001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS.
026 */
027package org.opends.server.extensions;
028
029import static org.opends.messages.ConfigMessages.*;
030import static org.opends.messages.CoreMessages.*;
031
032import java.util.ArrayList;
033import java.util.List;
034import java.util.concurrent.ConcurrentLinkedQueue;
035import java.util.concurrent.Semaphore;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicLong;
038
039import org.forgerock.i18n.LocalizableMessage;
040import org.forgerock.i18n.slf4j.LocalizedLogger;
041import org.forgerock.opendj.config.server.ConfigException;
042import org.forgerock.opendj.ldap.ResultCode;
043import org.opends.server.admin.server.ConfigurationChangeListener;
044import org.opends.server.admin.std.server.ParallelWorkQueueCfg;
045import org.opends.server.api.WorkQueue;
046import org.opends.server.core.DirectoryServer;
047import org.opends.server.monitors.ParallelWorkQueueMonitor;
048import org.opends.server.types.CancelRequest;
049import org.forgerock.opendj.config.server.ConfigChangeResult;
050import org.opends.server.types.DirectoryException;
051import org.opends.server.types.InitializationException;
052import org.opends.server.types.Operation;
053
054/**
055 * This class defines a data structure for storing and interacting with the
056 * Directory Server work queue.
057 */
058public class ParallelWorkQueue
059       extends WorkQueue<ParallelWorkQueueCfg>
060       implements ConfigurationChangeListener<ParallelWorkQueueCfg>
061{
062  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
063
064
065
066
067  /**
068   * The maximum number of times to retry getting the next operation from the
069   * queue if an unexpected failure occurs.
070   */
071  private static final int MAX_RETRY_COUNT = 5;
072
073
074
075  /** The set of worker threads that will be used to process this work queue. */
076  private ArrayList<ParallelWorkerThread> workerThreads;
077
078  /**
079   * The number of operations that have been submitted to the work queue for
080   * processing.
081   */
082  private AtomicLong opsSubmitted;
083
084  /**
085   * Indicates whether one or more of the worker threads needs to be killed at
086   * the next convenient opportunity.
087   */
088  private boolean killThreads;
089
090  /** Indicates whether the Directory Server is shutting down. */
091  private boolean shutdownRequested;
092
093  /** The thread number used for the last worker thread that was created. */
094  private int lastThreadNumber;
095
096  /**
097   * The number of worker threads that should be active (or will be shortly if a
098   * configuration change has not been completely applied).
099   */
100  private int numWorkerThreads;
101
102  /** The queue that will be used to actually hold the pending operations. */
103  private ConcurrentLinkedQueue<Operation> opQueue;
104
105  /** The lock used to provide threadsafe access for the queue. */
106  private final Object queueLock = new Object();
107
108
109  private final Semaphore queueSemaphore = new Semaphore(0, false);
110
111
112  /**
113   * Creates a new instance of this work queue.  All initialization should be
114   * performed in the <CODE>initializeWorkQueue</CODE> method.
115   */
116  public ParallelWorkQueue()
117  {
118    // No implementation should be performed here.
119  }
120
121
122
123  /** {@inheritDoc} */
124  @Override
125  public void initializeWorkQueue(ParallelWorkQueueCfg configuration)
126         throws ConfigException, InitializationException
127  {
128    shutdownRequested = false;
129    killThreads       = false;
130    opsSubmitted      = new AtomicLong(0);
131
132    // Register to be notified of any configuration changes.
133    configuration.addParallelChangeListener(this);
134
135    // Get the necessary configuration from the provided entry.
136    numWorkerThreads =
137        computeNumWorkerThreads(configuration.getNumWorkerThreads());
138
139    // Create the actual work queue.
140    opQueue = new ConcurrentLinkedQueue<>();
141
142    // Create the set of worker threads that should be used to service the work queue.
143    workerThreads = new ArrayList<>(numWorkerThreads);
144    for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads;
145    lastThreadNumber++)
146    {
147      ParallelWorkerThread t =
148           new ParallelWorkerThread(this, lastThreadNumber);
149      t.start();
150      workerThreads.add(t);
151    }
152
153
154    // Create and register a monitor provider for the work queue.
155    try
156    {
157      ParallelWorkQueueMonitor monitor =
158           new ParallelWorkQueueMonitor(this);
159      monitor.initializeMonitorProvider(null);
160      DirectoryServer.registerMonitorProvider(monitor);
161    }
162    catch (Exception e)
163    {
164      logger.traceException(e);
165      logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, ParallelWorkQueueMonitor.class, e);
166    }
167  }
168
169
170
171  /** {@inheritDoc} */
172  @Override
173  public void finalizeWorkQueue(LocalizableMessage reason)
174  {
175    shutdownRequested = true;
176
177
178    // Send responses to any operations in the pending queue to indicate that
179    // they won't be processed because the server is shutting down.
180    CancelRequest cancelRequest = new CancelRequest(true, reason);
181    ArrayList<Operation> pendingOperations = new ArrayList<>();
182    opQueue.removeAll(pendingOperations);
183
184    for (Operation o : pendingOperations)
185    {
186      try
187      {
188        // The operation has no chance of responding to the cancel
189        // request so avoid waiting for a cancel response.
190        if (o.getCancelResult() == null) {
191          o.abort(cancelRequest);
192        }
193      }
194      catch (Exception e)
195      {
196        logger.traceException(e);
197        logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e);
198      }
199    }
200
201
202    // Notify all the worker threads of the shutdown.
203    for (ParallelWorkerThread t : workerThreads)
204    {
205      try
206      {
207        t.shutDown();
208      }
209      catch (Exception e)
210      {
211        logger.traceException(e);
212        logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e);
213      }
214    }
215  }
216
217
218
219  /**
220   * Indicates whether this work queue has received a request to shut down.
221   *
222   * @return  <CODE>true</CODE> if the work queue has recieved a request to shut
223   *          down, or <CODE>false</CODE> if not.
224   */
225  public boolean shutdownRequested()
226  {
227    return shutdownRequested;
228  }
229
230
231
232  /**
233   * Submits an operation to be processed by one of the worker threads
234   * associated with this work queue.
235   *
236   * @param  operation  The operation to be processed.
237   *
238   * @throws  DirectoryException  If the provided operation is not accepted for
239   *                              some reason (e.g., if the server is shutting
240   *                              down or the pending operation queue is already
241   *                              at its maximum capacity).
242   */
243  @Override
244  public void submitOperation(Operation operation) throws DirectoryException
245  {
246    if (shutdownRequested)
247    {
248      LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get();
249      throw new DirectoryException(ResultCode.UNAVAILABLE, message);
250    }
251
252    opQueue.add(operation);
253    queueSemaphore.release();
254
255    opsSubmitted.incrementAndGet();
256  }
257
258  /** {@inheritDoc} */
259  @Override
260  public boolean trySubmitOperation(Operation operation)
261      throws DirectoryException
262  {
263    submitOperation(operation);
264    return true;
265  }
266
267
268  /**
269   * Retrieves the next operation that should be processed by one of the worker
270   * threads, blocking if necessary until a new request arrives.  This method
271   * should only be called by a worker thread associated with this work queue.
272   *
273   * @param  workerThread  The worker thread that is requesting the operation.
274   *
275   * @return  The next operation that should be processed, or <CODE>null</CODE>
276   *          if the server is shutting down and no more operations will be
277   *          processed.
278   */
279  public Operation nextOperation(ParallelWorkerThread workerThread)
280  {
281    return retryNextOperation(workerThread, 0);
282  }
283
284
285
286  /**
287   * Retrieves the next operation that should be processed by one of the worker
288   * threads following a previous failure attempt.  A maximum of five
289   * consecutive failures will be allowed before returning <CODE>null</CODE>,
290   * which will cause the associated thread to exit.
291   *
292   * @param  workerThread  The worker thread that is requesting the operation.
293   * @param  numFailures   The number of consecutive failures that the worker
294   *                       thread has experienced so far.  If this gets too
295   *                       high, then this method will return <CODE>null</CODE>
296   *                       rather than retrying.
297   *
298   * @return  The next operation that should be processed, or <CODE>null</CODE>
299   *          if the server is shutting down and no more operations will be
300   *          processed, or if there have been too many consecutive failures.
301   */
302  private Operation retryNextOperation(
303                                       ParallelWorkerThread workerThread,
304                                       int numFailures)
305  {
306    // See if we should kill off this thread.  This could be necessary if the
307    // number of worker threads has been decreased with the server online. If
308    // so, then return null and the thread will exit.
309    if (killThreads)
310    {
311      synchronized (queueLock)
312      {
313        try
314        {
315          int currentThreads = workerThreads.size();
316          if (currentThreads > numWorkerThreads)
317          {
318            if (workerThreads.remove(Thread.currentThread()))
319            {
320              currentThreads--;
321            }
322
323            if (currentThreads <= numWorkerThreads)
324            {
325              killThreads = false;
326            }
327
328            workerThread.setStoppedByReducedThreadNumber();
329            return null;
330          }
331        }
332        catch (Exception e)
333        {
334          logger.traceException(e);
335        }
336      }
337    }
338
339    if (shutdownRequested || numFailures > MAX_RETRY_COUNT)
340    {
341      if (numFailures > MAX_RETRY_COUNT)
342      {
343        logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread
344            .currentThread().getName(), numFailures, MAX_RETRY_COUNT);
345      }
346
347      return null;
348    }
349
350    try
351    {
352      while (true)
353      {
354        Operation nextOperation = null;
355        if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
356          nextOperation = opQueue.poll();
357        }
358        if (nextOperation == null)
359        {
360          // There was no work to do in the specified length of time.  See if
361          // we should shutdown, and if not then just check again.
362          if (shutdownRequested)
363          {
364            return null;
365          }
366          else if (killThreads)
367          {
368            synchronized (queueLock)
369            {
370              try
371              {
372                int currentThreads = workerThreads.size();
373                if (currentThreads > numWorkerThreads)
374                {
375                  if (workerThreads.remove(Thread.currentThread()))
376                  {
377                    currentThreads--;
378                  }
379
380                  if (currentThreads <= numWorkerThreads)
381                  {
382                    killThreads = false;
383                  }
384
385                  workerThread.setStoppedByReducedThreadNumber();
386                  return null;
387                }
388              }
389              catch (Exception e)
390              {
391                logger.traceException(e);
392              }
393            }
394          }
395        }
396        else
397        {
398          return nextOperation;
399        }
400      }
401    }
402    catch (Exception e)
403    {
404      logger.traceException(e);
405
406      // This should not happen.  The only recourse we have is to log a message
407      // and try again.
408      logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e);
409      return retryNextOperation(workerThread, numFailures + 1);
410    }
411  }
412
413
414
415  /**
416   * Attempts to remove the specified operation from this queue if it has not
417   * yet been picked up for processing by one of the worker threads.
418   *
419   * @param  operation  The operation to remove from the queue.
420   *
421   * @return  <CODE>true</CODE> if the provided request was present in the queue
422   *          and was removed successfully, or <CODE>false</CODE> it not.
423   */
424  public boolean removeOperation(Operation operation)
425  {
426    return opQueue.remove(operation);
427  }
428
429
430
431  /**
432   * Retrieves the total number of operations that have been successfully
433   * submitted to this work queue for processing since server startup.  This
434   * does not include operations that have been rejected for some reason like
435   * the queue already at its maximum capacity.
436   *
437   * @return  The total number of operations that have been successfully
438   *          submitted to this work queue since startup.
439   */
440  public long getOpsSubmitted()
441  {
442    return opsSubmitted.longValue();
443  }
444
445
446
447  /**
448   * Retrieves the number of pending operations in the queue that have not yet
449   * been picked up for processing.  Note that this method is not a
450   * constant-time operation and can be relatively inefficient, so it should be
451   * used sparingly.
452   *
453   * @return  The number of pending operations in the queue that have not yet
454   *          been picked up for processing.
455   */
456  public int size()
457  {
458    return opQueue.size();
459  }
460
461
462
463  /** {@inheritDoc} */
464  @Override
465  public boolean isConfigurationChangeAcceptable(
466                      ParallelWorkQueueCfg configuration,
467                      List<LocalizableMessage> unacceptableReasons)
468  {
469    return true;
470  }
471
472
473
474  /** {@inheritDoc} */
475  @Override
476  public ConfigChangeResult applyConfigurationChange(
477                                 ParallelWorkQueueCfg configuration)
478  {
479    int newNumThreads =
480        computeNumWorkerThreads(configuration.getNumWorkerThreads());
481
482    // Apply a change to the number of worker threads if appropriate.
483    int currentThreads = workerThreads.size();
484    if (newNumThreads != currentThreads)
485    {
486      synchronized (queueLock)
487      {
488        try
489        {
490          int threadsToAdd = newNumThreads - currentThreads;
491          if (threadsToAdd > 0)
492          {
493            for (int i = 0; i < threadsToAdd; i++)
494            {
495              ParallelWorkerThread t =
496                   new ParallelWorkerThread(this, lastThreadNumber++);
497              workerThreads.add(t);
498              t.start();
499            }
500
501            killThreads = false;
502          }
503          else
504          {
505            killThreads = true;
506          }
507
508          numWorkerThreads = newNumThreads;
509        }
510        catch (Exception e)
511        {
512          logger.traceException(e);
513        }
514      }
515    }
516    return new ConfigChangeResult();
517  }
518
519
520
521  /** {@inheritDoc} */
522  @Override
523  public boolean isIdle()
524  {
525    if (!opQueue.isEmpty()) {
526      return false;
527    }
528
529    synchronized (queueLock)
530    {
531      for (ParallelWorkerThread t : workerThreads)
532      {
533        if (t.isActive())
534        {
535          return false;
536        }
537      }
538
539      return true;
540    }
541  }
542
543  /**
544   * Return the number of worker threads used by this WorkQueue.
545   *
546   * @return the number of worker threads used by this WorkQueue
547   */
548  @Override
549  public int getNumWorkerThreads()
550  {
551    return this.numWorkerThreads;
552  }
553}