001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS. 026 */ 027package org.opends.server.extensions; 028 029 030 031import static org.opends.messages.ConfigMessages.*; 032import static org.opends.messages.CoreMessages.*; 033 034import java.util.ArrayList; 035import java.util.List; 036import java.util.concurrent.LinkedBlockingQueue; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.concurrent.locks.ReentrantReadWriteLock; 040import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; 041import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; 042 043import org.forgerock.i18n.LocalizableMessage; 044import org.forgerock.i18n.slf4j.LocalizedLogger; 045import org.forgerock.opendj.config.server.ConfigException; 046import org.forgerock.opendj.ldap.ResultCode; 047import org.opends.server.admin.server.ConfigurationChangeListener; 048import org.opends.server.admin.std.server.TraditionalWorkQueueCfg; 049import org.opends.server.api.WorkQueue; 050import org.opends.server.core.DirectoryServer; 051import org.opends.server.monitors.TraditionalWorkQueueMonitor; 052import org.opends.server.types.CancelRequest; 053import org.forgerock.opendj.config.server.ConfigChangeResult; 054import org.opends.server.types.DirectoryException; 055import org.opends.server.types.InitializationException; 056import org.opends.server.types.Operation; 057 058 059 060/** 061 * This class defines a data structure for storing and interacting with the 062 * Directory Server work queue. 063 */ 064public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg> 065 implements ConfigurationChangeListener<TraditionalWorkQueueCfg> 066{ 067 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 068 069 /** 070 * The maximum number of times to retry getting the next operation from the 071 * queue if an unexpected failure occurs. 072 */ 073 private static final int MAX_RETRY_COUNT = 5; 074 075 /** The set of worker threads that will be used to process this work queue. */ 076 private final ArrayList<TraditionalWorkerThread> workerThreads = new ArrayList<>(); 077 078 /** The number of operations that have been submitted to the work queue for processing. */ 079 private AtomicLong opsSubmitted; 080 081 /** 082 * The number of times that an attempt to submit a new request has been 083 * rejected because the work queue is already at its maximum capacity. 084 */ 085 private AtomicLong queueFullRejects; 086 087 /** 088 * Indicates whether one or more of the worker threads needs to be killed at 089 * the next convenient opportunity. 090 */ 091 private boolean killThreads; 092 093 /** Indicates whether the Directory Server is shutting down. */ 094 private boolean shutdownRequested; 095 096 /** The thread number used for the last worker thread that was created. */ 097 private int lastThreadNumber; 098 099 /** 100 * The maximum number of pending requests that this work queue will allow 101 * before it will start rejecting them. 102 */ 103 private int maxCapacity; 104 105 /** 106 * The number of worker threads that should be active (or will be shortly if a 107 * configuration change has not been completely applied). 108 */ 109 private int numWorkerThreads; 110 111 /** 112 * The queue overflow policy: true indicates that operations will be blocked 113 * until the queue has available capacity, otherwise operations will be 114 * rejected. 115 * <p> 116 * This is hard-coded to true for now because a reject on full policy does not 117 * seem to have a valid use case. 118 * </p> 119 */ 120 private final boolean isBlocking = true; 121 122 /** The queue that will be used to actually hold the pending operations. */ 123 private LinkedBlockingQueue<Operation> opQueue; 124 125 /** 126 * The lock used to provide threadsafe access for the queue, used for 127 * non-config changes. 128 */ 129 private final ReadLock queueReadLock; 130 131 /** 132 * The lock used to provide threadsafe access for the queue, used for config 133 * changes. 134 */ 135 private final WriteLock queueWriteLock; 136 { 137 ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); 138 queueReadLock = lock.readLock(); 139 queueWriteLock = lock.writeLock(); 140 } 141 142 143 144 /** 145 * Creates a new instance of this work queue. All initialization should be 146 * performed in the <CODE>initializeWorkQueue</CODE> method. 147 */ 148 public TraditionalWorkQueue() 149 { 150 // No implementation should be performed here. 151 } 152 153 154 155 /** {@inheritDoc} */ 156 @Override 157 public void initializeWorkQueue(TraditionalWorkQueueCfg configuration) 158 throws ConfigException, InitializationException 159 { 160 queueWriteLock.lock(); 161 try 162 { 163 shutdownRequested = false; 164 killThreads = false; 165 opsSubmitted = new AtomicLong(0); 166 queueFullRejects = new AtomicLong(0); 167 168 // Register to be notified of any configuration changes. 169 configuration.addTraditionalChangeListener(this); 170 171 // Get the necessary configuration from the provided entry. 172 numWorkerThreads = 173 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 174 maxCapacity = configuration.getMaxWorkQueueCapacity(); 175 176 // Create the actual work queue. 177 if (maxCapacity > 0) 178 { 179 opQueue = new LinkedBlockingQueue<>(maxCapacity); 180 } 181 else 182 { 183 // This will never be the case, since the configuration definition 184 // ensures that the capacity is always finite. 185 opQueue = new LinkedBlockingQueue<>(); 186 } 187 188 // Create the set of worker threads that should be used to service the 189 // work queue. 190 for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads; 191 lastThreadNumber++) 192 { 193 TraditionalWorkerThread t = new TraditionalWorkerThread(this, 194 lastThreadNumber); 195 t.start(); 196 workerThreads.add(t); 197 } 198 199 // Create and register a monitor provider for the work queue. 200 try 201 { 202 TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor( 203 this); 204 monitor.initializeMonitorProvider(null); 205 DirectoryServer.registerMonitorProvider(monitor); 206 } 207 catch (Exception e) 208 { 209 logger.traceException(e); 210 logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, TraditionalWorkQueueMonitor.class, e); 211 } 212 } 213 finally 214 { 215 queueWriteLock.unlock(); 216 } 217 } 218 219 220 221 /** {@inheritDoc} */ 222 @Override 223 public void finalizeWorkQueue(LocalizableMessage reason) 224 { 225 queueWriteLock.lock(); 226 try 227 { 228 shutdownRequested = true; 229 } 230 finally 231 { 232 queueWriteLock.unlock(); 233 } 234 235 // From now on no more operations can be enqueued or dequeued. 236 237 // Send responses to any operations in the pending queue to indicate that 238 // they won't be processed because the server is shutting down. 239 CancelRequest cancelRequest = new CancelRequest(true, reason); 240 ArrayList<Operation> pendingOperations = new ArrayList<>(); 241 opQueue.removeAll(pendingOperations); 242 for (Operation o : pendingOperations) 243 { 244 try 245 { 246 // The operation has no chance of responding to the cancel 247 // request so avoid waiting for a cancel response. 248 if (o.getCancelResult() == null) 249 { 250 o.abort(cancelRequest); 251 } 252 } 253 catch (Exception e) 254 { 255 logger.traceException(e); 256 logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e); 257 } 258 } 259 260 // Notify all the worker threads of the shutdown. 261 for (TraditionalWorkerThread t : workerThreads) 262 { 263 try 264 { 265 t.shutDown(); 266 } 267 catch (Exception e) 268 { 269 logger.traceException(e); 270 logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e); 271 } 272 } 273 } 274 275 276 277 /** 278 * Indicates whether this work queue has received a request to shut down. 279 * 280 * @return <CODE>true</CODE> if the work queue has recieved a request to shut 281 * down, or <CODE>false</CODE> if not. 282 */ 283 public boolean shutdownRequested() 284 { 285 queueReadLock.lock(); 286 try 287 { 288 return shutdownRequested; 289 } 290 finally 291 { 292 queueReadLock.unlock(); 293 } 294 } 295 296 297 298 /** 299 * Submits an operation to be processed by one of the worker threads 300 * associated with this work queue. 301 * 302 * @param operation 303 * The operation to be processed. 304 * @throws DirectoryException 305 * If the provided operation is not accepted for some reason (e.g., 306 * if the server is shutting down or the pending operation queue is 307 * already at its maximum capacity). 308 */ 309 @Override 310 public void submitOperation(Operation operation) throws DirectoryException 311 { 312 submitOperation(operation, isBlocking); 313 } 314 315 /** {@inheritDoc} */ 316 @Override 317 public boolean trySubmitOperation(Operation operation) 318 throws DirectoryException 319 { 320 try 321 { 322 submitOperation(operation, false); 323 return true; 324 } 325 catch (DirectoryException e) 326 { 327 if (ResultCode.BUSY == e.getResultCode()) 328 { 329 return false; 330 } 331 throw e; 332 } 333 } 334 335 private void submitOperation(Operation operation, 336 boolean blockEnqueuingWhenFull) throws DirectoryException 337 { 338 queueReadLock.lock(); 339 try 340 { 341 if (shutdownRequested) 342 { 343 LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 344 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 345 } 346 347 if (blockEnqueuingWhenFull) 348 { 349 try 350 { 351 // If the queue is full and there is an administrative change taking 352 // place then starvation could arise: this thread will hold the read 353 // lock, the admin thread will be waiting on the write lock, and the 354 // worker threads may be queued behind the admin thread. Since the 355 // worker threads cannot run, the queue will never empty and allow 356 // this thread to proceed. To help things out we can periodically 357 // yield the read lock when the queue is full. 358 while (!opQueue.offer(operation, 1, TimeUnit.SECONDS)) 359 { 360 queueReadLock.unlock(); 361 Thread.yield(); 362 queueReadLock.lock(); 363 364 if (shutdownRequested) 365 { 366 LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 367 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 368 } 369 } 370 } 371 catch (InterruptedException e) 372 { 373 // We cannot handle the interruption here. Reject the request and 374 // re-interrupt this thread. 375 Thread.currentThread().interrupt(); 376 377 queueFullRejects.incrementAndGet(); 378 379 LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get(); 380 throw new DirectoryException(ResultCode.BUSY, message); 381 } 382 } 383 else 384 { 385 if (!opQueue.offer(operation)) 386 { 387 queueFullRejects.incrementAndGet(); 388 389 LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity); 390 throw new DirectoryException(ResultCode.BUSY, message); 391 } 392 } 393 394 opsSubmitted.incrementAndGet(); 395 } 396 finally 397 { 398 queueReadLock.unlock(); 399 } 400 } 401 402 403 404 /** 405 * Retrieves the next operation that should be processed by one of the worker 406 * threads, blocking if necessary until a new request arrives. This method 407 * should only be called by a worker thread associated with this work queue. 408 * 409 * @param workerThread 410 * The worker thread that is requesting the operation. 411 * @return The next operation that should be processed, or <CODE>null</CODE> 412 * if the server is shutting down and no more operations will be 413 * processed. 414 */ 415 public Operation nextOperation(TraditionalWorkerThread workerThread) 416 { 417 return retryNextOperation(workerThread, 0); 418 } 419 420 421 422 /** 423 * Retrieves the next operation that should be processed by one of the worker 424 * threads following a previous failure attempt. A maximum of five consecutive 425 * failures will be allowed before returning <CODE>null</CODE>, which will 426 * cause the associated thread to exit. 427 * 428 * @param workerThread 429 * The worker thread that is requesting the operation. 430 * @param numFailures 431 * The number of consecutive failures that the worker thread has 432 * experienced so far. If this gets too high, then this method will 433 * return <CODE>null</CODE> rather than retrying. 434 * @return The next operation that should be processed, or <CODE>null</CODE> 435 * if the server is shutting down and no more operations will be 436 * processed, or if there have been too many consecutive failures. 437 */ 438 private Operation retryNextOperation(TraditionalWorkerThread workerThread, 439 int numFailures) 440 { 441 // See if we should kill off this thread. This could be necessary if the 442 // number of worker threads has been decreased with the server online. If 443 // so, then return null and the thread will exit. 444 queueReadLock.lock(); 445 try 446 { 447 if (shutdownRequested) 448 { 449 return null; 450 } 451 452 if (killThreads && tryKillThisWorkerThread(workerThread)) 453 { 454 return null; 455 } 456 457 if (numFailures > MAX_RETRY_COUNT) 458 { 459 logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread 460 .currentThread().getName(), numFailures, MAX_RETRY_COUNT); 461 462 return null; 463 } 464 465 while (true) 466 { 467 Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS); 468 if (nextOperation != null) 469 { 470 return nextOperation; 471 } 472 473 // There was no work to do in the specified length of time. Release the 474 // read lock allowing shutdown or config changes to proceed and then see 475 // if we should give up or check again. 476 queueReadLock.unlock(); 477 Thread.yield(); 478 queueReadLock.lock(); 479 480 if (shutdownRequested) 481 { 482 return null; 483 } 484 485 if (killThreads && tryKillThisWorkerThread(workerThread)) 486 { 487 return null; 488 } 489 } 490 } 491 catch (InterruptedException ie) 492 { 493 // This is somewhat expected so don't log. 494 // assert debugException(CLASS_NAME, "retryNextOperation", ie); 495 496 // If this occurs, then the worker thread must have been interrupted for 497 // some reason. This could be because the Directory Server is shutting 498 // down, in which case we should return null. 499 if (shutdownRequested) 500 { 501 return null; 502 } 503 504 // If we've gotten here, then the worker thread was interrupted for some 505 // other reason. This should not happen, and we need to log a message. 506 logger.warn(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN, Thread.currentThread().getName(), ie); 507 } 508 catch (Exception e) 509 { 510 logger.traceException(e); 511 512 // This should not happen. The only recourse we have is to log a message 513 // and try again. 514 logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e); 515 } 516 finally 517 { 518 queueReadLock.unlock(); 519 } 520 521 // An exception has occurred - retry. 522 return retryNextOperation(workerThread, numFailures + 1); 523 } 524 525 526 527 /** 528 * Kills this worker thread if needed. This method assumes that the read lock 529 * is already taken and ensure that it is taken on exit. 530 * 531 * @param workerThread 532 * The worker thread associated with this thread. 533 * @return {@code true} if this thread was killed or is about to be killed as 534 * a result of shutdown. 535 */ 536 private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread) 537 { 538 queueReadLock.unlock(); 539 queueWriteLock.lock(); 540 try 541 { 542 if (shutdownRequested) 543 { 544 // Shutdown may have been requested between unlock/lock. This thread is 545 // about to shutdown anyway, so return true. 546 return true; 547 } 548 549 int currentThreads = workerThreads.size(); 550 if (currentThreads > numWorkerThreads) 551 { 552 if (workerThreads.remove(Thread.currentThread())) 553 { 554 currentThreads--; 555 } 556 557 if (currentThreads <= numWorkerThreads) 558 { 559 killThreads = false; 560 } 561 562 workerThread.setStoppedByReducedThreadNumber(); 563 return true; 564 } 565 } 566 finally 567 { 568 queueWriteLock.unlock(); 569 queueReadLock.lock(); 570 571 if (shutdownRequested) 572 { 573 // Shutdown may have been requested between unlock/lock. This thread is 574 // about to shutdown anyway, so return true. 575 return true; 576 } 577 } 578 return false; 579 } 580 581 582 583 /** 584 * Retrieves the total number of operations that have been successfully 585 * submitted to this work queue for processing since server startup. This does 586 * not include operations that have been rejected for some reason like the 587 * queue already at its maximum capacity. 588 * 589 * @return The total number of operations that have been successfully 590 * submitted to this work queue since startup. 591 */ 592 public long getOpsSubmitted() 593 { 594 return opsSubmitted.longValue(); 595 } 596 597 598 599 /** 600 * Retrieves the total number of operations that have been rejected because 601 * the work queue was already at its maximum capacity. 602 * 603 * @return The total number of operations that have been rejected because the 604 * work queue was already at its maximum capacity. 605 */ 606 public long getOpsRejectedDueToQueueFull() 607 { 608 return queueFullRejects.longValue(); 609 } 610 611 612 613 /** 614 * Retrieves the number of pending operations in the queue that have not yet 615 * been picked up for processing. Note that this method is not a constant-time 616 * operation and can be relatively inefficient, so it should be used 617 * sparingly. 618 * 619 * @return The number of pending operations in the queue that have not yet 620 * been picked up for processing. 621 */ 622 public int size() 623 { 624 queueReadLock.lock(); 625 try 626 { 627 return opQueue.size(); 628 } 629 finally 630 { 631 queueReadLock.unlock(); 632 } 633 } 634 635 636 637 /** {@inheritDoc} */ 638 @Override 639 public boolean isConfigurationChangeAcceptable( 640 TraditionalWorkQueueCfg configuration, List<LocalizableMessage> unacceptableReasons) 641 { 642 return true; 643 } 644 645 646 647 /** {@inheritDoc} */ 648 @Override 649 public ConfigChangeResult applyConfigurationChange( 650 TraditionalWorkQueueCfg configuration) 651 { 652 int newNumThreads = 653 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 654 int newMaxCapacity = configuration.getMaxWorkQueueCapacity(); 655 656 // Apply a change to the number of worker threads if appropriate. 657 int currentThreads = workerThreads.size(); 658 if (newNumThreads != currentThreads) 659 { 660 queueWriteLock.lock(); 661 try 662 { 663 int threadsToAdd = newNumThreads - currentThreads; 664 if (threadsToAdd > 0) 665 { 666 for (int i = 0; i < threadsToAdd; i++) 667 { 668 TraditionalWorkerThread t = new TraditionalWorkerThread(this, 669 lastThreadNumber++); 670 workerThreads.add(t); 671 t.start(); 672 } 673 674 killThreads = false; 675 } 676 else 677 { 678 killThreads = true; 679 } 680 681 numWorkerThreads = newNumThreads; 682 } 683 catch (Exception e) 684 { 685 logger.traceException(e); 686 } 687 finally 688 { 689 queueWriteLock.unlock(); 690 } 691 } 692 693 694 // Apply a change to the maximum capacity if appropriate. Since we can't 695 // change capacity on the fly, then we'll have to create a new queue and 696 // transfer any remaining items into it. Any thread that is waiting on the 697 // original queue will time out after at most a few seconds and further 698 // checks will be against the new queue. 699 if (newMaxCapacity != maxCapacity) 700 { 701 // First switch the queue with the exclusive lock. 702 queueWriteLock.lock(); 703 LinkedBlockingQueue<Operation> oldOpQueue; 704 try 705 { 706 LinkedBlockingQueue<Operation> newOpQueue = null; 707 if (newMaxCapacity > 0) 708 { 709 newOpQueue = new LinkedBlockingQueue<>(newMaxCapacity); 710 } 711 else 712 { 713 newOpQueue = new LinkedBlockingQueue<>(); 714 } 715 716 oldOpQueue = opQueue; 717 opQueue = newOpQueue; 718 719 maxCapacity = newMaxCapacity; 720 } 721 finally 722 { 723 queueWriteLock.unlock(); 724 } 725 726 // Now resubmit any pending requests - we'll need the shared lock. 727 Operation pendingOperation = null; 728 queueReadLock.lock(); 729 try 730 { 731 // We have to be careful when adding any existing pending operations 732 // because the new capacity could be less than what was already 733 // backlogged in the previous queue. If that happens, we may have to 734 // loop a few times to get everything in there. 735 while ((pendingOperation = oldOpQueue.poll()) != null) 736 { 737 opQueue.put(pendingOperation); 738 } 739 } 740 catch (InterruptedException e) 741 { 742 // We cannot handle the interruption here. Cancel pending requests and 743 // re-interrupt this thread. 744 Thread.currentThread().interrupt(); 745 746 LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get(); 747 CancelRequest cancelRequest = new CancelRequest(true, message); 748 if (pendingOperation != null) 749 { 750 pendingOperation.abort(cancelRequest); 751 } 752 while ((pendingOperation = oldOpQueue.poll()) != null) 753 { 754 pendingOperation.abort(cancelRequest); 755 } 756 } 757 finally 758 { 759 queueReadLock.unlock(); 760 } 761 } 762 763 return new ConfigChangeResult(); 764 } 765 766 767 768 /** {@inheritDoc} */ 769 @Override 770 public boolean isIdle() 771 { 772 queueReadLock.lock(); 773 try 774 { 775 if (!opQueue.isEmpty()) 776 { 777 return false; 778 } 779 780 for (TraditionalWorkerThread t : workerThreads) 781 { 782 if (t.isActive()) 783 { 784 return false; 785 } 786 } 787 788 return true; 789 } 790 finally 791 { 792 queueReadLock.unlock(); 793 } 794 } 795 796 /** 797 * Return the number of worker threads used by this WorkQueue. 798 * 799 * @return the number of worker threads used by this WorkQueue 800 */ 801 @Override 802 public int getNumWorkerThreads() 803 { 804 return this.numWorkerThreads; 805 } 806}