001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS
026 */
027package org.opends.server.tasks;
028
029import java.util.List;
030
031import org.forgerock.i18n.LocalizableMessage;
032import org.forgerock.i18n.LocalizableMessageBuilder;
033import org.opends.messages.TaskMessages;
034import org.opends.server.backends.task.Task;
035import org.opends.server.backends.task.TaskState;
036import org.forgerock.i18n.slf4j.LocalizedLogger;
037import org.opends.server.replication.common.CSN;
038import org.opends.server.replication.plugin.LDAPReplicationDomain;
039import org.opends.server.types.*;
040import org.forgerock.opendj.ldap.ResultCode;
041import org.opends.server.util.TimeThread;
042
043import static org.opends.server.config.ConfigConstants.*;
044import static org.opends.server.core.DirectoryServer.*;
045
046/**
047 * This class provides an implementation of a Directory Server task that can
048 * be used to purge the replication historical informations stored in the
049 * user entries to solve conflicts.
050 */
051public class PurgeConflictsHistoricalTask extends Task
052{
053  /** The default value for the maximum duration of the purge expressed in seconds. */
054  public static final int DEFAULT_MAX_DURATION = 60 * 60;
055  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
056
057  private String domainString;
058  private LDAPReplicationDomain domain;
059
060  /**
061   *                 current historical purge delay
062   *                <--------------------------------->
063   * -----------------------------------------------------------------> t
064   *               |                           |            |
065   *           current                      task           task
066   *           CSN being purged           start date    max end date
067   *                                           <------------>
068   *                                          config.purgeMaxDuration
069   *
070   * The task will start purging the change with the oldest CSN found.
071   * The task run as long as :
072   *  - the end date (computed from the configured max duration) is not reached
073   *  - the CSN purged is oldest than the configured historical purge delay
074   */
075  private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION;
076
077  private TaskState initState;
078
079
080  /** {@inheritDoc} */
081  @Override
082  public LocalizableMessage getDisplayName() {
083    return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get();
084  }
085
086  /** {@inheritDoc} */
087  @Override public void initializeTask() throws DirectoryException
088  {
089    if (TaskState.isDone(getTaskState()))
090    {
091      return;
092    }
093
094    // FIXME -- Do we need any special authorization here?
095    Entry taskEntry = getTaskEntry();
096
097    AttributeType typeDomainBase = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN);
098    List<Attribute> attrList = taskEntry.getAttribute(typeDomainBase);
099    domainString = TaskUtils.getSingleValueString(attrList);
100
101    try
102    {
103      DN dn = DN.valueOf(domainString);
104      // We can assume that this is an LDAP replication domain
105      domain = LDAPReplicationDomain.retrievesReplicationDomain(dn);
106    }
107    catch(DirectoryException e)
108    {
109      LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
110      mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
111      mb.append(e.getMessage());
112      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage());
113    }
114
115    AttributeType typeMaxDuration = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION);
116    attrList = taskEntry.getAttribute(typeMaxDuration);
117    String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList);
118
119    if (maxDurationStringInSec != null)
120    {
121      try
122      {
123        purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec);
124      }
125      catch(Exception e)
126      {
127        throw new DirectoryException(
128            ResultCode.UNWILLING_TO_PERFORM,
129            TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get(
130        ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage()));
131      }
132    }
133  }
134
135  /** {@inheritDoc} */
136  @Override
137  protected TaskState runTask()
138  {
139    Boolean purgeCompletedInTime = false;
140    logger.trace("PurgeConflictsHistoricalTask is starting on domain: %s max duration (sec): %d",
141        domain.getBaseDN(), purgeTaskMaxDurationInSec);
142    try
143    {
144      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
145
146      // launch the task
147      domain.purgeConflictsHistorical(this, TimeThread.getTime() + purgeTaskMaxDurationInSec*1000);
148
149      purgeCompletedInTime = true;
150      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
151
152      initState =  TaskState.COMPLETED_SUCCESSFULLY;
153    }
154    catch(DirectoryException de)
155    {
156      logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage());
157      if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED)
158      {
159        // Error raised at submission time
160        logger.error(de.getMessageObject());
161        initState = TaskState.STOPPED_BY_ERROR;
162      }
163      else
164      {
165        initState =  TaskState.COMPLETED_SUCCESSFULLY;
166      }
167    }
168    finally
169    {
170      try
171      {
172        // sets in the attributes the last stats values
173        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
174        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
175        logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount);
176      }
177      catch(Exception e)
178      {
179        logger.trace("PurgeConflictsHistoricalTask exception %s", e.getLocalizedMessage());
180        initState = TaskState.STOPPED_BY_ERROR;
181      }
182    }
183
184    logger.trace("PurgeConflictsHistoricalTask is ending with state: %s completedInTime: %s",
185        initState, purgeCompletedInTime);
186    return initState;
187  }
188
189  private int updateAttrPeriod;
190  private CSN lastCSN;
191  private int purgeCount;
192
193  /**
194   * Set the last CSN purged and the count of purged values in order to monitor
195   * the historical purge.
196   *
197   * @param lastCSN
198   *          the last CSN purged.
199   * @param purgeCount
200   *          the count of purged values.
201   */
202  public void setProgressStats(CSN lastCSN, int purgeCount)
203  {
204    try
205    {
206      if (purgeCount == 0)
207      {
208        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CSN, lastCSN.toStringUI());
209      }
210
211      // we don't want the update of the task to overload too much task duration
212      this.purgeCount = purgeCount;
213      this.lastCSN = lastCSN;
214      if (++updateAttrPeriod % 100 == 0)
215      {
216        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
217        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
218        logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount);
219      }
220    }
221    catch(DirectoryException de)
222    {
223      logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage());
224      initState = TaskState.STOPPED_BY_ERROR;
225    }
226  }
227}