001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS.
026 */
027package org.opends.server.api;
028
029import static org.opends.messages.CoreMessages.*;
030
031import org.forgerock.i18n.slf4j.LocalizedLogger;
032import org.forgerock.i18n.LocalizableMessage;
033import org.opends.server.admin.std.server.WorkQueueCfg;
034import org.forgerock.opendj.config.server.ConfigException;
035import org.opends.server.types.DirectoryException;
036import org.opends.server.types.InitializationException;
037import org.opends.server.types.Operation;
038import org.opends.server.util.Platform;
039
040/**
041 * This class defines the structure and methods that must be
042 * implemented by a Directory Server work queue.  The work queue is
043 * the component of the server that accepts requests from connection
044 * handlers and ensures that they are properly processed.  The manner
045 * in which the work queue is able to accomplish this may vary between
046 * implementations, but in general it is assumed that one or more
047 * worker threads will be associated with the queue and may be used to
048 * process requests in parallel.
049 *
050 * @param  <T>  The type of configuration handled by this work queue.
051 */
052@org.opends.server.types.PublicAPI(
053     stability=org.opends.server.types.StabilityLevel.VOLATILE,
054     mayInstantiate=false,
055     mayExtend=true,
056     mayInvoke=true)
057public abstract class WorkQueue<T extends WorkQueueCfg>
058{
059
060  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
061
062  /**
063   * Initializes this work queue based on the information in the
064   * provided configuration entry.
065   *
066   * @param  configuration  The configuration to use to initialize
067   *                        the work queue.
068   *
069   * @throws  ConfigException  If the provided configuration entry
070   *                           does not have a valid work queue
071   *                           configuration.
072   *
073   * @throws  InitializationException  If a problem occurs during
074   *                                   initialization that is not
075   *                                   related to the server
076   *                                   configuration.
077   */
078  public abstract void initializeWorkQueue(T configuration)
079         throws ConfigException, InitializationException;
080
081
082
083  /**
084   * Performs any necessary finalization for this work queue,
085   * including ensuring that all active operations are interrupted or
086   * will be allowed to complete, and that all pending operations will
087   * be cancelled.
088   *
089   * @param  reason  The human-readable reason that the work queue is
090   *                 being shut down.
091   */
092  public abstract void finalizeWorkQueue(LocalizableMessage reason);
093
094
095
096  /**
097   * Submits an operation to be processed in the server.
098   *
099   * @param  operation  The operation to be processed.
100   *
101   * @throws  DirectoryException  If the provided operation is not
102   *                              accepted for some reason (e.g., if
103   *                              the server is shutting down or
104   *                              already has too many pending
105   *                              requests in the queue).
106   */
107  public abstract void submitOperation(Operation operation)
108         throws DirectoryException;
109
110
111
112  /**
113   * Tries to submit an operation to be processed in the server, without
114   * blocking.
115   *
116   * @param operation
117   *          The operation to be processed.
118   * @return true if the operation could be submitted to the queue, false if the
119   *         queue was full
120   * @throws DirectoryException
121   *           If the provided operation is not accepted for some reason (e.g.,
122   *           if the server is shutting down).
123   */
124  public abstract boolean trySubmitOperation(Operation operation)
125      throws DirectoryException;
126
127
128  /**
129   * Indicates whether the work queue is currently processing any
130   * requests.  Note that this is a point-in-time determination, and
131   * if any component of the server wishes to depend on a quiescent
132   * state then it should use some external mechanism to ensure that
133   * no other requests are submitted to the queue.
134   *
135   * @return  {@code true} if the work queue is currently idle, or
136   *          {@code false} if it is being used to process one or more
137   *          operations.
138   */
139  public abstract boolean isIdle();
140
141
142  /**
143   * Return the maximum number of worker threads that can be used by this
144   * WorkQueue (The WorkQueue could have a thread pool which adjusts its size).
145   *
146   * @return the maximum number of worker threads that can be used by this
147   *         WorkQueue
148   */
149  public abstract int getNumWorkerThreads();
150
151
152  /**
153   * Computes the number of worker threads to use by the working queue based on
154   * the configured number.
155   *
156   * @param configuredNumWorkerThreads
157   *          the configured number of worker threads to use
158   * @return the number of worker threads to use
159   */
160  protected int computeNumWorkerThreads(Integer configuredNumWorkerThreads)
161  {
162    if (configuredNumWorkerThreads != null)
163    {
164      return configuredNumWorkerThreads;
165    }
166    else
167    {
168      // Automatically choose based on the number of processors.
169      int value = Platform.computeNumberOfThreads(16, 2.0f);
170      logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value);
171      return value;
172    }
173  }
174
175  /**
176   * Waits for the work queue to become idle before returning.  Note
177   * that this is a point-in-time determination, and if any component
178   * of the server wishes to depend on a quiescent state then it
179   * should use some external mechanism to ensure that no other
180   * requests are submitted to the queue.
181   *
182   * @param  timeLimit  The maximum length of time in milliseconds
183   *                    that this method should wait for the queue to
184   *                    become idle before giving up.  A time limit
185   *                    that is less than or equal to zero indicates
186   *                    that there should not be a time limit.
187   *
188   * @return  {@code true} if the work queue is idle at the time that
189   *          this method returns, or {@code false} if the wait time
190   *          limit was reached before the server became idle.
191   */
192  public boolean waitUntilIdle(long timeLimit)
193  {
194    long stopWaitingTime;
195    if (timeLimit <= 0)
196    {
197      stopWaitingTime = Long.MAX_VALUE;
198    }
199    else
200    {
201      stopWaitingTime = System.currentTimeMillis() + timeLimit;
202    }
203
204    while (System.currentTimeMillis() < stopWaitingTime)
205    {
206      if (isIdle())
207      {
208        return true;
209      }
210
211      try
212      {
213        Thread.sleep(1);
214      } catch (InterruptedException ie) {}
215    }
216
217    return false;
218  }
219}
220