001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2009 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS. 026 */ 027package org.opends.server.extensions; 028 029 030import java.util.Map; 031 032import org.forgerock.i18n.LocalizableMessage; 033import org.opends.server.api.DirectoryThread; 034import org.opends.server.core.DirectoryServer; 035import org.forgerock.i18n.slf4j.LocalizedLogger; 036import org.opends.server.types.CancelRequest; 037import org.opends.server.types.DisconnectReason; 038import org.opends.server.types.Operation; 039 040import static org.opends.messages.CoreMessages.*; 041import static org.opends.server.util.StaticUtils.*; 042 043 044/** 045 * This class defines a data structure for storing and interacting with a 046 * Directory Server worker thread. 047 */ 048public class ParallelWorkerThread 049 extends DirectoryThread 050{ 051 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 052 053 /** 054 * Indicates whether the Directory Server is shutting down and this thread 055 * should stop running. 056 */ 057 private boolean shutdownRequested; 058 059 /** 060 * Indicates whether this thread was stopped because the server threadnumber 061 * was reduced. 062 */ 063 private boolean stoppedByReducedThreadNumber; 064 065 /** Indicates whether this thread is currently waiting for work. */ 066 private boolean waitingForWork; 067 068 /** The operation that this worker thread is currently processing. */ 069 private Operation operation; 070 071 /** The handle to the actual thread for this worker thread. */ 072 private Thread workerThread; 073 074 /** The work queue that this worker thread will service. */ 075 private ParallelWorkQueue workQueue; 076 077 078 079 /** 080 * Creates a new worker thread that will service the provided work queue and 081 * process any new requests that are submitted. 082 * 083 * @param workQueue The work queue with which this worker thread is 084 * associated. 085 * @param threadID The thread ID for this worker thread. 086 */ 087 public ParallelWorkerThread(ParallelWorkQueue workQueue, int threadID) 088 { 089 super("Worker Thread " + threadID); 090 091 092 this.workQueue = workQueue; 093 094 stoppedByReducedThreadNumber = false; 095 shutdownRequested = false; 096 waitingForWork = false; 097 operation = null; 098 workerThread = null; 099 } 100 101 102 103 /** 104 * Indicates that this thread is about to be stopped because the Directory 105 * Server configuration has been updated to reduce the number of worker 106 * threads. 107 */ 108 public void setStoppedByReducedThreadNumber() 109 { 110 stoppedByReducedThreadNumber = true; 111 } 112 113 114 115 /** 116 * Indicates whether this worker thread is actively processing a request. 117 * Note that this is a point-in-time determination and if a reliable answer is 118 * expected then the server should impose some external constraint to ensure 119 * that no new requests are enqueued. 120 * 121 * @return {@code true} if this worker thread is actively processing a 122 * request, or {@code false} if it is idle. 123 */ 124 public boolean isActive() 125 { 126 return isAlive() && operation != null; 127 } 128 129 130 131 /** 132 * Operates in a loop, retrieving the next request from the work queue, 133 * processing it, and then going back to the queue for more. 134 */ 135 @Override 136 public void run() 137 { 138 workerThread = currentThread(); 139 140 while (! shutdownRequested) 141 { 142 try 143 { 144 waitingForWork = true; 145 operation = null; 146 operation = workQueue.nextOperation(this); 147 waitingForWork = false; 148 149 150 if (operation == null) 151 { 152 // The operation may be null if the server is shutting down. If that 153 // is the case, then break out of the while loop. 154 break; 155 } 156 else 157 { 158 // The operation is not null, so process it. Make sure that when 159 // processing is complete. 160 operation.run(); 161 operation.operationCompleted(); 162 } 163 } 164 catch (Throwable t) 165 { 166 if (logger.isTraceEnabled()) 167 { 168 logger.trace( 169 "Uncaught exception in worker thread while processing " + 170 "operation %s: %s", operation, t); 171 172 logger.traceException(t); 173 } 174 175 try 176 { 177 LocalizableMessage message = 178 ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get(getName(), operation, stackTraceToSingleLineString(t)); 179 logger.error(message); 180 181 operation.setResultCode(DirectoryServer.getServerErrorResultCode()); 182 operation.appendErrorMessage(message); 183 operation.getClientConnection().sendResponse(operation); 184 } 185 catch (Throwable t2) 186 { 187 if (logger.isTraceEnabled()) 188 { 189 logger.trace( 190 "Exception in worker thread while trying to log a " + 191 "message about an uncaught exception %s: %s", t, t2); 192 193 logger.traceException(t2); 194 } 195 } 196 197 198 try 199 { 200 LocalizableMessage message = ERR_UNCAUGHT_WORKER_THREAD_EXCEPTION.get( 201 getName(), operation, stackTraceToSingleLineString(t)); 202 operation.disconnectClient(DisconnectReason.SERVER_ERROR, true, message); 203 } 204 catch (Throwable t2) 205 { 206 logger.traceException(t2); 207 } 208 } 209 } 210 211 // If we have gotten here, then we presume that the server thread is 212 // shutting down. However, if that's not the case then that is a problem 213 // and we will want to log a message. 214 if (stoppedByReducedThreadNumber) 215 { 216 logger.debug(INFO_WORKER_STOPPED_BY_REDUCED_THREADNUMBER, getName()); 217 } 218 else if (! workQueue.shutdownRequested()) 219 { 220 logger.warn(WARN_UNEXPECTED_WORKER_THREAD_EXIT, getName()); 221 } 222 223 224 if (logger.isTraceEnabled()) 225 { 226 logger.trace(getName() + " exiting."); 227 } 228 } 229 230 231 232 /** 233 * Indicates that the Directory Server has received a request to stop running 234 * and that this thread should stop running as soon as possible. 235 */ 236 public void shutDown() 237 { 238 if (logger.isTraceEnabled()) 239 { 240 logger.trace(getName() + " being signaled to shut down."); 241 } 242 243 // Set a flag that indicates that the thread should stop running. 244 shutdownRequested = true; 245 246 247 // Check to see if the thread is waiting for work. If so, then interrupt 248 // it. 249 if (waitingForWork) 250 { 251 try 252 { 253 workerThread.interrupt(); 254 } 255 catch (Exception e) 256 { 257 if (logger.isTraceEnabled()) 258 { 259 logger.trace( 260 "Caught an exception while trying to interrupt the worker " + 261 "thread waiting for work: %s", e); 262 logger.traceException(e); 263 } 264 } 265 } 266 else 267 { 268 try 269 { 270 CancelRequest cancelRequest = 271 new CancelRequest(true, INFO_CANCELED_BY_SHUTDOWN.get()); 272 operation.cancel(cancelRequest); 273 } 274 catch (Exception e) 275 { 276 if (logger.isTraceEnabled()) 277 { 278 logger.trace( 279 "Caught an exception while trying to abandon the " + 280 "operation in progress for the worker thread: %s", e); 281 logger.traceException(e); 282 } 283 } 284 } 285 } 286 287 /** 288 * Retrieves any relevent debug information with which this tread is 289 * associated so they can be included in debug messages. 290 * 291 * @return debug information about this thread as a string. 292 */ 293 @Override 294 public Map<String, String> getDebugProperties() 295 { 296 Map<String, String> properties = super.getDebugProperties(); 297 properties.put("clientConnection", 298 operation.getClientConnection().toString()); 299 properties.put("operation", operation.toString()); 300 301 return properties; 302 } 303} 304