001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2014 ForgeRock AS 026 */ 027package org.opends.server.extensions; 028 029import java.util.Map; 030 031import org.forgerock.i18n.LocalizableMessage; 032import org.opends.server.api.DirectoryThread; 033import org.opends.server.core.DirectoryServer; 034import org.forgerock.i18n.slf4j.LocalizedLogger; 035import org.opends.server.types.CancelRequest; 036import org.opends.server.types.DisconnectReason; 037import org.opends.server.types.Operation; 038 039import static org.opends.messages.CoreMessages.*; 040import static org.opends.server.util.StaticUtils.*; 041 042/** 043 * This class defines a data structure for storing and interacting with a 044 * Directory Server worker thread. 045 */ 046public class TraditionalWorkerThread 047 extends DirectoryThread 048{ 049 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 050 051 /** 052 * Indicates whether the Directory Server is shutting down and this thread 053 * should stop running. 054 */ 055 private volatile boolean shutdownRequested; 056 057 /** 058 * Indicates whether this thread was stopped because the server thread number 059 * was reduced. 060 */ 061 private boolean stoppedByReducedThreadNumber; 062 063 /** Indicates whether this thread is currently waiting for work. */ 064 private boolean waitingForWork; 065 066 /** The operation that this worker thread is currently processing. */ 067 private volatile Operation operation; 068 069 /** The handle to the actual thread for this worker thread. */ 070 private Thread workerThread; 071 072 /** The work queue that this worker thread will service. */ 073 private final TraditionalWorkQueue workQueue; 074 075 076 077 /** 078 * Creates a new worker thread that will service the provided work queue and 079 * process any new requests that are submitted. 080 * 081 * @param workQueue The work queue with which this worker thread is 082 * associated. 083 * @param threadID The thread ID for this worker thread. 084 */ 085 public TraditionalWorkerThread(TraditionalWorkQueue workQueue, int threadID) 086 { 087 super("Worker Thread " + threadID); 088 089 090 this.workQueue = workQueue; 091 092 stoppedByReducedThreadNumber = false; 093 shutdownRequested = false; 094 waitingForWork = false; 095 operation = null; 096 workerThread = null; 097 } 098 099 100 101 /** 102 * Indicates that this thread is about to be stopped because the Directory 103 * Server configuration has been updated to reduce the number of worker 104 * threads. 105 */ 106 public void setStoppedByReducedThreadNumber() 107 { 108 stoppedByReducedThreadNumber = true; 109 } 110 111 112 113 /** 114 * Indicates whether this worker thread is actively processing a request. 115 * Note that this is a point-in-time determination and if a reliable answer is 116 * expected then the server should impose some external constraint to ensure 117 * that no new requests are enqueued. 118 * 119 * @return {@code true} if this worker thread is actively processing a 120 * request, or {@code false} if it is idle. 121 */ 122 public boolean isActive() 123 { 124 return isAlive() && operation != null; 125 } 126 127 128 129 /** 130 * Operates in a loop, retrieving the next request from the work queue, 131 * processing it, and then going back to the queue for more. 132 */ 133 @Override 134 public void run() 135 { 136 workerThread = currentThread(); 137 138 while (! shutdownRequested) 139 { 140 try 141 { 142 waitingForWork = true; 143 operation = null; // this line is necessary because next line can block 144 operation = workQueue.nextOperation(this); 145 waitingForWork = false; 146 147 148 if (operation == null) 149 { 150 // The operation may be null if the server is shutting down. If that 151 // is the case, then break out of the while loop. 152 break; 153 } 154 else 155 { 156 // The operation is not null, so process it. Make sure that when 157 // processing is complete. 158 operation.run(); 159 operation.operationCompleted(); 160 } 161 } 162 catch (Throwable t) 163 { 164 if (logger.isTraceEnabled()) 165 { 166 logger.trace( 167 "Uncaught exception in worker thread while processing " + 168 "operation %s: %s", operation, t); 169 logger.traceException(t); 170 } 171 172 try 173 { 174 LocalizableMessage message = 175 ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t)); 176 logger.error(message); 177 178 // Ensure that the client receives some kind of result so that it does 179 // not hang. 180 operation.setResultCode(DirectoryServer.getServerErrorResultCode()); 181 operation.appendErrorMessage(message); 182 operation.getClientConnection().sendResponse(operation); 183 } 184 catch (Throwable t2) 185 { 186 if (logger.isTraceEnabled()) 187 { 188 logger.trace( 189 "Exception in worker thread while trying to log a " + 190 "message about an uncaught exception %s: %s", t, t2); 191 192 logger.traceException(t2); 193 } 194 } 195 196 197 try 198 { 199 LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get( 200 getName(), operation, stackTraceToSingleLineString(t)); 201 202 operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message); 203 } 204 catch (Throwable t2) 205 { 206 logger.traceException(t2); 207 } 208 } 209 } 210 211 // If we have gotten here, then we presume that the server thread is 212 // shutting down. However, if that's not the case then that is a problem 213 // and we will want to log a message. 214 if (stoppedByReducedThreadNumber) 215 { 216 logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName()); 217 } 218 else if (! workQueue.shutdownRequested()) 219 { 220 logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName()); 221 } 222 223 224 if (logger.isTraceEnabled()) 225 { 226 logger.trace(getName() + " exiting."); 227 } 228 } 229 230 231 232 /** 233 * Indicates that the Directory Server has received a request to stop running 234 * and that this thread should stop running as soon as possible. 235 */ 236 public void shutDown() 237 { 238 if (logger.isTraceEnabled()) 239 { 240 logger.trace(getName() + " being signaled to shut down."); 241 } 242 243 // Set a flag that indicates that the thread should stop running. 244 shutdownRequested = true; 245 246 247 // Check to see if the thread is waiting for work. If so, then interrupt 248 // it. 249 if (waitingForWork) 250 { 251 try 252 { 253 workerThread.interrupt(); 254 } 255 catch (Exception e) 256 { 257 if (logger.isTraceEnabled()) 258 { 259 logger.trace( 260 "Caught an exception while trying to interrupt the worker " + 261 "thread waiting for work: %s", e); 262 logger.traceException(e); 263 } 264 } 265 } 266 else 267 { 268 try 269 { 270 final Operation localOperation = operation; 271 if (localOperation != null) 272 { 273 CancelRequest cancelRequest = new CancelRequest(true, 274 INFO_CANCELED_BY_SHUTDOWN.get()); 275 localOperation.cancel(cancelRequest); 276 } 277 } 278 catch (Exception e) 279 { 280 if (logger.isTraceEnabled()) 281 { 282 logger.trace( 283 "Caught an exception while trying to abandon the " + 284 "operation in progress for the worker thread: %s", e); 285 logger.traceException(e); 286 } 287 } 288 } 289 } 290 291 /** 292 * Retrieves any relevant debug information with which this tread is 293 * associated so they can be included in debug messages. 294 * 295 * @return debug information about this thread as a string. 296 */ 297 @Override 298 public Map<String, String> getDebugProperties() 299 { 300 Map<String, String> properties = super.getDebugProperties(); 301 properties.put("clientConnection", operation != null 302 ? String.valueOf(operation.getClientConnection()) : "none"); 303 properties.put("operation", String.valueOf(operation)); 304 return properties; 305 } 306} 307