001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2009 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS.
026 */
027package org.opends.server.extensions;
028
029
030import java.util.Map;
031
032import org.forgerock.i18n.LocalizableMessage;
033import org.opends.server.api.DirectoryThread;
034import org.opends.server.core.DirectoryServer;
035import org.forgerock.i18n.slf4j.LocalizedLogger;
036import org.opends.server.types.CancelRequest;
037import org.opends.server.types.DisconnectReason;
038import org.opends.server.types.Operation;
039
040import static org.opends.messages.CoreMessages.*;
041import static org.opends.server.util.StaticUtils.*;
042
043
044/**
045 * This class defines a data structure for storing and interacting with a
046 * Directory Server worker thread.
047 */
048public class ParallelWorkerThread
049       extends DirectoryThread
050{
051  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
052
053  /**
054   * Indicates whether the Directory Server is shutting down and this thread
055   * should stop running.
056   */
057  private boolean shutdownRequested;
058
059  /**
060   * Indicates whether this thread was stopped because the server threadnumber
061   * was reduced.
062   */
063  private boolean stoppedByReducedThreadNumber;
064
065  /** Indicates whether this thread is currently waiting for work. */
066  private boolean waitingForWork;
067
068  /** The operation that this worker thread is currently processing. */
069  private Operation operation;
070
071  /** The handle to the actual thread for this worker thread. */
072  private Thread workerThread;
073
074  /** The work queue that this worker thread will service. */
075  private ParallelWorkQueue workQueue;
076
077
078
079  /**
080   * Creates a new worker thread that will service the provided work queue and
081   * process any new requests that are submitted.
082   *
083   * @param  workQueue  The work queue with which this worker thread is
084   *                    associated.
085   * @param  threadID   The thread ID for this worker thread.
086   */
087  public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID)
088  {
089    super("Worker Thread " + threadID);
090
091
092    this.workQueue = workQueue;
093
094    stoppedByReducedThreadNumber = false;
095    shutdownRequested            = false;
096    waitingForWork               = false;
097    operation                    = null;
098    workerThread                 = null;
099  }
100
101
102
103  /**
104   * Indicates that this thread is about to be stopped because the Directory
105   * Server configuration has been updated to reduce the number of worker
106   * threads.
107   */
108  public void setStoppedByReducedThreadNumber()
109  {
110    stoppedByReducedThreadNumber = true;
111  }
112
113
114
115  /**
116   * Indicates whether this worker thread is actively processing a request.
117   * Note that this is a point-in-time determination and if a reliable answer is
118   * expected then the server should impose some external constraint to ensure
119   * that no new requests are enqueued.
120   *
121   * @return  {@code true} if this worker thread is actively processing a
122   *          request, or {@code false} if it is idle.
123   */
124  public boolean isActive()
125  {
126    return isAlive() && operation != null;
127  }
128
129
130
131  /**
132   * Operates in a loop, retrieving the next request from the work queue,
133   * processing it, and then going back to the queue for more.
134   */
135  @Override
136  public void run()
137  {
138    workerThread = currentThread();
139
140    while (! shutdownRequested)
141    {
142      try
143      {
144        waitingForWork = true;
145        operation = null;
146        operation = workQueue.nextOperation(this);
147        waitingForWork = false;
148
149
150        if (operation == null)
151        {
152          // The operation may be null if the server is shutting down.  If that
153          // is the case, then break out of the while loop.
154          break;
155        }
156        else
157        {
158          // The operation is not null, so process it.  Make sure that when
159          // processing is complete.
160          operation.run();
161          operation.operationCompleted();
162        }
163      }
164      catch (Throwable t)
165      {
166        if (logger.isTraceEnabled())
167        {
168          logger.trace(
169            "Uncaught exception in worker thread while processing " +
170                "operation %s: %s", operation, t);
171
172          logger.traceException(t);
173        }
174
175        try
176        {
177          LocalizableMessage message =
178              ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t));
179          logger.error(message);
180
181          operation.setResultCode(DirectoryServer.getServerErrorResultCode());
182          operation.appendErrorMessage(message);
183          operation.getClientConnection().sendResponse(operation);
184        }
185        catch (Throwable t2)
186        {
187          if (logger.isTraceEnabled())
188          {
189            logger.trace(
190              "Exception in worker thread while trying to log a " +
191                  "message about an uncaught exception %s: %s", t, t2);
192
193            logger.traceException(t2);
194          }
195        }
196
197
198        try
199        {
200          LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(
201              getName(), operation, stackTraceToSingleLineString(t));
202          operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message);
203        }
204        catch (Throwable t2)
205        {
206          logger.traceException(t2);
207        }
208      }
209    }
210
211    // If we have gotten here, then we presume that the server thread is
212    // shutting down.  However, if that's not the case then that is a problem
213    // and we will want to log a message.
214    if (stoppedByReducedThreadNumber)
215    {
216      logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName());
217    }
218    else if (! workQueue.shutdownRequested())
219    {
220      logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName());
221    }
222
223
224    if (logger.isTraceEnabled())
225    {
226      logger.trace(getName() + " exiting.");
227    }
228  }
229
230
231
232  /**
233   * Indicates that the Directory Server has received a request to stop running
234   * and that this thread should stop running as soon as possible.
235   */
236  public void shutDown()
237  {
238    if (logger.isTraceEnabled())
239    {
240      logger.trace(getName() + " being signaled to shut down.");
241    }
242
243    // Set a flag that indicates that the thread should stop running.
244    shutdownRequested = true;
245
246
247    // Check to see if the thread is waiting for work.  If so, then interrupt
248    // it.
249    if (waitingForWork)
250    {
251      try
252      {
253        workerThread.interrupt();
254      }
255      catch (Exception e)
256      {
257        if (logger.isTraceEnabled())
258        {
259          logger.trace(
260            "Caught an exception while trying to interrupt the worker " +
261                "thread waiting for work: %s", e);
262          logger.traceException(e);
263        }
264      }
265    }
266    else
267    {
268      try
269      {
270        CancelRequest cancelRequest =
271          new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get());
272        operation.cancel(cancelRequest);
273      }
274      catch (Exception e)
275      {
276        if (logger.isTraceEnabled())
277        {
278          logger.trace(
279            "Caught an exception while trying to abandon the " +
280                "operation in progress for the worker thread: %s", e);
281          logger.traceException(e);
282        }
283      }
284    }
285  }
286
287  /**
288   * Retrieves any relevent debug information with which this tread is
289   * associated so they can be included in debug messages.
290   *
291   * @return debug information about this thread as a string.
292   */
293  @Override
294  public Map<String, String> getDebugProperties()
295  {
296    Map<String, String> properties = super.getDebugProperties();
297    properties.put("clientConnection",
298                   operation.getClientConnection().toString());
299    properties.put("operation", operation.toString());
300
301    return properties;
302  }
303}
304