001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2009-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2014-2015 ForgeRock AS
026 */
027package org.opends.server.tools.tasks;
028
029import static org.forgerock.opendj.ldap.ResultCode.*;
030import static org.opends.messages.ToolMessages.*;
031import static org.opends.server.config.ConfigConstants.*;
032
033import java.io.IOException;
034import java.text.SimpleDateFormat;
035import java.util.ArrayList;
036import java.util.Collections;
037import java.util.Date;
038import java.util.LinkedHashSet;
039import java.util.List;
040import java.util.UUID;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.forgerock.i18n.LocalizableMessage;
044import org.forgerock.opendj.ldap.ByteString;
045import org.forgerock.opendj.ldap.DecodeException;
046import org.forgerock.opendj.ldap.DereferenceAliasesPolicy;
047import org.forgerock.opendj.ldap.ModificationType;
048import org.forgerock.opendj.ldap.SearchScope;
049import org.opends.server.backends.task.FailedDependencyAction;
050import org.opends.server.backends.task.TaskState;
051import org.opends.server.config.ConfigConstants;
052import org.opends.server.protocols.ldap.AddRequestProtocolOp;
053import org.opends.server.protocols.ldap.AddResponseProtocolOp;
054import org.opends.server.protocols.ldap.DeleteRequestProtocolOp;
055import org.opends.server.protocols.ldap.DeleteResponseProtocolOp;
056import org.opends.server.protocols.ldap.LDAPAttribute;
057import org.opends.server.protocols.ldap.LDAPConstants;
058import org.opends.server.protocols.ldap.LDAPFilter;
059import org.opends.server.protocols.ldap.LDAPMessage;
060import org.opends.server.protocols.ldap.LDAPModification;
061import org.opends.server.protocols.ldap.LDAPResultCode;
062import org.opends.server.protocols.ldap.ModifyRequestProtocolOp;
063import org.opends.server.protocols.ldap.ModifyResponseProtocolOp;
064import org.opends.server.protocols.ldap.SearchRequestProtocolOp;
065import org.opends.server.protocols.ldap.SearchResultEntryProtocolOp;
066import org.opends.server.tools.LDAPConnection;
067import org.opends.server.tools.LDAPReader;
068import org.opends.server.tools.LDAPWriter;
069import org.opends.server.types.Control;
070import org.opends.server.types.Entry;
071import org.opends.server.types.LDAPException;
072import org.opends.server.types.RawAttribute;
073import org.opends.server.types.RawModification;
074import org.opends.server.types.SearchResultEntry;
075import org.opends.server.util.StaticUtils;
076
077/**
078 * Helper class for interacting with the task backend on behalf of utilities
079 * that are capable of being scheduled.
080 */
081public class TaskClient {
082
083  /**
084   * Connection through which task scheduling will take place.
085   */
086  protected LDAPConnection connection;
087
088  /**
089   * Keeps track of message IDs.
090   */
091  private final AtomicInteger nextMessageID = new AtomicInteger(0);
092
093  /**
094   * Creates a new TaskClient for interacting with the task backend remotely.
095   * @param conn for accessing the task backend
096   */
097  public TaskClient(LDAPConnection conn) {
098    this.connection = conn;
099  }
100
101  /**
102   * Returns the ID of the task entry for a given list of task attributes.
103   * @param taskAttributes the task attributes.
104   * @return the ID of the task entry for a given list of task attributes.
105   */
106  public static String getTaskID(List<RawAttribute> taskAttributes)
107  {
108    String taskID = null;
109
110    RawAttribute recurringIDAttr = getAttribute(ATTR_RECURRING_TASK_ID,
111        taskAttributes);
112
113    if (recurringIDAttr != null) {
114      taskID = recurringIDAttr.getValues().get(0).toString();
115    } else {
116      RawAttribute taskIDAttr = getAttribute(ATTR_TASK_ID,
117          taskAttributes);
118      taskID = taskIDAttr.getValues().get(0).toString();
119    }
120
121    return taskID;
122  }
123
124  private static RawAttribute getAttribute(String attrName,
125      List<RawAttribute> taskAttributes)
126  {
127    for (RawAttribute attr : taskAttributes)
128    {
129      if (attr.getAttributeType().equalsIgnoreCase(attrName))
130      {
131        return attr;
132      }
133    }
134    return null;
135  }
136
137  /**
138   * Returns the DN of the task entry for a given list of task attributes.
139   * @param taskAttributes the task attributes.
140   * @return the DN of the task entry for a given list of task attributes.
141   */
142  public static String getTaskDN(List<RawAttribute> taskAttributes)
143  {
144    String entryDN = null;
145    String taskID = getTaskID(taskAttributes);
146    RawAttribute recurringIDAttr = getAttribute(ATTR_RECURRING_TASK_ID,
147        taskAttributes);
148
149    if (recurringIDAttr != null) {
150      entryDN = ATTR_RECURRING_TASK_ID + "=" +
151      taskID + "," + RECURRING_TASK_BASE_RDN + "," + DN_TASK_ROOT;
152    } else {
153      entryDN = ATTR_TASK_ID + "=" + taskID + "," +
154      SCHEDULED_TASK_BASE_RDN + "," + DN_TASK_ROOT;
155    }
156    return entryDN;
157  }
158
159  private static boolean isScheduleRecurring(TaskScheduleInformation information)
160  {
161    return information.getRecurringDateTime() != null;
162  }
163
164  /**
165   * This is a commodity method that returns the common attributes (those
166   * related to scheduling) of a task entry for a given
167   * {@link TaskScheduleInformation} object.
168   * @param information the scheduling information.
169   * @return the schedule attributes of the task entry.
170   */
171  public static ArrayList<RawAttribute> getTaskAttributes(
172      TaskScheduleInformation information)
173  {
174    String taskID = null;
175    boolean scheduleRecurring = isScheduleRecurring(information);
176
177    if (scheduleRecurring) {
178      taskID = information.getTaskId();
179      if (taskID == null || taskID.length() == 0) {
180        taskID = information.getTaskClass().getSimpleName() + "-" + UUID.randomUUID();
181      }
182    } else {
183      // Use a formatted time/date for the ID so that is remotely useful
184      SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmssSSS");
185      taskID = df.format(new Date());
186    }
187
188    ArrayList<RawAttribute> attributes = new ArrayList<>();
189
190    ArrayList<String> ocValues = new ArrayList<>(4);
191    ocValues.add("top");
192    ocValues.add(ConfigConstants.OC_TASK);
193    if (scheduleRecurring) {
194      ocValues.add(ConfigConstants.OC_RECURRING_TASK);
195    }
196    ocValues.add(information.getTaskObjectclass());
197    attributes.add(new LDAPAttribute(ATTR_OBJECTCLASS, ocValues));
198
199    if (scheduleRecurring) {
200      attributes.add(new LDAPAttribute(ATTR_RECURRING_TASK_ID, taskID));
201    }
202    attributes.add(new LDAPAttribute(ATTR_TASK_ID, taskID));
203
204    String classValue = information.getTaskClass().getName();
205    attributes.add(new LDAPAttribute(ATTR_TASK_CLASS, classValue));
206
207    // add the start time if necessary
208    Date startDate = information.getStartDateTime();
209    if (startDate != null) {
210      String startTimeString = StaticUtils.formatDateTimeString(startDate);
211      attributes.add(new LDAPAttribute(ATTR_TASK_SCHEDULED_START_TIME, startTimeString));
212    }
213
214    if (scheduleRecurring) {
215      String recurringPatternValues = information.getRecurringDateTime();
216      attributes.add(new LDAPAttribute(ATTR_RECURRING_TASK_SCHEDULE, recurringPatternValues));
217    }
218
219    // add dependency IDs
220    List<String> dependencyIds = information.getDependencyIds();
221    if (dependencyIds != null && !dependencyIds.isEmpty()) {
222      attributes.add(new LDAPAttribute(ATTR_TASK_DEPENDENCY_IDS, dependencyIds));
223
224      // add the dependency action
225      FailedDependencyAction fda = information.getFailedDependencyAction();
226      if (fda == null) {
227        fda = FailedDependencyAction.defaultValue();
228      }
229      attributes.add(new LDAPAttribute(ATTR_TASK_FAILED_DEPENDENCY_ACTION, fda.name()));
230    }
231
232    // add completion notification email addresses
233    List<String> compNotifEmailAddresss = information.getNotifyUponCompletionEmailAddresses();
234    if (compNotifEmailAddresss != null && !compNotifEmailAddresss.isEmpty()) {
235      attributes.add(new LDAPAttribute(ATTR_TASK_NOTIFY_ON_COMPLETION, compNotifEmailAddresss));
236    }
237
238    // add error notification email addresses
239    List<String> errNotifEmailAddresss = information.getNotifyUponErrorEmailAddresses();
240    if (errNotifEmailAddresss != null && !errNotifEmailAddresss.isEmpty()) {
241      attributes.add(new LDAPAttribute(ATTR_TASK_NOTIFY_ON_ERROR, errNotifEmailAddresss));
242    }
243
244    information.addTaskAttributes(attributes);
245
246    return attributes;
247  }
248
249  /**
250   * Schedule a task for execution by writing an entry to the task backend.
251   *
252   * @param information to be scheduled
253   * @return String task ID assigned the new task
254   * @throws IOException if there is a stream communication problem
255   * @throws LDAPException if there is a problem getting information
256   *         out to the directory
257   * @throws DecodeException if there is a problem with the encoding
258   * @throws TaskClientException if there is a problem with the task entry
259   */
260  public synchronized TaskEntry schedule(TaskScheduleInformation information)
261          throws LDAPException, IOException, DecodeException, TaskClientException
262  {
263    LDAPReader reader = connection.getLDAPReader();
264    LDAPWriter writer = connection.getLDAPWriter();
265
266    ArrayList<Control> controls = new ArrayList<>();
267    ArrayList<RawAttribute> attributes = getTaskAttributes(information);
268
269    ByteString entryDN = ByteString.valueOfUtf8(getTaskDN(attributes));
270    AddRequestProtocolOp addRequest = new AddRequestProtocolOp(entryDN, attributes);
271    LDAPMessage requestMessage =
272         new LDAPMessage(nextMessageID.getAndIncrement(), addRequest, controls);
273
274    // Send the request to the server and read the response.
275    LDAPMessage responseMessage;
276    writer.writeMessage(requestMessage);
277
278    responseMessage = reader.readMessage();
279    if (responseMessage == null)
280    {
281      throw new LDAPException(
282              LDAPResultCode.CLIENT_SIDE_SERVER_DOWN,
283              ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get());
284    }
285
286    if (responseMessage.getProtocolOpType() !=
287        LDAPConstants.OP_TYPE_ADD_RESPONSE)
288    {
289      throw new LDAPException(
290              LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR,
291              ERR_TASK_CLIENT_INVALID_RESPONSE_TYPE.get(
292                responseMessage.getProtocolOpName()));
293    }
294
295    AddResponseProtocolOp addResponse =
296         responseMessage.getAddResponseProtocolOp();
297    if (addResponse.getResultCode() != 0) {
298      throw new LDAPException(
299              LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR,
300              addResponse.getErrorMessage());
301    }
302    return getTaskEntry(getTaskID(attributes));
303  }
304
305  /**
306   * Gets all the ds-task entries from the task root.
307   *
308   * @return list of entries from the task root
309   * @throws IOException if there is a stream communication problem
310   * @throws LDAPException if there is a problem getting information
311   *         out to the directory
312   * @throws DecodeException if there is a problem with the encoding
313   */
314  public synchronized List<TaskEntry> getTaskEntries()
315          throws LDAPException, IOException, DecodeException {
316    List<Entry> entries = new ArrayList<>();
317
318    writeSearch(new SearchRequestProtocolOp(
319        ByteString.valueOfUtf8(ConfigConstants.DN_TASK_ROOT),
320            SearchScope.WHOLE_SUBTREE,
321            DereferenceAliasesPolicy.NEVER,
322            Integer.MAX_VALUE,
323            Integer.MAX_VALUE,
324            false,
325            LDAPFilter.decode("(objectclass=ds-task)"),
326            new LinkedHashSet<String>()));
327
328    LDAPReader reader = connection.getLDAPReader();
329    byte opType;
330    do {
331      LDAPMessage responseMessage = reader.readMessage();
332      if (responseMessage == null) {
333        throw new LDAPException(
334                LDAPResultCode.CLIENT_SIDE_SERVER_DOWN,
335                ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get());
336      } else {
337        opType = responseMessage.getProtocolOpType();
338        if (opType == LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY) {
339          SearchResultEntryProtocolOp searchEntryOp =
340                  responseMessage.getSearchResultEntryProtocolOp();
341          SearchResultEntry entry = searchEntryOp.toSearchResultEntry();
342          entries.add(entry);
343        }
344      }
345    }
346    while (opType != LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE);
347    List<TaskEntry> taskEntries = new ArrayList<>(entries.size());
348    for (Entry entry : entries) {
349      taskEntries.add(new TaskEntry(entry));
350    }
351    return Collections.unmodifiableList(taskEntries);
352  }
353
354  /**
355   * Gets the entry of the task whose ID is <code>id</code> from the directory.
356   *
357   * @param id of the entry to retrieve
358   * @return Entry for the task
359   * @throws IOException if there is a stream communication problem
360   * @throws LDAPException if there is a problem getting information
361   *         out to the directory
362   * @throws DecodeException if there is a problem with the encoding
363   * @throws TaskClientException if there is no task with the requested id
364   */
365  public synchronized TaskEntry getTaskEntry(String id)
366          throws LDAPException, IOException, DecodeException, TaskClientException
367  {
368    Entry entry = null;
369
370    writeSearch(new SearchRequestProtocolOp(
371        ByteString.valueOfUtf8(ConfigConstants.DN_TASK_ROOT),
372            SearchScope.WHOLE_SUBTREE,
373            DereferenceAliasesPolicy.NEVER,
374            Integer.MAX_VALUE,
375            Integer.MAX_VALUE,
376            false,
377            LDAPFilter.decode("(" + ATTR_TASK_ID + "=" + id + ")"),
378            new LinkedHashSet<String>()));
379
380    LDAPReader reader = connection.getLDAPReader();
381    byte opType;
382    do {
383      LDAPMessage responseMessage = reader.readMessage();
384      if (responseMessage == null) {
385        LocalizableMessage message = ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get();
386        throw new LDAPException(UNAVAILABLE.intValue(), message);
387      } else {
388        opType = responseMessage.getProtocolOpType();
389        if (opType == LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY) {
390          SearchResultEntryProtocolOp searchEntryOp =
391                  responseMessage.getSearchResultEntryProtocolOp();
392          entry = searchEntryOp.toSearchResultEntry();
393        }
394      }
395    }
396    while (opType != LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE);
397    if (entry == null) {
398      throw new TaskClientException(ERR_TASK_CLIENT_UNKNOWN_TASK.get(id));
399    }
400    return new TaskEntry(entry);
401  }
402
403
404  /**
405   * Changes that the state of the task in the backend to a canceled state.
406   *
407   * @param  id if the task to cancel
408   * @throws IOException if there is a stream communication problem
409   * @throws LDAPException if there is a problem getting information
410   *         out to the directory
411   * @throws DecodeException if there is a problem with the encoding
412   * @throws TaskClientException if there is no task with the requested id
413   */
414  public synchronized void cancelTask(String id)
415          throws TaskClientException, IOException, DecodeException, LDAPException
416  {
417    LDAPReader reader = connection.getLDAPReader();
418    LDAPWriter writer = connection.getLDAPWriter();
419
420    TaskEntry entry = getTaskEntry(id);
421    TaskState state = entry.getTaskState();
422    if (state != null) {
423      if (!TaskState.isDone(state)) {
424
425        ByteString dn = ByteString.valueOfUtf8(entry.getDN().toString());
426
427        ArrayList<RawModification> mods = new ArrayList<>();
428
429        String newState;
430        if (TaskState.isPending(state)) {
431          newState = TaskState.CANCELED_BEFORE_STARTING.name();
432        } else {
433          newState = TaskState.STOPPED_BY_ADMINISTRATOR.name();
434        }
435        LDAPAttribute attr = new LDAPAttribute(ATTR_TASK_STATE, newState);
436        mods.add(new LDAPModification(ModificationType.REPLACE, attr));
437
438        ModifyRequestProtocolOp modRequest =
439                new ModifyRequestProtocolOp(dn, mods);
440        LDAPMessage requestMessage =
441             new LDAPMessage(nextMessageID.getAndIncrement(), modRequest, null);
442
443        writer.writeMessage(requestMessage);
444
445        LDAPMessage responseMessage = reader.readMessage();
446
447        if (responseMessage == null) {
448          LocalizableMessage message = ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get();
449          throw new LDAPException(UNAVAILABLE.intValue(), message);
450        }
451
452        if (responseMessage.getProtocolOpType() !=
453                LDAPConstants.OP_TYPE_MODIFY_RESPONSE)
454        {
455          throw new LDAPException(
456                  LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR,
457                  ERR_TASK_CLIENT_INVALID_RESPONSE_TYPE.get(
458                    responseMessage.getProtocolOpName()));
459        }
460
461        ModifyResponseProtocolOp modResponse =
462                responseMessage.getModifyResponseProtocolOp();
463        LocalizableMessage errorMessage = modResponse.getErrorMessage();
464        if (errorMessage != null) {
465          throw new LDAPException(
466                  LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR,
467                  errorMessage);
468        }
469      } else if (TaskState.isRecurring(state)) {
470
471        ByteString dn = ByteString.valueOfUtf8(entry.getDN().toString());
472        DeleteRequestProtocolOp deleteRequest =
473          new DeleteRequestProtocolOp(dn);
474
475        LDAPMessage requestMessage = new LDAPMessage(
476          nextMessageID.getAndIncrement(), deleteRequest, null);
477
478        writer.writeMessage(requestMessage);
479
480        LDAPMessage responseMessage = reader.readMessage();
481
482        if (responseMessage == null) {
483          LocalizableMessage message = ERR_TASK_CLIENT_UNEXPECTED_CONNECTION_CLOSURE.get();
484          throw new LDAPException(UNAVAILABLE.intValue(), message);
485        }
486
487        if (responseMessage.getProtocolOpType() !=
488                LDAPConstants.OP_TYPE_DELETE_RESPONSE)
489        {
490          throw new LDAPException(
491                  LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR,
492                  ERR_TASK_CLIENT_INVALID_RESPONSE_TYPE.get(
493                    responseMessage.getProtocolOpName()));
494        }
495
496        DeleteResponseProtocolOp deleteResponse =
497                responseMessage.getDeleteResponseProtocolOp();
498        LocalizableMessage errorMessage = deleteResponse.getErrorMessage();
499        if (errorMessage != null) {
500          throw new LDAPException(
501                  LDAPResultCode.CLIENT_SIDE_LOCAL_ERROR,
502                  errorMessage);
503        }
504      } else {
505        throw new TaskClientException(
506                ERR_TASK_CLIENT_UNCANCELABLE_TASK.get(id));
507      }
508    } else {
509      throw new TaskClientException(
510              ERR_TASK_CLIENT_TASK_STATE_UNKNOWN.get(id));
511    }
512  }
513
514
515  /**
516   * Writes a search to the directory writer.
517   * @param searchRequest to write
518   * @throws IOException if there is a stream communication problem
519   */
520  private void writeSearch(SearchRequestProtocolOp searchRequest)
521          throws IOException {
522    LDAPWriter writer = connection.getLDAPWriter();
523    LDAPMessage requestMessage = new LDAPMessage(
524            nextMessageID.getAndIncrement(),
525            searchRequest,
526            new ArrayList<Control>());
527
528    // Send the request to the server and read the response.
529    writer.writeMessage(requestMessage);
530  }
531
532}