001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS.
026 */
027package org.opends.server.extensions;
028
029
030
031import static org.opends.messages.ConfigMessages.*;
032import static org.opends.messages.CoreMessages.*;
033
034import java.util.ArrayList;
035import java.util.List;
036import java.util.concurrent.LinkedBlockingQueue;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicLong;
039import java.util.concurrent.locks.ReentrantReadWriteLock;
040import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
041import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
042
043import org.forgerock.i18n.LocalizableMessage;
044import org.forgerock.i18n.slf4j.LocalizedLogger;
045import org.forgerock.opendj.config.server.ConfigException;
046import org.forgerock.opendj.ldap.ResultCode;
047import org.opends.server.admin.server.ConfigurationChangeListener;
048import org.opends.server.admin.std.server.TraditionalWorkQueueCfg;
049import org.opends.server.api.WorkQueue;
050import org.opends.server.core.DirectoryServer;
051import org.opends.server.monitors.TraditionalWorkQueueMonitor;
052import org.opends.server.types.CancelRequest;
053import org.forgerock.opendj.config.server.ConfigChangeResult;
054import org.opends.server.types.DirectoryException;
055import org.opends.server.types.InitializationException;
056import org.opends.server.types.Operation;
057
058
059
060/**
061 * This class defines a data structure for storing and interacting with the
062 * Directory Server work queue.
063 */
064public class TraditionalWorkQueue extends WorkQueue<TraditionalWorkQueueCfg>
065    implements ConfigurationChangeListener<TraditionalWorkQueueCfg>
066{
067  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
068
069  /**
070   * The maximum number of times to retry getting the next operation from the
071   * queue if an unexpected failure occurs.
072   */
073  private static final int MAX_RETRY_COUNT = 5;
074
075  /** The set of worker threads that will be used to process this work queue. */
076  private final ArrayList<TraditionalWorkerThread> workerThreads = new ArrayList<>();
077
078  /** The number of operations that have been submitted to the work queue for processing. */
079  private AtomicLong opsSubmitted;
080
081  /**
082   * The number of times that an attempt to submit a new request has been
083   * rejected because the work queue is already at its maximum capacity.
084   */
085  private AtomicLong queueFullRejects;
086
087  /**
088   * Indicates whether one or more of the worker threads needs to be killed at
089   * the next convenient opportunity.
090   */
091  private boolean killThreads;
092
093  /** Indicates whether the Directory Server is shutting down. */
094  private boolean shutdownRequested;
095
096  /** The thread number used for the last worker thread that was created. */
097  private int lastThreadNumber;
098
099  /**
100   * The maximum number of pending requests that this work queue will allow
101   * before it will start rejecting them.
102   */
103  private int maxCapacity;
104
105  /**
106   * The number of worker threads that should be active (or will be shortly if a
107   * configuration change has not been completely applied).
108   */
109  private int numWorkerThreads;
110
111  /**
112   * The queue overflow policy: true indicates that operations will be blocked
113   * until the queue has available capacity, otherwise operations will be
114   * rejected.
115   * <p>
116   * This is hard-coded to true for now because a reject on full policy does not
117   * seem to have a valid use case.
118   * </p>
119   */
120  private final boolean isBlocking = true;
121
122  /** The queue that will be used to actually hold the pending operations. */
123  private LinkedBlockingQueue<Operation> opQueue;
124
125  /**
126   * The lock used to provide threadsafe access for the queue, used for
127   * non-config changes.
128   */
129  private final ReadLock queueReadLock;
130
131  /**
132   * The lock used to provide threadsafe access for the queue, used for config
133   * changes.
134   */
135  private final WriteLock queueWriteLock;
136  {
137    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
138    queueReadLock = lock.readLock();
139    queueWriteLock = lock.writeLock();
140  }
141
142
143
144  /**
145   * Creates a new instance of this work queue. All initialization should be
146   * performed in the <CODE>initializeWorkQueue</CODE> method.
147   */
148  public TraditionalWorkQueue()
149  {
150    // No implementation should be performed here.
151  }
152
153
154
155  /** {@inheritDoc} */
156  @Override
157  public void initializeWorkQueue(TraditionalWorkQueueCfg configuration)
158      throws ConfigException, InitializationException
159  {
160    queueWriteLock.lock();
161    try
162    {
163      shutdownRequested = false;
164      killThreads = false;
165      opsSubmitted = new AtomicLong(0);
166      queueFullRejects = new AtomicLong(0);
167
168      // Register to be notified of any configuration changes.
169      configuration.addTraditionalChangeListener(this);
170
171      // Get the necessary configuration from the provided entry.
172      numWorkerThreads =
173          computeNumWorkerThreads(configuration.getNumWorkerThreads());
174      maxCapacity = configuration.getMaxWorkQueueCapacity();
175
176      // Create the actual work queue.
177      if (maxCapacity > 0)
178      {
179        opQueue = new LinkedBlockingQueue<>(maxCapacity);
180      }
181      else
182      {
183        // This will never be the case, since the configuration definition
184        // ensures that the capacity is always finite.
185        opQueue = new LinkedBlockingQueue<>();
186      }
187
188      // Create the set of worker threads that should be used to service the
189      // work queue.
190      for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
191        lastThreadNumber++)
192      {
193        TraditionalWorkerThread t = new TraditionalWorkerThread(this,
194            lastThreadNumber);
195        t.start();
196        workerThreads.add(t);
197      }
198
199      // Create and register a monitor provider for the work queue.
200      try
201      {
202        TraditionalWorkQueueMonitor monitor = new TraditionalWorkQueueMonitor(
203            this);
204        monitor.initializeMonitorProvider(null);
205        DirectoryServer.registerMonitorProvider(monitor);
206      }
207      catch (Exception e)
208      {
209        logger.traceException(e);
210        logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, TraditionalWorkQueueMonitor.class, e);
211      }
212    }
213    finally
214    {
215      queueWriteLock.unlock();
216    }
217  }
218
219
220
221  /** {@inheritDoc} */
222  @Override
223  public void finalizeWorkQueue(LocalizableMessage reason)
224  {
225    queueWriteLock.lock();
226    try
227    {
228      shutdownRequested = true;
229    }
230    finally
231    {
232      queueWriteLock.unlock();
233    }
234
235    // From now on no more operations can be enqueued or dequeued.
236
237    // Send responses to any operations in the pending queue to indicate that
238    // they won't be processed because the server is shutting down.
239    CancelRequest cancelRequest = new CancelRequest(true, reason);
240    ArrayList<Operation> pendingOperations = new ArrayList<>();
241    opQueue.removeAll(pendingOperations);
242    for (Operation o : pendingOperations)
243    {
244      try
245      {
246        // The operation has no chance of responding to the cancel
247        // request so avoid waiting for a cancel response.
248        if (o.getCancelResult() == null)
249        {
250          o.abort(cancelRequest);
251        }
252      }
253      catch (Exception e)
254      {
255        logger.traceException(e);
256        logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e);
257      }
258    }
259
260    // Notify all the worker threads of the shutdown.
261    for (TraditionalWorkerThread t : workerThreads)
262    {
263      try
264      {
265        t.shutDown();
266      }
267      catch (Exception e)
268      {
269        logger.traceException(e);
270        logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e);
271      }
272    }
273  }
274
275
276
277  /**
278   * Indicates whether this work queue has received a request to shut down.
279   *
280   * @return <CODE>true</CODE> if the work queue has recieved a request to shut
281   *         down, or <CODE>false</CODE> if not.
282   */
283  public boolean shutdownRequested()
284  {
285    queueReadLock.lock();
286    try
287    {
288      return shutdownRequested;
289    }
290    finally
291    {
292      queueReadLock.unlock();
293    }
294  }
295
296
297
298  /**
299   * Submits an operation to be processed by one of the worker threads
300   * associated with this work queue.
301   *
302   * @param operation
303   *          The operation to be processed.
304   * @throws DirectoryException
305   *           If the provided operation is not accepted for some reason (e.g.,
306   *           if the server is shutting down or the pending operation queue is
307   *           already at its maximum capacity).
308   */
309  @Override
310  public void submitOperation(Operation operation) throws DirectoryException
311  {
312    submitOperation(operation, isBlocking);
313  }
314
315  /** {@inheritDoc} */
316  @Override
317  public boolean trySubmitOperation(Operation operation)
318      throws DirectoryException
319  {
320    try
321    {
322      submitOperation(operation, false);
323      return true;
324    }
325    catch (DirectoryException e)
326    {
327      if (ResultCode.BUSY == e.getResultCode())
328      {
329        return false;
330      }
331      throw e;
332    }
333  }
334
335  private void submitOperation(Operation operation,
336      boolean blockEnqueuingWhenFull) throws DirectoryException
337  {
338    queueReadLock.lock();
339    try
340    {
341      if (shutdownRequested)
342      {
343        LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
344        throw new DirectoryException(ResultCode.UNAVAILABLE, message);
345      }
346
347      if (blockEnqueuingWhenFull)
348      {
349        try
350        {
351          // If the queue is full and there is an administrative change taking
352          // place then starvation could arise: this thread will hold the read
353          // lock, the admin thread will be waiting on the write lock, and the
354          // worker threads may be queued behind the admin thread. Since the
355          // worker threads cannot run, the queue will never empty and allow
356          // this thread to proceed. To help things out we can periodically
357          // yield the read lock when the queue is full.
358          while (!opQueue.offer(operation, 1, TimeUnit.SECONDS))
359          {
360            queueReadLock.unlock();
361            Thread.yield();
362            queueReadLock.lock();
363
364            if (shutdownRequested)
365            {
366              LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
367              throw new DirectoryException(ResultCode.UNAVAILABLE, message);
368            }
369          }
370        }
371        catch (InterruptedException e)
372        {
373          // We cannot handle the interruption here. Reject the request and
374          // re-interrupt this thread.
375          Thread.currentThread().interrupt();
376
377          queueFullRejects.incrementAndGet();
378
379          LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
380          throw new DirectoryException(ResultCode.BUSY, message);
381        }
382      }
383      else
384      {
385        if (!opQueue.offer(operation))
386        {
387          queueFullRejects.incrementAndGet();
388
389          LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_FULL.get(maxCapacity);
390          throw new DirectoryException(ResultCode.BUSY, message);
391        }
392      }
393
394      opsSubmitted.incrementAndGet();
395    }
396    finally
397    {
398      queueReadLock.unlock();
399    }
400  }
401
402
403
404  /**
405   * Retrieves the next operation that should be processed by one of the worker
406   * threads, blocking if necessary until a new request arrives. This method
407   * should only be called by a worker thread associated with this work queue.
408   *
409   * @param workerThread
410   *          The worker thread that is requesting the operation.
411   * @return The next operation that should be processed, or <CODE>null</CODE>
412   *         if the server is shutting down and no more operations will be
413   *         processed.
414   */
415  public Operation nextOperation(TraditionalWorkerThread workerThread)
416  {
417    return retryNextOperation(workerThread, 0);
418  }
419
420
421
422  /**
423   * Retrieves the next operation that should be processed by one of the worker
424   * threads following a previous failure attempt. A maximum of five consecutive
425   * failures will be allowed before returning <CODE>null</CODE>, which will
426   * cause the associated thread to exit.
427   *
428   * @param workerThread
429   *          The worker thread that is requesting the operation.
430   * @param numFailures
431   *          The number of consecutive failures that the worker thread has
432   *          experienced so far. If this gets too high, then this method will
433   *          return <CODE>null</CODE> rather than retrying.
434   * @return The next operation that should be processed, or <CODE>null</CODE>
435   *         if the server is shutting down and no more operations will be
436   *         processed, or if there have been too many consecutive failures.
437   */
438  private Operation retryNextOperation(TraditionalWorkerThread workerThread,
439      int numFailures)
440  {
441    // See if we should kill off this thread. This could be necessary if the
442    // number of worker threads has been decreased with the server online. If
443    // so, then return null and the thread will exit.
444    queueReadLock.lock();
445    try
446    {
447      if (shutdownRequested)
448      {
449        return null;
450      }
451
452      if (killThreads && tryKillThisWorkerThread(workerThread))
453      {
454        return null;
455      }
456
457      if (numFailures > MAX_RETRY_COUNT)
458      {
459        logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread
460            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
461
462        return null;
463      }
464
465      while (true)
466      {
467        Operation nextOperation = opQueue.poll(5, TimeUnit.SECONDS);
468        if (nextOperation != null)
469        {
470          return nextOperation;
471        }
472
473        // There was no work to do in the specified length of time. Release the
474        // read lock allowing shutdown or config changes to proceed and then see
475        // if we should give up or check again.
476        queueReadLock.unlock();
477        Thread.yield();
478        queueReadLock.lock();
479
480        if (shutdownRequested)
481        {
482          return null;
483        }
484
485        if (killThreads && tryKillThisWorkerThread(workerThread))
486        {
487          return null;
488        }
489      }
490    }
491    catch (InterruptedException ie)
492    {
493      // This is somewhat expected so don't log.
494      // assert debugException(CLASS_NAME, "retryNextOperation", ie);
495
496      // If this occurs, then the worker thread must have been interrupted for
497      // some reason. This could be because the Directory Server is shutting
498      // down, in which case we should return null.
499      if (shutdownRequested)
500      {
501        return null;
502      }
503
504      // If we've gotten here, then the worker thread was interrupted for some
505      // other reason. This should not happen, and we need to log a message.
506      logger.warn(WARN_WORKER_INTERRUPTED_WITHOUT_SHUTDOWN, Thread.currentThread().getName(), ie);
507    }
508    catch (Exception e)
509    {
510      logger.traceException(e);
511
512      // This should not happen. The only recourse we have is to log a message
513      // and try again.
514      logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e);
515    }
516    finally
517    {
518      queueReadLock.unlock();
519    }
520
521    // An exception has occurred - retry.
522    return retryNextOperation(workerThread, numFailures + 1);
523  }
524
525
526
527  /**
528   * Kills this worker thread if needed. This method assumes that the read lock
529   * is already taken and ensure that it is taken on exit.
530   *
531   * @param workerThread
532   *          The worker thread associated with this thread.
533   * @return {@code true} if this thread was killed or is about to be killed as
534   *         a result of shutdown.
535   */
536  private boolean tryKillThisWorkerThread(TraditionalWorkerThread workerThread)
537  {
538    queueReadLock.unlock();
539    queueWriteLock.lock();
540    try
541    {
542      if (shutdownRequested)
543      {
544        // Shutdown may have been requested between unlock/lock. This thread is
545        // about to shutdown anyway, so return true.
546        return true;
547      }
548
549      int currentThreads = workerThreads.size();
550      if (currentThreads > numWorkerThreads)
551      {
552        if (workerThreads.remove(Thread.currentThread()))
553        {
554          currentThreads--;
555        }
556
557        if (currentThreads <= numWorkerThreads)
558        {
559          killThreads = false;
560        }
561
562        workerThread.setStoppedByReducedThreadNumber();
563        return true;
564      }
565    }
566    finally
567    {
568      queueWriteLock.unlock();
569      queueReadLock.lock();
570
571      if (shutdownRequested)
572      {
573        // Shutdown may have been requested between unlock/lock. This thread is
574        // about to shutdown anyway, so return true.
575        return true;
576      }
577    }
578    return false;
579  }
580
581
582
583  /**
584   * Retrieves the total number of operations that have been successfully
585   * submitted to this work queue for processing since server startup. This does
586   * not include operations that have been rejected for some reason like the
587   * queue already at its maximum capacity.
588   *
589   * @return The total number of operations that have been successfully
590   *         submitted to this work queue since startup.
591   */
592  public long getOpsSubmitted()
593  {
594    return opsSubmitted.longValue();
595  }
596
597
598
599  /**
600   * Retrieves the total number of operations that have been rejected because
601   * the work queue was already at its maximum capacity.
602   *
603   * @return The total number of operations that have been rejected because the
604   *         work queue was already at its maximum capacity.
605   */
606  public long getOpsRejectedDueToQueueFull()
607  {
608    return queueFullRejects.longValue();
609  }
610
611
612
613  /**
614   * Retrieves the number of pending operations in the queue that have not yet
615   * been picked up for processing. Note that this method is not a constant-time
616   * operation and can be relatively inefficient, so it should be used
617   * sparingly.
618   *
619   * @return The number of pending operations in the queue that have not yet
620   *         been picked up for processing.
621   */
622  public int size()
623  {
624    queueReadLock.lock();
625    try
626    {
627      return opQueue.size();
628    }
629    finally
630    {
631      queueReadLock.unlock();
632    }
633  }
634
635
636
637  /** {@inheritDoc} */
638  @Override
639  public boolean isConfigurationChangeAcceptable(
640      TraditionalWorkQueueCfg configuration, List<LocalizableMessage> unacceptableReasons)
641  {
642    return true;
643  }
644
645
646
647  /** {@inheritDoc} */
648  @Override
649  public ConfigChangeResult applyConfigurationChange(
650      TraditionalWorkQueueCfg configuration)
651  {
652    int newNumThreads =
653        computeNumWorkerThreads(configuration.getNumWorkerThreads());
654    int newMaxCapacity = configuration.getMaxWorkQueueCapacity();
655
656    // Apply a change to the number of worker threads if appropriate.
657    int currentThreads = workerThreads.size();
658    if (newNumThreads != currentThreads)
659    {
660      queueWriteLock.lock();
661      try
662      {
663        int threadsToAdd = newNumThreads - currentThreads;
664        if (threadsToAdd > 0)
665        {
666          for (int i = 0; i < threadsToAdd; i++)
667          {
668            TraditionalWorkerThread t = new TraditionalWorkerThread(this,
669                lastThreadNumber++);
670            workerThreads.add(t);
671            t.start();
672          }
673
674          killThreads = false;
675        }
676        else
677        {
678          killThreads = true;
679        }
680
681        numWorkerThreads = newNumThreads;
682      }
683      catch (Exception e)
684      {
685        logger.traceException(e);
686      }
687      finally
688      {
689        queueWriteLock.unlock();
690      }
691    }
692
693
694    // Apply a change to the maximum capacity if appropriate. Since we can't
695    // change capacity on the fly, then we'll have to create a new queue and
696    // transfer any remaining items into it. Any thread that is waiting on the
697    // original queue will time out after at most a few seconds and further
698    // checks will be against the new queue.
699    if (newMaxCapacity != maxCapacity)
700    {
701      // First switch the queue with the exclusive lock.
702      queueWriteLock.lock();
703      LinkedBlockingQueue<Operation> oldOpQueue;
704      try
705      {
706        LinkedBlockingQueue<Operation> newOpQueue = null;
707        if (newMaxCapacity > 0)
708        {
709          newOpQueue = new LinkedBlockingQueue<>(newMaxCapacity);
710        }
711        else
712        {
713          newOpQueue = new LinkedBlockingQueue<>();
714        }
715
716        oldOpQueue = opQueue;
717        opQueue = newOpQueue;
718
719        maxCapacity = newMaxCapacity;
720      }
721      finally
722      {
723        queueWriteLock.unlock();
724      }
725
726      // Now resubmit any pending requests - we'll need the shared lock.
727      Operation pendingOperation = null;
728      queueReadLock.lock();
729      try
730      {
731        // We have to be careful when adding any existing pending operations
732        // because the new capacity could be less than what was already
733        // backlogged in the previous queue. If that happens, we may have to
734        // loop a few times to get everything in there.
735        while ((pendingOperation = oldOpQueue.poll()) != null)
736        {
737          opQueue.put(pendingOperation);
738        }
739      }
740      catch (InterruptedException e)
741      {
742        // We cannot handle the interruption here. Cancel pending requests and
743        // re-interrupt this thread.
744        Thread.currentThread().interrupt();
745
746        LocalizableMessage message = WARN_OP_REJECTED_BY_QUEUE_INTERRUPT.get();
747        CancelRequest cancelRequest = new CancelRequest(true, message);
748        if (pendingOperation != null)
749        {
750          pendingOperation.abort(cancelRequest);
751        }
752        while ((pendingOperation = oldOpQueue.poll()) != null)
753        {
754          pendingOperation.abort(cancelRequest);
755        }
756      }
757      finally
758      {
759        queueReadLock.unlock();
760      }
761    }
762
763    return new ConfigChangeResult();
764  }
765
766
767
768  /** {@inheritDoc} */
769  @Override
770  public boolean isIdle()
771  {
772    queueReadLock.lock();
773    try
774    {
775      if (!opQueue.isEmpty())
776      {
777        return false;
778      }
779
780      for (TraditionalWorkerThread t : workerThreads)
781      {
782        if (t.isActive())
783        {
784          return false;
785        }
786      }
787
788      return true;
789    }
790    finally
791    {
792      queueReadLock.unlock();
793    }
794  }
795
796  /**
797   * Return the number of worker threads used by this WorkQueue.
798   *
799   * @return the number of worker threads used by this WorkQueue
800   */
801  @Override
802  public int getNumWorkerThreads()
803  {
804    return this.numWorkerThreads;
805  }
806}