001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS 026 */ 027package org.opends.server.backends.task; 028 029import static org.opends.messages.BackendMessages.*; 030import static org.opends.server.config.ConfigConstants.*; 031import static org.opends.server.util.CollectionUtils.*; 032import static org.opends.server.util.ServerConstants.*; 033import static org.opends.server.util.StaticUtils.*; 034 035import java.io.File; 036import java.io.IOException; 037import java.util.*; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.locks.ReentrantLock; 040 041import org.forgerock.i18n.LocalizableMessage; 042import org.forgerock.i18n.slf4j.LocalizedLogger; 043import org.forgerock.opendj.ldap.ByteString; 044import org.forgerock.opendj.ldap.ResultCode; 045import org.opends.server.api.AlertGenerator; 046import org.opends.server.api.DirectoryThread; 047import org.opends.server.core.DirectoryServer; 048import org.opends.server.core.SearchOperation; 049import org.opends.server.core.ServerContext; 050import org.opends.server.types.*; 051import org.opends.server.types.LockManager.DNLock; 052import org.opends.server.util.LDIFException; 053import org.opends.server.util.LDIFReader; 054import org.opends.server.util.LDIFWriter; 055import org.opends.server.util.TimeThread; 056 057/** 058 * This class defines a task scheduler for the Directory Server that will 059 * control the execution of scheduled tasks and other administrative functions 060 * that need to occur on a regular basis. 061 */ 062public class TaskScheduler 063 extends DirectoryThread 064 implements AlertGenerator 065{ 066 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 067 068 /** 069 * The fully-qualified name of this class. 070 */ 071 private static final String CLASS_NAME = 072 "org.opends.server.backends.task.TaskScheduler"; 073 074 075 076 /** 077 * The maximum length of time in milliseconds to sleep between iterations 078 * through the scheduler loop. 079 */ 080 private static long MAX_SLEEP_TIME = 5000; 081 082 083 /** Indicates whether the scheduler is currently running. */ 084 private boolean isRunning; 085 /** Indicates whether a request has been received to stop the scheduler. */ 086 private boolean stopRequested; 087 088 /** The entry that serves as the immediate parent for recurring tasks. */ 089 private Entry recurringTaskParentEntry; 090 /** The entry that serves as the immediate parent for scheduled tasks. */ 091 private Entry scheduledTaskParentEntry; 092 /** The top-level entry at the root of the task tree. */ 093 private Entry taskRootEntry; 094 095 /** The set of recurring tasks defined in the server. */ 096 private final HashMap<String,RecurringTask> recurringTasks = new HashMap<>(); 097 /** The set of tasks associated with this scheduler. */ 098 private final HashMap<String,Task> tasks = new HashMap<>(); 099 /** The set of worker threads that are actively busy processing tasks. */ 100 private final HashMap<String,TaskThread> activeThreads = new HashMap<>(); 101 102 /** The thread ID for the next task thread to be created;. */ 103 private int nextThreadID = 1; 104 105 /** The set of worker threads that may be used to process tasks. */ 106 private final LinkedList<TaskThread> idleThreads = new LinkedList<>(); 107 108 /** The lock used to provide threadsafe access to the scheduler. */ 109 private final ReentrantLock schedulerLock = new ReentrantLock(); 110 111 /** The task backend with which this scheduler is associated. */ 112 private TaskBackend taskBackend; 113 114 /** The thread being used to actually run the scheduler. */ 115 private Thread schedulerThread; 116 117 /** The set of recently-completed tasks that need to be retained. */ 118 private final TreeSet<Task> completedTasks = new TreeSet<>(); 119 /** The set of tasks that have been scheduled but not yet arrived. */ 120 private final TreeSet<Task> pendingTasks = new TreeSet<>(); 121 /** The set of tasks that are currently running. */ 122 private final TreeSet<Task> runningTasks = new TreeSet<>(); 123 124 private ServerContext serverContext; 125 126 /** 127 * Creates a new task scheduler that will be used to ensure that tasks are 128 * invoked at the appropriate times. 129 * @param serverContext 130 * The server context 131 * @param taskBackend The task backend with which this scheduler is 132 * associated. 133 * 134 * @throws InitializationException If a problem occurs while initializing 135 * the scheduler from the backing file. 136 */ 137 public TaskScheduler(ServerContext serverContext, TaskBackend taskBackend) 138 throws InitializationException 139 { 140 super("Task Scheduler Thread"); 141 142 this.serverContext = serverContext; 143 this.taskBackend = taskBackend; 144 145 DirectoryServer.registerAlertGenerator(this); 146 147 initializeTasksFromBackingFile(); 148 149 for (RecurringTask recurringTask : recurringTasks.values()) { 150 Task task = null; 151 try { 152 task = recurringTask.scheduleNextIteration(new GregorianCalendar()); 153 } catch (DirectoryException de) { 154 logger.error(de.getMessageObject()); 155 } 156 if (task != null) { 157 try { 158 scheduleTask(task, false); 159 } catch (DirectoryException de) { 160 // This task might have been already scheduled from before 161 // and thus got initialized from backing file, otherwise 162 // log error and continue. 163 if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) { 164 logger.error(de.getMessageObject()); 165 } 166 } 167 } 168 } 169 } 170 171 172 173 /** 174 * Adds a recurring task to the scheduler, optionally scheduling the first 175 * iteration for processing. 176 * 177 * @param recurringTask The recurring task to add to the scheduler. 178 * @param scheduleIteration Indicates whether to schedule an iteration of 179 * the recurring task. 180 * 181 * @throws DirectoryException If a problem occurs while trying to add the 182 * recurring task (e.g., there's already another 183 * recurring task defined with the same ID). 184 */ 185 public void addRecurringTask(RecurringTask recurringTask, 186 boolean scheduleIteration) 187 throws DirectoryException 188 { 189 schedulerLock.lock(); 190 191 try 192 { 193 String id = recurringTask.getRecurringTaskID(); 194 195 if (recurringTasks.containsKey(id)) 196 { 197 LocalizableMessage message = 198 ERR_TASKSCHED_DUPLICATE_RECURRING_ID.get(id); 199 throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message); 200 } 201 202 Attribute attr = Attributes.create(ATTR_TASK_STATE, TaskState.RECURRING.toString()); 203 Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry(); 204 recurringTaskEntry.putAttribute(attr.getAttributeType(), newArrayList(attr)); 205 206 if (scheduleIteration) 207 { 208 Task task = recurringTask.scheduleNextIteration( 209 new GregorianCalendar()); 210 if (task != null) 211 { 212 // If there is an existing task with the same id 213 // and it is in completed state, take its place. 214 Task t = tasks.get(task.getTaskID()); 215 if (t != null && TaskState.isDone(t.getTaskState())) 216 { 217 removeCompletedTask(t.getTaskID()); 218 } 219 220 scheduleTask(task, false); 221 } 222 } 223 224 recurringTasks.put(id, recurringTask); 225 writeState(); 226 } 227 finally 228 { 229 schedulerLock.unlock(); 230 } 231 } 232 233 234 235 /** 236 * Removes the recurring task with the given ID. 237 * 238 * @param recurringTaskID The ID of the recurring task to remove. 239 * 240 * @return The recurring task that was removed, or <CODE>null</CODE> if there 241 * was no such recurring task. 242 * 243 * @throws DirectoryException If there is currently a pending or running 244 * iteration of the associated recurring task. 245 */ 246 public RecurringTask removeRecurringTask(String recurringTaskID) 247 throws DirectoryException 248 { 249 schedulerLock.lock(); 250 251 try 252 { 253 RecurringTask recurringTask = recurringTasks.remove(recurringTaskID); 254 HashMap<String,Task> iterationsMap = new HashMap<>(); 255 256 for (Task t : tasks.values()) 257 { 258 // Find any existing task iterations and try to cancel them. 259 if (t.getRecurringTaskID() != null && 260 t.getRecurringTaskID().equals(recurringTaskID)) 261 { 262 TaskState state = t.getTaskState(); 263 if (!TaskState.isDone(state) && !TaskState.isCancelled(state)) 264 { 265 cancelTask(t.getTaskID()); 266 } 267 iterationsMap.put(t.getTaskID(), t); 268 } 269 } 270 271 // Remove any completed task iterations. 272 for (Map.Entry<String,Task> iterationEntry : iterationsMap.entrySet()) 273 { 274 if (TaskState.isDone(iterationEntry.getValue().getTaskState())) 275 { 276 removeCompletedTask(iterationEntry.getKey()); 277 } 278 } 279 280 writeState(); 281 return recurringTask; 282 } 283 finally 284 { 285 schedulerLock.unlock(); 286 } 287 } 288 289 290 291 /** 292 * Schedules the provided task for execution. If the scheduler is active and 293 * the start time has arrived, then the task will begin execution immediately. 294 * Otherwise, it will be placed in the pending queue to be started at the 295 * appropriate time. 296 * 297 * @param task The task to be scheduled. 298 * @param writeState Indicates whether the current state information for 299 * the scheduler should be persisted to disk once the 300 * task is scheduled. 301 * 302 * @throws DirectoryException If a problem occurs while trying to schedule 303 * the task (e.g., there's already another task 304 * defined with the same ID). 305 */ 306 public void scheduleTask(Task task, boolean writeState) 307 throws DirectoryException 308 { 309 schedulerLock.lock(); 310 311 312 try 313 { 314 String id = task.getTaskID(); 315 316 if (tasks.containsKey(id)) 317 { 318 LocalizableMessage message = ERR_TASKSCHED_DUPLICATE_TASK_ID.get(id); 319 throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message); 320 } 321 322 for (String dependencyID : task.getDependencyIDs()) 323 { 324 Task t = tasks.get(dependencyID); 325 if (t == null) 326 { 327 LocalizableMessage message = ERR_TASKSCHED_DEPENDENCY_MISSING.get(id, dependencyID); 328 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 329 } 330 } 331 332 tasks.put(id, task); 333 334 TaskState state = shouldStart(task); 335 task.setTaskState(state); 336 337 if (state == TaskState.RUNNING) 338 { 339 TaskThread taskThread; 340 if (idleThreads.isEmpty()) 341 { 342 taskThread = new TaskThread(this, nextThreadID++); 343 taskThread.start(); 344 } 345 else 346 { 347 taskThread = idleThreads.removeFirst(); 348 } 349 350 runningTasks.add(task); 351 activeThreads.put(task.getTaskID(), taskThread); 352 taskThread.setTask(task); 353 } 354 else if (TaskState.isDone(state)) 355 { 356 if (state == TaskState.CANCELED_BEFORE_STARTING && task.isRecurring()) 357 { 358 pendingTasks.add(task); 359 } 360 else 361 { 362 completedTasks.add(task); 363 } 364 } 365 else 366 { 367 pendingTasks.add(task); 368 } 369 370 if (writeState) 371 { 372 writeState(); 373 } 374 } 375 finally 376 { 377 schedulerLock.unlock(); 378 } 379 } 380 381 382 383 /** 384 * Attempts to cancel the task with the given task ID. This will only cancel 385 * the task if it has not yet started running. If it has started, then it 386 * will not be interrupted. 387 * 388 * @param taskID The task ID of the task to cancel. 389 * 390 * @return The requested task, which may or may not have actually been 391 * cancelled (the task state should make it possible to determine 392 * whether it was cancelled), or <CODE>null</CODE> if there is no 393 * such task. 394 */ 395 public Task cancelTask(String taskID) 396 { 397 schedulerLock.lock(); 398 399 try 400 { 401 Task t = tasks.get(taskID); 402 if (t == null) 403 { 404 return null; 405 } 406 407 if (TaskState.isPending(t.getTaskState())) 408 { 409 pendingTasks.remove(t); 410 t.setTaskState(TaskState.CANCELED_BEFORE_STARTING); 411 addCompletedTask(t); 412 writeState(); 413 } 414 415 return t; 416 } 417 finally 418 { 419 schedulerLock.unlock(); 420 } 421 } 422 423 424 425 /** 426 * Removes the specified pending task. It will be completely removed rather 427 * than moving it to the set of completed tasks. 428 * 429 * @param taskID The task ID of the pending task to remove. 430 * 431 * @return The task that was removed. 432 * 433 * @throws DirectoryException If the requested task is not in the pending 434 * queue. 435 */ 436 public Task removePendingTask(String taskID) 437 throws DirectoryException 438 { 439 schedulerLock.lock(); 440 441 try 442 { 443 Task t = tasks.get(taskID); 444 if (t == null) 445 { 446 LocalizableMessage message = ERR_TASKSCHED_REMOVE_PENDING_NO_SUCH_TASK.get(taskID); 447 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 448 } 449 450 if (TaskState.isPending(t.getTaskState())) 451 { 452 tasks.remove(taskID); 453 pendingTasks.remove(t); 454 writeState(); 455 return t; 456 } 457 else 458 { 459 LocalizableMessage message = ERR_TASKSCHED_REMOVE_PENDING_NOT_PENDING.get(taskID); 460 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 461 } 462 } 463 finally 464 { 465 schedulerLock.unlock(); 466 } 467 } 468 469 470 471 /** 472 * Removes the specified completed task. 473 * 474 * @param taskID The task ID of the completed task to remove. 475 * 476 * @return The task that was removed. 477 * 478 * @throws DirectoryException If the requested task could not be found. 479 */ 480 public Task removeCompletedTask(String taskID) 481 throws DirectoryException 482 { 483 schedulerLock.lock(); 484 485 try 486 { 487 Iterator<Task> iterator = completedTasks.iterator(); 488 while (iterator.hasNext()) 489 { 490 Task t = iterator.next(); 491 if (t.getTaskID().equals(taskID)) 492 { 493 iterator.remove(); 494 tasks.remove(taskID); 495 writeState(); 496 return t; 497 } 498 } 499 500 LocalizableMessage message = ERR_TASKSCHED_REMOVE_COMPLETED_NO_SUCH_TASK.get(taskID); 501 throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); 502 } 503 finally 504 { 505 schedulerLock.unlock(); 506 } 507 } 508 509 510 511 /** 512 * Indicates that processing has completed on the provided task thread and 513 * that it is now available for processing other tasks. The thread may be 514 * immediately used for processing another task if appropriate. 515 * 516 * @param taskThread The thread that has completed processing on its 517 * previously-assigned task. 518 * @param completedTask The task for which processing has been completed. 519 * @param taskState The task state for this completed task. 520 * 521 * @return <CODE>true</CODE> if the thread should continue running and 522 * wait for the next task to process, or <CODE>false</CODE> if it 523 * should exit immediately. 524 */ 525 public boolean threadDone(TaskThread taskThread, Task completedTask, 526 TaskState taskState) 527 { 528 schedulerLock.lock(); 529 530 try 531 { 532 completedTask.setCompletionTime(TimeThread.getTime()); 533 completedTask.setTaskState(taskState); 534 addCompletedTask(completedTask); 535 536 try 537 { 538 completedTask.sendNotificationEMailMessage(); 539 } 540 catch (Exception e) 541 { 542 logger.traceException(e); 543 } 544 545 String taskID = completedTask.getTaskID(); 546 if (activeThreads.remove(taskID) == null) 547 { 548 return false; 549 } 550 551 // See if the task is part of a recurring task. 552 // If so, then schedule the next iteration. 553 scheduleNextRecurringTaskIteration(completedTask, 554 new GregorianCalendar()); 555 writeState(); 556 557 if (isRunning) 558 { 559 idleThreads.add(taskThread); 560 return true; 561 } 562 else 563 { 564 return false; 565 } 566 } 567 finally 568 { 569 schedulerLock.unlock(); 570 } 571 } 572 573 574 575 /** 576 * Check if a given task is a recurring task iteration and re-schedule it. 577 * @param completedTask The task for which processing has been completed. 578 * @param calendar The calendar date and time to schedule from. 579 */ 580 protected void scheduleNextRecurringTaskIteration(Task completedTask, 581 GregorianCalendar calendar) 582 { 583 String recurringTaskID = completedTask.getRecurringTaskID(); 584 if (recurringTaskID != null) 585 { 586 RecurringTask recurringTask = recurringTasks.get(recurringTaskID); 587 if (recurringTask != null) 588 { 589 Task newIteration = null; 590 try { 591 newIteration = recurringTask.scheduleNextIteration(calendar); 592 } catch (DirectoryException de) { 593 logger.error(de.getMessageObject()); 594 } 595 if (newIteration != null) 596 { 597 try 598 { 599 // If there is an existing task with the same id 600 // and it is in completed state, take its place. 601 Task t = tasks.get(newIteration.getTaskID()); 602 if (t != null && TaskState.isDone(t.getTaskState())) 603 { 604 removeCompletedTask(t.getTaskID()); 605 } 606 607 scheduleTask(newIteration, false); 608 } 609 catch (DirectoryException de) 610 { 611 // This task might have been already scheduled from before 612 // and thus got initialized from backing file, otherwise 613 // log error and continue. 614 if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) 615 { 616 logger.traceException(de); 617 618 LocalizableMessage message = 619 ERR_TASKSCHED_ERROR_SCHEDULING_RECURRING_ITERATION.get( 620 recurringTaskID, de.getMessageObject()); 621 logger.error(message); 622 623 DirectoryServer.sendAlertNotification(this, 624 ALERT_TYPE_CANNOT_SCHEDULE_RECURRING_ITERATION, 625 message); 626 } 627 } 628 } 629 } 630 } 631 } 632 633 634 635 /** 636 * Adds the provided task to the set of completed tasks associated with the 637 * scheduler. It will be automatically removed after the appropriate 638 * retention time has elapsed. 639 * 640 * @param completedTask The task for which processing has completed. 641 */ 642 public void addCompletedTask(Task completedTask) 643 { 644 // The scheduler lock is reentrant, so even if we already hold it, we can 645 // acquire it again. 646 schedulerLock.lock(); 647 648 try 649 { 650 completedTasks.add(completedTask); 651 runningTasks.remove(completedTask); 652 653 // If the task never ran set its completion 654 // time here explicitly so that it can be 655 // correctly evaluated for retention later. 656 if (completedTask.getCompletionTime() == -1) 657 { 658 completedTask.setCompletionTime(TimeThread.getTime()); 659 } 660 } 661 finally 662 { 663 schedulerLock.unlock(); 664 } 665 } 666 667 668 669 /** 670 * Stops the scheduler so that it will not start any scheduled tasks. It will 671 * not attempt to interrupt any tasks that are already running. Note that 672 * once the scheduler has been stopped, it cannot be restarted and it will be 673 * necessary to restart the task backend to start a new scheduler instance. 674 */ 675 public void stopScheduler() 676 { 677 stopRequested = true; 678 679 try 680 { 681 schedulerThread.interrupt(); 682 } 683 catch (Exception e) 684 { 685 logger.traceException(e); 686 } 687 688 try 689 { 690 schedulerThread.join(); 691 } 692 catch (Exception e) 693 { 694 logger.traceException(e); 695 } 696 697 pendingTasks.clear(); 698 runningTasks.clear(); 699 completedTasks.clear(); 700 tasks.clear(); 701 702 for (TaskThread thread : idleThreads) 703 { 704 LocalizableMessage message = INFO_TASKBE_INTERRUPTED_BY_SHUTDOWN.get(); 705 thread.interruptTask(TaskState.STOPPED_BY_SHUTDOWN, message, true); 706 } 707 } 708 709 710 711 /** 712 * Attempts to interrupt any tasks that are actively running. This will not 713 * make any attempt to stop the scheduler. 714 * 715 * @param interruptState The state that should be assigned to the tasks if 716 * they are successfully interrupted. 717 * @param interruptReason A message indicating the reason that the tasks 718 * are to be interrupted. 719 * @param waitForStop Indicates whether this method should wait until 720 * all active tasks have stopped before returning. 721 */ 722 public void interruptRunningTasks(TaskState interruptState, 723 LocalizableMessage interruptReason, 724 boolean waitForStop) 725 { 726 // Grab a copy of the running threads so that we can operate on them without 727 // holding the lock. 728 LinkedList<TaskThread> threadList = new LinkedList<>(); 729 730 schedulerLock.lock(); 731 try 732 { 733 threadList.addAll(activeThreads.values()); 734 } 735 finally 736 { 737 schedulerLock.unlock(); 738 } 739 740 741 // Iterate through all the task threads and request that they stop 742 // processing. 743 for (TaskThread t : threadList) 744 { 745 try 746 { 747 t.interruptTask(interruptState, interruptReason, true); 748 } 749 catch (Exception e) 750 { 751 logger.traceException(e); 752 } 753 } 754 755 756 // If we should actually wait for all the task threads to stop, then do so. 757 if (waitForStop) 758 { 759 for (TaskThread t : threadList) 760 { 761 try 762 { 763 t.join(); 764 } 765 catch (Exception e) 766 { 767 logger.traceException(e); 768 } 769 } 770 } 771 } 772 773 774 775 /** 776 * Operates in a loop, launching tasks at the appropriate time and performing 777 * any necessary periodic cleanup. 778 */ 779 @Override 780 public void run() 781 { 782 isRunning = true; 783 schedulerThread = currentThread(); 784 785 try 786 { 787 while (! stopRequested) 788 { 789 schedulerLock.lock(); 790 791 boolean writeState = false; 792 long sleepTime = MAX_SLEEP_TIME; 793 794 try 795 { 796 // If there are any pending tasks that need to be started, then do so 797 // now. 798 Iterator<Task> iterator = pendingTasks.iterator(); 799 while (iterator.hasNext()) 800 { 801 Task t = iterator.next(); 802 TaskState state = shouldStart(t); 803 804 if (state == TaskState.RUNNING) 805 { 806 TaskThread taskThread; 807 if (idleThreads.isEmpty()) 808 { 809 taskThread = new TaskThread(this, nextThreadID++); 810 taskThread.start(); 811 } 812 else 813 { 814 taskThread = idleThreads.removeFirst(); 815 } 816 817 runningTasks.add(t); 818 activeThreads.put(t.getTaskID(), taskThread); 819 taskThread.setTask(t); 820 821 iterator.remove(); 822 writeState = true; 823 } 824 else if (state == TaskState.WAITING_ON_START_TIME) 825 { 826 // If we're waiting for the start time to arrive, then see if that 827 // will come before the next sleep time is up. 828 long waitTime = t.getScheduledStartTime() - TimeThread.getTime(); 829 sleepTime = Math.min(sleepTime, waitTime); 830 } 831 // Recurring task iteration has to spawn the next one 832 // even if the current iteration has been canceled. 833 else if (state == TaskState.CANCELED_BEFORE_STARTING && t.isRecurring()) 834 { 835 if (t.getScheduledStartTime() > TimeThread.getTime()) { 836 // If we're waiting for the start time to arrive, 837 // then see if that will come before the next sleep time is up. 838 long waitTime = 839 t.getScheduledStartTime() - TimeThread.getTime(); 840 sleepTime = Math.min(sleepTime, waitTime); 841 } else { 842 TaskThread taskThread; 843 if (idleThreads.isEmpty()) { 844 taskThread = new TaskThread(this, nextThreadID++); 845 taskThread.start(); 846 } else { 847 taskThread = idleThreads.removeFirst(); 848 } 849 runningTasks.add(t); 850 activeThreads.put(t.getTaskID(), taskThread); 851 taskThread.setTask(t); 852 } 853 } 854 855 if (state != t.getTaskState()) 856 { 857 t.setTaskState(state); 858 writeState = true; 859 } 860 } 861 862 863 // Clean up any completed tasks that have been around long enough. 864 long retentionTimeMillis = 865 TimeUnit.SECONDS.toMillis(taskBackend.getRetentionTime()); 866 long oldestRetainedCompletionTime = 867 TimeThread.getTime() - retentionTimeMillis; 868 iterator = completedTasks.iterator(); 869 while (iterator.hasNext()) 870 { 871 Task t = iterator.next(); 872 if (t.getCompletionTime() < oldestRetainedCompletionTime) 873 { 874 iterator.remove(); 875 tasks.remove(t.getTaskID()); 876 writeState = true; 877 } 878 } 879 880 // If anything changed, then make sure that the on-disk state gets 881 // updated. 882 if (writeState) 883 { 884 writeState(); 885 } 886 } 887 finally 888 { 889 schedulerLock.unlock(); 890 } 891 892 893 try 894 { 895 if (sleepTime > 0) 896 { 897 Thread.sleep(sleepTime); 898 } 899 } catch (InterruptedException ie) {} 900 } 901 } 902 finally 903 { 904 isRunning = false; 905 } 906 } 907 908 909 910 /** 911 * Determines whether the specified task should start running. This is based 912 * on the start time, the set of dependencies, and whether or not the 913 * scheduler is active. Note that the caller to this method must hold the 914 * scheduler lock. 915 * 916 * @param task The task for which to make the determination. 917 * 918 * @return The task state that should be used for the task. It should be 919 * RUNNING if the task should be started, or some other state if not. 920 */ 921 private TaskState shouldStart(Task task) 922 { 923 // If the task has finished we don't want to restart it 924 TaskState state = task.getTaskState(); 925 926 // Reset task state if recurring. 927 if (state == TaskState.RECURRING) { 928 state = null; 929 } 930 931 if (state != null && TaskState.isDone(state)) 932 { 933 return state; 934 } 935 936 if (! isRunning) 937 { 938 return TaskState.UNSCHEDULED; 939 } 940 941 if (task.getScheduledStartTime() > TimeThread.getTime()) 942 { 943 return TaskState.WAITING_ON_START_TIME; 944 } 945 946 LinkedList<String> dependencyIDs = task.getDependencyIDs(); 947 if (dependencyIDs != null) 948 { 949 for (String dependencyID : dependencyIDs) 950 { 951 Task t = tasks.get(dependencyID); 952 if (t != null) 953 { 954 TaskState tState = t.getTaskState(); 955 if (!TaskState.isDone(tState)) 956 { 957 return TaskState.WAITING_ON_DEPENDENCY; 958 } 959 if (!TaskState.isSuccessful(tState)) 960 { 961 FailedDependencyAction action = task.getFailedDependencyAction(); 962 switch (action) 963 { 964 case CANCEL: 965 cancelTask(task.getTaskID()); 966 return task.getTaskState(); 967 case DISABLE: 968 task.setTaskState(TaskState.DISABLED); 969 return task.getTaskState(); 970 default: 971 break; 972 } 973 } 974 } 975 } 976 } 977 978 return TaskState.RUNNING; 979 } 980 981 982 983 /** 984 * Populates the scheduler with information read from the task backing file. 985 * If no backing file is found, then create a new one. The caller must 986 * already hold the scheduler lock or otherwise ensure that this is a 987 * threadsafe operation. 988 * 989 * @throws InitializationException If a fatal error occurs while attempting 990 * to perform the initialization. 991 */ 992 private void initializeTasksFromBackingFile() 993 throws InitializationException 994 { 995 String backingFilePath = taskBackend.getTaskBackingFile(); 996 997 try 998 { 999 File backingFile = getFileForPath(backingFilePath); 1000 if (! backingFile.exists()) 1001 { 1002 createNewTaskBackingFile(); 1003 return; 1004 } 1005 1006 1007 LDIFImportConfig importConfig = new LDIFImportConfig(backingFilePath); 1008 LDIFReader ldifReader = new LDIFReader(importConfig); 1009 1010 taskRootEntry = null; 1011 recurringTaskParentEntry = null; 1012 scheduledTaskParentEntry = null; 1013 1014 while (true) 1015 { 1016 Entry entry; 1017 1018 try 1019 { 1020 entry = ldifReader.readEntry(); 1021 } 1022 catch (LDIFException le) 1023 { 1024 logger.traceException(le); 1025 1026 if (le.canContinueReading()) 1027 { 1028 logger.error(ERR_TASKSCHED_CANNOT_PARSE_ENTRY_RECOVERABLE, 1029 backingFilePath, le.getLineNumber(), le.getMessage()); 1030 1031 continue; 1032 } 1033 else 1034 { 1035 try 1036 { 1037 ldifReader.close(); 1038 } 1039 catch (Exception e) 1040 { 1041 logger.traceException(e); 1042 } 1043 1044 LocalizableMessage message = ERR_TASKSCHED_CANNOT_PARSE_ENTRY_FATAL.get( 1045 backingFilePath, le.getLineNumber(), le.getMessage()); 1046 throw new InitializationException(message); 1047 } 1048 } 1049 1050 if (entry == null) 1051 { 1052 break; 1053 } 1054 1055 DN entryDN = entry.getName(); 1056 if (entryDN.equals(taskBackend.getTaskRootDN())) 1057 { 1058 taskRootEntry = entry; 1059 } 1060 else if (entryDN.equals(taskBackend.getRecurringTasksParentDN())) 1061 { 1062 recurringTaskParentEntry = entry; 1063 } 1064 else if (entryDN.equals(taskBackend.getScheduledTasksParentDN())) 1065 { 1066 scheduledTaskParentEntry = entry; 1067 } 1068 else 1069 { 1070 DN parentDN = entryDN.getParentDNInSuffix(); 1071 if (parentDN == null) 1072 { 1073 logger.error(ERR_TASKSCHED_ENTRY_HAS_NO_PARENT, entryDN, taskBackend.getTaskRootDN()); 1074 } 1075 else if (parentDN.equals(taskBackend.getScheduledTasksParentDN())) 1076 { 1077 try 1078 { 1079 Task task = entryToScheduledTask(entry, null); 1080 if (TaskState.isDone(task.getTaskState())) 1081 { 1082 String id = task.getTaskID(); 1083 if (tasks.containsKey(id)) 1084 { 1085 logger.warn(WARN_TASKSCHED_DUPLICATE_TASK_ID, id); 1086 } 1087 else 1088 { 1089 completedTasks.add(task); 1090 tasks.put(id, task); 1091 } 1092 } 1093 else 1094 { 1095 scheduleTask(task, false); 1096 } 1097 } 1098 catch (DirectoryException de) 1099 { 1100 logger.traceException(de); 1101 logger.error(ERR_TASKSCHED_CANNOT_SCHEDULE_TASK_FROM_ENTRY, entryDN, de.getMessageObject()); 1102 } 1103 } 1104 else if (parentDN.equals(taskBackend.getRecurringTasksParentDN())) 1105 { 1106 try 1107 { 1108 RecurringTask recurringTask = entryToRecurringTask(entry); 1109 addRecurringTask(recurringTask, false); 1110 } 1111 catch (DirectoryException de) 1112 { 1113 logger.traceException(de); 1114 logger.error(ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY, entryDN, de.getMessageObject()); 1115 } 1116 } 1117 else 1118 { 1119 logger.error(ERR_TASKSCHED_INVALID_TASK_ENTRY_DN, entryDN, backingFilePath); 1120 } 1121 } 1122 } 1123 1124 ldifReader.close(); 1125 } 1126 catch (IOException ioe) 1127 { 1128 logger.traceException(ioe); 1129 1130 LocalizableMessage message = ERR_TASKSCHED_ERROR_READING_TASK_BACKING_FILE.get( 1131 backingFilePath, stackTraceToSingleLineString(ioe)); 1132 throw new InitializationException(message, ioe); 1133 } 1134 } 1135 1136 1137 1138 /** 1139 * Creates a new task backing file that contains only the basic structure but 1140 * no scheduled or recurring task entries. The caller must already hold the 1141 * scheduler lock or otherwise ensure that this is a threadsafe operation. 1142 * 1143 * @throws InitializationException If a problem occurs while attempting to 1144 * create the backing file. 1145 */ 1146 private void createNewTaskBackingFile() 1147 throws InitializationException 1148 { 1149 String backingFile = taskBackend.getTaskBackingFile(); 1150 LDIFExportConfig exportConfig = 1151 new LDIFExportConfig(backingFile, ExistingFileBehavior.OVERWRITE); 1152 1153 try 1154 { 1155 LDIFWriter writer = new LDIFWriter(exportConfig); 1156 1157 // First, write a header to the top of the file to indicate that it should 1158 // not be manually edited. 1159 writer.writeComment(INFO_TASKBE_BACKING_FILE_HEADER.get(), 80); 1160 1161 1162 // Next, create the required hierarchical entries and add them to the 1163 // LDIF. 1164 taskRootEntry = createEntry(taskBackend.getTaskRootDN()); 1165 writer.writeEntry(taskRootEntry); 1166 1167 scheduledTaskParentEntry = 1168 createEntry(taskBackend.getScheduledTasksParentDN()); 1169 writer.writeEntry(scheduledTaskParentEntry); 1170 1171 recurringTaskParentEntry = 1172 createEntry(taskBackend.getRecurringTasksParentDN()); 1173 writer.writeEntry(recurringTaskParentEntry); 1174 1175 1176 // Close the file and we're done. 1177 writer.close(); 1178 } 1179 catch (IOException ioe) 1180 { 1181 logger.traceException(ioe); 1182 1183 LocalizableMessage message = ERR_TASKSCHED_CANNOT_CREATE_BACKING_FILE.get( 1184 backingFile, stackTraceToSingleLineString(ioe)); 1185 throw new InitializationException(message, ioe); 1186 } 1187 catch (LDIFException le) 1188 { 1189 logger.traceException(le); 1190 LocalizableMessage message = ERR_TASKSCHED_CANNOT_CREATE_BACKING_FILE.get( 1191 backingFile, le.getMessage()); 1192 throw new InitializationException(message, le); 1193 } 1194 } 1195 1196 1197 1198 /** 1199 * Writes state information about all tasks and recurring tasks to disk. 1200 */ 1201 public void writeState() 1202 { 1203 String backingFilePath = taskBackend.getTaskBackingFile(); 1204 String tmpFilePath = backingFilePath + ".tmp"; 1205 LDIFExportConfig exportConfig = 1206 new LDIFExportConfig(tmpFilePath, ExistingFileBehavior.OVERWRITE); 1207 1208 1209 schedulerLock.lock(); 1210 1211 try 1212 { 1213 LDIFWriter writer = new LDIFWriter(exportConfig); 1214 1215 // First, write a header to the top of the file to indicate that it should 1216 // not be manually edited. 1217 writer.writeComment(INFO_TASKBE_BACKING_FILE_HEADER.get(), 80); 1218 1219 1220 // Next, write the structural entries to the top of the LDIF. 1221 writer.writeEntry(taskRootEntry); 1222 writer.writeEntry(scheduledTaskParentEntry); 1223 writer.writeEntry(recurringTaskParentEntry); 1224 1225 1226 // Iterate through all the recurring tasks and write them to LDIF. 1227 for (RecurringTask recurringTask : recurringTasks.values()) 1228 { 1229 writer.writeEntry(recurringTask.getRecurringTaskEntry()); 1230 } 1231 1232 1233 // Iterate through all the scheduled tasks and write them to LDIF. 1234 for (Task task : tasks.values()) 1235 { 1236 writer.writeEntry(task.getTaskEntry()); 1237 } 1238 1239 1240 // Close the file. 1241 writer.close(); 1242 1243 1244 // See if there is a ".save" file. If so, then delete it. 1245 File saveFile = getFileForPath(backingFilePath + ".save"); 1246 try 1247 { 1248 if (saveFile.exists()) 1249 { 1250 saveFile.delete(); 1251 } 1252 } 1253 catch (Exception e) 1254 { 1255 logger.traceException(e); 1256 } 1257 1258 1259 // If there is an existing backing file, then rename it to ".save". 1260 File backingFile = getFileForPath(backingFilePath); 1261 try 1262 { 1263 if (backingFile.exists()) 1264 { 1265 backingFile.renameTo(saveFile); 1266 } 1267 } 1268 catch (Exception e) 1269 { 1270 logger.traceException(e); 1271 1272 LocalizableMessage message = WARN_TASKSCHED_CANNOT_RENAME_CURRENT_BACKING_FILE.get( 1273 backingFilePath, saveFile.getAbsolutePath(), stackTraceToSingleLineString(e)); 1274 logger.warn(message); 1275 DirectoryServer.sendAlertNotification( 1276 this, ALERT_TYPE_CANNOT_RENAME_CURRENT_TASK_FILE, message); 1277 } 1278 1279 1280 // Rename the ".tmp" file into place. 1281 File tmpFile = getFileForPath(tmpFilePath); 1282 try 1283 { 1284 tmpFile.renameTo(backingFile); 1285 } 1286 catch (Exception e) 1287 { 1288 logger.traceException(e); 1289 1290 LocalizableMessage message = ERR_TASKSCHED_CANNOT_RENAME_NEW_BACKING_FILE.get( 1291 tmpFilePath, backingFilePath, stackTraceToSingleLineString(e)); 1292 logger.error(message); 1293 DirectoryServer.sendAlertNotification( 1294 this, ALERT_TYPE_CANNOT_RENAME_NEW_TASK_FILE, message); 1295 } 1296 } 1297 catch (LDIFException le) 1298 { 1299 logger.traceException(le); 1300 LocalizableMessage message = 1301 ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath, le 1302 .getMessage()); 1303 logger.error(message); 1304 DirectoryServer.sendAlertNotification(this, 1305 ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message); 1306 } 1307 catch (Exception e) 1308 { 1309 logger.traceException(e); 1310 LocalizableMessage message = 1311 ERR_TASKSCHED_CANNOT_WRITE_BACKING_FILE.get(tmpFilePath, 1312 stackTraceToSingleLineString(e)); 1313 logger.error(message); 1314 DirectoryServer.sendAlertNotification(this, 1315 ALERT_TYPE_CANNOT_WRITE_TASK_FILE, message); 1316 } 1317 finally 1318 { 1319 schedulerLock.unlock(); 1320 } 1321 } 1322 1323 1324 1325 /** 1326 * Retrieves the total number of entries in the task backend. 1327 * 1328 * @return The total number of entries in the task backend. 1329 */ 1330 public long getEntryCount() 1331 { 1332 schedulerLock.lock(); 1333 1334 try 1335 { 1336 return tasks.size() + recurringTasks.size() + 3; 1337 } 1338 finally 1339 { 1340 schedulerLock.unlock(); 1341 } 1342 } 1343 1344 /** 1345 * Retrieves the number of scheduled tasks in the task backend. 1346 * 1347 * @return The total number of entries in the task backend. 1348 */ 1349 public long getScheduledTaskCount() 1350 { 1351 schedulerLock.lock(); 1352 1353 try 1354 { 1355 return tasks.size(); 1356 } 1357 finally 1358 { 1359 schedulerLock.unlock(); 1360 } 1361 } 1362 1363 1364 1365 /** 1366 * Retrieves the number of recurring tasks in the task backend. 1367 * 1368 * @return The total number of entries in the task backend. 1369 */ 1370 public long getRecurringTaskCount() 1371 { 1372 schedulerLock.lock(); 1373 1374 try 1375 { 1376 return recurringTasks.size(); 1377 } 1378 finally 1379 { 1380 schedulerLock.unlock(); 1381 } 1382 } 1383 1384 1385 1386 /** 1387 * Retrieves the task backend with which this scheduler is associated. 1388 * 1389 * @return The task backend with which this scheduler is associated. 1390 */ 1391 public TaskBackend getTaskBackend() 1392 { 1393 return taskBackend; 1394 } 1395 1396 1397 1398 /** 1399 * Retrieves the root entry that is the common ancestor for all entries in the 1400 * task backend. 1401 * 1402 * @return The root entry that is the common ancestor for all entries in the 1403 * task backend. 1404 */ 1405 public Entry getTaskRootEntry() 1406 { 1407 return taskRootEntry.duplicate(true); 1408 } 1409 1410 1411 1412 /** 1413 * Retrieves the entry that is the immediate parent for all scheduled task 1414 * entries in the task backend. 1415 * 1416 * @return The entry that is the immediate parent for all scheduled task 1417 * entries in the task backend. 1418 */ 1419 public Entry getScheduledTaskParentEntry() 1420 { 1421 return scheduledTaskParentEntry.duplicate(true); 1422 } 1423 1424 1425 1426 /** 1427 * Retrieves the entry that is the immediate parent for all recurring task 1428 * entries in the task backend. 1429 * 1430 * @return The entry that is the immediate parent for all recurring task 1431 * entries in the task backend. 1432 */ 1433 public Entry getRecurringTaskParentEntry() 1434 { 1435 return recurringTaskParentEntry.duplicate(true); 1436 } 1437 1438 1439 1440 /** 1441 * Retrieves the scheduled task with the given task ID. 1442 * 1443 * @param taskID The task ID for the scheduled task to retrieve. 1444 * 1445 * @return The requested scheduled task, or <CODE>null</CODE> if there is no 1446 * such task. 1447 */ 1448 public Task getScheduledTask(String taskID) 1449 { 1450 schedulerLock.lock(); 1451 1452 try 1453 { 1454 return tasks.get(taskID); 1455 } 1456 finally 1457 { 1458 schedulerLock.unlock(); 1459 } 1460 } 1461 1462 1463 1464 /** 1465 * Retrieves the scheduled task created from the specified entry. 1466 * 1467 * @param taskEntryDN The DN of the task configuration entry associated 1468 * with the task to retrieve. 1469 * 1470 * @return The requested scheduled task, or <CODE>null</CODE> if there is no 1471 * such task. 1472 */ 1473 public Task getScheduledTask(DN taskEntryDN) 1474 { 1475 schedulerLock.lock(); 1476 1477 try 1478 { 1479 for (Task t : tasks.values()) 1480 { 1481 if (taskEntryDN.equals(t.getTaskEntry().getName())) 1482 { 1483 return t; 1484 } 1485 } 1486 1487 return null; 1488 } 1489 finally 1490 { 1491 schedulerLock.unlock(); 1492 } 1493 } 1494 1495 1496 1497 /** 1498 * Indicates whether the current thread already holds a lock on the scheduler. 1499 * 1500 * @return {@code true} if the current thread holds the scheduler lock, or 1501 * {@code false} if not. 1502 */ 1503 boolean holdsSchedulerLock() 1504 { 1505 return schedulerLock.isHeldByCurrentThread(); 1506 } 1507 1508 1509 1510 /** 1511 * Attempts to acquire a write lock on the specified entry, trying as many 1512 * times as necessary until the lock has been acquired. 1513 * 1514 * @param entryDN The DN of the entry for which to acquire the write lock. 1515 * 1516 * @return The write lock that has been acquired for the entry. 1517 */ 1518 DNLock writeLockEntry(DN entryDN) 1519 { 1520 DNLock lock = null; 1521 while (lock == null) 1522 { 1523 lock = DirectoryServer.getLockManager().tryWriteLockEntry(entryDN); 1524 } 1525 return lock; 1526 } 1527 1528 1529 1530 /** 1531 * Attempts to acquire a read lock on the specified entry. 1532 * 1533 * @param entryDN The DN of the entry for which to acquire the read lock. 1534 * 1535 * @return The read lock that has been acquired for the entry. 1536 * 1537 * @throws DirectoryException If the read lock cannot be acquired. 1538 */ 1539 DNLock readLockEntry(DN entryDN) throws DirectoryException 1540 { 1541 final DNLock lock = DirectoryServer.getLockManager().tryReadLockEntry(entryDN); 1542 if (lock != null) 1543 { 1544 return lock; 1545 } 1546 throw new DirectoryException(ResultCode.BUSY, ERR_BACKEND_CANNOT_LOCK_ENTRY.get(entryDN)); 1547 } 1548 1549 1550 1551 /** 1552 * Retrieves the scheduled task entry with the provided DN. The caller should 1553 * hold a read lock on the target entry. 1554 * 1555 * @param scheduledTaskEntryDN The entry DN that indicates which scheduled 1556 * task entry to retrieve. 1557 * 1558 * @return The scheduled task entry with the provided DN, or 1559 * <CODE>null</CODE> if no scheduled task has the provided DN. 1560 */ 1561 public Entry getScheduledTaskEntry(DN scheduledTaskEntryDN) 1562 { 1563 schedulerLock.lock(); 1564 1565 try 1566 { 1567 for (Task task : tasks.values()) 1568 { 1569 Entry taskEntry = task.getTaskEntry(); 1570 1571 if (scheduledTaskEntryDN.equals(taskEntry.getName())) 1572 { 1573 return taskEntry.duplicate(true); 1574 } 1575 } 1576 1577 return null; 1578 } 1579 finally 1580 { 1581 schedulerLock.unlock(); 1582 } 1583 } 1584 1585 1586 1587 /** 1588 * Compares the filter in the provided search operation against each of the 1589 * task entries, returning any that match. Note that only the search filter 1590 * will be used -- the base and scope will be ignored, so the caller must 1591 * ensure that they are correct for scheduled tasks. 1592 * 1593 * @param searchOperation The search operation to use when performing the 1594 * search. 1595 * 1596 * @return <CODE>true</CODE> if processing should continue on the search 1597 * operation, or <CODE>false</CODE> if it should not for some reason 1598 * (e.g., a size or time limit was reached). 1599 * 1600 * @throws DirectoryException If a problem occurs while processing the 1601 * search operation against the scheduled tasks. 1602 */ 1603 public boolean searchScheduledTasks(SearchOperation searchOperation) 1604 throws DirectoryException 1605 { 1606 SearchFilter filter = searchOperation.getFilter(); 1607 1608 schedulerLock.lock(); 1609 1610 try 1611 { 1612 for (Task t : tasks.values()) 1613 { 1614 DN taskEntryDN = t.getTaskEntryDN(); 1615 DNLock lock = readLockEntry(taskEntryDN); 1616 try 1617 { 1618 Entry e = t.getTaskEntry().duplicate(true); 1619 if (filter.matchesEntry(e) && !searchOperation.returnEntry(e, null)) 1620 { 1621 return false; 1622 } 1623 } 1624 finally 1625 { 1626 lock.unlock(); 1627 } 1628 } 1629 1630 return true; 1631 } 1632 finally 1633 { 1634 schedulerLock.unlock(); 1635 } 1636 } 1637 1638 1639 1640 /** 1641 * Retrieves the recurring task with the given recurring task ID. 1642 * 1643 * @param recurringTaskID The recurring task ID for the recurring task to 1644 * retrieve. 1645 * 1646 * @return The requested recurring task, or <CODE>null</CODE> if there is no 1647 * such recurring task. 1648 */ 1649 public RecurringTask getRecurringTask(String recurringTaskID) 1650 { 1651 schedulerLock.lock(); 1652 1653 try 1654 { 1655 return recurringTasks.get(recurringTaskID); 1656 } 1657 finally 1658 { 1659 schedulerLock.unlock(); 1660 } 1661 } 1662 1663 1664 1665 /** 1666 * Retrieves the recurring task with the given recurring task ID. 1667 * 1668 * @param recurringTaskEntryDN The recurring task ID for the recurring task 1669 * to retrieve. 1670 * 1671 * @return The requested recurring task, or <CODE>null</CODE> if there is no 1672 * such recurring task. 1673 */ 1674 public RecurringTask getRecurringTask(DN recurringTaskEntryDN) 1675 { 1676 schedulerLock.lock(); 1677 1678 try 1679 { 1680 for (RecurringTask rt : recurringTasks.values()) 1681 { 1682 if (recurringTaskEntryDN.equals(rt.getRecurringTaskEntry().getName())) 1683 { 1684 return rt; 1685 } 1686 } 1687 1688 return null; 1689 } 1690 finally 1691 { 1692 schedulerLock.unlock(); 1693 } 1694 } 1695 1696 1697 1698 /** 1699 * Retrieves the recurring task entry with the provided DN. The caller should 1700 * hold a read lock on the target entry. 1701 * 1702 * @param recurringTaskEntryDN The entry DN that indicates which recurring 1703 * task entry to retrieve. 1704 * 1705 * @return The recurring task entry with the provided DN, or 1706 * <CODE>null</CODE> if no recurring task has the provided DN. 1707 */ 1708 public Entry getRecurringTaskEntry(DN recurringTaskEntryDN) 1709 { 1710 schedulerLock.lock(); 1711 1712 try 1713 { 1714 for (RecurringTask recurringTask : recurringTasks.values()) 1715 { 1716 Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry(); 1717 1718 if (recurringTaskEntryDN.equals(recurringTaskEntry.getName())) 1719 { 1720 return recurringTaskEntry.duplicate(true); 1721 } 1722 } 1723 1724 return null; 1725 } 1726 finally 1727 { 1728 schedulerLock.unlock(); 1729 } 1730 } 1731 1732 1733 1734 /** 1735 * Compares the filter in the provided search operation against each of the 1736 * recurring task entries, returning any that match. Note that only the 1737 * search filter will be used -- the base and scope will be ignored, so the 1738 * caller must ensure that they are correct for recurring tasks. 1739 * 1740 * @param searchOperation The search operation to use when performing the 1741 * search. 1742 * 1743 * @return <CODE>true</CODE> if processing should continue on the search 1744 * operation, or <CODE>false</CODE> if it should not for some reason 1745 * (e.g., a size or time limit was reached). 1746 * 1747 * @throws DirectoryException If a problem occurs while processing the 1748 * search operation against the recurring tasks. 1749 */ 1750 public boolean searchRecurringTasks(SearchOperation searchOperation) 1751 throws DirectoryException 1752 { 1753 SearchFilter filter = searchOperation.getFilter(); 1754 1755 schedulerLock.lock(); 1756 1757 try 1758 { 1759 for (RecurringTask rt : recurringTasks.values()) 1760 { 1761 DN recurringTaskEntryDN = rt.getRecurringTaskEntryDN(); 1762 DNLock lock = readLockEntry(recurringTaskEntryDN); 1763 try 1764 { 1765 Entry e = rt.getRecurringTaskEntry().duplicate(true); 1766 if (filter.matchesEntry(e) && ! searchOperation.returnEntry(e, null)) 1767 { 1768 return false; 1769 } 1770 } 1771 finally 1772 { 1773 lock.unlock(); 1774 } 1775 } 1776 1777 return true; 1778 } 1779 finally 1780 { 1781 schedulerLock.unlock(); 1782 } 1783 } 1784 1785 1786 1787 /** 1788 * Decodes the contents of the provided entry as a scheduled task. The 1789 * resulting task will not actually be scheduled for processing. 1790 * 1791 * @param entry The entry to decode as a scheduled task. 1792 * @param operation The operation used to create this task in the server, or 1793 * {@code null} if the operation is not available. 1794 * 1795 * @return The scheduled task decoded from the provided entry. 1796 * 1797 * @throws DirectoryException If the provided entry cannot be decoded as a 1798 * scheduled task. 1799 */ 1800 public Task entryToScheduledTask(Entry entry, Operation operation) 1801 throws DirectoryException 1802 { 1803 // Get the name of the class that implements the task logic. 1804 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(ATTR_TASK_CLASS.toLowerCase(), ATTR_TASK_CLASS); 1805 List<Attribute> attrList = entry.getAttribute(attrType); 1806 if (attrList == null || attrList.isEmpty()) 1807 { 1808 LocalizableMessage message = ERR_TASKSCHED_NO_CLASS_ATTRIBUTE.get(ATTR_TASK_ID); 1809 throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message); 1810 } 1811 1812 if (attrList.size() > 1) 1813 { 1814 LocalizableMessage message = ERR_TASKSCHED_MULTIPLE_CLASS_TYPES.get(ATTR_TASK_ID); 1815 throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message); 1816 } 1817 1818 Attribute attr = attrList.get(0); 1819 if (attr.isEmpty()) 1820 { 1821 LocalizableMessage message = ERR_TASKSCHED_NO_CLASS_VALUES.get(ATTR_TASK_ID); 1822 throw new DirectoryException(ResultCode.CONSTRAINT_VIOLATION, message); 1823 } 1824 1825 Iterator<ByteString> iterator = attr.iterator(); 1826 ByteString value = iterator.next(); 1827 if (iterator.hasNext()) 1828 { 1829 LocalizableMessage message = ERR_TASKSCHED_MULTIPLE_CLASS_VALUES.get(ATTR_TASK_ID); 1830 throw new DirectoryException(ResultCode.OBJECTCLASS_VIOLATION, message); 1831 } 1832 1833 // Try to load the specified class. 1834 String taskClassName = value.toString(); 1835 Class<?> taskClass; 1836 try 1837 { 1838 taskClass = DirectoryServer.loadClass(taskClassName); 1839 } 1840 catch (Exception e) 1841 { 1842 logger.traceException(e); 1843 1844 LocalizableMessage message = ERR_TASKSCHED_CANNOT_LOAD_CLASS. 1845 get(taskClassName, ATTR_TASK_CLASS, stackTraceToSingleLineString(e)); 1846 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1847 } 1848 1849 // Instantiate the class as a task. 1850 Task task; 1851 try 1852 { 1853 task = (Task) taskClass.newInstance(); 1854 } 1855 catch (Exception e) 1856 { 1857 logger.traceException(e); 1858 1859 LocalizableMessage message = ERR_TASKSCHED_CANNOT_INSTANTIATE_CLASS_AS_TASK.get( 1860 taskClassName, Task.class.getName()); 1861 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1862 } 1863 1864 // Perform the necessary internal and external initialization for the task. 1865 try 1866 { 1867 task.initializeTaskInternal(serverContext, this, entry); 1868 } 1869 catch (InitializationException ie) 1870 { 1871 logger.traceException(ie); 1872 1873 LocalizableMessage message = ERR_TASKSCHED_CANNOT_INITIALIZE_INTERNAL.get( 1874 taskClassName, ie.getMessage()); 1875 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1876 } 1877 catch (Exception e) 1878 { 1879 LocalizableMessage message = ERR_TASKSCHED_CANNOT_INITIALIZE_INTERNAL.get( 1880 taskClassName, stackTraceToSingleLineString(e)); 1881 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), message); 1882 } 1883 1884 if (!TaskState.isDone(task.getTaskState()) && 1885 !DirectoryServer.getAllowedTasks().contains(taskClassName)) 1886 { 1887 LocalizableMessage message = ERR_TASKSCHED_NOT_ALLOWED_TASK.get(taskClassName); 1888 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); 1889 } 1890 1891 task.setOperation(operation); 1892 1893 // Avoid task specific initialization for completed tasks. 1894 if (!TaskState.isDone(task.getTaskState())) { 1895 task.initializeTask(); 1896 } 1897 task.setOperation(null); 1898 1899 return task; 1900 } 1901 1902 1903 1904 /** 1905 * Decodes the contents of the provided entry as a recurring task. The 1906 * resulting recurring task will not actually be added to the scheduler. 1907 * 1908 * @param entry The entry to decode as a recurring task. 1909 * 1910 * @return The recurring task decoded from the provided entry. 1911 * 1912 * @throws DirectoryException If the provided entry cannot be decoded as a 1913 * recurring task. 1914 */ 1915 public RecurringTask entryToRecurringTask(Entry entry) 1916 throws DirectoryException 1917 { 1918 return new RecurringTask(serverContext, this, entry); 1919 } 1920 1921 1922 1923 /** 1924 * Retrieves the DN of the configuration entry with which this alert generator 1925 * is associated. 1926 * 1927 * @return The DN of the configuration entry with which this alert generator 1928 * is associated. 1929 */ 1930 @Override 1931 public DN getComponentEntryDN() 1932 { 1933 return taskBackend.getConfigEntryDN(); 1934 } 1935 1936 1937 1938 /** 1939 * Retrieves the fully-qualified name of the Java class for this alert 1940 * generator implementation. 1941 * 1942 * @return The fully-qualified name of the Java class for this alert 1943 * generator implementation. 1944 */ 1945 @Override 1946 public String getClassName() 1947 { 1948 return CLASS_NAME; 1949 } 1950 1951 1952 1953 /** 1954 * Retrieves information about the set of alerts that this generator may 1955 * produce. The map returned should be between the notification type for a 1956 * particular notification and the human-readable description for that 1957 * notification. This alert generator must not generate any alerts with types 1958 * that are not contained in this list. 1959 * 1960 * @return Information about the set of alerts that this generator may 1961 * produce. 1962 */ 1963 @Override 1964 public LinkedHashMap<String,String> getAlerts() 1965 { 1966 LinkedHashMap<String, String> alerts = new LinkedHashMap<>(); 1967 1968 alerts.put(ALERT_TYPE_CANNOT_SCHEDULE_RECURRING_ITERATION, 1969 ALERT_DESCRIPTION_CANNOT_SCHEDULE_RECURRING_ITERATION); 1970 alerts.put(ALERT_TYPE_CANNOT_RENAME_CURRENT_TASK_FILE, 1971 ALERT_DESCRIPTION_CANNOT_RENAME_CURRENT_TASK_FILE); 1972 alerts.put(ALERT_TYPE_CANNOT_RENAME_NEW_TASK_FILE, 1973 ALERT_DESCRIPTION_CANNOT_RENAME_NEW_TASK_FILE); 1974 alerts.put(ALERT_TYPE_CANNOT_WRITE_TASK_FILE, 1975 ALERT_DESCRIPTION_CANNOT_WRITE_TASK_FILE); 1976 1977 return alerts; 1978 } 1979}