001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS. 026 */ 027package org.opends.server.extensions; 028 029import static org.opends.messages.ConfigMessages.*; 030import static org.opends.messages.CoreMessages.*; 031 032import java.util.ArrayList; 033import java.util.List; 034import java.util.concurrent.ConcurrentLinkedQueue; 035import java.util.concurrent.Semaphore; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicLong; 038 039import org.forgerock.i18n.LocalizableMessage; 040import org.forgerock.i18n.slf4j.LocalizedLogger; 041import org.forgerock.opendj.config.server.ConfigException; 042import org.forgerock.opendj.ldap.ResultCode; 043import org.opends.server.admin.server.ConfigurationChangeListener; 044import org.opends.server.admin.std.server.ParallelWorkQueueCfg; 045import org.opends.server.api.WorkQueue; 046import org.opends.server.core.DirectoryServer; 047import org.opends.server.monitors.ParallelWorkQueueMonitor; 048import org.opends.server.types.CancelRequest; 049import org.forgerock.opendj.config.server.ConfigChangeResult; 050import org.opends.server.types.DirectoryException; 051import org.opends.server.types.InitializationException; 052import org.opends.server.types.Operation; 053 054/** 055 * This class defines a data structure for storing and interacting with the 056 * Directory Server work queue. 057 */ 058public class ParallelWorkQueue 059 extends WorkQueue<ParallelWorkQueueCfg> 060 implements ConfigurationChangeListener<ParallelWorkQueueCfg> 061{ 062 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 063 064 065 066 067 /** 068 * The maximum number of times to retry getting the next operation from the 069 * queue if an unexpected failure occurs. 070 */ 071 private static final int MAX_RETRY_COUNT = 5; 072 073 074 075 /** The set of worker threads that will be used to process this work queue. */ 076 private ArrayList<ParallelWorkerThread> workerThreads; 077 078 /** 079 * The number of operations that have been submitted to the work queue for 080 * processing. 081 */ 082 private AtomicLong opsSubmitted; 083 084 /** 085 * Indicates whether one or more of the worker threads needs to be killed at 086 * the next convenient opportunity. 087 */ 088 private boolean killThreads; 089 090 /** Indicates whether the Directory Server is shutting down. */ 091 private boolean shutdownRequested; 092 093 /** The thread number used for the last worker thread that was created. */ 094 private int lastThreadNumber; 095 096 /** 097 * The number of worker threads that should be active (or will be shortly if a 098 * configuration change has not been completely applied). 099 */ 100 private int numWorkerThreads; 101 102 /** The queue that will be used to actually hold the pending operations. */ 103 private ConcurrentLinkedQueue<Operation> opQueue; 104 105 /** The lock used to provide threadsafe access for the queue. */ 106 private final Object queueLock = new Object(); 107 108 109 private final Semaphore queueSemaphore = new Semaphore(0, false); 110 111 112 /** 113 * Creates a new instance of this work queue. All initialization should be 114 * performed in the <CODE>initializeWorkQueue</CODE> method. 115 */ 116 public ParallelWorkQueue() 117 { 118 // No implementation should be performed here. 119 } 120 121 122 123 /** {@inheritDoc} */ 124 @Override 125 public void initializeWorkQueue(ParallelWorkQueueCfg configuration) 126 throws ConfigException, InitializationException 127 { 128 shutdownRequested = false; 129 killThreads = false; 130 opsSubmitted = new AtomicLong(0); 131 132 // Register to be notified of any configuration changes. 133 configuration.addParallelChangeListener(this); 134 135 // Get the necessary configuration from the provided entry. 136 numWorkerThreads = 137 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 138 139 // Create the actual work queue. 140 opQueue = new ConcurrentLinkedQueue<>(); 141 142 // Create the set of worker threads that should be used to service the work queue. 143 workerThreads = new ArrayList<>(numWorkerThreads); 144 for (lastThreadNumber = 0; lastThreadNumber < numWorkerThreads; 145 lastThreadNumber++) 146 { 147 ParallelWorkerThread t = 148 new ParallelWorkerThread(this, lastThreadNumber); 149 t.start(); 150 workerThreads.add(t); 151 } 152 153 154 // Create and register a monitor provider for the work queue. 155 try 156 { 157 ParallelWorkQueueMonitor monitor = 158 new ParallelWorkQueueMonitor(this); 159 monitor.initializeMonitorProvider(null); 160 DirectoryServer.registerMonitorProvider(monitor); 161 } 162 catch (Exception e) 163 { 164 logger.traceException(e); 165 logger.error(ERR_CONFIG_WORK_QUEUE_CANNOT_CREATE_MONITOR, ParallelWorkQueueMonitor.class, e); 166 } 167 } 168 169 170 171 /** {@inheritDoc} */ 172 @Override 173 public void finalizeWorkQueue(LocalizableMessage reason) 174 { 175 shutdownRequested = true; 176 177 178 // Send responses to any operations in the pending queue to indicate that 179 // they won't be processed because the server is shutting down. 180 CancelRequest cancelRequest = new CancelRequest(true, reason); 181 ArrayList<Operation> pendingOperations = new ArrayList<>(); 182 opQueue.removeAll(pendingOperations); 183 184 for (Operation o : pendingOperations) 185 { 186 try 187 { 188 // The operation has no chance of responding to the cancel 189 // request so avoid waiting for a cancel response. 190 if (o.getCancelResult() == null) { 191 o.abort(cancelRequest); 192 } 193 } 194 catch (Exception e) 195 { 196 logger.traceException(e); 197 logger.warn(WARN_QUEUE_UNABLE_TO_CANCEL, o, e); 198 } 199 } 200 201 202 // Notify all the worker threads of the shutdown. 203 for (ParallelWorkerThread t : workerThreads) 204 { 205 try 206 { 207 t.shutDown(); 208 } 209 catch (Exception e) 210 { 211 logger.traceException(e); 212 logger.warn(WARN_QUEUE_UNABLE_TO_NOTIFY_THREAD, t.getName(), e); 213 } 214 } 215 } 216 217 218 219 /** 220 * Indicates whether this work queue has received a request to shut down. 221 * 222 * @return <CODE>true</CODE> if the work queue has recieved a request to shut 223 * down, or <CODE>false</CODE> if not. 224 */ 225 public boolean shutdownRequested() 226 { 227 return shutdownRequested; 228 } 229 230 231 232 /** 233 * Submits an operation to be processed by one of the worker threads 234 * associated with this work queue. 235 * 236 * @param operation The operation to be processed. 237 * 238 * @throws DirectoryException If the provided operation is not accepted for 239 * some reason (e.g., if the server is shutting 240 * down or the pending operation queue is already 241 * at its maximum capacity). 242 */ 243 @Override 244 public void submitOperation(Operation operation) throws DirectoryException 245 { 246 if (shutdownRequested) 247 { 248 LocalizableMessage message = WARN_OP_REJECTED_BY_SHUTDOWN.get(); 249 throw new DirectoryException(ResultCode.UNAVAILABLE, message); 250 } 251 252 opQueue.add(operation); 253 queueSemaphore.release(); 254 255 opsSubmitted.incrementAndGet(); 256 } 257 258 /** {@inheritDoc} */ 259 @Override 260 public boolean trySubmitOperation(Operation operation) 261 throws DirectoryException 262 { 263 submitOperation(operation); 264 return true; 265 } 266 267 268 /** 269 * Retrieves the next operation that should be processed by one of the worker 270 * threads, blocking if necessary until a new request arrives. This method 271 * should only be called by a worker thread associated with this work queue. 272 * 273 * @param workerThread The worker thread that is requesting the operation. 274 * 275 * @return The next operation that should be processed, or <CODE>null</CODE> 276 * if the server is shutting down and no more operations will be 277 * processed. 278 */ 279 public Operation nextOperation(ParallelWorkerThread workerThread) 280 { 281 return retryNextOperation(workerThread, 0); 282 } 283 284 285 286 /** 287 * Retrieves the next operation that should be processed by one of the worker 288 * threads following a previous failure attempt. A maximum of five 289 * consecutive failures will be allowed before returning <CODE>null</CODE>, 290 * which will cause the associated thread to exit. 291 * 292 * @param workerThread The worker thread that is requesting the operation. 293 * @param numFailures The number of consecutive failures that the worker 294 * thread has experienced so far. If this gets too 295 * high, then this method will return <CODE>null</CODE> 296 * rather than retrying. 297 * 298 * @return The next operation that should be processed, or <CODE>null</CODE> 299 * if the server is shutting down and no more operations will be 300 * processed, or if there have been too many consecutive failures. 301 */ 302 private Operation retryNextOperation( 303 ParallelWorkerThread workerThread, 304 int numFailures) 305 { 306 // See if we should kill off this thread. This could be necessary if the 307 // number of worker threads has been decreased with the server online. If 308 // so, then return null and the thread will exit. 309 if (killThreads) 310 { 311 synchronized (queueLock) 312 { 313 try 314 { 315 int currentThreads = workerThreads.size(); 316 if (currentThreads > numWorkerThreads) 317 { 318 if (workerThreads.remove(Thread.currentThread())) 319 { 320 currentThreads--; 321 } 322 323 if (currentThreads <= numWorkerThreads) 324 { 325 killThreads = false; 326 } 327 328 workerThread.setStoppedByReducedThreadNumber(); 329 return null; 330 } 331 } 332 catch (Exception e) 333 { 334 logger.traceException(e); 335 } 336 } 337 } 338 339 if (shutdownRequested || numFailures > MAX_RETRY_COUNT) 340 { 341 if (numFailures > MAX_RETRY_COUNT) 342 { 343 logger.error(ERR_CONFIG_WORK_QUEUE_TOO_MANY_FAILURES, Thread 344 .currentThread().getName(), numFailures, MAX_RETRY_COUNT); 345 } 346 347 return null; 348 } 349 350 try 351 { 352 while (true) 353 { 354 Operation nextOperation = null; 355 if (queueSemaphore.tryAcquire(5, TimeUnit.SECONDS)) { 356 nextOperation = opQueue.poll(); 357 } 358 if (nextOperation == null) 359 { 360 // There was no work to do in the specified length of time. See if 361 // we should shutdown, and if not then just check again. 362 if (shutdownRequested) 363 { 364 return null; 365 } 366 else if (killThreads) 367 { 368 synchronized (queueLock) 369 { 370 try 371 { 372 int currentThreads = workerThreads.size(); 373 if (currentThreads > numWorkerThreads) 374 { 375 if (workerThreads.remove(Thread.currentThread())) 376 { 377 currentThreads--; 378 } 379 380 if (currentThreads <= numWorkerThreads) 381 { 382 killThreads = false; 383 } 384 385 workerThread.setStoppedByReducedThreadNumber(); 386 return null; 387 } 388 } 389 catch (Exception e) 390 { 391 logger.traceException(e); 392 } 393 } 394 } 395 } 396 else 397 { 398 return nextOperation; 399 } 400 } 401 } 402 catch (Exception e) 403 { 404 logger.traceException(e); 405 406 // This should not happen. The only recourse we have is to log a message 407 // and try again. 408 logger.warn(WARN_WORKER_WAITING_UNCAUGHT_EXCEPTION, Thread.currentThread().getName(), e); 409 return retryNextOperation(workerThread, numFailures + 1); 410 } 411 } 412 413 414 415 /** 416 * Attempts to remove the specified operation from this queue if it has not 417 * yet been picked up for processing by one of the worker threads. 418 * 419 * @param operation The operation to remove from the queue. 420 * 421 * @return <CODE>true</CODE> if the provided request was present in the queue 422 * and was removed successfully, or <CODE>false</CODE> it not. 423 */ 424 public boolean removeOperation(Operation operation) 425 { 426 return opQueue.remove(operation); 427 } 428 429 430 431 /** 432 * Retrieves the total number of operations that have been successfully 433 * submitted to this work queue for processing since server startup. This 434 * does not include operations that have been rejected for some reason like 435 * the queue already at its maximum capacity. 436 * 437 * @return The total number of operations that have been successfully 438 * submitted to this work queue since startup. 439 */ 440 public long getOpsSubmitted() 441 { 442 return opsSubmitted.longValue(); 443 } 444 445 446 447 /** 448 * Retrieves the number of pending operations in the queue that have not yet 449 * been picked up for processing. Note that this method is not a 450 * constant-time operation and can be relatively inefficient, so it should be 451 * used sparingly. 452 * 453 * @return The number of pending operations in the queue that have not yet 454 * been picked up for processing. 455 */ 456 public int size() 457 { 458 return opQueue.size(); 459 } 460 461 462 463 /** {@inheritDoc} */ 464 @Override 465 public boolean isConfigurationChangeAcceptable( 466 ParallelWorkQueueCfg configuration, 467 List<LocalizableMessage> unacceptableReasons) 468 { 469 return true; 470 } 471 472 473 474 /** {@inheritDoc} */ 475 @Override 476 public ConfigChangeResult applyConfigurationChange( 477 ParallelWorkQueueCfg configuration) 478 { 479 int newNumThreads = 480 computeNumWorkerThreads(configuration.getNumWorkerThreads()); 481 482 // Apply a change to the number of worker threads if appropriate. 483 int currentThreads = workerThreads.size(); 484 if (newNumThreads != currentThreads) 485 { 486 synchronized (queueLock) 487 { 488 try 489 { 490 int threadsToAdd = newNumThreads - currentThreads; 491 if (threadsToAdd > 0) 492 { 493 for (int i = 0; i < threadsToAdd; i++) 494 { 495 ParallelWorkerThread t = 496 new ParallelWorkerThread(this, lastThreadNumber++); 497 workerThreads.add(t); 498 t.start(); 499 } 500 501 killThreads = false; 502 } 503 else 504 { 505 killThreads = true; 506 } 507 508 numWorkerThreads = newNumThreads; 509 } 510 catch (Exception e) 511 { 512 logger.traceException(e); 513 } 514 } 515 } 516 return new ConfigChangeResult(); 517 } 518 519 520 521 /** {@inheritDoc} */ 522 @Override 523 public boolean isIdle() 524 { 525 if (!opQueue.isEmpty()) { 526 return false; 527 } 528 529 synchronized (queueLock) 530 { 531 for (ParallelWorkerThread t : workerThreads) 532 { 533 if (t.isActive()) 534 { 535 return false; 536 } 537 } 538 539 return true; 540 } 541 } 542 543 /** 544 * Return the number of worker threads used by this WorkQueue. 545 * 546 * @return the number of worker threads used by this WorkQueue 547 */ 548 @Override 549 public int getNumWorkerThreads() 550 { 551 return this.numWorkerThreads; 552 } 553}