001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS. 026 */ 027package org.opends.server.api; 028 029import static org.opends.messages.CoreMessages.*; 030 031import org.forgerock.i18n.slf4j.LocalizedLogger; 032import org.forgerock.i18n.LocalizableMessage; 033import org.opends.server.admin.std.server.WorkQueueCfg; 034import org.forgerock.opendj.config.server.ConfigException; 035import org.opends.server.types.DirectoryException; 036import org.opends.server.types.InitializationException; 037import org.opends.server.types.Operation; 038import org.opends.server.util.Platform; 039 040/** 041 * This class defines the structure and methods that must be 042 * implemented by a Directory Server work queue. The work queue is 043 * the component of the server that accepts requests from connection 044 * handlers and ensures that they are properly processed. The manner 045 * in which the work queue is able to accomplish this may vary between 046 * implementations, but in general it is assumed that one or more 047 * worker threads will be associated with the queue and may be used to 048 * process requests in parallel. 049 * 050 * @param <T> The type of configuration handled by this work queue. 051 */ 052@org.opends.server.types.PublicAPI( 053 stability=org.opends.server.types.StabilityLevel.VOLATILE, 054 mayInstantiate=false, 055 mayExtend=true, 056 mayInvoke=true) 057public abstract class WorkQueue<T extends WorkQueueCfg> 058{ 059 060 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 061 062 /** 063 * Initializes this work queue based on the information in the 064 * provided configuration entry. 065 * 066 * @param configuration The configuration to use to initialize 067 * the work queue. 068 * 069 * @throws ConfigException If the provided configuration entry 070 * does not have a valid work queue 071 * configuration. 072 * 073 * @throws InitializationException If a problem occurs during 074 * initialization that is not 075 * related to the server 076 * configuration. 077 */ 078 public abstract void initializeWorkQueue(T configuration) 079 throws ConfigException, InitializationException; 080 081 082 083 /** 084 * Performs any necessary finalization for this work queue, 085 * including ensuring that all active operations are interrupted or 086 * will be allowed to complete, and that all pending operations will 087 * be cancelled. 088 * 089 * @param reason The human-readable reason that the work queue is 090 * being shut down. 091 */ 092 public abstract void finalizeWorkQueue(LocalizableMessage reason); 093 094 095 096 /** 097 * Submits an operation to be processed in the server. 098 * 099 * @param operation The operation to be processed. 100 * 101 * @throws DirectoryException If the provided operation is not 102 * accepted for some reason (e.g., if 103 * the server is shutting down or 104 * already has too many pending 105 * requests in the queue). 106 */ 107 public abstract void submitOperation(Operation operation) 108 throws DirectoryException; 109 110 111 112 /** 113 * Tries to submit an operation to be processed in the server, without 114 * blocking. 115 * 116 * @param operation 117 * The operation to be processed. 118 * @return true if the operation could be submitted to the queue, false if the 119 * queue was full 120 * @throws DirectoryException 121 * If the provided operation is not accepted for some reason (e.g., 122 * if the server is shutting down). 123 */ 124 public abstract boolean trySubmitOperation(Operation operation) 125 throws DirectoryException; 126 127 128 /** 129 * Indicates whether the work queue is currently processing any 130 * requests. Note that this is a point-in-time determination, and 131 * if any component of the server wishes to depend on a quiescent 132 * state then it should use some external mechanism to ensure that 133 * no other requests are submitted to the queue. 134 * 135 * @return {@code true} if the work queue is currently idle, or 136 * {@code false} if it is being used to process one or more 137 * operations. 138 */ 139 public abstract boolean isIdle(); 140 141 142 /** 143 * Return the maximum number of worker threads that can be used by this 144 * WorkQueue (The WorkQueue could have a thread pool which adjusts its size). 145 * 146 * @return the maximum number of worker threads that can be used by this 147 * WorkQueue 148 */ 149 public abstract int getNumWorkerThreads(); 150 151 152 /** 153 * Computes the number of worker threads to use by the working queue based on 154 * the configured number. 155 * 156 * @param configuredNumWorkerThreads 157 * the configured number of worker threads to use 158 * @return the number of worker threads to use 159 */ 160 protected int computeNumWorkerThreads(Integer configuredNumWorkerThreads) 161 { 162 if (configuredNumWorkerThreads != null) 163 { 164 return configuredNumWorkerThreads; 165 } 166 else 167 { 168 // Automatically choose based on the number of processors. 169 int value = Platform.computeNumberOfThreads(16, 2.0f); 170 logger.debug(INFO_ERGONOMIC_SIZING_OF_WORKER_THREAD_POOL, value); 171 return value; 172 } 173 } 174 175 /** 176 * Waits for the work queue to become idle before returning. Note 177 * that this is a point-in-time determination, and if any component 178 * of the server wishes to depend on a quiescent state then it 179 * should use some external mechanism to ensure that no other 180 * requests are submitted to the queue. 181 * 182 * @param timeLimit The maximum length of time in milliseconds 183 * that this method should wait for the queue to 184 * become idle before giving up. A time limit 185 * that is less than or equal to zero indicates 186 * that there should not be a time limit. 187 * 188 * @return {@code true} if the work queue is idle at the time that 189 * this method returns, or {@code false} if the wait time 190 * limit was reached before the server became idle. 191 */ 192 public boolean waitUntilIdle(long timeLimit) 193 { 194 long stopWaitingTime; 195 if (timeLimit <= 0) 196 { 197 stopWaitingTime = Long.MAX_VALUE; 198 } 199 else 200 { 201 stopWaitingTime = System.currentTimeMillis() + timeLimit; 202 } 203 204 while (System.currentTimeMillis() < stopWaitingTime) 205 { 206 if (isIdle()) 207 { 208 return true; 209 } 210 211 try 212 { 213 Thread.sleep(1); 214 } catch (InterruptedException ie) {} 215 } 216 217 return false; 218 } 219} 220