001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2009-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2014-2015 ForgeRock AS 026 */ 027package org.opends.server.tools.tasks; 028 029import static org.forgerock.opendj.ldap.ResultCode.*; 030import static org.opends.messages.ToolMessages.*; 031import static org.opends.server.config.ConfigConstants.*; 032 033import java.io.IOException; 034import java.text.SimpleDateFormat; 035import java.util.ArrayList; 036import java.util.Collections; 037import java.util.Date; 038import java.util.LinkedHashSet; 039import java.util.List; 040import java.util.UUID; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.forgerock.i18n.LocalizableMessage; 044import org.forgerock.opendj.ldap.ByteString; 045import org.forgerock.opendj.ldap.DecodeException; 046import org.forgerock.opendj.ldap.DereferenceAliasesPolicy; 047import org.forgerock.opendj.ldap.ModificationType; 048import org.forgerock.opendj.ldap.SearchScope; 049import org.opends.server.backends.task.FailedDependencyAction; 050import org.opends.server.backends.task.TaskState; 051import org.opends.server.config.ConfigConstants; 052import org.opends.server.protocols.ldap.AddRequestProtocolOp; 053import org.opends.server.protocols.ldap.AddResponseProtocolOp; 054import org.opends.server.protocols.ldap.DeleteRequestProtocolOp; 055import org.opends.server.protocols.ldap.DeleteResponseProtocolOp; 056import org.opends.server.protocols.ldap.LDAPAttribute; 057import org.opends.server.protocols.ldap.LDAPConstants; 058import org.opends.server.protocols.ldap.LDAPFilter; 059import org.opends.server.protocols.ldap.LDAPMessage; 060import org.opends.server.protocols.ldap.LDAPModification; 061import org.opends.server.protocols.ldap.LDAPResultCode; 062import org.opends.server.protocols.ldap.ModifyRequestProtocolOp; 063import org.opends.server.protocols.ldap.ModifyResponseProtocolOp; 064import org.opends.server.protocols.ldap.SearchRequestProtocolOp; 065import org.opends.server.protocols.ldap.SearchResultEntryProtocolOp; 066import org.opends.server.tools.LDAPConnection; 067import org.opends.server.tools.LDAPReader; 068import org.opends.server.tools.LDAPWriter; 069import org.opends.server.types.Control; 070import org.opends.server.types.Entry; 071import org.opends.server.types.LDAPException; 072import org.opends.server.types.RawAttribute; 073import org.opends.server.types.RawModification; 074import org.opends.server.types.SearchResultEntry; 075import org.opends.server.util.StaticUtils; 076 077/** 078 * Helper class for interacting with the task backend on behalf of utilities 079 * that are capable of being scheduled. 080 */ 081public class TaskClient { 082 083 /** 084 * Connection through which task scheduling will take place. 085 */ 086 protected LDAPConnection connection; 087 088 /** 089 * Keeps track of message IDs. 090 */ 091 private final AtomicInteger nextMessageID = new AtomicInteger(0); 092 093 /** 094 * Creates a new TaskClient for interacting with the task backend remotely. 095 * @param conn for accessing the task backend 096 */ 097 public TaskClient(LDAPConnection conn) { 098 this.connection = conn; 099 } 100 101 /** 102 * Returns the ID of the task entry for a given list of task attributes. 103 * @param taskAttributes the task attributes. 104 * @return the ID of the task entry for a given list of task attributes. 105 */ 106 public static String getTaskID(List<RawAttribute> taskAttributes) 107 { 108 String taskID = null; 109 110 RawAttribute recurringIDAttr = getAttribute(ATTR_RECURRING_TASK_ID, 111 taskAttributes); 112 113 if (recurringIDAttr != null) { 114 taskID = recurringIDAttr.getValues().get(0).toString(); 115 } else { 116 RawAttribute taskIDAttr = getAttribute(ATTR_TASK_ID, 117 taskAttributes); 118 taskID = taskIDAttr.getValues().get(0).toString(); 119 } 120 121 return taskID; 122 } 123 124 private static RawAttribute getAttribute(String attrName, 125 List<RawAttribute> taskAttributes) 126 { 127 for (RawAttribute attr : taskAttributes) 128 { 129 if (attr.getAttributeType().equalsIgnoreCase(attrName)) 130 { 131 return attr; 132 } 133 } 134 return null; 135 } 136 137 /** 138 * Returns the DN of the task entry for a given list of task attributes. 139 * @param taskAttributes the task attributes. 140 * @return the DN of the task entry for a given list of task attributes. 141 */ 142 public static String getTaskDN(List<RawAttribute> taskAttributes) 143 { 144 String entryDN = null; 145 String taskID = getTaskID(taskAttributes); 146 RawAttribute recurringIDAttr = getAttribute(ATTR_RECURRING_TASK_ID, 147 taskAttributes); 148 149 if (recurringIDAttr != null) { 150 entryDN = ATTR_RECURRING_TASK_ID + "=" + 151 taskID + "," + RECURRING_TASK_BASE_RDN + "," + DN_TASK_ROOT; 152 } else { 153 entryDN = ATTR_TASK_ID + "=" + taskID + "," + 154 SCHEDULED_TASK_BASE_RDN + "," + DN_TASK_ROOT; 155 } 156 return entryDN; 157 } 158 159 private static boolean isScheduleRecurring(TaskScheduleInformation information) 160 { 161 return information.getRecurringDateTime() != null; 162 } 163 164 /** 165 * This is a commodity method that returns the common attributes (those 166 * related to scheduling) of a task entry for a given 167 * {@link TaskScheduleInformation} object. 168 * @param information the scheduling information. 169 * @return the schedule attributes of the task entry. 170 */ 171 public static ArrayList<RawAttribute> getTaskAttributes( 172 TaskScheduleInformation information) 173 { 174 String taskID = null; 175 boolean scheduleRecurring = isScheduleRecurring(information); 176 177 if (scheduleRecurring) { 178 taskID = information.getTaskId(); 179 if (taskID == null || taskID.length() == 0) { 180 taskID = information.getTaskClass().getSimpleName() + "-" + UUID.randomUUID(); 181 } 182 } else { 183 // Use a formatted time/date for the ID so that is remotely useful 184 SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSS"); 185 taskID = df.format(new Date()); 186 } 187 188 ArrayList<RawAttribute> attributes = new ArrayList<>(); 189 190 ArrayList<String> ocValues = new ArrayList<>(4); 191 ocValues.add("top"); 192 ocValues.add(ConfigConstants.OC_TASK); 193 if (scheduleRecurring) { 194 ocValues.add(ConfigConstants.OC_RECURRING_TASK); 195 } 196 ocValues.add(information.getTaskObjectclass()); 197 attributes.add(new LDAPAttribute(ATTR_OBJECTCLASS, ocValues)); 198 199 if (scheduleRecurring) { 200 attributes.add(new LDAPAttribute(ATTR_RECURRING_TASK_ID, taskID)); 201 } 202 attributes.add(new LDAPAttribute(ATTR_TASK_ID, taskID)); 203 204 String classValue = information.getTaskClass().getName(); 205 attributes.add(new LDAPAttribute(ATTR_TASK_CLASS, classValue)); 206 207 // add the start time if necessary 208 Date startDate = information.getStartDateTime(); 209 if (startDate != null) { 210 String startTimeString = StaticUtils.formatDateTimeString(startDate); 211 attributes.add(new LDAPAttribute(ATTR_TASK_SCHEDULED_START_TIME, startTimeString)); 212 } 213 214 if (scheduleRecurring) { 215 String recurringPatternValues = information.getRecurringDateTime(); 216 attributes.add(new LDAPAttribute(ATTR_RECURRING_TASK_SCHEDULE, recurringPatternValues)); 217 } 218 219 // add dependency IDs 220 List<String> dependencyIds = information.getDependencyIds(); 221 if (dependencyIds != null && !dependencyIds.isEmpty()) { 222 attributes.add(new LDAPAttribute(ATTR_TASK_DEPENDENCY_IDS, dependencyIds)); 223 224 // add the dependency action 225 FailedDependencyAction fda = information.getFailedDependencyAction(); 226 if (fda == null) { 227 fda = FailedDependencyAction.defaultValue(); 228 } 229 attributes.add(new LDAPAttribute(ATTR_TASK_FAILED_DEPENDENCY_ACTION, fda.name())); 230 } 231 232 // add completion notification email addresses 233 List<String> compNotifEmailAddresss = information.getNotifyUponCompletionEmailAddresses(); 234 if (compNotifEmailAddresss != null && !compNotifEmailAddresss.isEmpty()) { 235 attributes.add(new LDAPAttribute(ATTR_TASK_NOTIFY_ON_COMPLETION, compNotifEmailAddresss)); 236 } 237 238 // add error notification email addresses 239 List<String> errNotifEmailAddresss = information.getNotifyUponErrorEmailAddresses(); 240 if (errNotifEmailAddresss != null && !errNotifEmailAddresss.isEmpty()) { 241 attributes.add(new LDAPAttribute(ATTR_TASK_NOTIFY_ON_ERROR, errNotifEmailAddresss)); 242 } 243 244 information.addTaskAttributes(attributes); 245 246 return attributes; 247 } 248 249 /** 250 * Schedule a task for execution by writing an entry to the task backend. 251 * 252 * @param information to be scheduled 253 * @return String task ID assigned the new task 254 * @throws IOException if there is a stream communication problem 255 * @throws LDAPException if there is a problem getting information 256 * out to the directory 257 * @throws DecodeException if there is a problem with the encoding 258 * @throws TaskClientException if there is a problem with the task entry 259 */ 260 public synchronized TaskEntry schedule(TaskScheduleInformation information) 261 throws LDAPException, IOException, DecodeException, TaskClientException 262 { 263 LDAPReader reader = connection.getLDAPReader(); 264 LDAPWriter writer = connection.getLDAPWriter(); 265 266 ArrayList<Control> controls = new ArrayList<>(); 267 ArrayList<RawAttribute> attributes = getTaskAttributes(information); 268 269 ByteString entryDN = ByteString.valueOfUtf8(getTaskDN(attributes)); 270 AddRequestProtocolOp addRequest = new AddRequestProtocolOp(entryDN, attributes); 271 LDAPMessage requestMessage = 272 new LDAPMessage(nextMessageID.getAndIncrement(), addRequest, controls); 273 274 // Send the request to the server and read the response. 275 LDAPMessage responseMessage; 276 writer.writeMessage(requestMessage); 277 278 responseMessage = reader.readMessage(); 279 if (responseMessage == null) 280 { 281 throw new LDAPException( 282 LDAPResultCode.CLIENT_SIDE_SERVER_DOWN, 283 ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get()); 284 } 285 286 if (responseMessage.getProtocolOpType() != 287 LDAPConstants.OP_TYPE_ADD_RESPONSE) 288 { 289 throw new LDAPException( 290 LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR, 291 ERR_TASK_CLIENT_INVALID_RESPONSE_TYPE.get( 292 responseMessage.getProtocolOpName())); 293 } 294 295 AddResponseProtocolOp addResponse = 296 responseMessage.getAddResponseProtocolOp(); 297 if (addResponse.getResultCode() != 0) { 298 throw new LDAPException( 299 LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR, 300 addResponse.getErrorMessage()); 301 } 302 return getTaskEntry(getTaskID(attributes)); 303 } 304 305 /** 306 * Gets all the ds-task entries from the task root. 307 * 308 * @return list of entries from the task root 309 * @throws IOException if there is a stream communication problem 310 * @throws LDAPException if there is a problem getting information 311 * out to the directory 312 * @throws DecodeException if there is a problem with the encoding 313 */ 314 public synchronized List<TaskEntry> getTaskEntries() 315 throws LDAPException, IOException, DecodeException { 316 List<Entry> entries = new ArrayList<>(); 317 318 writeSearch(new SearchRequestProtocolOp( 319 ByteString.valueOfUtf8(ConfigConstants.DN_TASK_ROOT), 320 SearchScope.WHOLE_SUBTREE, 321 DereferenceAliasesPolicy.NEVER, 322 Integer.MAX_VALUE, 323 Integer.MAX_VALUE, 324 false, 325 LDAPFilter.decode("(objectclass=ds-task)"), 326 new LinkedHashSet<String>())); 327 328 LDAPReader reader = connection.getLDAPReader(); 329 byte opType; 330 do { 331 LDAPMessage responseMessage = reader.readMessage(); 332 if (responseMessage == null) { 333 throw new LDAPException( 334 LDAPResultCode.CLIENT_SIDE_SERVER_DOWN, 335 ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get()); 336 } else { 337 opType = responseMessage.getProtocolOpType(); 338 if (opType == LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY) { 339 SearchResultEntryProtocolOp searchEntryOp = 340 responseMessage.getSearchResultEntryProtocolOp(); 341 SearchResultEntry entry = searchEntryOp.toSearchResultEntry(); 342 entries.add(entry); 343 } 344 } 345 } 346 while (opType != LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE); 347 List<TaskEntry> taskEntries = new ArrayList<>(entries.size()); 348 for (Entry entry : entries) { 349 taskEntries.add(new TaskEntry(entry)); 350 } 351 return Collections.unmodifiableList(taskEntries); 352 } 353 354 /** 355 * Gets the entry of the task whose ID is <code>id</code> from the directory. 356 * 357 * @param id of the entry to retrieve 358 * @return Entry for the task 359 * @throws IOException if there is a stream communication problem 360 * @throws LDAPException if there is a problem getting information 361 * out to the directory 362 * @throws DecodeException if there is a problem with the encoding 363 * @throws TaskClientException if there is no task with the requested id 364 */ 365 public synchronized TaskEntry getTaskEntry(String id) 366 throws LDAPException, IOException, DecodeException, TaskClientException 367 { 368 Entry entry = null; 369 370 writeSearch(new SearchRequestProtocolOp( 371 ByteString.valueOfUtf8(ConfigConstants.DN_TASK_ROOT), 372 SearchScope.WHOLE_SUBTREE, 373 DereferenceAliasesPolicy.NEVER, 374 Integer.MAX_VALUE, 375 Integer.MAX_VALUE, 376 false, 377 LDAPFilter.decode("(" + ATTR_TASK_ID + "=" + id + ")"), 378 new LinkedHashSet<String>())); 379 380 LDAPReader reader = connection.getLDAPReader(); 381 byte opType; 382 do { 383 LDAPMessage responseMessage = reader.readMessage(); 384 if (responseMessage == null) { 385 LocalizableMessage message = ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get(); 386 throw new LDAPException(UNAVAILABLE.intValue(), message); 387 } else { 388 opType = responseMessage.getProtocolOpType(); 389 if (opType == LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY) { 390 SearchResultEntryProtocolOp searchEntryOp = 391 responseMessage.getSearchResultEntryProtocolOp(); 392 entry = searchEntryOp.toSearchResultEntry(); 393 } 394 } 395 } 396 while (opType != LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE); 397 if (entry == null) { 398 throw new TaskClientException(ERR_TASK_CLIENT_UNKNOWN_TASK.get(id)); 399 } 400 return new TaskEntry(entry); 401 } 402 403 404 /** 405 * Changes that the state of the task in the backend to a canceled state. 406 * 407 * @param id if the task to cancel 408 * @throws IOException if there is a stream communication problem 409 * @throws LDAPException if there is a problem getting information 410 * out to the directory 411 * @throws DecodeException if there is a problem with the encoding 412 * @throws TaskClientException if there is no task with the requested id 413 */ 414 public synchronized void cancelTask(String id) 415 throws TaskClientException, IOException, DecodeException, LDAPException 416 { 417 LDAPReader reader = connection.getLDAPReader(); 418 LDAPWriter writer = connection.getLDAPWriter(); 419 420 TaskEntry entry = getTaskEntry(id); 421 TaskState state = entry.getTaskState(); 422 if (state != null) { 423 if (!TaskState.isDone(state)) { 424 425 ByteString dn = ByteString.valueOfUtf8(entry.getDN().toString()); 426 427 ArrayList<RawModification> mods = new ArrayList<>(); 428 429 String newState; 430 if (TaskState.isPending(state)) { 431 newState = TaskState.CANCELED_BEFORE_STARTING.name(); 432 } else { 433 newState = TaskState.STOPPED_BY_ADMINISTRATOR.name(); 434 } 435 LDAPAttribute attr = new LDAPAttribute(ATTR_TASK_STATE, newState); 436 mods.add(new LDAPModification(ModificationType.REPLACE, attr)); 437 438 ModifyRequestProtocolOp modRequest = 439 new ModifyRequestProtocolOp(dn, mods); 440 LDAPMessage requestMessage = 441 new LDAPMessage(nextMessageID.getAndIncrement(), modRequest, null); 442 443 writer.writeMessage(requestMessage); 444 445 LDAPMessage responseMessage = reader.readMessage(); 446 447 if (responseMessage == null) { 448 LocalizableMessage message = ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get(); 449 throw new LDAPException(UNAVAILABLE.intValue(), message); 450 } 451 452 if (responseMessage.getProtocolOpType() != 453 LDAPConstants.OP_TYPE_MODIFY_RESPONSE) 454 { 455 throw new LDAPException( 456 LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR, 457 ERR_TASK_CLIENT_INVALID_RESPONSE_TYPE.get( 458 responseMessage.getProtocolOpName())); 459 } 460 461 ModifyResponseProtocolOp modResponse = 462 responseMessage.getModifyResponseProtocolOp(); 463 LocalizableMessage errorMessage = modResponse.getErrorMessage(); 464 if (errorMessage != null) { 465 throw new LDAPException( 466 LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR, 467 errorMessage); 468 } 469 } else if (TaskState.isRecurring(state)) { 470 471 ByteString dn = ByteString.valueOfUtf8(entry.getDN().toString()); 472 DeleteRequestProtocolOp deleteRequest = 473 new DeleteRequestProtocolOp(dn); 474 475 LDAPMessage requestMessage = new LDAPMessage( 476 nextMessageID.getAndIncrement(), deleteRequest, null); 477 478 writer.writeMessage(requestMessage); 479 480 LDAPMessage responseMessage = reader.readMessage(); 481 482 if (responseMessage == null) { 483 LocalizableMessage message = ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get(); 484 throw new LDAPException(UNAVAILABLE.intValue(), message); 485 } 486 487 if (responseMessage.getProtocolOpType() != 488 LDAPConstants.OP_TYPE_DELETE_RESPONSE) 489 { 490 throw new LDAPException( 491 LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR, 492 ERR_TASK_CLIENT_INVALID_RESPONSE_TYPE.get( 493 responseMessage.getProtocolOpName())); 494 } 495 496 DeleteResponseProtocolOp deleteResponse = 497 responseMessage.getDeleteResponseProtocolOp(); 498 LocalizableMessage errorMessage = deleteResponse.getErrorMessage(); 499 if (errorMessage != null) { 500 throw new LDAPException( 501 LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR, 502 errorMessage); 503 } 504 } else { 505 throw new TaskClientException( 506 ERR_TASK_CLIENT_UNCANCELABLE_TASK.get(id)); 507 } 508 } else { 509 throw new TaskClientException( 510 ERR_TASK_CLIENT_TASK_STATE_UNKNOWN.get(id)); 511 } 512 } 513 514 515 /** 516 * Writes a search to the directory writer. 517 * @param searchRequest to write 518 * @throws IOException if there is a stream communication problem 519 */ 520 private void writeSearch(SearchRequestProtocolOp searchRequest) 521 throws IOException { 522 LDAPWriter writer = connection.getLDAPWriter(); 523 LDAPMessage requestMessage = new LDAPMessage( 524 nextMessageID.getAndIncrement(), 525 searchRequest, 526 new ArrayList<Control>()); 527 528 // Send the request to the server and read the response. 529 writer.writeMessage(requestMessage); 530 } 531 532}