001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS 026 */ 027package org.opends.server.tasks; 028 029import java.util.List; 030 031import org.forgerock.i18n.LocalizableMessage; 032import org.forgerock.i18n.LocalizableMessageBuilder; 033import org.opends.messages.TaskMessages; 034import org.opends.server.backends.task.Task; 035import org.opends.server.backends.task.TaskState; 036import org.forgerock.i18n.slf4j.LocalizedLogger; 037import org.opends.server.replication.common.CSN; 038import org.opends.server.replication.plugin.LDAPReplicationDomain; 039import org.opends.server.types.*; 040import org.forgerock.opendj.ldap.ResultCode; 041import org.opends.server.util.TimeThread; 042 043import static org.opends.server.config.ConfigConstants.*; 044import static org.opends.server.core.DirectoryServer.*; 045 046/** 047 * This class provides an implementation of a Directory Server task that can 048 * be used to purge the replication historical informations stored in the 049 * user entries to solve conflicts. 050 */ 051public class PurgeConflictsHistoricalTask extends Task 052{ 053 /** The default value for the maximum duration of the purge expressed in seconds. */ 054 public static final int DEFAULT_MAX_DURATION = 60 * 60; 055 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 056 057 private String domainString; 058 private LDAPReplicationDomain domain; 059 060 /** 061 * current historical purge delay 062 * <---------------------------------> 063 * -----------------------------------------------------------------> t 064 * | | | 065 * current task task 066 * CSN being purged start date max end date 067 * <------------> 068 * config.purgeMaxDuration 069 * 070 * The task will start purging the change with the oldest CSN found. 071 * The task run as long as : 072 * - the end date (computed from the configured max duration) is not reached 073 * - the CSN purged is oldest than the configured historical purge delay 074 */ 075 private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION; 076 077 private TaskState initState; 078 079 080 /** {@inheritDoc} */ 081 @Override 082 public LocalizableMessage getDisplayName() { 083 return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get(); 084 } 085 086 /** {@inheritDoc} */ 087 @Override public void initializeTask() throws DirectoryException 088 { 089 if (TaskState.isDone(getTaskState())) 090 { 091 return; 092 } 093 094 // FIXME -- Do we need any special authorization here? 095 Entry taskEntry = getTaskEntry(); 096 097 AttributeType typeDomainBase = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN); 098 List<Attribute> attrList = taskEntry.getAttribute(typeDomainBase); 099 domainString = TaskUtils.getSingleValueString(attrList); 100 101 try 102 { 103 DN dn = DN.valueOf(domainString); 104 // We can assume that this is an LDAP replication domain 105 domain = LDAPReplicationDomain.retrievesReplicationDomain(dn); 106 } 107 catch(DirectoryException e) 108 { 109 LocalizableMessageBuilder mb = new LocalizableMessageBuilder(); 110 mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get()); 111 mb.append(e.getMessage()); 112 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage()); 113 } 114 115 AttributeType typeMaxDuration = getAttributeTypeOrDefault(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION); 116 attrList = taskEntry.getAttribute(typeMaxDuration); 117 String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList); 118 119 if (maxDurationStringInSec != null) 120 { 121 try 122 { 123 purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec); 124 } 125 catch(Exception e) 126 { 127 throw new DirectoryException( 128 ResultCode.UNWILLING_TO_PERFORM, 129 TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get( 130 ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage())); 131 } 132 } 133 } 134 135 /** {@inheritDoc} */ 136 @Override 137 protected TaskState runTask() 138 { 139 Boolean purgeCompletedInTime = false; 140 logger.trace("PurgeConflictsHistoricalTask is starting on domain: %s max duration (sec): %d", 141 domain.getBaseDN(), purgeTaskMaxDurationInSec); 142 try 143 { 144 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); 145 146 // launch the task 147 domain.purgeConflictsHistorical(this, TimeThread.getTime() + purgeTaskMaxDurationInSec*1000); 148 149 purgeCompletedInTime = true; 150 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); 151 152 initState = TaskState.COMPLETED_SUCCESSFULLY; 153 } 154 catch(DirectoryException de) 155 { 156 logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage()); 157 if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED) 158 { 159 // Error raised at submission time 160 logger.error(de.getMessageObject()); 161 initState = TaskState.STOPPED_BY_ERROR; 162 } 163 else 164 { 165 initState = TaskState.COMPLETED_SUCCESSFULLY; 166 } 167 } 168 finally 169 { 170 try 171 { 172 // sets in the attributes the last stats values 173 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); 174 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); 175 logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount); 176 } 177 catch(Exception e) 178 { 179 logger.trace("PurgeConflictsHistoricalTask exception %s", e.getLocalizedMessage()); 180 initState = TaskState.STOPPED_BY_ERROR; 181 } 182 } 183 184 logger.trace("PurgeConflictsHistoricalTask is ending with state: %s completedInTime: %s", 185 initState, purgeCompletedInTime); 186 return initState; 187 } 188 189 private int updateAttrPeriod; 190 private CSN lastCSN; 191 private int purgeCount; 192 193 /** 194 * Set the last CSN purged and the count of purged values in order to monitor 195 * the historical purge. 196 * 197 * @param lastCSN 198 * the last CSN purged. 199 * @param purgeCount 200 * the count of purged values. 201 */ 202 public void setProgressStats(CSN lastCSN, int purgeCount) 203 { 204 try 205 { 206 if (purgeCount == 0) 207 { 208 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CSN, lastCSN.toStringUI()); 209 } 210 211 // we don't want the update of the task to overload too much task duration 212 this.purgeCount = purgeCount; 213 this.lastCSN = lastCSN; 214 if (++updateAttrPeriod % 100 == 0) 215 { 216 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); 217 replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); 218 logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount); 219 } 220 } 221 catch(DirectoryException de) 222 { 223 logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage()); 224 initState = TaskState.STOPPED_BY_ERROR; 225 } 226 } 227}