001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2014 ForgeRock AS
026 */
027package org.opends.server.extensions;
028
029import java.util.Map;
030
031import org.forgerock.i18n.LocalizableMessage;
032import org.opends.server.api.DirectoryThread;
033import org.opends.server.core.DirectoryServer;
034import org.forgerock.i18n.slf4j.LocalizedLogger;
035import org.opends.server.types.CancelRequest;
036import org.opends.server.types.DisconnectReason;
037import org.opends.server.types.Operation;
038
039import static org.opends.messages.CoreMessages.*;
040import static org.opends.server.util.StaticUtils.*;
041
042/**
043 * This class defines a data structure for storing and interacting with a
044 * Directory Server worker thread.
045 */
046public class TraditionalWorkerThread
047       extends DirectoryThread
048{
049  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
050
051  /**
052   * Indicates whether the Directory Server is shutting down and this thread
053   * should stop running.
054   */
055  private volatile boolean shutdownRequested;
056
057  /**
058   * Indicates whether this thread was stopped because the server thread number
059   * was reduced.
060   */
061  private boolean stoppedByReducedThreadNumber;
062
063  /** Indicates whether this thread is currently waiting for work. */
064  private boolean waitingForWork;
065
066  /** The operation that this worker thread is currently processing. */
067  private volatile Operation operation;
068
069  /** The handle to the actual thread for this worker thread. */
070  private Thread workerThread;
071
072  /** The work queue that this worker thread will service. */
073  private final TraditionalWorkQueue workQueue;
074
075
076
077  /**
078   * Creates a new worker thread that will service the provided work queue and
079   * process any new requests that are submitted.
080   *
081   * @param  workQueue  The work queue with which this worker thread is
082   *                    associated.
083   * @param  threadID   The thread ID for this worker thread.
084   */
085  public TraditionalWorkerThread(TraditionalWorkQueue workQueue, int threadID)
086  {
087    super("Worker Thread " + threadID);
088
089
090    this.workQueue = workQueue;
091
092    stoppedByReducedThreadNumber = false;
093    shutdownRequested            = false;
094    waitingForWork               = false;
095    operation                    = null;
096    workerThread                 = null;
097  }
098
099
100
101  /**
102   * Indicates that this thread is about to be stopped because the Directory
103   * Server configuration has been updated to reduce the number of worker
104   * threads.
105   */
106  public void setStoppedByReducedThreadNumber()
107  {
108    stoppedByReducedThreadNumber = true;
109  }
110
111
112
113  /**
114   * Indicates whether this worker thread is actively processing a request.
115   * Note that this is a point-in-time determination and if a reliable answer is
116   * expected then the server should impose some external constraint to ensure
117   * that no new requests are enqueued.
118   *
119   * @return  {@code true} if this worker thread is actively processing a
120   *          request, or {@code false} if it is idle.
121   */
122  public boolean isActive()
123  {
124    return isAlive() && operation != null;
125  }
126
127
128
129  /**
130   * Operates in a loop, retrieving the next request from the work queue,
131   * processing it, and then going back to the queue for more.
132   */
133  @Override
134  public void run()
135  {
136    workerThread = currentThread();
137
138    while (! shutdownRequested)
139    {
140      try
141      {
142        waitingForWork = true;
143        operation = null; // this line is necessary because next line can block
144        operation = workQueue.nextOperation(this);
145        waitingForWork = false;
146
147
148        if (operation == null)
149        {
150          // The operation may be null if the server is shutting down.  If that
151          // is the case, then break out of the while loop.
152          break;
153        }
154        else
155        {
156          // The operation is not null, so process it.  Make sure that when
157          // processing is complete.
158          operation.run();
159          operation.operationCompleted();
160        }
161      }
162      catch (Throwable t)
163      {
164        if (logger.isTraceEnabled())
165        {
166          logger.trace(
167            "Uncaught exception in worker thread while processing " +
168                "operation %s: %s", operation, t);
169          logger.traceException(t);
170        }
171
172        try
173        {
174          LocalizableMessage message =
175              ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t));
176          logger.error(message);
177
178          // Ensure that the client receives some kind of result so that it does
179          // not hang.
180          operation.setResultCode(DirectoryServer.getServerErrorResultCode());
181          operation.appendErrorMessage(message);
182          operation.getClientConnection().sendResponse(operation);
183        }
184        catch (Throwable t2)
185        {
186          if (logger.isTraceEnabled())
187          {
188            logger.trace(
189              "Exception in worker thread while trying to log a " +
190                  "message about an uncaught exception %s: %s", t, t2);
191
192            logger.traceException(t2);
193          }
194        }
195
196
197        try
198        {
199          LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(
200              getName(), operation, stackTraceToSingleLineString(t));
201
202          operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message);
203        }
204        catch (Throwable t2)
205        {
206          logger.traceException(t2);
207        }
208      }
209    }
210
211    // If we have gotten here, then we presume that the server thread is
212    // shutting down.  However, if that's not the case then that is a problem
213    // and we will want to log a message.
214    if (stoppedByReducedThreadNumber)
215    {
216      logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName());
217    }
218    else if (! workQueue.shutdownRequested())
219    {
220      logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName());
221    }
222
223
224    if (logger.isTraceEnabled())
225    {
226      logger.trace(getName() + " exiting.");
227    }
228  }
229
230
231
232  /**
233   * Indicates that the Directory Server has received a request to stop running
234   * and that this thread should stop running as soon as possible.
235   */
236  public void shutDown()
237  {
238    if (logger.isTraceEnabled())
239    {
240      logger.trace(getName() + " being signaled to shut down.");
241    }
242
243    // Set a flag that indicates that the thread should stop running.
244    shutdownRequested = true;
245
246
247    // Check to see if the thread is waiting for work.  If so, then interrupt
248    // it.
249    if (waitingForWork)
250    {
251      try
252      {
253        workerThread.interrupt();
254      }
255      catch (Exception e)
256      {
257        if (logger.isTraceEnabled())
258        {
259          logger.trace(
260            "Caught an exception while trying to interrupt the worker " +
261                "thread waiting for work: %s", e);
262          logger.traceException(e);
263        }
264      }
265    }
266    else
267    {
268      try
269      {
270        final Operation localOperation = operation;
271        if (localOperation != null)
272        {
273          CancelRequest cancelRequest = new CancelRequest(true,
274              INFO_CANCELED_BY_SHUTDOWN.get());
275          localOperation.cancel(cancelRequest);
276        }
277      }
278      catch (Exception e)
279      {
280        if (logger.isTraceEnabled())
281        {
282          logger.trace(
283            "Caught an exception while trying to abandon the " +
284                "operation in progress for the worker thread: %s", e);
285          logger.traceException(e);
286        }
287      }
288    }
289  }
290
291  /**
292   * Retrieves any relevant debug information with which this tread is
293   * associated so they can be included in debug messages.
294   *
295   * @return debug information about this thread as a string.
296   */
297  @Override
298  public Map<String, String> getDebugProperties()
299  {
300    Map<String, String> properties = super.getDebugProperties();
301    properties.put("clientConnection", operation != null
302        ? String.valueOf(operation.getClientConnection()) : "none");
303    properties.put("operation", String.valueOf(operation));
304    return properties;
305  }
306}
307