001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.plugin;
028
029import static org.forgerock.opendj.ldap.ResultCode.*;
030import static org.opends.messages.ReplicationMessages.*;
031import static org.opends.messages.ToolMessages.*;
032import static org.opends.server.protocols.internal.InternalClientConnection.*;
033import static org.opends.server.protocols.internal.Requests.*;
034import static org.opends.server.replication.plugin.EntryHistorical.*;
035import static org.opends.server.replication.protocol.OperationContext.*;
036import static org.opends.server.replication.service.ReplicationMonitor.*;
037import static org.opends.server.util.CollectionUtils.*;
038import static org.opends.server.util.ServerConstants.*;
039import static org.opends.server.util.StaticUtils.*;
040
041import java.io.File;
042import java.io.InputStream;
043import java.io.OutputStream;
044import java.io.StringReader;
045import java.util.*;
046import java.util.concurrent.BlockingQueue;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.TimeoutException;
049import java.util.concurrent.atomic.AtomicBoolean;
050import java.util.concurrent.atomic.AtomicInteger;
051import java.util.concurrent.atomic.AtomicReference;
052import java.util.zip.DataFormatException;
053
054import org.forgerock.i18n.LocalizableMessage;
055import org.forgerock.i18n.slf4j.LocalizedLogger;
056import org.forgerock.opendj.config.server.ConfigChangeResult;
057import org.forgerock.opendj.config.server.ConfigException;
058import org.forgerock.opendj.ldap.ByteString;
059import org.forgerock.opendj.ldap.DecodeException;
060import org.forgerock.opendj.ldap.ModificationType;
061import org.forgerock.opendj.ldap.ResultCode;
062import org.forgerock.opendj.ldap.SearchScope;
063import org.opends.server.admin.server.ConfigurationChangeListener;
064import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
065import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
066import org.opends.server.admin.std.server.ReplicationDomainCfg;
067import org.opends.server.api.DirectoryThread;
068import org.opends.server.api.SynchronizationProvider;
069import org.opends.server.api.AlertGenerator;
070import org.opends.server.api.Backend;
071import org.opends.server.api.Backend.BackendOperation;
072import org.opends.server.api.BackendInitializationListener;
073import org.opends.server.api.ServerShutdownListener;
074import org.opends.server.backends.task.Task;
075import org.opends.server.core.*;
076import org.opends.server.protocols.internal.InternalClientConnection;
077import org.opends.server.protocols.internal.InternalSearchListener;
078import org.opends.server.protocols.internal.InternalSearchOperation;
079import org.opends.server.protocols.internal.Requests;
080import org.opends.server.protocols.internal.SearchRequest;
081import org.opends.server.protocols.ldap.LDAPAttribute;
082import org.opends.server.protocols.ldap.LDAPControl;
083import org.opends.server.protocols.ldap.LDAPFilter;
084import org.opends.server.protocols.ldap.LDAPModification;
085import org.opends.server.replication.common.CSN;
086import org.opends.server.replication.common.ServerState;
087import org.opends.server.replication.common.ServerStatus;
088import org.opends.server.replication.common.StatusMachineEvent;
089import org.opends.server.replication.protocol.*;
090import org.opends.server.replication.service.DSRSShutdownSync;
091import org.opends.server.replication.service.ReplicationBroker;
092import org.opends.server.replication.service.ReplicationDomain;
093import org.opends.server.tasks.PurgeConflictsHistoricalTask;
094import org.opends.server.tasks.TaskUtils;
095import org.opends.server.types.*;
096import org.opends.server.types.operation.*;
097import org.opends.server.util.LDIFReader;
098import org.opends.server.util.TimeThread;
099import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
100
101/**
102 *  This class implements the bulk part of the Directory Server side
103 *  of the replication code.
104 *  It contains the root method for publishing a change,
105 *  processing a change received from the replicationServer service,
106 *  handle conflict resolution,
107 *  handle protocol messages from the replicationServer.
108 * <p>
109 * FIXME Move this class to org.opends.server.replication.service
110 * or the equivalent package once this code is moved to a maven module.
111 */
112public final class LDAPReplicationDomain extends ReplicationDomain
113       implements ConfigurationChangeListener<ReplicationDomainCfg>,
114                  AlertGenerator, BackendInitializationListener, ServerShutdownListener
115{
116  /**
117   * Set of attributes that will return all the user attributes and the
118   * replication related operational attributes when used in a search operation.
119   */
120  private static final Set<String> USER_AND_REPL_OPERATIONAL_ATTRS =
121      newHashSet(HISTORICAL_ATTRIBUTE_NAME, ENTRYUUID_ATTRIBUTE_NAME, "*");
122
123  /**
124   * Initializing replication for the domain initiates backend finalization/initialization
125   * This flag prevents the Replication Domain to disable/enable itself when
126   * it is the event initiator
127   */
128  private boolean ignoreBackendInitializationEvent;
129
130  private volatile boolean  serverShutdownRequested;
131
132  @Override
133  public String getShutdownListenerName() {
134    return "LDAPReplicationDomain " + getBaseDN();
135  }
136
137  @Override
138  public void processServerShutdown(LocalizableMessage reason) {
139    serverShutdownRequested = true;
140  }
141
142
143  /**
144   * This class is used in the session establishment phase
145   * when no Replication Server with all the local changes has been found
146   * and we therefore need to recover them.
147   * A search is then performed on the database using this
148   * internalSearchListener.
149   */
150  private class ScanSearchListener implements InternalSearchListener
151  {
152    private final CSN startCSN;
153    private final CSN endCSN;
154
155    public ScanSearchListener(CSN startCSN, CSN endCSN)
156    {
157      this.startCSN = startCSN;
158      this.endCSN = endCSN;
159    }
160
161    @Override
162    public void handleInternalSearchEntry(
163        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
164        throws DirectoryException
165    {
166      // Build the list of Operations that happened on this entry after startCSN
167      // and before endCSN and add them to the replayOperations list
168      Iterable<FakeOperation> updates =
169        EntryHistorical.generateFakeOperations(searchEntry);
170
171      for (FakeOperation op : updates)
172      {
173        CSN csn = op.getCSN();
174        if (csn.isNewerThan(startCSN) && csn.isOlderThan(endCSN))
175        {
176          synchronized (replayOperations)
177          {
178            replayOperations.put(csn, op);
179          }
180        }
181      }
182    }
183
184    @Override
185    public void handleInternalSearchReference(
186        InternalSearchOperation searchOperation,
187        SearchResultReference searchReference) throws DirectoryException
188    {
189       // Nothing to do.
190    }
191  }
192
193  @Override
194  public void performBackendPreInitializationProcessing(Backend<?> backend) {
195    // Nothing to do
196  }
197
198  @Override
199  public void performBackendPostFinalizationProcessing(Backend<?> backend) {
200    // Nothing to do
201  }
202
203  @Override
204  public void performBackendPostInitializationProcessing(Backend<?> backend) {
205    if (!ignoreBackendInitializationEvent
206            && getBackend().getBackendID().equals(backend.getBackendID())) {
207      enable();
208    }
209  }
210
211  @Override
212  public void performBackendPreFinalizationProcessing(Backend<?> backend) {
213    // Do not disable itself during a shutdown
214    // And ignore the event if this replica is the event trigger (e.g. importing).
215    if (!ignoreBackendInitializationEvent
216            && !serverShutdownRequested
217            && getBackend().getBackendID().equals(backend.getBackendID())) {
218      disable();
219    }
220  }
221
222  /** The fully-qualified name of this class. */
223  private static final String CLASS_NAME = LDAPReplicationDomain.class.getName();
224
225  /**
226   * The attribute used to mark conflicting entries.
227   * The value of this attribute should be the dn that this entry was
228   * supposed to have when it was marked as conflicting.
229   */
230  public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
231  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
232
233  private final DSRSShutdownSync dsrsShutdownSync;
234  /**
235   * The update to replay message queue where the listener thread is going to
236   * push incoming update messages.
237   */
238  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
239  /** The number of naming conflicts successfully resolved. */
240  private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
241  /** The number of modify conflicts successfully resolved. */
242  private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
243  /** The number of unresolved naming conflicts. */
244  private final AtomicInteger numUnresolvedNamingConflicts =
245      new AtomicInteger();
246  /** The number of updates replayed successfully by the replication. */
247  private final AtomicInteger numReplayedPostOpCalled = new AtomicInteger();
248
249  private final PersistentServerState state;
250  private volatile boolean generationIdSavedStatus;
251
252  /**
253   * This object is used to store the list of update currently being
254   * done on the local database.
255   * Is is useful to make sure that the local operations are sent in a
256   * correct order to the replication server and that the ServerState
257   * is not updated too early.
258   */
259  private final PendingChanges pendingChanges;
260  private final AtomicReference<RSUpdater> rsUpdater = new AtomicReference<>(null);
261
262  /**
263   * It contain the updates that were done on other servers, transmitted by the
264   * replication server and that are currently replayed.
265   * <p>
266   * It is useful to make sure that dependencies between operations are
267   * correctly fulfilled and to make sure that the ServerState is not updated
268   * too early.
269   */
270  private final RemotePendingChanges remotePendingChanges;
271  private boolean solveConflictFlag = true;
272
273  private final InternalClientConnection conn = getRootConnection();
274  private final AtomicBoolean shutdown = new AtomicBoolean();
275  private volatile boolean disabled;
276  private volatile boolean stateSavingDisabled;
277
278  /**
279   * This list is used to temporary store operations that needs to be replayed
280   * at session establishment time.
281   */
282  private final SortedMap<CSN, FakeOperation> replayOperations = new TreeMap<>();
283
284  private ExternalChangelogDomain eclDomain;
285
286  /** A boolean indicating if the thread used to save the persistentServerState is terminated. */
287  private volatile boolean done = true;
288
289  private final ServerStateFlush flushThread;
290
291  /** The attribute name used to store the generation id in the backend. */
292  private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id";
293  /** The attribute name used to store the fractional include configuration in the backend. */
294  static final String REPLICATION_FRACTIONAL_INCLUDE = "ds-sync-fractional-include";
295  /** The attribute name used to store the fractional exclude configuration in the backend. */
296  static final String REPLICATION_FRACTIONAL_EXCLUDE = "ds-sync-fractional-exclude";
297
298  /**
299   * Fractional replication variables.
300   */
301
302  /** Holds the fractional configuration for this domain, if any. */
303  private final FractionalConfig fractionalConfig;
304
305  /** The list of attributes that cannot be used in fractional replication configuration. */
306  private static final String[] FRACTIONAL_PROHIBITED_ATTRIBUTES = new String[]
307  {
308    "objectClass",
309    "2.5.4.0" // objectClass OID
310  };
311
312  /**
313   * When true, this flag is used to force the domain status to be put in bad
314   * data set just after the connection to the replication server.
315   * This must be used when fractional replication is enabled with a
316   * configuration different from the previous one (or at the very first
317   * fractional usage time) : after connection, a ChangeStatusMsg is sent
318   * requesting the bad data set status. Then none of the update messages
319   * received from the replication server are taken into account until the
320   * backend is synchronized with brand new data set compliant with the new
321   * fractional configuration (i.e with compliant fractional configuration in
322   * domain root entry).
323   */
324  private boolean forceBadDataSet;
325
326  /**
327   * The message id to be used when an import is stopped with error by
328   * the fractional replication ldif import plugin.
329   */
330  private int importErrorMessageId = -1;
331  /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE. */
332  static final int IMPORT_ERROR_MESSAGE_BAD_REMOTE = 1;
333  /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL. */
334  static final int IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL = 2;
335
336  /*
337   * Definitions for the return codes of the
338   * fractionalFilterOperation(PreOperationModifyOperation
339   *  modifyOperation, boolean performFiltering) method
340   */
341  /**
342   * The operation contains attributes subject to fractional filtering according
343   * to the fractional configuration.
344   */
345  private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
346  /**
347   * The operation contains no attributes subject to fractional filtering
348   * according to the fractional configuration.
349   */
350  private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
351  /** The operation should become a no-op. */
352  private static final int FRACTIONAL_BECOME_NO_OP = 3;
353
354  /**
355   * The last CSN purged in this domain. Allows to have a continuous purging
356   * process from one purge processing (task run) to the next one. Values 0 when
357   * the server starts.
358   */
359  private CSN lastCSNPurgedFromHist = new CSN(0,0,0);
360
361  /**
362   * The thread that periodically saves the ServerState of this
363   * LDAPReplicationDomain in the database.
364   */
365  private class ServerStateFlush extends DirectoryThread
366  {
367    protected ServerStateFlush()
368    {
369      super("Replica DS(" + getServerId() + ") state checkpointer for domain \"" + getBaseDN() + "\"");
370    }
371
372    @Override
373    public void run()
374    {
375      done = false;
376
377      while (!isShutdownInitiated())
378      {
379        try
380        {
381          synchronized (this)
382          {
383            wait(1000);
384            if (!disabled && !stateSavingDisabled)
385            {
386              // save the ServerState
387              state.save();
388            }
389          }
390        }
391        catch (InterruptedException e)
392        {
393          // Thread interrupted: check for shutdown.
394          Thread.currentThread().interrupt();
395        }
396      }
397      state.save();
398
399      done = true;
400    }
401  }
402
403  /**
404   * The thread that is responsible to update the RS to which this domain is
405   * connected in case it is late and there is no RS which is up to date.
406   */
407  private class RSUpdater extends DirectoryThread
408  {
409    private final CSN startCSN;
410
411    protected RSUpdater(CSN replServerMaxCSN)
412    {
413      super("Replica DS(" + getServerId() + ") missing change publisher for domain \"" + getBaseDN() + "\"");
414      this.startCSN = replServerMaxCSN;
415    }
416
417    @Override
418    public void run()
419    {
420      // Replication server is missing some of our changes:
421      // let's send them to him.
422      logger.trace(DEBUG_GOING_TO_SEARCH_FOR_CHANGES);
423
424      /*
425       * Get all the changes that have not been seen by this
426       * replication server and publish them.
427       */
428      try
429      {
430        if (buildAndPublishMissingChanges(startCSN, broker))
431        {
432          logger.trace(DEBUG_CHANGES_SENT);
433          synchronized(replayOperations)
434          {
435            replayOperations.clear();
436          }
437        }
438        else
439        {
440          /*
441           * An error happened trying to search for the updates
442           * This server will start accepting again new updates but
443           * some inconsistencies will stay between servers.
444           * Log an error for the repair tool
445           * that will need to re-synchronize the servers.
446           */
447          logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN());
448        }
449      }
450      catch (Exception e)
451      {
452        /*
453         * An error happened trying to search for the updates
454         * This server will start accepting again new updates but
455         * some inconsistencies will stay between servers.
456         * Log an error for the repair tool
457         * that will need to re-synchronize the servers.
458         */
459        logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN());
460      }
461      finally
462      {
463        broker.setRecoveryRequired(false);
464        // RSUpdater thread has finished its work, let's remove it from memory
465        // so another RSUpdater thread can be started if needed.
466        rsUpdater.compareAndSet(this, null);
467      }
468    }
469  }
470
471  /**
472   * Creates a new ReplicationDomain using configuration from configEntry.
473   *
474   * @param configuration    The configuration of this ReplicationDomain.
475   * @param updateToReplayQueue The queue for update messages to replay.
476   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
477   * @throws ConfigException In case of invalid configuration.
478   */
479  LDAPReplicationDomain(ReplicationDomainCfg configuration,
480      BlockingQueue<UpdateToReplay> updateToReplayQueue,
481      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
482  {
483    super(configuration, -1);
484
485    this.updateToReplayQueue = updateToReplayQueue;
486    this.dsrsShutdownSync = dsrsShutdownSync;
487
488    // Get assured configuration
489    readAssuredConfig(configuration, false);
490
491    // Get fractional configuration
492    fractionalConfig = new FractionalConfig(getBaseDN());
493    readFractionalConfig(configuration, false);
494    storeECLConfiguration(configuration);
495    solveConflictFlag = isSolveConflict(configuration);
496
497    Backend<?> backend = getBackend();
498    if (backend == null)
499    {
500      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(getBaseDN()));
501    }
502
503    try
504    {
505      generationId = loadGenerationId();
506    }
507    catch (DirectoryException e)
508    {
509      logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
510    }
511
512    /*
513     * Create a new Persistent Server State that will be used to store
514     * the last CSN seen from all LDAP servers in the topology.
515     */
516    state = new PersistentServerState(getBaseDN(), getServerId(),
517        getServerState());
518    flushThread = new ServerStateFlush();
519
520    /*
521     * CSNGenerator is used to create new unique CSNs for each operation done on
522     * this replication domain.
523     *
524     * The generator time is adjusted to the time of the last CSN received from
525     * remote other servers.
526     */
527    pendingChanges = new PendingChanges(getGenerator(), this);
528    remotePendingChanges = new RemotePendingChanges(getServerState());
529
530    // listen for changes on the configuration
531    configuration.addChangeListener(this);
532
533    // register as an AlertGenerator
534    DirectoryServer.registerAlertGenerator(this);
535
536    DirectoryServer.registerBackendInitializationListener(this);
537    DirectoryServer.registerShutdownListener(this);
538
539    startPublishService();
540  }
541
542  /**
543   * Modify conflicts are solved for all suffixes but the schema suffix because
544   * we don't want to store extra information in the schema ldif files. This has
545   * no negative impact because the changes on schema should not produce
546   * conflicts.
547   */
548  private boolean isSolveConflict(ReplicationDomainCfg cfg)
549  {
550    return !getBaseDN().equals(DirectoryServer.getSchemaDN())
551        && cfg.isSolveConflicts();
552  }
553
554  /**
555   * Sets the error message id to be used when online import is stopped with
556   * error by the fractional replication ldif import plugin.
557   * @param importErrorMessageId The message to use.
558   */
559  void setImportErrorMessageId(int importErrorMessageId)
560  {
561    this.importErrorMessageId = importErrorMessageId;
562  }
563
564  /**
565   * This flag is used by the fractional replication ldif import plugin to stop
566   * the (online) import process if a fractional configuration inconsistency is
567   * detected by it.
568   *
569   * @return true if the online import currently in progress should continue,
570   *         false otherwise.
571   */
572  private boolean isFollowImport()
573  {
574    return importErrorMessageId == -1;
575  }
576
577  /**
578   * Gets and stores the fractional replication configuration parameters.
579   * @param configuration The configuration object
580   * @param allowReconnection Tells if one must reconnect if significant changes
581   *        occurred
582   */
583  private void readFractionalConfig(ReplicationDomainCfg configuration,
584    boolean allowReconnection)
585  {
586    // Read the configuration entry
587    FractionalConfig newFractionalConfig;
588    try
589    {
590      newFractionalConfig = FractionalConfig.toFractionalConfig(configuration);
591    }
592    catch(ConfigException e)
593    {
594      // Should not happen as normally already called without problem in
595      // isConfigurationChangeAcceptable or isConfigurationAcceptable
596      // if we come up to this method
597      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
598      return;
599    }
600
601    /**
602     * Is there any change in fractional configuration ?
603     */
604
605    // Compute current configuration
606    boolean needReconnection;
607     try
608    {
609      needReconnection = !FractionalConfig.
610        isFractionalConfigEquivalent(fractionalConfig, newFractionalConfig);
611    }
612    catch  (ConfigException e)
613    {
614      // Should not happen
615      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
616      return;
617    }
618
619    // Disable service if configuration changed
620    final boolean needRestart = needReconnection && allowReconnection;
621    if (needRestart)
622    {
623      disableService();
624    }
625    // Set new configuration
626    int newFractionalMode = newFractionalConfig.fractionalConfigToInt();
627    fractionalConfig.setFractional(newFractionalMode !=
628      FractionalConfig.NOT_FRACTIONAL);
629    if (fractionalConfig.isFractional())
630    {
631      // Set new fractional configuration values
632      fractionalConfig.setFractionalExclusive(
633          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
634      fractionalConfig.setFractionalSpecificClassesAttributes(
635        newFractionalConfig.getFractionalSpecificClassesAttributes());
636      fractionalConfig.setFractionalAllClassesAttributes(
637        newFractionalConfig.fractionalAllClassesAttributes);
638    } else
639    {
640      // Reset default values
641      fractionalConfig.setFractionalExclusive(true);
642      fractionalConfig.setFractionalSpecificClassesAttributes(
643        new HashMap<String, Set<String>>());
644      fractionalConfig.setFractionalAllClassesAttributes(new HashSet<String>());
645    }
646
647    // Reconnect if required
648    if (needRestart)
649    {
650      enableService();
651    }
652  }
653
654  /**
655   * Return true if the fractional configuration stored in the domain root
656   * entry of the backend is equivalent to the fractional configuration stored
657   * in the local variables.
658   */
659  private boolean isBackendFractionalConfigConsistent()
660  {
661    // Read config stored in domain root entry
662    if (logger.isTraceEnabled())
663    {
664      logger.trace("Attempt to read the potential fractional config in domain root entry " + getBaseDN());
665    }
666
667    // Search the domain root entry that is used to save the generation id
668    SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT)
669        .addAttribute(REPLICATION_GENERATION_ID, REPLICATION_FRACTIONAL_EXCLUDE, REPLICATION_FRACTIONAL_INCLUDE);
670    InternalSearchOperation search = conn.processSearch(request);
671
672    if (search.getResultCode() != ResultCode.SUCCESS
673        && search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
674    {
675      String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage();
676      logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN());
677      return false;
678    }
679
680    SearchResultEntry resultEntry = findReplicationSearchResultEntry(search);
681    if (resultEntry == null)
682    {
683      /*
684       * The backend is probably empty: if there is some fractional
685       * configuration in memory, we do not let the domain being connected,
686       * otherwise, it's ok
687       */
688      return !fractionalConfig.isFractional();
689    }
690
691    // Now extract fractional configuration if any
692    Iterator<String> exclIt =
693        getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_EXCLUDE);
694    Iterator<String> inclIt =
695        getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_INCLUDE);
696
697    // Compare backend and local fractional configuration
698    return isFractionalConfigConsistent(fractionalConfig, exclIt, inclIt);
699  }
700
701  private SearchResultEntry findReplicationSearchResultEntry(
702      InternalSearchOperation searchOperation)
703  {
704    final SearchResultEntry resultEntry = getFirstResult(searchOperation);
705    if (resultEntry != null)
706    {
707      AttributeType synchronizationGenIDType = DirectoryServer.getAttributeTypeOrNull(REPLICATION_GENERATION_ID);
708      List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType);
709      if (attrs != null)
710      {
711        Attribute attr = attrs.get(0);
712        if (attr.size() == 1)
713        {
714          return resultEntry;
715        }
716        if (attr.size() > 1)
717        {
718          String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString();
719          logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg);
720        }
721      }
722    }
723    return null;
724  }
725
726  private Iterator<String> getAttributeValueIterator(SearchResultEntry resultEntry, String attrName)
727  {
728    AttributeType attrType = DirectoryServer.getAttributeTypeOrNull(attrName);
729    List<Attribute> exclAttrs = resultEntry.getAttribute(attrType);
730    if (exclAttrs != null)
731    {
732      Attribute exclAttr = exclAttrs.get(0);
733      if (exclAttr != null)
734      {
735        return new AttributeValueStringIterator(exclAttr.iterator());
736      }
737    }
738    return null;
739  }
740
741  /**
742   * Return true if the fractional configuration passed as fractional
743   * configuration attribute values is equivalent to the fractional
744   * configuration stored in the local variables.
745   * @param fractionalConfig The local fractional configuration
746   * @param exclIt Fractional exclude mode configuration attribute values to
747   * analyze.
748   * @param inclIt Fractional include mode configuration attribute values to
749   * analyze.
750   * @return True if the fractional configuration passed as fractional
751   * configuration attribute values is equivalent to the fractional
752   * configuration stored in the local variables.
753   */
754  static boolean isFractionalConfigConsistent(
755      FractionalConfig fractionalConfig, Iterator<String> exclIt,
756      Iterator<String> inclIt)
757  {
758    /*
759     * Parse fractional configuration stored in passed fractional configuration
760     * attributes values
761     */
762
763    Map<String, Set<String>> storedFractionalSpecificClassesAttributes = new HashMap<>();
764    Set<String> storedFractionalAllClassesAttributes = new HashSet<>();
765
766    int storedFractionalMode;
767    try
768    {
769      storedFractionalMode = FractionalConfig.parseFractionalConfig(exclIt,
770        inclIt, storedFractionalSpecificClassesAttributes,
771        storedFractionalAllClassesAttributes);
772    } catch (ConfigException e)
773    {
774      // Should not happen as configuration in domain root entry is flushed
775      // from valid configuration in local variables
776      logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e));
777      return false;
778    }
779
780    FractionalConfig storedFractionalConfig = new FractionalConfig(
781      fractionalConfig.getBaseDn());
782    storedFractionalConfig.setFractional(storedFractionalMode !=
783      FractionalConfig.NOT_FRACTIONAL);
784    // Set stored fractional configuration values
785    if (storedFractionalConfig.isFractional())
786    {
787      storedFractionalConfig.setFractionalExclusive(
788          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
789    }
790    storedFractionalConfig.setFractionalSpecificClassesAttributes(
791      storedFractionalSpecificClassesAttributes);
792    storedFractionalConfig.setFractionalAllClassesAttributes(
793      storedFractionalAllClassesAttributes);
794
795    /*
796     * Compare configuration stored in passed fractional configuration
797     * attributes with local variable one
798     */
799    try
800    {
801      return FractionalConfig.
802        isFractionalConfigEquivalent(fractionalConfig, storedFractionalConfig);
803    } catch (ConfigException e)
804    {
805      // Should not happen as configuration in domain root entry is flushed
806      // from valid configuration in local variables so both should have already
807      // been checked
808      logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e));
809      return false;
810    }
811  }
812
813  /**
814   * Utility class to have get a string iterator from an AtributeValue iterator.
815   * Assuming the attribute values are strings.
816   */
817  static class AttributeValueStringIterator implements Iterator<String>
818  {
819    private final Iterator<ByteString> attrValIt;
820
821    /**
822     * Creates a new AttributeValueStringIterator object.
823     * @param attrValIt The underlying attribute iterator to use, assuming
824     * internal values are strings.
825     */
826    AttributeValueStringIterator(Iterator<ByteString> attrValIt)
827    {
828      this.attrValIt = attrValIt;
829    }
830
831    @Override
832    public boolean hasNext()
833    {
834      return attrValIt.hasNext();
835    }
836
837    @Override
838    public String next()
839    {
840      return attrValIt.next().toString();
841    }
842
843    // Should not be needed anyway
844    @Override
845    public void remove()
846    {
847      attrValIt.remove();
848    }
849  }
850
851  /**
852   * Compare 2 attribute collections and returns true if they are equivalent.
853   *
854   * @param attributes1
855   *          First attribute collection to compare.
856   * @param attributes2
857   *          Second attribute collection to compare.
858   * @return True if both attribute collection are equivalent.
859   * @throws ConfigException
860   *           If some attributes could not be retrieved from the schema.
861   */
862  private static boolean areAttributesEquivalent(
863      Collection<String> attributes1, Collection<String> attributes2)
864      throws ConfigException
865  {
866    // Compare all classes attributes
867    if (attributes1.size() != attributes2.size())
868    {
869      return false;
870    }
871
872    // Check consistency of all classes attributes
873    Schema schema = DirectoryServer.getSchema();
874    /*
875     * For each attribute in attributes1, check there is the matching
876     * one in attributes2.
877     */
878    for (String attrName1 : attributes1)
879    {
880      // Get attribute from attributes1
881      AttributeType attributeType1 = schema.getAttributeType(attrName1);
882      if (attributeType1 == null)
883      {
884        throw new ConfigException(
885          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName1));
886      }
887      // Look for matching one in attributes2
888      boolean foundAttribute = false;
889      for (String attrName2 : attributes2)
890      {
891        AttributeType attributeType2 = schema.getAttributeType(attrName2);
892        if (attributeType2 == null)
893        {
894          throw new ConfigException(
895            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName2));
896        }
897        if (attributeType1.equals(attributeType2))
898        {
899          foundAttribute = true;
900          break;
901        }
902      }
903      // Found matching attribute ?
904      if (!foundAttribute)
905      {
906        return false;
907      }
908    }
909
910    return true;
911  }
912
913  /**
914   * Check that the passed fractional configuration is acceptable
915   * regarding configuration syntax, schema constraints...
916   * Throws an exception if the configuration is not acceptable.
917   * @param configuration The configuration to analyze.
918   * @throws org.opends.server.config.ConfigException if the configuration is
919   * not acceptable.
920   */
921  private static void isFractionalConfigAcceptable(
922    ReplicationDomainCfg configuration) throws ConfigException
923  {
924    /*
925     * Parse fractional configuration
926     */
927
928    // Read the configuration entry
929    FractionalConfig newFractionalConfig = FractionalConfig.toFractionalConfig(
930        configuration);
931
932    if (!newFractionalConfig.isFractional())
933    {
934      // Nothing to check
935      return;
936    }
937
938    // Prepare variables to be filled with config
939    Map<String, Set<String>> newFractionalSpecificClassesAttributes =
940      newFractionalConfig.getFractionalSpecificClassesAttributes();
941    Set<String> newFractionalAllClassesAttributes =
942      newFractionalConfig.getFractionalAllClassesAttributes();
943
944    /*
945     * Check attributes consistency : we only allow to filter MAY (optional)
946     * attributes of a class : to be compliant with the schema, no MUST
947     * (mandatory) attribute can be filtered by fractional replication.
948     */
949
950    // Check consistency of specific classes attributes
951    Schema schema = DirectoryServer.getSchema();
952    int fractionalMode = newFractionalConfig.fractionalConfigToInt();
953    for (String className : newFractionalSpecificClassesAttributes.keySet())
954    {
955      // Does the class exist ?
956      ObjectClass fractionalClass = schema.getObjectClass(
957        className.toLowerCase());
958      if (fractionalClass == null)
959      {
960        throw new ConfigException(
961          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className));
962      }
963
964      boolean isExtensibleObjectClass =
965          "extensibleObject".equalsIgnoreCase(className);
966
967      Set<String> attributes =
968        newFractionalSpecificClassesAttributes.get(className);
969
970      for (String attrName : attributes)
971      {
972        // Not a prohibited attribute ?
973        if (isFractionalProhibitedAttr(attrName))
974        {
975          throw new ConfigException(
976            NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName));
977        }
978
979        // Does the attribute exist ?
980        AttributeType attributeType = schema.getAttributeType(attrName);
981        if (attributeType != null)
982        {
983          // No more checking for the extensibleObject class
984          if (!isExtensibleObjectClass
985              && fractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL
986              // Exclusive mode : the attribute must be optional
987              && !fractionalClass.isOptional(attributeType))
988          {
989            throw new ConfigException(
990                NOTE_ERR_FRACTIONAL_CONFIG_NOT_OPTIONAL_ATTRIBUTE.get(attrName,
991                    className));
992          }
993        }
994        else
995        {
996          throw new ConfigException(
997            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName));
998        }
999      }
1000    }
1001
1002    // Check consistency of all classes attributes
1003    for (String attrName : newFractionalAllClassesAttributes)
1004    {
1005      // Not a prohibited attribute ?
1006      if (isFractionalProhibitedAttr(attrName))
1007      {
1008        throw new ConfigException(
1009          NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName));
1010      }
1011
1012      // Does the attribute exist ?
1013      if (schema.getAttributeType(attrName) == null)
1014      {
1015        throw new ConfigException(
1016          NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName));
1017      }
1018    }
1019  }
1020
1021  /**
1022   * Test if the passed attribute is not allowed to be used in configuration of
1023   * fractional replication.
1024   * @param attr Attribute to test.
1025   * @return true if the attribute is prohibited.
1026   */
1027  private static boolean isFractionalProhibitedAttr(String attr)
1028  {
1029    for (String forbiddenAttr : FRACTIONAL_PROHIBITED_ATTRIBUTES)
1030    {
1031      if (forbiddenAttr.equalsIgnoreCase(attr))
1032      {
1033        return true;
1034      }
1035    }
1036    return false;
1037  }
1038
1039  /**
1040   * If fractional replication is enabled, this analyzes the operation and
1041   * suppresses the forbidden attributes in it so that they are not added in
1042   * the local backend.
1043   *
1044   * @param addOperation The operation to modify based on fractional
1045   * replication configuration
1046   * @param performFiltering Tells if the effective attribute filtering should
1047   * be performed or if the call is just to analyze if there are some
1048   * attributes filtered by fractional configuration
1049   * @return true if the operation contains some attributes subject to filtering
1050   * by the fractional configuration
1051   */
1052  private boolean fractionalFilterOperation(
1053    PreOperationAddOperation addOperation, boolean performFiltering)
1054  {
1055    return fractionalRemoveAttributesFromEntry(fractionalConfig,
1056      addOperation.getEntryDN().rdn(), addOperation.getObjectClasses(),
1057      addOperation.getUserAttributes(), performFiltering);
1058  }
1059
1060  /**
1061   * If fractional replication is enabled, this analyzes the operation and
1062   * suppresses the forbidden attributes in it so that they are not added in
1063   * the local backend.
1064   *
1065   * @param modifyDNOperation The operation to modify based on fractional
1066   * replication configuration
1067   * @param performFiltering Tells if the effective modifications should
1068   * be performed or if the call is just to analyze if there are some
1069   * inconsistency with fractional configuration
1070   * @return true if the operation is inconsistent with fractional
1071   * configuration
1072   */
1073  private boolean fractionalFilterOperation(
1074    PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering)
1075  {
1076    // Quick exit if not called for analyze and
1077    if (performFiltering && modifyDNOperation.deleteOldRDN())
1078    {
1079      // The core will remove any occurrence of attribute that was part of the
1080      // old RDN, nothing more to do.
1081      return true; // Will not be used as analyze was not requested
1082    }
1083
1084    // Create a list of filtered attributes for this entry
1085    Entry concernedEntry = modifyDNOperation.getOriginalEntry();
1086    Set<String> fractionalConcernedAttributes =
1087      createFractionalConcernedAttrList(fractionalConfig,
1088      concernedEntry.getObjectClasses().keySet());
1089
1090    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1091    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1092    {
1093      // No attributes to filter
1094      return false;
1095    }
1096
1097    /*
1098     * Analyze the old and new rdn to see if they are some attributes to be
1099     * removed: if the oldRDN contains some forbidden attributes (for instance
1100     * it is possible if the entry was created with an add operation and the
1101     * RDN used contains a forbidden attribute: in this case the attribute value
1102     * has been kept to be consistent with the dn of the entry.) that are no
1103     * more part of the new RDN, we must remove any attribute of this type by
1104     * putting a modification to delete the attribute.
1105     */
1106
1107    boolean inconsistentOperation = false;
1108    RDN rdn = modifyDNOperation.getEntryDN().rdn();
1109    RDN newRdn = modifyDNOperation.getNewRDN();
1110
1111    // Go through each attribute of the old RDN
1112    for (int i=0 ; i<rdn.getNumValues() ; i++)
1113    {
1114      AttributeType attributeType = rdn.getAttributeType(i);
1115      // Is it present in the fractional attributes established list ?
1116      boolean foundAttribute =
1117          exists(fractionalConcernedAttributes, attributeType);
1118      if (canRemoveAttribute(fractionalExclusive, foundAttribute)
1119          && !newRdn.hasAttributeType(attributeType)
1120          && !modifyDNOperation.deleteOldRDN())
1121      {
1122        /*
1123         * A forbidden attribute is in the old RDN and no more in the new RDN,
1124         * and it has not been requested to remove attributes from old RDN:
1125         * let's remove the attribute from the entry to stay consistent with
1126         * fractional configuration
1127         */
1128        Modification modification = new Modification(ModificationType.DELETE,
1129          Attributes.empty(attributeType));
1130        modifyDNOperation.addModification(modification);
1131        inconsistentOperation = true;
1132      }
1133    }
1134
1135    return inconsistentOperation;
1136  }
1137
1138  private boolean exists(Set<String> attrNames, AttributeType attrTypeToFind)
1139  {
1140    for (String attrName : attrNames)
1141    {
1142      if (DirectoryServer.getAttributeTypeOrNull(attrName).equals(attrTypeToFind))
1143      {
1144        return true;
1145      }
1146    }
1147    return false;
1148  }
1149
1150  /**
1151   * Remove attributes from an entry, according to the passed fractional
1152   * configuration. The entry is represented by the 2 passed parameters.
1153   * The attributes to be removed are removed using the remove method on the
1154   * passed iterator for the attributes in the entry.
1155   * @param fractionalConfig The fractional configuration to use
1156   * @param entryRdn The rdn of the entry to add
1157   * @param classes The object classes representing the entry to modify
1158   * @param attributesMap The map of attributes/values to be potentially removed
1159   * from the entry.
1160   * @param performFiltering Tells if the effective attribute filtering should
1161   * be performed or if the call is just an analyze to see if there are some
1162   * attributes filtered by fractional configuration
1163   * @return true if the operation contains some attributes subject to filtering
1164   * by the fractional configuration
1165   */
1166   static boolean fractionalRemoveAttributesFromEntry(
1167    FractionalConfig fractionalConfig, RDN entryRdn,
1168    Map<ObjectClass,String> classes, Map<AttributeType,
1169    List<Attribute>> attributesMap, boolean performFiltering)
1170  {
1171    boolean hasSomeAttributesToFilter = false;
1172    /*
1173     * Prepare a list of attributes to be included/excluded according to the
1174     * fractional replication configuration
1175     */
1176
1177    Set<String> fractionalConcernedAttributes =
1178      createFractionalConcernedAttrList(fractionalConfig, classes.keySet());
1179    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1180    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1181    {
1182      return false; // No attributes to filter
1183    }
1184
1185    // Prepare list of object classes of the added entry
1186    Set<ObjectClass> entryClasses = classes.keySet();
1187
1188    /*
1189     * Go through the user attributes and remove those that match filtered one
1190     * - exclude mode : remove only attributes that are in
1191     * fractionalConcernedAttributes
1192     * - include mode : remove any attribute that is not in
1193     * fractionalConcernedAttributes
1194     */
1195    List<List<Attribute>> newRdnAttrLists = new ArrayList<>();
1196    List<AttributeType> rdnAttrTypes = new ArrayList<>();
1197    final Set<AttributeType> attrTypes = attributesMap.keySet();
1198    for (Iterator<AttributeType> iter = attrTypes.iterator(); iter.hasNext();)
1199    {
1200      AttributeType attributeType = iter.next();
1201
1202      // Only optional attributes may be removed
1203      if (isMandatoryAttribute(entryClasses, attributeType)
1204      // Do not remove an attribute if it is a prohibited one
1205          || isFractionalProhibited(attributeType)
1206          || !canRemoveAttribute(attributeType, fractionalExclusive,
1207              fractionalConcernedAttributes))
1208      {
1209        continue;
1210      }
1211
1212      if (!performFiltering)
1213      {
1214        // The call was just to check : at least one attribute to filter
1215        // found, return immediately the answer;
1216        return true;
1217      }
1218
1219      // Do not remove an attribute/value that is part of the RDN of the
1220      // entry as it is forbidden
1221      if (entryRdn.hasAttributeType(attributeType))
1222      {
1223        /*
1224        We must remove all values of the attributes map for this
1225        attribute type but the one that has the value which is in the RDN
1226        of the entry. In fact the (underlying )attribute list does not
1227        support remove so we have to create a new list, keeping only the
1228        attribute value which is the same as in the RDN
1229        */
1230        ByteString rdnAttributeValue =
1231          entryRdn.getAttributeValue(attributeType);
1232        List<Attribute> attrList = attributesMap.get(attributeType);
1233        ByteString sameAttrValue = null;
1234        // Locate the attribute value identical to the one in the RDN
1235        for (Attribute attr : attrList)
1236        {
1237          if (attr.contains(rdnAttributeValue))
1238          {
1239            for (ByteString attrValue : attr) {
1240              if (rdnAttributeValue.equals(attrValue)) {
1241                // Keep the value we want
1242                sameAttrValue = attrValue;
1243              } else {
1244                hasSomeAttributesToFilter = true;
1245              }
1246            }
1247          }
1248          else
1249          {
1250            hasSomeAttributesToFilter = true;
1251          }
1252        }
1253        //    Recreate the attribute list with only the RDN attribute value
1254        if (sameAttrValue != null)
1255          // Paranoia check: should never be the case as we should always
1256          // find the attribute/value pair matching the pair in the RDN
1257        {
1258          // Construct and store new attribute list
1259          newRdnAttrLists.add(Attributes.createAsList(attributeType, sameAttrValue));
1260          /*
1261          Store matching attribute type
1262          The mapping will be done using object from rdnAttrTypes as key
1263          and object from newRdnAttrLists (at same index) as value in
1264          the user attribute map to be modified
1265          */
1266          rdnAttrTypes.add(attributeType);
1267        }
1268      }
1269      else
1270      {
1271        // Found an attribute to remove, remove it from the list.
1272        iter.remove();
1273        hasSomeAttributesToFilter = true;
1274      }
1275    }
1276    // Now overwrite the attribute values for the attribute types present in the
1277    // RDN, if there are some filtered attributes in the RDN
1278    for (int index = 0 ; index < rdnAttrTypes.size() ; index++)
1279    {
1280      attributesMap.put(rdnAttrTypes.get(index), newRdnAttrLists.get(index));
1281    }
1282    return hasSomeAttributesToFilter;
1283  }
1284
1285   private static boolean isMandatoryAttribute(Set<ObjectClass> entryClasses, AttributeType attributeType)
1286   {
1287     for (ObjectClass objectClass : entryClasses)
1288     {
1289       if (objectClass.isRequired(attributeType))
1290       {
1291         return true;
1292       }
1293     }
1294     return false;
1295   }
1296
1297   private static boolean isFractionalProhibited(AttributeType attrType)
1298   {
1299     String attributeName = attrType.getPrimaryName();
1300     return (attributeName != null && isFractionalProhibitedAttr(attributeName))
1301         || isFractionalProhibitedAttr(attrType.getOID());
1302   }
1303
1304  private static boolean canRemoveAttribute(AttributeType attributeType,
1305      boolean fractionalExclusive, Set<String> fractionalConcernedAttributes)
1306  {
1307    String attributeName = attributeType.getPrimaryName();
1308    String attributeOid = attributeType.getOID();
1309
1310    // Is the current attribute part of the established list ?
1311    boolean foundAttribute =
1312        contains(fractionalConcernedAttributes, attributeName, attributeOid);
1313    // Now remove the attribute or modification if:
1314    // - exclusive mode and attribute is in configuration
1315    // - inclusive mode and attribute is not in configuration
1316    return canRemoveAttribute(fractionalExclusive, foundAttribute);
1317  }
1318
1319  private static boolean canRemoveAttribute(boolean fractionalExclusive,
1320      boolean foundAttribute)
1321  {
1322    return (foundAttribute && fractionalExclusive)
1323        || (!foundAttribute && !fractionalExclusive);
1324  }
1325
1326  private static boolean contains(Set<String> attrNames, String attrName,
1327      String attrOID)
1328  {
1329    return attrNames.contains(attrOID)
1330        || (attrName != null && attrNames.contains(attrName.toLowerCase()));
1331  }
1332
1333  /**
1334   * Prepares a list of attributes of interest for the fractional feature.
1335   * @param fractionalConfig The fractional configuration to use
1336   * @param entryObjectClasses The object classes of an entry on which an
1337   * operation is going to be performed.
1338   * @return The list of attributes of the entry to be excluded/included
1339   * when the operation will be performed.
1340   */
1341  private static Set<String> createFractionalConcernedAttrList(
1342    FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses)
1343  {
1344    /*
1345     * Is the concerned entry of a type concerned by fractional replication
1346     * configuration ? If yes, add the matching attribute names to a set of
1347     * attributes to take into account for filtering
1348     * (inclusive or exclusive mode).
1349     * Using a Set to avoid duplicate attributes (from 2 inheriting classes for
1350     * instance)
1351     */
1352    Set<String> fractionalConcernedAttributes = new HashSet<>();
1353
1354    // Get object classes the entry matches
1355    Set<String> fractionalAllClassesAttributes =
1356      fractionalConfig.getFractionalAllClassesAttributes();
1357    Map<String, Set<String>> fractionalSpecificClassesAttributes =
1358      fractionalConfig.getFractionalSpecificClassesAttributes();
1359
1360    Set<String> fractionalClasses =
1361        fractionalSpecificClassesAttributes.keySet();
1362    for (ObjectClass entryObjectClass : entryObjectClasses)
1363    {
1364      for(String fractionalClass : fractionalClasses)
1365      {
1366        if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase()))
1367        {
1368          fractionalConcernedAttributes.addAll(
1369              fractionalSpecificClassesAttributes.get(fractionalClass));
1370        }
1371      }
1372    }
1373
1374    // Add to the set any attribute which is class independent
1375    fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes);
1376
1377    return fractionalConcernedAttributes;
1378  }
1379
1380  /**
1381   * If fractional replication is enabled, this analyzes the operation and
1382   * suppresses the forbidden attributes in it so that they are not added/
1383   * deleted/modified in the local backend.
1384   *
1385   * @param modifyOperation The operation to modify based on fractional
1386   * replication configuration
1387   * @param performFiltering Tells if the effective attribute filtering should
1388   * be performed or if the call is just to analyze if there are some
1389   * attributes filtered by fractional configuration
1390   * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES,
1391   * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP
1392   */
1393  private int fractionalFilterOperation(PreOperationModifyOperation
1394    modifyOperation, boolean performFiltering)
1395  {
1396    /*
1397     * Prepare a list of attributes to be included/excluded according to the
1398     * fractional replication configuration
1399     */
1400
1401    Entry modifiedEntry = modifyOperation.getCurrentEntry();
1402    Set<String> fractionalConcernedAttributes =
1403      createFractionalConcernedAttrList(fractionalConfig,
1404      modifiedEntry.getObjectClasses().keySet());
1405    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
1406    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
1407    {
1408      // No attributes to filter
1409      return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1410    }
1411
1412    // Prepare list of object classes of the modified entry
1413    DN entryToModifyDn = modifyOperation.getEntryDN();
1414    Entry entryToModify;
1415    try
1416    {
1417      entryToModify = DirectoryServer.getEntry(entryToModifyDn);
1418    }
1419    catch(DirectoryException e)
1420    {
1421      logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e));
1422      return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1423    }
1424    Set<ObjectClass> entryClasses = entryToModify.getObjectClasses().keySet();
1425
1426    /*
1427     * Now go through the attribute modifications and filter the mods according
1428     * to the fractional configuration (using the just established concerned
1429     * attributes list):
1430     * - delete attributes: remove them if regarding a filtered attribute
1431     * - add attributes: remove them if regarding a filtered attribute
1432     * - modify attributes: remove them if regarding a filtered attribute
1433     */
1434
1435    int result = FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES;
1436    List<Modification> mods = modifyOperation.getModifications();
1437    Iterator<Modification> modsIt = mods.iterator();
1438    while (modsIt.hasNext())
1439    {
1440      Modification mod = modsIt.next();
1441      Attribute attr = mod.getAttribute();
1442      AttributeType attrType = attr.getAttributeType();
1443      // Fractional replication ignores operational attributes
1444      if (attrType.isOperational()
1445          || isMandatoryAttribute(entryClasses, attrType)
1446          || isFractionalProhibited(attrType)
1447          || !canRemoveAttribute(attrType, fractionalExclusive,
1448              fractionalConcernedAttributes))
1449      {
1450        continue;
1451      }
1452
1453      if (!performFiltering)
1454      {
1455        // The call was just to check : at least one attribute to filter
1456        // found, return immediately the answer;
1457        return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
1458      }
1459
1460      // Found a modification to remove, remove it from the list.
1461      modsIt.remove();
1462      result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
1463      if (mods.isEmpty())
1464      {
1465        // This operation must become a no-op as no more modification in it
1466        return FRACTIONAL_BECOME_NO_OP;
1467      }
1468    }
1469
1470    return result;
1471  }
1472
1473  /**
1474   * This is overwritten to allow stopping the (online) import process by the
1475   * fractional ldif import plugin when it detects that the (imported) remote
1476   * data set is not consistent with the local fractional configuration.
1477   * {@inheritDoc}
1478   */
1479  @Override
1480  protected byte[] receiveEntryBytes()
1481  {
1482    if (isFollowImport())
1483    {
1484      // Ok, next entry is allowed to be received
1485      return super.receiveEntryBytes();
1486    }
1487
1488    // Fractional ldif import plugin detected inconsistency between local and
1489    // remote server fractional configuration and is stopping the import
1490    // process:
1491    // This is an error termination during the import
1492    // The error is stored and the import is ended by returning null
1493    final ImportExportContext ieCtx = getImportExportContext();
1494    LocalizableMessage msg = null;
1495    switch (importErrorMessageId)
1496    {
1497    case IMPORT_ERROR_MESSAGE_BAD_REMOTE:
1498      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(getBaseDN(), ieCtx.getImportSource());
1499      break;
1500    case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL:
1501      msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(getBaseDN(), ieCtx.getImportSource());
1502      break;
1503    }
1504    ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg));
1505    return null;
1506  }
1507
1508  /**
1509   * This is overwritten to allow stopping the (online) export process if the
1510   * local domain is fractional and the destination is all other servers:
1511   * This make no sense to have only fractional servers in a replicated
1512   * topology. This prevents from administrator manipulation error that would
1513   * lead to whole topology data corruption.
1514   * {@inheritDoc}
1515   */
1516  @Override
1517  protected void initializeRemote(int target, int requestorID,
1518    Task initTask, int initWindow) throws DirectoryException
1519  {
1520    if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional())
1521    {
1522      LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get(getBaseDN(), getServerId());
1523      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg);
1524    }
1525
1526    super.initializeRemote(target, requestorID, initTask, initWindow);
1527  }
1528
1529  /**
1530   * Implement the  handleConflictResolution phase of the deleteOperation.
1531   *
1532   * @param deleteOperation The deleteOperation.
1533   * @return A SynchronizationProviderResult indicating if the operation
1534   *         can continue.
1535   */
1536  SynchronizationProviderResult handleConflictResolution(
1537         PreOperationDeleteOperation deleteOperation)
1538  {
1539    if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected())
1540    {
1541      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1542      return new SynchronizationProviderResult.StopProcessing(
1543          ResultCode.UNWILLING_TO_PERFORM, msg);
1544    }
1545
1546    DeleteContext ctx =
1547      (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
1548    Entry deletedEntry = deleteOperation.getEntryToDelete();
1549
1550    if (ctx != null)
1551    {
1552      /*
1553       * This is a replication operation
1554       * Check that the modified entry has the same entryuuid
1555       * as it was in the original message.
1556       */
1557      String operationEntryUUID = ctx.getEntryUUID();
1558      String deletedEntryUUID = getEntryUUID(deletedEntry);
1559      if (!operationEntryUUID.equals(deletedEntryUUID))
1560      {
1561        /*
1562         * The changes entry is not the same entry as the one on
1563         * the original change was performed.
1564         * Probably the original entry was renamed and replaced with
1565         * another entry.
1566         * We must not let the change proceed, return a negative
1567         * result and set the result code to NO_SUCH_OBJECT.
1568         * When the operation will return, the thread that started the operation
1569         * will try to find the correct entry and restart a new operation.
1570         */
1571        return new SynchronizationProviderResult.StopProcessing(
1572            ResultCode.NO_SUCH_OBJECT, null);
1573      }
1574    }
1575    else
1576    {
1577      // There is no replication context attached to the operation
1578      // so this is not a replication operation.
1579      CSN csn = generateCSN(deleteOperation);
1580      String modifiedEntryUUID = getEntryUUID(deletedEntry);
1581      ctx = new DeleteContext(csn, modifiedEntryUUID);
1582      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
1583
1584      synchronized (replayOperations)
1585      {
1586        int size = replayOperations.size();
1587        if (size >= 10000)
1588        {
1589          replayOperations.remove(replayOperations.firstKey());
1590        }
1591        FakeOperation op = new FakeDelOperation(
1592            deleteOperation.getEntryDN(), csn, modifiedEntryUUID);
1593        replayOperations.put(csn, op);
1594      }
1595    }
1596
1597    return new SynchronizationProviderResult.ContinueProcessing();
1598  }
1599
1600  /**
1601   * Implement the  handleConflictResolution phase of the addOperation.
1602   *
1603   * @param addOperation The AddOperation.
1604   * @return A SynchronizationProviderResult indicating if the operation
1605   *         can continue.
1606   */
1607  SynchronizationProviderResult handleConflictResolution(
1608      PreOperationAddOperation addOperation)
1609  {
1610    if (!addOperation.isSynchronizationOperation() && !brokerIsConnected())
1611    {
1612      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1613      return new SynchronizationProviderResult.StopProcessing(
1614          ResultCode.UNWILLING_TO_PERFORM, msg);
1615    }
1616
1617    if (fractionalConfig.isFractional())
1618    {
1619      if (addOperation.isSynchronizationOperation())
1620      {
1621        /*
1622         * Filter attributes here for fractional replication. If fractional
1623         * replication is enabled, we analyze the operation to suppress the
1624         * forbidden attributes in it so that they are not added in the local
1625         * backend. This must be called before any other plugin is called, to
1626         * keep coherency across plugin calls.
1627         */
1628        fractionalFilterOperation(addOperation, true);
1629      }
1630      else
1631      {
1632        /*
1633         * Direct access from an LDAP client : if some attributes are to be
1634         * removed according to the fractional configuration, simply forbid
1635         * the operation
1636         */
1637        if (fractionalFilterOperation(addOperation, false))
1638        {
1639          LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), addOperation);
1640          return new SynchronizationProviderResult.StopProcessing(
1641            ResultCode.UNWILLING_TO_PERFORM, msg);
1642        }
1643      }
1644    }
1645
1646    if (addOperation.isSynchronizationOperation())
1647    {
1648      AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
1649      /*
1650       * If an entry with the same entry uniqueID already exist then
1651       * this operation has already been replayed in the past.
1652       */
1653      String uuid = ctx.getEntryUUID();
1654      if (findEntryDN(uuid) != null)
1655      {
1656        return new SynchronizationProviderResult.StopProcessing(
1657            ResultCode.NO_OPERATION, null);
1658      }
1659
1660      /* The parent entry may have been renamed here since the change was done
1661       * on the first server, and another entry have taken the former dn
1662       * of the parent entry
1663       */
1664
1665      String parentEntryUUID = ctx.getParentEntryUUID();
1666      // root entry have no parent, there is no need to check for it.
1667      if (parentEntryUUID != null)
1668      {
1669        // There is a potential of perfs improvement here
1670        // if we could avoid the following parent entry retrieval
1671        DN parentDnFromCtx = findEntryDN(ctx.getParentEntryUUID());
1672        if (parentDnFromCtx == null)
1673        {
1674          // The parent does not exist with the specified unique id
1675          // stop the operation with NO_SUCH_OBJECT and let the
1676          // conflict resolution or the dependency resolution solve this.
1677          return new SynchronizationProviderResult.StopProcessing(
1678              ResultCode.NO_SUCH_OBJECT, null);
1679        }
1680
1681        DN entryDN = addOperation.getEntryDN();
1682        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
1683        if (parentDnFromEntryDn != null
1684            && !parentDnFromCtx.equals(parentDnFromEntryDn))
1685        {
1686          // parentEntry has been renamed
1687          // replication name conflict resolution is expected to fix that
1688          // later in the flow
1689          return new SynchronizationProviderResult.StopProcessing(
1690              ResultCode.NO_SUCH_OBJECT, null);
1691        }
1692      }
1693    }
1694    return new SynchronizationProviderResult.ContinueProcessing();
1695  }
1696
1697  /**
1698   * Check that the broker associated to this ReplicationDomain has found
1699   * a Replication Server and that this LDAP server is therefore able to
1700   * process operations.
1701   * If not, set the ResultCode, the response message,
1702   * interrupt the operation, and return false
1703   *
1704   * @return  true when it OK to process the Operation, false otherwise.
1705   *          When false is returned the resultCode and the response message
1706   *          is also set in the Operation.
1707   */
1708  private boolean brokerIsConnected()
1709  {
1710    final IsolationPolicy isolationPolicy = config.getIsolationPolicy();
1711    if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
1712    {
1713      // this policy imply that we always accept updates.
1714      return true;
1715    }
1716    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
1717    {
1718      // this isolation policy specifies that the updates are denied
1719      // when the broker had problems during the connection phase
1720      // Updates are still accepted if the broker is currently connecting..
1721      return !hasConnectionError();
1722    }
1723    // we should never get there as the only possible policies are
1724    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
1725    return true;
1726  }
1727
1728  /**
1729   * Implement the  handleConflictResolution phase of the ModifyDNOperation.
1730   *
1731   * @param modifyDNOperation The ModifyDNOperation.
1732   * @return A SynchronizationProviderResult indicating if the operation
1733   *         can continue.
1734   */
1735  SynchronizationProviderResult handleConflictResolution(
1736      PreOperationModifyDNOperation modifyDNOperation)
1737  {
1738    if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected())
1739    {
1740      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1741      return new SynchronizationProviderResult.StopProcessing(
1742          ResultCode.UNWILLING_TO_PERFORM, msg);
1743    }
1744
1745    if (fractionalConfig.isFractional())
1746    {
1747      if (modifyDNOperation.isSynchronizationOperation())
1748      {
1749        /*
1750         * Filter operation here for fractional replication. If fractional
1751         * replication is enabled, we analyze the operation and modify it if
1752         * necessary to stay consistent with what is defined in fractional
1753         * configuration.
1754         */
1755        fractionalFilterOperation(modifyDNOperation, true);
1756      }
1757      else
1758      {
1759        /*
1760         * Direct access from an LDAP client : something is inconsistent with
1761         * the fractional configuration, forbid the operation.
1762         */
1763        if (fractionalFilterOperation(modifyDNOperation, false))
1764        {
1765          LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyDNOperation);
1766          return new SynchronizationProviderResult.StopProcessing(
1767            ResultCode.UNWILLING_TO_PERFORM, msg);
1768        }
1769      }
1770    }
1771
1772    ModifyDnContext ctx =
1773      (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
1774    if (ctx != null)
1775    {
1776      /*
1777       * This is a replication operation
1778       * Check that the modified entry has the same entryuuid
1779       * as was in the original message.
1780       */
1781      final String modifiedEntryUUID =
1782          getEntryUUID(modifyDNOperation.getOriginalEntry());
1783      if (!modifiedEntryUUID.equals(ctx.getEntryUUID()))
1784      {
1785        /*
1786         * The modified entry is not the same entry as the one on
1787         * the original change was performed.
1788         * Probably the original entry was renamed and replaced with
1789         * another entry.
1790         * We must not let the change proceed, return a negative
1791         * result and set the result code to NO_SUCH_OBJECT.
1792         * When the operation will return, the thread that started the operation
1793         * will try to find the correct entry and restart a new operation.
1794         */
1795        return new SynchronizationProviderResult.StopProcessing(
1796            ResultCode.NO_SUCH_OBJECT, null);
1797      }
1798
1799      if (modifyDNOperation.getNewSuperior() != null)
1800      {
1801        /*
1802         * Also check that the current id of the
1803         * parent is the same as when the operation was performed.
1804         */
1805        String newParentId = findEntryUUID(modifyDNOperation.getNewSuperior());
1806        if (newParentId != null && ctx.getNewSuperiorEntryUUID() != null
1807            && !newParentId.equals(ctx.getNewSuperiorEntryUUID()))
1808        {
1809        return new SynchronizationProviderResult.StopProcessing(
1810            ResultCode.NO_SUCH_OBJECT, null);
1811        }
1812      }
1813
1814      /*
1815       * If the object has been renamed more recently than this
1816       * operation, cancel the operation.
1817       */
1818      EntryHistorical hist = EntryHistorical.newInstanceFromEntry(
1819          modifyDNOperation.getOriginalEntry());
1820      if (hist.addedOrRenamedAfter(ctx.getCSN()))
1821      {
1822        return new SynchronizationProviderResult.StopProcessing(
1823            ResultCode.NO_OPERATION, null);
1824      }
1825    }
1826    else
1827    {
1828      // There is no replication context attached to the operation
1829      // so this is not a replication operation.
1830      CSN csn = generateCSN(modifyDNOperation);
1831      String newParentId = null;
1832      if (modifyDNOperation.getNewSuperior() != null)
1833      {
1834        newParentId = findEntryUUID(modifyDNOperation.getNewSuperior());
1835      }
1836
1837      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
1838      String modifiedEntryUUID = getEntryUUID(modifiedEntry);
1839      ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId);
1840      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
1841    }
1842    return new SynchronizationProviderResult.ContinueProcessing();
1843  }
1844
1845  /**
1846   * Handle the conflict resolution.
1847   * Called by the core server after locking the entry and before
1848   * starting the actual modification.
1849   * @param modifyOperation the operation
1850   * @return code indicating is operation must proceed
1851   */
1852  SynchronizationProviderResult handleConflictResolution(
1853         PreOperationModifyOperation modifyOperation)
1854  {
1855    if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected())
1856    {
1857      LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN());
1858      return new SynchronizationProviderResult.StopProcessing(
1859          ResultCode.UNWILLING_TO_PERFORM, msg);
1860    }
1861
1862    if (fractionalConfig.isFractional())
1863    {
1864      if  (modifyOperation.isSynchronizationOperation())
1865      {
1866        /*
1867         * Filter attributes here for fractional replication. If fractional
1868         * replication is enabled, we analyze the operation and modify it so
1869         * that no forbidden attribute is added/modified/deleted in the local
1870         * backend. This must be called before any other plugin is called, to
1871         * keep coherency across plugin calls.
1872         */
1873        if (fractionalFilterOperation(modifyOperation, true) ==
1874          FRACTIONAL_BECOME_NO_OP)
1875        {
1876          // Every modifications filtered in this operation: the operation
1877          // becomes a no-op
1878          return new SynchronizationProviderResult.StopProcessing(
1879            ResultCode.NO_OPERATION, null);
1880        }
1881      }
1882      else
1883      {
1884        /*
1885         * Direct access from an LDAP client : if some attributes are to be
1886         * removed according to the fractional configuration, simply forbid
1887         * the operation
1888         */
1889        switch(fractionalFilterOperation(modifyOperation, false))
1890        {
1891          case FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES:
1892            // Ok, let the operation happen
1893            break;
1894          case FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES:
1895            // Some attributes not compliant with fractional configuration :
1896            // forbid the operation
1897            LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyOperation);
1898            return new SynchronizationProviderResult.StopProcessing(
1899              ResultCode.UNWILLING_TO_PERFORM, msg);
1900        }
1901      }
1902    }
1903
1904    ModifyContext ctx =
1905      (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
1906
1907    Entry modifiedEntry = modifyOperation.getModifiedEntry();
1908    if (ctx == null)
1909    {
1910      // No replication ctx attached => not a replicated operation
1911      // - create a ctx with : CSN, entryUUID
1912      // - attach the context to the op
1913
1914      CSN csn = generateCSN(modifyOperation);
1915      ctx = new ModifyContext(csn, getEntryUUID(modifiedEntry));
1916
1917      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
1918    }
1919    else
1920    {
1921      // Replication ctx attached => this is a replicated operation being
1922      // replayed here, it is necessary to
1923      // - check if the entry has been renamed
1924      // - check for conflicts
1925      String modifiedEntryUUID = ctx.getEntryUUID();
1926      String currentEntryUUID = getEntryUUID(modifiedEntry);
1927      if (currentEntryUUID != null
1928          && !currentEntryUUID.equals(modifiedEntryUUID))
1929      {
1930        /*
1931         * The current modified entry is not the same entry as the one on
1932         * the original modification was performed.
1933         * Probably the original entry was renamed and replaced with
1934         * another entry.
1935         * We must not let the modification proceed, return a negative
1936         * result and set the result code to NO_SUCH_OBJECT.
1937         * When the operation will return, the thread that started the
1938         * operation will try to find the correct entry and restart a new
1939         * operation.
1940         */
1941         return new SynchronizationProviderResult.StopProcessing(
1942              ResultCode.NO_SUCH_OBJECT, null);
1943      }
1944
1945      // Solve the conflicts between modify operations
1946      EntryHistorical historicalInformation =
1947        EntryHistorical.newInstanceFromEntry(modifiedEntry);
1948      modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
1949                                    historicalInformation);
1950
1951      if (historicalInformation.replayOperation(modifyOperation, modifiedEntry))
1952      {
1953        numResolvedModifyConflicts.incrementAndGet();
1954      }
1955    }
1956    return new SynchronizationProviderResult.ContinueProcessing();
1957  }
1958
1959  /**
1960   * The preOperation phase for the add Operation.
1961   * Its job is to generate the replication context associated to the
1962   * operation. It is necessary to do it in this phase because contrary to
1963   * the other operations, the entry UUID is not set when the handleConflict
1964   * phase is called.
1965   *
1966   * @param addOperation The Add Operation.
1967   */
1968  void doPreOperation(PreOperationAddOperation addOperation)
1969  {
1970    final CSN csn = generateCSN(addOperation);
1971    final String entryUUID = getEntryUUID(addOperation);
1972    final AddContext ctx = new AddContext(csn, entryUUID,
1973        findEntryUUID(addOperation.getEntryDN().getParentDNInSuffix()));
1974    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
1975  }
1976
1977  @Override
1978  public void publishReplicaOfflineMsg()
1979  {
1980    pendingChanges.putReplicaOfflineMsg();
1981    dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
1982  }
1983
1984  /**
1985   * Check if an operation must be synchronized.
1986   * Also update the list of pending changes and the server RUV
1987   * @param op the operation
1988   */
1989  void synchronize(PostOperationOperation op)
1990  {
1991    ResultCode result = op.getResultCode();
1992    // Note that a failed non-replication operation might not have a change
1993    // number.
1994    CSN curCSN = OperationContext.getCSN(op);
1995    if (curCSN != null && config.isLogChangenumber())
1996    {
1997      op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(),
1998          "replicationCSN", curCSN));
1999    }
2000
2001    if (result == ResultCode.SUCCESS)
2002    {
2003      if (op.isSynchronizationOperation())
2004      { // Replaying a sync operation
2005        numReplayedPostOpCalled.incrementAndGet();
2006        try
2007        {
2008          remotePendingChanges.commit(curCSN);
2009        }
2010        catch (NoSuchElementException e)
2011        {
2012          logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN);
2013          return;
2014        }
2015      }
2016      else
2017      {
2018        // Generate a replication message for a successful non-replication
2019        // operation.
2020        LDAPUpdateMsg msg = LDAPUpdateMsg.generateMsg(op);
2021
2022        if (msg == null)
2023        {
2024          /*
2025          * This is an operation type that we do not know about
2026          * It should never happen.
2027          */
2028          pendingChanges.remove(curCSN);
2029          logger.error(ERR_UNKNOWN_TYPE, op.getOperationType());
2030          return;
2031        }
2032
2033        addEntryAttributesForCL(msg,op);
2034
2035        // If assured replication is configured, this will prepare blocking
2036        // mechanism. If assured replication is disabled, this returns
2037        // immediately
2038        prepareWaitForAckIfAssuredEnabled(msg);
2039        try
2040        {
2041          msg.encode();
2042          pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
2043        }
2044        catch (NoSuchElementException e)
2045        {
2046          logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN);
2047          return;
2048        }
2049        // If assured replication is enabled, this will wait for the matching
2050        // ack or time out. If assured replication is disabled, this returns
2051        // immediately
2052        try
2053        {
2054          waitForAckIfAssuredEnabled(msg);
2055        } catch (TimeoutException ex)
2056        {
2057          // This exception may only be raised if assured replication is enabled
2058          logger.info(NOTE_DS_ACK_TIMEOUT, getBaseDN(), getAssuredTimeout(), msg);
2059        }
2060      }
2061
2062      /*
2063       * If the operation is a DELETE on the base entry of the suffix
2064       * that is replicated, the generation is now lost because the
2065       * DB is empty. We need to save it again the next time we add an entry.
2066       */
2067      if (OperationType.DELETE.equals(op.getOperationType())
2068          && ((PostOperationDeleteOperation) op)
2069                .getEntryDN().equals(getBaseDN()))
2070      {
2071        generationIdSavedStatus = false;
2072      }
2073
2074      if (!generationIdSavedStatus)
2075      {
2076        saveGenerationId(generationId);
2077      }
2078    }
2079    else if (!op.isSynchronizationOperation() && curCSN != null)
2080    {
2081      // Remove an unsuccessful non-replication operation from the pending
2082      // changes list.
2083      pendingChanges.remove(curCSN);
2084      pendingChanges.pushCommittedChanges();
2085    }
2086
2087    checkForClearedConflict(op);
2088  }
2089
2090  /**
2091   * Check if the operation that just happened has cleared a conflict :
2092   * Clearing a conflict happens if the operation has free a DN that
2093   * for which an other entry was in conflict.
2094   * Steps:
2095   * - get the DN freed by a DELETE or MODRDN op
2096   * - search for entries put in the conflict space (dn=entryUUID'+'....)
2097   *   because the expected DN was not available (ds-sync-conflict=expected DN)
2098   * - retain the entry with the oldest conflict
2099   * - rename this entry with the freedDN as it was expected originally
2100   */
2101   private void checkForClearedConflict(PostOperationOperation op)
2102   {
2103     OperationType type = op.getOperationType();
2104     if (op.getResultCode() != ResultCode.SUCCESS)
2105     {
2106       // those operations cannot have cleared a conflict
2107       return;
2108     }
2109
2110     DN freedDN;
2111     if (type == OperationType.DELETE)
2112     {
2113       freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
2114     }
2115     else if (type == OperationType.MODIFY_DN)
2116     {
2117       freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
2118     }
2119     else
2120     {
2121       return;
2122     }
2123
2124    SearchFilter filter;
2125    try
2126    {
2127      filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT,
2128          ByteString.valueOfUtf8(freedDN.toString())).toSearchFilter();
2129    }
2130    catch (DirectoryException e)
2131    {
2132      // can not happen?
2133      logger.traceException(e);
2134      return;
2135    }
2136
2137    SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
2138        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
2139    InternalSearchOperation searchOp =  conn.processSearch(request);
2140
2141     Entry entryToRename = null;
2142     CSN entryToRenameCSN = null;
2143     for (SearchResultEntry entry : searchOp.getSearchEntries())
2144     {
2145       EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry);
2146       if (entryToRename == null)
2147       {
2148         entryToRename = entry;
2149         entryToRenameCSN = history.getDNDate();
2150       }
2151       else if (!history.addedOrRenamedAfter(entryToRenameCSN))
2152       {
2153         // this conflict is older than the previous, keep it.
2154         entryToRename = entry;
2155         entryToRenameCSN = history.getDNDate();
2156       }
2157     }
2158
2159     if (entryToRename != null)
2160     {
2161       DN entryDN = entryToRename.getName();
2162       ModifyDNOperation newOp = renameEntry(
2163           entryDN, freedDN.rdn(), freedDN.parent(), false);
2164
2165       ResultCode res = newOp.getResultCode();
2166       if (res != ResultCode.SUCCESS)
2167       {
2168        logger.error(ERR_COULD_NOT_SOLVE_CONFLICT, entryDN, res);
2169       }
2170     }
2171   }
2172
2173  /**
2174   * Rename an Entry Using a synchronization, non-replicated operation.
2175   * This method should be used instead of the InternalConnection methods
2176   * when the operation that need to be run must be local only and therefore
2177   * not replicated to the RS.
2178   *
2179   * @param targetDN     The DN of the entry to rename.
2180   * @param newRDN       The new RDN to be used.
2181   * @param parentDN     The parentDN to be used.
2182   * @param markConflict A boolean indicating is this entry should be marked
2183   *                     as a conflicting entry. In such case the
2184   *                     DS_SYNC_CONFLICT attribute will be added to the entry
2185   *                     with the value of its original DN.
2186   *                     If false, the DS_SYNC_CONFLICT attribute will be
2187   *                     cleared.
2188   *
2189   * @return The operation that was run to rename the entry.
2190   */
2191  private ModifyDNOperation renameEntry(DN targetDN, RDN newRDN, DN parentDN,
2192      boolean markConflict)
2193  {
2194    ModifyDNOperation newOp = new ModifyDNOperationBasis(
2195        conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
2196        targetDN, newRDN, false, parentDN);
2197
2198    if (markConflict)
2199    {
2200      Attribute attr = Attributes.create(DS_SYNC_CONFLICT, targetDN.toString());
2201      newOp.addModification(new Modification(ModificationType.REPLACE, attr));
2202    }
2203    else
2204    {
2205      Attribute attr = Attributes.empty(DS_SYNC_CONFLICT);
2206      newOp.addModification(new Modification(ModificationType.DELETE, attr));
2207    }
2208
2209    runAsSynchronizedOperation(newOp);
2210    return newOp;
2211  }
2212
2213  private void runAsSynchronizedOperation(Operation op)
2214  {
2215    op.setInternalOperation(true);
2216    op.setSynchronizationOperation(true);
2217    op.setDontSynchronize(true);
2218    op.run();
2219  }
2220
2221  /** Delete this ReplicationDomain. */
2222  void delete()
2223  {
2224    shutdown();
2225    removeECLDomainCfg();
2226  }
2227
2228  /** Shutdown this ReplicationDomain. */
2229  public void shutdown()
2230  {
2231    if (shutdown.compareAndSet(false, true))
2232    {
2233      final RSUpdater rsUpdater = this.rsUpdater.get();
2234      if (rsUpdater != null)
2235      {
2236        rsUpdater.initiateShutdown();
2237      }
2238
2239      // stop the thread in charge of flushing the ServerState.
2240      if (flushThread != null)
2241      {
2242        flushThread.initiateShutdown();
2243        synchronized (flushThread)
2244        {
2245          flushThread.notify();
2246        }
2247      }
2248
2249      DirectoryServer.deregisterAlertGenerator(this);
2250      DirectoryServer.deregisterBackendInitializationListener(this);
2251      DirectoryServer.deregisterShutdownListener(this);
2252
2253      // stop the ReplicationDomain
2254      disableService();
2255    }
2256
2257    // wait for completion of the ServerStateFlush thread.
2258    try
2259    {
2260      while (!done)
2261      {
2262        Thread.sleep(50);
2263      }
2264    } catch (InterruptedException e)
2265    {
2266      Thread.currentThread().interrupt();
2267    }
2268  }
2269
2270  /**
2271   * Create and replay a synchronized Operation from an UpdateMsg.
2272   *
2273   * @param msg
2274   *          The UpdateMsg to be replayed.
2275   * @param shutdown
2276   *          whether the server initiated shutdown
2277   */
2278  void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
2279  {
2280    // Try replay the operation, then flush (replaying) any pending operation
2281    // whose dependency has been replayed until no more left.
2282    do
2283    {
2284      Operation op = null; // the last operation on which replay was attempted
2285      boolean dependency = false;
2286      String replayErrorMsg = null;
2287      CSN csn = null;
2288      try
2289      {
2290        // The next operation for which to attempt replay.
2291        // This local variable allow to keep error messages in the "op" local
2292        // variable until the next loop iteration starts.
2293        // "op" is already initialized to the next Operation because of the
2294        // error handling paths.
2295        Operation nextOp = op = msg.createOperation(conn);
2296        dependency = remotePendingChanges.checkDependencies(op, msg);
2297
2298        boolean replayDone = false;
2299        int retryCount = 10;
2300        while (!dependency && !replayDone && retryCount-- > 0)
2301        {
2302          if (shutdown.get())
2303          {
2304            // shutdown initiated, let's leave
2305            return;
2306          }
2307          // Try replay the operation
2308          op = nextOp;
2309          op.setInternalOperation(true);
2310          op.setSynchronizationOperation(true);
2311
2312          // Always add the ManageDSAIT control so that updates to referrals
2313          // are processed locally.
2314          op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL));
2315
2316          csn = OperationContext.getCSN(op);
2317          op.run();
2318
2319          ResultCode result = op.getResultCode();
2320
2321          if (result != ResultCode.SUCCESS)
2322          {
2323            if (result == ResultCode.NO_OPERATION)
2324            {
2325              // Pre-operation conflict resolution detected that the operation
2326              // was a no-op. For example, an add which has already been
2327              // replayed, or a modify DN operation on an entry which has been
2328              // renamed by a more recent modify DN.
2329              replayDone = true;
2330            }
2331            else if (result == ResultCode.BUSY)
2332            {
2333              /*
2334               * We probably could not get a lock (OPENDJ-885). Give the server
2335               * another chance to process this operation immediately.
2336               */
2337              Thread.yield();
2338              continue;
2339            }
2340            else if (result == ResultCode.UNAVAILABLE)
2341            {
2342              /*
2343               * It can happen when a rebuild is performed or the backend is
2344               * offline (OPENDJ-49). Give the server another chance to process
2345               * this operation after some time.
2346               */
2347              Thread.sleep(50);
2348              continue;
2349            }
2350            else if (op instanceof ModifyOperation)
2351            {
2352              ModifyOperation castOp = (ModifyOperation) op;
2353              dependency = remotePendingChanges.checkDependencies(castOp);
2354              ModifyMsg modifyMsg = (ModifyMsg) msg;
2355              replayDone = solveNamingConflict(castOp, modifyMsg);
2356            }
2357            else if (op instanceof DeleteOperation)
2358            {
2359              DeleteOperation castOp = (DeleteOperation) op;
2360              dependency = remotePendingChanges.checkDependencies(castOp);
2361              replayDone = solveNamingConflict(castOp, msg);
2362            }
2363            else if (op instanceof AddOperation)
2364            {
2365              AddOperation castOp = (AddOperation) op;
2366              AddMsg addMsg = (AddMsg) msg;
2367              dependency = remotePendingChanges.checkDependencies(castOp);
2368              replayDone = solveNamingConflict(castOp, addMsg);
2369            }
2370            else if (op instanceof ModifyDNOperation)
2371            {
2372              ModifyDNOperation castOp = (ModifyDNOperation) op;
2373              replayDone = solveNamingConflict(castOp, msg);
2374            }
2375            else
2376            {
2377              replayDone = true; // unknown type of operation ?!
2378            }
2379
2380            if (replayDone)
2381            {
2382              // the update became a dummy update and the result
2383              // of the conflict resolution phase is to do nothing.
2384              // however we still need to push this change to the serverState
2385              updateError(csn);
2386            }
2387            else
2388            {
2389              /*
2390               * Create a new operation reflecting the new state of the
2391               * UpdateMsg after conflict resolution modified it.
2392               *  Note: When msg is a DeleteMsg, the DeleteOperation is properly
2393               *  created with subtreeDelete request control when needed.
2394               */
2395              nextOp = msg.createOperation(conn);
2396            }
2397          }
2398          else
2399          {
2400            replayDone = true;
2401          }
2402        }
2403
2404        if (!replayDone && !dependency)
2405        {
2406          // Continue with the next change but the servers could now become
2407          // inconsistent.
2408          // Let the repair tool know about this.
2409          final LocalizableMessage message = ERR_LOOP_REPLAYING_OPERATION.get(
2410              op, op.getErrorMessage());
2411          logger.error(message);
2412          numUnresolvedNamingConflicts.incrementAndGet();
2413          replayErrorMsg = message.toString();
2414          updateError(csn);
2415        }
2416      } catch (DecodeException | LDAPException | DataFormatException e)
2417      {
2418        replayErrorMsg = logDecodingOperationError(msg, e);
2419      } catch (Exception e)
2420      {
2421        if (csn != null)
2422        {
2423          /*
2424           * An Exception happened during the replay process.
2425           * Continue with the next change but the servers will now start
2426           * to be inconsistent.
2427           * Let the repair tool know about this.
2428           */
2429          LocalizableMessage message =
2430              ERR_EXCEPTION_REPLAYING_OPERATION.get(
2431                  stackTraceToSingleLineString(e), op);
2432          logger.error(message);
2433          replayErrorMsg = message.toString();
2434          updateError(csn);
2435        } else
2436        {
2437          replayErrorMsg = logDecodingOperationError(msg, e);
2438        }
2439      } finally
2440      {
2441        if (!dependency)
2442        {
2443          processUpdateDone(msg, replayErrorMsg);
2444        }
2445      }
2446
2447      // Now replay any pending update that had a dependency and whose
2448      // dependency has been replayed, do that until no more updates of that
2449      // type left...
2450      msg = remotePendingChanges.getNextUpdate();
2451    } while (msg != null);
2452  }
2453
2454  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
2455  {
2456    LocalizableMessage message =
2457        ERR_EXCEPTION_DECODING_OPERATION.get(msg + " " + stackTraceToSingleLineString(e));
2458    logger.error(message);
2459    return message.toString();
2460  }
2461
2462  /**
2463   * This method is called when an error happens while replaying
2464   * an operation.
2465   * It is necessary because the postOperation does not always get
2466   * called when error or Exceptions happen during the operation replay.
2467   *
2468   * @param csn the CSN of the operation with error.
2469   */
2470  private void updateError(CSN csn)
2471  {
2472    try
2473    {
2474      remotePendingChanges.commit(csn);
2475    }
2476    catch (NoSuchElementException e)
2477    {
2478      // A failure occurred after the change had been removed from the pending
2479      // changes table.
2480      if (logger.isTraceEnabled())
2481      {
2482        logger.trace(
2483            "LDAPReplicationDomain.updateError: Unable to find remote "
2484                + "pending change for CSN %s", csn);
2485      }
2486    }
2487  }
2488
2489  /**
2490   * Generate a new CSN and insert it in the pending list.
2491   *
2492   * @param operation
2493   *          The operation for which the CSN must be generated.
2494   * @return The new CSN.
2495   */
2496  private CSN generateCSN(PluginOperation operation)
2497  {
2498    return pendingChanges.putLocalOperation(operation);
2499  }
2500
2501  /**
2502   * Find the Unique Id of the entry with the provided DN by doing a
2503   * search of the entry and extracting its entryUUID from its attributes.
2504   *
2505   * @param dn The dn of the entry for which the unique Id is searched.
2506   *
2507   * @return The unique Id of the entry with the provided DN.
2508   */
2509  static String findEntryUUID(DN dn)
2510  {
2511    if (dn == null)
2512    {
2513      return null;
2514    }
2515    final SearchRequest request = newSearchRequest(dn, SearchScope.BASE_OBJECT)
2516        .addAttribute(ENTRYUUID_ATTRIBUTE_NAME);
2517    final InternalSearchOperation search = getRootConnection().processSearch(request);
2518    final SearchResultEntry resultEntry = getFirstResult(search);
2519    if (resultEntry != null)
2520    {
2521      return getEntryUUID(resultEntry);
2522    }
2523    return null;
2524  }
2525
2526  private static SearchResultEntry getFirstResult(InternalSearchOperation search)
2527  {
2528    if (search.getResultCode() == ResultCode.SUCCESS)
2529    {
2530      final LinkedList<SearchResultEntry> results = search.getSearchEntries();
2531      if (!results.isEmpty())
2532      {
2533        return results.getFirst();
2534      }
2535    }
2536    return null;
2537  }
2538
2539  /**
2540   * Find the current DN of an entry from its entry UUID.
2541   *
2542   * @param uuid the Entry Unique ID.
2543   * @return The current DN of the entry or null if there is no entry with
2544   *         the specified UUID.
2545   */
2546  private DN findEntryDN(String uuid)
2547  {
2548    try
2549    {
2550      final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, "entryuuid=" + uuid);
2551      InternalSearchOperation search = conn.processSearch(request);
2552      final SearchResultEntry resultEntry = getFirstResult(search);
2553      if (resultEntry != null)
2554      {
2555        return resultEntry.getName();
2556      }
2557    }
2558    catch (DirectoryException e)
2559    {
2560      // never happens because the filter is always valid.
2561    }
2562    return null;
2563  }
2564
2565  /**
2566   * Solve a conflict detected when replaying a modify operation.
2567   *
2568   * @param op The operation that triggered the conflict detection.
2569   * @param msg The operation that triggered the conflict detection.
2570   * @return true if the process is completed, false if it must continue..
2571   */
2572  private boolean solveNamingConflict(ModifyOperation op, ModifyMsg msg)
2573  {
2574    ResultCode result = op.getResultCode();
2575    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
2576    String entryUUID = ctx.getEntryUUID();
2577
2578    if (result == ResultCode.NO_SUCH_OBJECT)
2579    {
2580      /*
2581       * The operation is a modification but
2582       * the entry has been renamed on a different master in the same time.
2583       * search if the entry has been renamed, and return the new dn
2584       * of the entry.
2585       */
2586      DN newDN = findEntryDN(entryUUID);
2587      if (newDN != null)
2588      {
2589        // There is an entry with the same unique id as this modify operation
2590        // replay the modify using the current dn of this entry.
2591        msg.setDN(newDN);
2592        numResolvedNamingConflicts.incrementAndGet();
2593        return false;
2594      }
2595      else
2596      {
2597        // This entry does not exist anymore.
2598        // It has probably been deleted, stop the processing of this operation
2599        numResolvedNamingConflicts.incrementAndGet();
2600        return true;
2601      }
2602    }
2603    else if (result == ResultCode.NOT_ALLOWED_ON_RDN)
2604    {
2605      DN currentDN = findEntryDN(entryUUID);
2606      RDN currentRDN;
2607      if (currentDN != null)
2608      {
2609        currentRDN = currentDN.rdn();
2610      }
2611      else
2612      {
2613        // The entry does not exist anymore.
2614        numResolvedNamingConflicts.incrementAndGet();
2615        return true;
2616      }
2617
2618      // The modify operation is trying to delete the value that is
2619      // currently used in the RDN. We need to alter the modify so that it does
2620      // not remove the current RDN value(s).
2621
2622      List<Modification> mods = op.getModifications();
2623      for (Modification mod : mods)
2624      {
2625        AttributeType modAttrType = mod.getAttribute().getAttributeType();
2626        if ((mod.getModificationType() == ModificationType.DELETE
2627              || mod.getModificationType() == ModificationType.REPLACE)
2628            && currentRDN.hasAttributeType(modAttrType))
2629        {
2630          // the attribute can't be deleted because it is used in the RDN,
2631          // turn this operation is a replace with the current RDN value(s);
2632          mod.setModificationType(ModificationType.REPLACE);
2633          Attribute newAttribute = mod.getAttribute();
2634          AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute);
2635          attrBuilder.add(currentRDN.getAttributeValue(modAttrType));
2636          mod.setAttribute(attrBuilder.toAttribute());
2637        }
2638      }
2639      msg.setMods(mods);
2640      numResolvedNamingConflicts.incrementAndGet();
2641      return false;
2642    }
2643    else
2644    {
2645      // The other type of errors can not be caused by naming conflicts.
2646      // Log a message for the repair tool.
2647      logger.error(ERR_ERROR_REPLAYING_OPERATION,
2648          op, ctx.getCSN(), result, op.getErrorMessage());
2649      return true;
2650    }
2651  }
2652
2653 /**
2654  * Solve a conflict detected when replaying a delete operation.
2655  *
2656  * @param op The operation that triggered the conflict detection.
2657  * @param msg The operation that triggered the conflict detection.
2658  * @return true if the process is completed, false if it must continue..
2659  */
2660 private boolean solveNamingConflict(DeleteOperation op, LDAPUpdateMsg msg)
2661 {
2662   ResultCode result = op.getResultCode();
2663   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
2664   String entryUUID = ctx.getEntryUUID();
2665
2666   if (result == ResultCode.NO_SUCH_OBJECT)
2667   {
2668     /* Find if the entry is still in the database. */
2669     DN currentDN = findEntryDN(entryUUID);
2670     if (currentDN == null)
2671     {
2672       /*
2673        * The entry has already been deleted, either because this delete
2674        * has already been replayed or because another concurrent delete
2675        * has already done the job.
2676        * In any case, there is nothing more to do.
2677        */
2678       numResolvedNamingConflicts.incrementAndGet();
2679       return true;
2680     }
2681     else
2682     {
2683       // This entry has been renamed, replay the delete using its new DN.
2684       msg.setDN(currentDN);
2685       numResolvedNamingConflicts.incrementAndGet();
2686       return false;
2687     }
2688   }
2689   else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
2690   {
2691     /*
2692      * This may happen when we replay a DELETE done on a master
2693      * but children of this entry have been added on another master.
2694      *
2695      * Rename all the children by adding entryuuid in dn and delete this entry.
2696      *
2697      * The action taken here must be consistent with the actions
2698      * done in the solveNamingConflict(AddOperation) method
2699      * when we are adding an entry whose parent entry has already been deleted.
2700      */
2701     if (findAndRenameChild(op.getEntryDN(), op))
2702     {
2703       numUnresolvedNamingConflicts.incrementAndGet();
2704     }
2705
2706     return false;
2707   }
2708   else
2709   {
2710     // The other type of errors can not be caused by naming conflicts.
2711     // Log a message for the repair tool.
2712     logger.error(ERR_ERROR_REPLAYING_OPERATION,
2713         op, ctx.getCSN(), result, op.getErrorMessage());
2714     return true;
2715   }
2716 }
2717
2718/**
2719 * Solve a conflict detected when replaying a Modify DN operation.
2720 *
2721 * @param op The operation that triggered the conflict detection.
2722 * @param msg The operation that triggered the conflict detection.
2723 * @return true if the process is completed, false if it must continue.
2724 * @throws Exception When the operation is not valid.
2725 */
2726private boolean solveNamingConflict(ModifyDNOperation op, LDAPUpdateMsg msg)
2727    throws Exception
2728{
2729  ResultCode result = op.getResultCode();
2730  ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
2731  String entryUUID = ctx.getEntryUUID();
2732  String newSuperiorID = ctx.getNewSuperiorEntryUUID();
2733
2734  /*
2735   * four possible cases :
2736   * - the modified entry has been renamed
2737   * - the new parent has been renamed
2738   * - the operation is replayed for the second time.
2739   * - the entry has been deleted
2740   * action :
2741   *  - change the target dn and the new parent dn and
2742   *        restart the operation,
2743   *  - don't do anything if the operation is replayed.
2744   */
2745
2746  // get the current DN of this entry in the database.
2747  DN currentDN = findEntryDN(entryUUID);
2748
2749  // Construct the new DN to use for the entry.
2750  DN entryDN = op.getEntryDN();
2751  DN newSuperior;
2752  RDN newRDN = op.getNewRDN();
2753
2754  if (newSuperiorID != null)
2755  {
2756    newSuperior = findEntryDN(newSuperiorID);
2757  }
2758  else
2759  {
2760    newSuperior = entryDN.parent();
2761  }
2762
2763  //If we could not find the new parent entry, we missed this entry
2764  // earlier or it has disappeared from the database
2765  // Log this information for the repair tool and mark the entry
2766  // as conflicting.
2767  // stop the processing.
2768  if (newSuperior == null)
2769  {
2770    markConflictEntry(op, currentDN, currentDN.parent().child(newRDN));
2771    numUnresolvedNamingConflicts.incrementAndGet();
2772    return true;
2773  }
2774
2775  DN newDN = newSuperior.child(newRDN);
2776
2777  if (currentDN == null)
2778  {
2779    // The entry targeted by the Modify DN is not in the database
2780    // anymore.
2781    // This is a conflict between a delete and this modify DN.
2782    // The entry has been deleted, we can safely assume
2783    // that the operation is completed.
2784    numResolvedNamingConflicts.incrementAndGet();
2785    return true;
2786  }
2787
2788  // if the newDN and the current DN match then the operation
2789  // is a no-op (this was probably a second replay)
2790  // don't do anything.
2791  if (newDN.equals(currentDN))
2792  {
2793    numResolvedNamingConflicts.incrementAndGet();
2794    return true;
2795  }
2796
2797  if (result == ResultCode.NO_SUCH_OBJECT
2798      || result == ResultCode.UNWILLING_TO_PERFORM
2799      || result == ResultCode.OBJECTCLASS_VIOLATION)
2800  {
2801    /*
2802     * The entry or it's new parent has not been found
2803     * reconstruct the operation with the DN we just built
2804     */
2805    ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
2806    modifyDnMsg.setDN(currentDN);
2807    modifyDnMsg.setNewSuperior(newSuperior.toString());
2808    numResolvedNamingConflicts.incrementAndGet();
2809    return false;
2810  }
2811  else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
2812  {
2813    /*
2814     * This may happen when two modifyDn operation
2815     * are done on different servers but with the same target DN
2816     * add the conflict object class to the entry
2817     * and rename it using its entryuuid.
2818     */
2819    ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
2820    markConflictEntry(op, op.getEntryDN(), newDN);
2821    modifyDnMsg.setNewRDN(generateConflictRDN(entryUUID,
2822                          modifyDnMsg.getNewRDN()));
2823    modifyDnMsg.setNewSuperior(newSuperior.toString());
2824    numUnresolvedNamingConflicts.incrementAndGet();
2825    return false;
2826  }
2827  else
2828  {
2829    // The other type of errors can not be caused by naming conflicts.
2830    // Log a message for the repair tool.
2831    logger.error(ERR_ERROR_REPLAYING_OPERATION,
2832        op, ctx.getCSN(), result, op.getErrorMessage());
2833    return true;
2834  }
2835}
2836
2837  /**
2838   * Solve a conflict detected when replaying a ADD operation.
2839   *
2840   * @param op The operation that triggered the conflict detection.
2841   * @param msg The message that triggered the conflict detection.
2842   * @return true if the process is completed, false if it must continue.
2843   * @throws Exception When the operation is not valid.
2844   */
2845  private boolean solveNamingConflict(AddOperation op, AddMsg msg)
2846      throws Exception
2847  {
2848    ResultCode result = op.getResultCode();
2849    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
2850    String entryUUID = ctx.getEntryUUID();
2851    String parentUniqueId = ctx.getParentEntryUUID();
2852
2853    if (result == ResultCode.NO_SUCH_OBJECT)
2854    {
2855      /*
2856       * This can happen if the parent has been renamed or deleted
2857       * find the parent dn and calculate a new dn for the entry
2858       */
2859      if (parentUniqueId == null)
2860      {
2861        /*
2862         * This entry is the base dn of the backend.
2863         * It is quite surprising that the operation result be NO_SUCH_OBJECT.
2864         * There is nothing more we can do except log a
2865         * message for the repair tool to look at this problem.
2866         * TODO : Log the message
2867         */
2868        return true;
2869      }
2870      DN parentDn = findEntryDN(parentUniqueId);
2871      if (parentDn == null)
2872      {
2873        /*
2874         * The parent has been deleted
2875         * rename the entry as a conflicting entry.
2876         * The action taken here must be consistent with the actions
2877         * done when in the solveNamingConflict(DeleteOperation) method
2878         * when we are deleting an entry that have some child entries.
2879         */
2880        addConflict(msg);
2881
2882        String conflictRDN =
2883            generateConflictRDN(entryUUID, op.getEntryDN().rdn().toString());
2884        msg.setDN(DN.valueOf(conflictRDN + "," + getBaseDN()));
2885        // reset the parent entryUUID so that the check done is the
2886        // handleConflict phase does not fail.
2887        msg.setParentEntryUUID(null);
2888        numUnresolvedNamingConflicts.incrementAndGet();
2889      }
2890      else
2891      {
2892        msg.setDN(DN.valueOf(msg.getDN().rdn() + "," + parentDn));
2893        numResolvedNamingConflicts.incrementAndGet();
2894      }
2895      return false;
2896    }
2897    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
2898    {
2899      /*
2900       * This can happen if
2901       *  - two adds are done on different servers but with the
2902       *    same target DN.
2903       *  - the same ADD is being replayed for the second time on this server.
2904       * if the entryUUID already exist, assume this is a replay and
2905       *        don't do anything
2906       * if the entry unique id do not exist, generate conflict.
2907       */
2908      if (findEntryDN(entryUUID) != null)
2909      {
2910        // entry already exist : this is a replay
2911        return true;
2912      }
2913      else
2914      {
2915        addConflict(msg);
2916        String conflictRDN =
2917            generateConflictRDN(entryUUID, msg.getDN().toString());
2918        msg.setDN(DN.valueOf(conflictRDN));
2919        numUnresolvedNamingConflicts.incrementAndGet();
2920        return false;
2921      }
2922    }
2923    else
2924    {
2925      // The other type of errors can not be caused by naming conflicts.
2926      // log a message for the repair tool.
2927      logger.error(ERR_ERROR_REPLAYING_OPERATION,
2928          op, ctx.getCSN(), result, op.getErrorMessage());
2929      return true;
2930    }
2931  }
2932
2933  /**
2934   * Find all the entries below the provided DN and rename them
2935   * so that they stay below the baseDN of this replicationDomain and
2936   * use the conflicting name and attribute.
2937   *
2938   * @param entryDN    The DN of the entry whose child must be renamed.
2939   * @param conflictOp The Operation that generated the conflict.
2940   */
2941  private boolean findAndRenameChild(DN entryDN, Operation conflictOp)
2942  {
2943    /*
2944     * TODO JNR Ludo thinks that: "Ideally, the operation should verify that the
2945     * entryUUID has not changed or try to use the entryUUID rather than the
2946     * DN.". entryUUID can be obtained from the caller of the current method.
2947     */
2948    boolean conflict = false;
2949
2950    // Find and rename child entries.
2951    final SearchRequest request = newSearchRequest(entryDN, SearchScope.SINGLE_LEVEL)
2952        .addAttribute(ENTRYUUID_ATTRIBUTE_NAME, HISTORICAL_ATTRIBUTE_NAME);
2953    InternalSearchOperation op = conn.processSearch(request);
2954    if (op.getResultCode() == ResultCode.SUCCESS)
2955    {
2956      for (SearchResultEntry entry : op.getSearchEntries())
2957      {
2958        /*
2959         * Check the ADD and ModRDN date of the child entry
2960         * (All of them, not only the one that are newer than the DEL op)
2961         * and keep the entry as a conflicting entry.
2962         */
2963        conflict = true;
2964        renameConflictEntry(conflictOp, entry.getName(), getEntryUUID(entry));
2965      }
2966    }
2967    else
2968    {
2969      // log error and information for the REPAIR tool.
2970      logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, entryDN, conflictOp, op.getResultCode());
2971    }
2972
2973    return conflict;
2974  }
2975
2976  /**
2977   * Rename an entry that was conflicting so that it stays below the
2978   * baseDN of the replicationDomain.
2979   *
2980   * @param conflictOp The Operation that caused the conflict.
2981   * @param dn         The DN of the entry to be renamed.
2982   * @param entryUUID        The uniqueID of the entry to be renamed.
2983   */
2984  private void renameConflictEntry(Operation conflictOp, DN dn,
2985      String entryUUID)
2986  {
2987    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(dn);
2988    DirectoryServer.sendAlertNotification(this,
2989        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
2990
2991    RDN newRDN = generateDeleteConflictDn(entryUUID, dn);
2992    ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true);
2993
2994    if (newOp.getResultCode() != ResultCode.SUCCESS)
2995    {
2996      // log information for the repair tool.
2997      logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY,
2998          dn, conflictOp, newOp.getResultCode());
2999    }
3000  }
3001
3002  /**
3003   * Generate a modification to add the conflict attribute to an entry
3004   * whose Dn is now conflicting with another entry.
3005   *
3006   * @param op        The operation causing the conflict.
3007   * @param currentDN The current DN of the operation to mark as conflicting.
3008   * @param conflictDN     The newDn on which the conflict happened.
3009   */
3010  private void markConflictEntry(Operation op, DN currentDN, DN conflictDN)
3011  {
3012    // create new internal modify operation and run it.
3013    Attribute attr = Attributes.create(DS_SYNC_CONFLICT, conflictDN.toString());
3014    List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
3015
3016    ModifyOperation newOp = new ModifyOperationBasis(
3017          conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
3018          currentDN, mods);
3019    runAsSynchronizedOperation(newOp);
3020
3021    if (newOp.getResultCode() != ResultCode.SUCCESS)
3022    {
3023      // Log information for the repair tool.
3024      logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, op, newOp.getResultCode());
3025    }
3026
3027    // Generate an alert to let the administration know that some
3028    // conflict could not be solved.
3029    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN);
3030    DirectoryServer.sendAlertNotification(this,
3031        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
3032  }
3033
3034  /**
3035   * Add the conflict attribute to an entry that could
3036   * not be added because it is conflicting with another entry.
3037   *
3038   * @param msg            The conflicting Add Operation.
3039   *
3040   * @throws DecodeException When an encoding error happened manipulating the
3041   *                       msg.
3042   */
3043  private void addConflict(AddMsg msg) throws DecodeException
3044  {
3045    String normalizedDN = msg.getDN().toString();
3046
3047    // Generate an alert to let the administrator know that some
3048    // conflict could not be solved.
3049    LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN);
3050    DirectoryServer.sendAlertNotification(this,
3051        ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage);
3052
3053    // Add the conflict attribute
3054    msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN);
3055  }
3056
3057  /**
3058   * Generate the Dn to use for a conflicting entry.
3059   *
3060   * @param entryUUID The unique identifier of the entry involved in the
3061   * conflict.
3062   * @param rdn Original rdn.
3063   * @return The generated RDN for a conflicting entry.
3064   */
3065  private String generateConflictRDN(String entryUUID, String rdn)
3066  {
3067    return "entryuuid=" + entryUUID + "+" + rdn;
3068  }
3069
3070  /**
3071   * Generate the RDN to use for a conflicting entry whose father was deleted.
3072   *
3073   * @param entryUUID The unique identifier of the entry involved in the
3074   *                 conflict.
3075   * @param dn       The original DN of the entry.
3076   *
3077   * @return The generated RDN for a conflicting entry.
3078   */
3079  private RDN generateDeleteConflictDn(String entryUUID, DN dn)
3080  {
3081    String newRDN =  "entryuuid=" + entryUUID + "+" + dn.rdn();
3082    try
3083    {
3084      return RDN.decode(newRDN);
3085    } catch (DirectoryException e)
3086    {
3087      // cannot happen
3088      return null;
3089    }
3090  }
3091
3092  /**
3093   * Check if the domain solve conflicts.
3094   *
3095   * @return a boolean indicating if the domain should solve conflicts.
3096   */
3097  boolean solveConflict()
3098  {
3099    return solveConflictFlag;
3100  }
3101
3102  /**
3103   * Disable the replication on this domain.
3104   * The session to the replication server will be stopped.
3105   * The domain will not be destroyed but call to the pre-operation
3106   * methods will result in failure.
3107   * The listener thread will be destroyed.
3108   * The monitor informations will still be accessible.
3109   */
3110  public void disable()
3111  {
3112    state.save();
3113    state.clearInMemory();
3114    disabled = true;
3115    disableService(); // This will cut the session and wake up the listener
3116  }
3117
3118  /**
3119   * Do what necessary when the data have changed : load state, load
3120   * generation Id.
3121   * If there is no such information check if there is a
3122   * ReplicaUpdateVector entry and translate it into a state
3123   * and generationId.
3124   * @exception DirectoryException Thrown when an error occurs.
3125   */
3126  private void loadDataState() throws DirectoryException
3127  {
3128    state.clearInMemory();
3129    state.loadState();
3130    getGenerator().adjust(state.getMaxCSN(getServerId()));
3131
3132    // Retrieves the generation ID associated with the data imported
3133    generationId = loadGenerationId();
3134  }
3135
3136  /**
3137   * Enable back the domain after a previous disable.
3138   * The domain will connect back to a replication Server and
3139   * will recreate threads to listen for messages from the Synchronization
3140   * server.
3141   * The generationId will be retrieved or computed if necessary.
3142   * The ServerState will also be read again from the local database.
3143   */
3144  public void enable()
3145  {
3146    try
3147    {
3148      loadDataState();
3149    }
3150    catch (Exception e)
3151    {
3152      /* TODO should mark that replicationServer service is
3153       * not available, log an error and retry upon timeout
3154       * should we stop the modifications ?
3155       */
3156      logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
3157      return;
3158    }
3159
3160    enableService();
3161
3162    disabled = false;
3163  }
3164
3165  /**
3166   * Compute the data generationId associated with the current data present
3167   * in the backend for this domain.
3168   * @return The computed generationId.
3169   * @throws DirectoryException When an error occurs.
3170   */
3171  private long computeGenerationId() throws DirectoryException
3172  {
3173    final long genId = exportBackend(null, true);
3174    if (logger.isTraceEnabled())
3175    {
3176      logger.trace("Computed generationId: generationId=" + genId);
3177    }
3178    return genId;
3179  }
3180
3181  /**
3182   * Run a modify operation to update the entry whose DN is given as
3183   * a parameter with the generationID information.
3184   *
3185   * @param entryDN       The DN of the entry to be updated.
3186   * @param generationId  The value of the generationID to be saved.
3187   *
3188   * @return A ResultCode indicating if the operation was successful.
3189   */
3190  private ResultCode runSaveGenerationId(DN entryDN, long generationId)
3191  {
3192    // The generationId is stored in the root entry of the domain.
3193    final ByteString asn1BaseDn = ByteString.valueOfUtf8(entryDN.toString());
3194
3195    LDAPAttribute attr = new LDAPAttribute(REPLICATION_GENERATION_ID, Long.toString(generationId));
3196    List<RawModification> mods = new ArrayList<>(1);
3197    mods.add(new LDAPModification(ModificationType.REPLACE, attr));
3198
3199    ModifyOperation op = new ModifyOperationBasis(
3200          conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
3201          asn1BaseDn, mods);
3202    runAsSynchronizedOperation(op);
3203    return op.getResultCode();
3204  }
3205
3206  /**
3207   * Stores the value of the generationId.
3208   * @param generationId The value of the generationId.
3209   * @return a ResultCode indicating if the method was successful.
3210   */
3211  private ResultCode saveGenerationId(long generationId)
3212  {
3213    ResultCode result = runSaveGenerationId(getBaseDN(), generationId);
3214    if (result != ResultCode.SUCCESS)
3215    {
3216      generationIdSavedStatus = false;
3217      if (result == ResultCode.NO_SUCH_OBJECT)
3218      {
3219        // If the base entry does not exist, save the generation
3220        // ID in the config entry
3221        result = runSaveGenerationId(config.dn(), generationId);
3222      }
3223
3224      if (result != ResultCode.SUCCESS)
3225      {
3226        logger.error(ERR_UPDATING_GENERATION_ID, result.getName(), getBaseDN());
3227      }
3228    }
3229    else
3230    {
3231      generationIdSavedStatus = true;
3232    }
3233    return result;
3234  }
3235
3236  /**
3237   * Load the GenerationId from the root entry of the domain
3238   * from the REPLICATION_GENERATION_ID attribute in database
3239   * to memory, or compute it if not found.
3240   *
3241   * @return generationId The retrieved value of generationId
3242   * @throws DirectoryException When an error occurs.
3243   */
3244  private long loadGenerationId() throws DirectoryException
3245  {
3246    if (logger.isTraceEnabled())
3247    {
3248      logger.trace("Attempt to read generation ID from DB " + getBaseDN());
3249    }
3250
3251    // Search the database entry that is used to periodically save the generation id
3252    final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT)
3253        .addAttribute(REPLICATION_GENERATION_ID);
3254    InternalSearchOperation search = conn.processSearch(request);
3255    if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT)
3256    {
3257      // if the base entry does not exist look for the generationID
3258      // in the config entry.
3259      request.setName(config.dn());
3260      search = conn.processSearch(request);
3261    }
3262
3263    boolean found = false;
3264    long aGenerationId = -1;
3265    if (search.getResultCode() != ResultCode.SUCCESS)
3266    {
3267      if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
3268      {
3269        String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage();
3270        logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN());
3271      }
3272    }
3273    else
3274    {
3275      List<SearchResultEntry> result = search.getSearchEntries();
3276      SearchResultEntry resultEntry = result.get(0);
3277      if (resultEntry != null)
3278      {
3279        AttributeType synchronizationGenIDType = DirectoryServer.getAttributeTypeOrNull(REPLICATION_GENERATION_ID);
3280        List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType);
3281        if (attrs != null)
3282        {
3283          Attribute attr = attrs.get(0);
3284          if (attr.size()>1)
3285          {
3286            String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString();
3287            logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg);
3288          }
3289          else if (attr.size() == 1)
3290          {
3291            found = true;
3292            try
3293            {
3294              aGenerationId = Long.decode(attr.iterator().next().toString());
3295            }
3296            catch(Exception e)
3297            {
3298              logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e));
3299            }
3300          }
3301        }
3302      }
3303    }
3304
3305    if (!found)
3306    {
3307      aGenerationId = computeGenerationId();
3308      saveGenerationId(aGenerationId);
3309
3310      if (logger.isTraceEnabled())
3311      {
3312        logger.trace("Generation ID created for domain baseDN=" + getBaseDN() + " generationId=" + aGenerationId);
3313      }
3314    }
3315    else
3316    {
3317      generationIdSavedStatus = true;
3318      if (logger.isTraceEnabled())
3319      {
3320        logger.trace("Generation ID successfully read from domain baseDN=" + getBaseDN()
3321            + " generationId=" + aGenerationId);
3322      }
3323    }
3324    return aGenerationId;
3325  }
3326
3327  /**
3328   * Do whatever is needed when a backup is started.
3329   * We need to make sure that the serverState is correctly save.
3330   */
3331  void backupStart()
3332  {
3333    state.save();
3334  }
3335
3336  /** Do whatever is needed when a backup is finished. */
3337  void backupEnd()
3338  {
3339    // Nothing is needed at the moment
3340  }
3341
3342  /*
3343   * Total Update >>
3344   */
3345
3346  /**
3347   * This method trigger an export of the replicated data.
3348   *
3349   * @param output               The OutputStream where the export should
3350   *                             be produced.
3351   * @throws DirectoryException  When needed.
3352   */
3353  @Override
3354  protected void exportBackend(OutputStream output) throws DirectoryException
3355  {
3356    exportBackend(output, false);
3357  }
3358
3359  /**
3360   * Export the entries from the backend and/or compute the generation ID.
3361   * The ieContext must have been set before calling.
3362   *
3363   * @param output              The OutputStream where the export should
3364   *                            be produced.
3365   * @param checksumOutput      A boolean indicating if this export is
3366   *                            invoked to perform a checksum only
3367   *
3368   * @return The computed       GenerationID.
3369   *
3370   * @throws DirectoryException when an error occurred
3371   */
3372  private long exportBackend(OutputStream output, boolean checksumOutput)
3373      throws DirectoryException
3374  {
3375    Backend<?> backend = getBackend();
3376
3377    //  Acquire a shared lock for the backend.
3378    try
3379    {
3380      String lockFile = LockFileManager.getBackendLockFileName(backend);
3381      StringBuilder failureReason = new StringBuilder();
3382      if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
3383      {
3384        LocalizableMessage message =
3385            ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason);
3386        logger.error(message);
3387        throw new DirectoryException(ResultCode.OTHER, message);
3388      }
3389    }
3390    catch (Exception e)
3391    {
3392      LocalizableMessage message =
3393          ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(),
3394              stackTraceToSingleLineString(e));
3395      logger.error(message);
3396      throw new DirectoryException(ResultCode.OTHER, message);
3397    }
3398
3399    long numberOfEntries = backend.getNumberOfEntriesInBaseDN(getBaseDN());
3400    long entryCount = Math.min(numberOfEntries, 1000);
3401    OutputStream os;
3402    ReplLDIFOutputStream ros = null;
3403    if (checksumOutput)
3404    {
3405      ros = new ReplLDIFOutputStream(entryCount);
3406      os = ros;
3407      try
3408      {
3409        os.write(Long.toString(numberOfEntries).getBytes());
3410      }
3411      catch(Exception e)
3412      {
3413        // Should never happen
3414      }
3415    }
3416    else
3417    {
3418      os = output;
3419    }
3420
3421    // baseDN branch is the only one included in the export
3422    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
3423    exportConfig.setIncludeBranches(newArrayList(getBaseDN()));
3424
3425    // For the checksum computing mode, only consider the 'stable' attributes
3426    if (checksumOutput)
3427    {
3428      String includeAttributeStrings[] = { "objectclass", "sn", "cn", "entryuuid" };
3429      Set<AttributeType> includeAttributes = new HashSet<>();
3430      for (String attrName : includeAttributeStrings)
3431      {
3432        includeAttributes.add(DirectoryServer.getAttributeTypeOrDefault(attrName));
3433      }
3434      exportConfig.setIncludeAttributes(includeAttributes);
3435    }
3436
3437    //  Launch the export.
3438    long genID = 0;
3439    try
3440    {
3441      backend.exportLDIF(exportConfig);
3442    }
3443    catch (DirectoryException de)
3444    {
3445      if (ros == null || ros.getNumExportedEntries() < entryCount)
3446      {
3447        LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
3448        logger.error(message);
3449        throw new DirectoryException(ResultCode.OTHER, message);
3450      }
3451    }
3452    catch (Exception e)
3453    {
3454      LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(stackTraceToSingleLineString(e));
3455      logger.error(message);
3456      throw new DirectoryException(ResultCode.OTHER, message);
3457    }
3458    finally
3459    {
3460      // Clean up after the export by closing the export config.
3461      // Will also flush the export and export the remaining entries.
3462      exportConfig.close();
3463
3464      if (checksumOutput)
3465      {
3466        genID = ros.getChecksumValue();
3467      }
3468
3469      //  Release the shared lock on the backend.
3470      try
3471      {
3472        String lockFile = LockFileManager.getBackendLockFileName(backend);
3473        StringBuilder failureReason = new StringBuilder();
3474        if (! LockFileManager.releaseLock(lockFile, failureReason))
3475        {
3476          LocalizableMessage message =
3477              WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason);
3478          logger.warn(message);
3479          throw new DirectoryException(ResultCode.OTHER, message);
3480        }
3481      }
3482      catch (Exception e)
3483      {
3484        LocalizableMessage message =
3485            WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(),
3486                stackTraceToSingleLineString(e));
3487        logger.warn(message);
3488        throw new DirectoryException(ResultCode.OTHER, message);
3489      }
3490    }
3491    return genID;
3492  }
3493
3494  /**
3495   * Process backend before import.
3496   *
3497   * @param backend
3498   *          The backend.
3499   * @throws DirectoryException
3500   *           If the backend could not be disabled or locked exclusively.
3501   */
3502  private void preBackendImport(Backend<?> backend) throws DirectoryException
3503  {
3504    // Stop saving state
3505    stateSavingDisabled = true;
3506
3507    // Prevent the processing of the backend finalisation event as the import will disable the attached backend
3508    ignoreBackendInitializationEvent = true;
3509
3510    // FIXME setBackendEnabled should be part of TaskUtils ?
3511    TaskUtils.disableBackend(backend.getBackendID());
3512
3513    // Acquire an exclusive lock for the backend.
3514    String lockFile = LockFileManager.getBackendLockFileName(backend);
3515    StringBuilder failureReason = new StringBuilder();
3516    if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
3517    {
3518      LocalizableMessage message = ERR_INIT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason);
3519      logger.error(message);
3520      throw new DirectoryException(ResultCode.OTHER, message);
3521    }
3522  }
3523
3524  /**
3525   * This method triggers an import of the replicated data.
3526   *
3527   * @param input                The InputStream from which the data are read.
3528   * @throws DirectoryException  When needed.
3529   */
3530  @Override
3531  protected void importBackend(InputStream input) throws DirectoryException
3532  {
3533    Backend<?> backend = getBackend();
3534
3535    LDIFImportConfig importConfig = null;
3536    ImportExportContext ieCtx = getImportExportContext();
3537    try
3538    {
3539      if (!backend.supports(BackendOperation.LDIF_IMPORT))
3540      {
3541        ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
3542            ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
3543        return;
3544      }
3545
3546      importConfig = new LDIFImportConfig(input);
3547      importConfig.setIncludeBranches(newLinkedHashSet(getBaseDN()));
3548      importConfig.setAppendToExistingData(false);
3549      importConfig.setSkipDNValidation(true);
3550      // We should not validate schema for replication
3551      importConfig.setValidateSchema(false);
3552      // Allow fractional replication ldif import plugin to be called
3553      importConfig.setInvokeImportPlugins(true);
3554      // Reset the follow import flag and message before starting the import
3555      importErrorMessageId = -1;
3556
3557      // TODO How to deal with rejected entries during the import
3558      File rejectsFile =
3559          getFileForPath("logs" + File.separator + "replInitRejectedEntries");
3560      importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(),
3561          ExistingFileBehavior.OVERWRITE);
3562
3563      // Process import
3564      preBackendImport(backend);
3565      backend.importLDIF(importConfig, DirectoryServer.getInstance().getServerContext());
3566
3567      stateSavingDisabled = false;
3568    }
3569    catch(Exception e)
3570    {
3571      ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
3572          ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e))));
3573    }
3574    finally
3575    {
3576      try
3577      {
3578        // Cleanup
3579        if (importConfig != null)
3580        {
3581          importConfig.close();
3582          closeBackendImport(backend); // Re-enable backend
3583          backend = getBackend();
3584        }
3585
3586        loadDataState();
3587
3588        if (ieCtx.getException() != null)
3589        {
3590          // When an error occurred during an import, most of times
3591          // the generationId coming in the root entry of the imported data,
3592          // is not valid anymore (partial data in the backend).
3593          generationId = computeGenerationId();
3594          saveGenerationId(generationId);
3595        }
3596      }
3597      catch (DirectoryException fe)
3598      {
3599        // If we already catch an Exception it's quite possible
3600        // that the loadDataState() and setGenerationId() fail
3601        // so we don't bother about the new Exception.
3602        // However if there was no Exception before we want
3603        // to return this Exception to the task creator.
3604        ieCtx.setExceptionIfNoneSet(new DirectoryException(
3605            ResultCode.OTHER,
3606            ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe))));
3607      }
3608    }
3609
3610    if (ieCtx.getException() != null)
3611    {
3612      throw ieCtx.getException();
3613    }
3614  }
3615
3616  /**
3617   * Make post import operations.
3618   * @param backend The backend implied in the import.
3619   * @exception DirectoryException Thrown when an error occurs.
3620   */
3621  private void closeBackendImport(Backend<?> backend) throws DirectoryException
3622  {
3623    String lockFile = LockFileManager.getBackendLockFileName(backend);
3624    StringBuilder failureReason = new StringBuilder();
3625
3626    // Release lock
3627    if (!LockFileManager.releaseLock(lockFile, failureReason))
3628    {
3629      LocalizableMessage message =
3630          WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason);
3631      logger.warn(message);
3632      throw new DirectoryException(ResultCode.OTHER, message);
3633    }
3634
3635    TaskUtils.enableBackend(backend.getBackendID());
3636
3637    // Restore the processing of backend finalization events.
3638    ignoreBackendInitializationEvent = false;
3639
3640  }
3641
3642  /**
3643   * Retrieves a replication domain based on the baseDN.
3644   *
3645   * @param baseDN The baseDN of the domain to retrieve
3646   * @return The domain retrieved
3647   * @throws DirectoryException When an error occurred or no domain
3648   * match the provided baseDN.
3649   */
3650  public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDN)
3651      throws DirectoryException
3652  {
3653    LDAPReplicationDomain replicationDomain = null;
3654
3655    // Retrieves the domain
3656    for (SynchronizationProvider<?> provider :
3657      DirectoryServer.getSynchronizationProviders())
3658    {
3659      if (!(provider instanceof MultimasterReplication))
3660      {
3661        LocalizableMessage message = ERR_INVALID_PROVIDER.get();
3662        throw new DirectoryException(ResultCode.OTHER, message);
3663      }
3664
3665      // From the domainDN retrieves the replication domain
3666      LDAPReplicationDomain domain =
3667        MultimasterReplication.findDomain(baseDN, null);
3668      if (domain == null)
3669      {
3670        break;
3671      }
3672      if (replicationDomain != null)
3673      {
3674        // Should never happen
3675        LocalizableMessage message = ERR_MULTIPLE_MATCHING_DOMAIN.get();
3676        throw new DirectoryException(ResultCode.OTHER, message);
3677      }
3678      replicationDomain = domain;
3679    }
3680
3681    if (replicationDomain == null)
3682    {
3683      throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(baseDN));
3684    }
3685    return replicationDomain;
3686  }
3687
3688  /**
3689   * Returns the backend associated to this domain.
3690   * @return The associated backend.
3691   */
3692  private Backend<?> getBackend()
3693  {
3694    return DirectoryServer.getBackend(getBaseDN());
3695  }
3696
3697  /*
3698   * <<Total Update
3699   */
3700
3701  /**
3702   * Push the schema modifications contained in the given parameter as a
3703   * modification that would happen on a local server. The modifications are not
3704   * applied to the local schema backend and historical information is not
3705   * updated; but a CSN is generated and the ServerState associated to the
3706   * schema domain is updated.
3707   *
3708   * @param modifications
3709   *          The schema modifications to push
3710   */
3711  void synchronizeSchemaModifications(List<Modification> modifications)
3712  {
3713    ModifyOperation op = new ModifyOperationBasis(
3714        conn, nextOperationID(), nextMessageID(), null,
3715        DirectoryServer.getSchemaDN(), modifications);
3716
3717    final Entry schema;
3718    try
3719    {
3720      schema = DirectoryServer.getEntry(DirectoryServer.getSchemaDN());
3721    }
3722    catch (DirectoryException e)
3723    {
3724      logger.traceException(e);
3725      logger.error(ERR_BACKEND_SEARCH_ENTRY.get(DirectoryServer.getSchemaDN().toString(),
3726              stackTraceToSingleLineString(e)));
3727      return;
3728    }
3729
3730    LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op);
3731    CSN csn = generateCSN(localOp);
3732    OperationContext ctx = new ModifyContext(csn, getEntryUUID(schema));
3733    localOp.setAttachment(SYNCHROCONTEXT, ctx);
3734    localOp.setResultCode(ResultCode.SUCCESS);
3735    synchronize(localOp);
3736  }
3737
3738  /**
3739   * Check if the provided configuration is acceptable for add.
3740   *
3741   * @param configuration The configuration to check.
3742   * @param unacceptableReasons When the configuration is not acceptable, this
3743   *                            table is use to return the reasons why this
3744   *                            configuration is not acceptable.
3745   *
3746   * @return true if the configuration is acceptable, false other wise.
3747   */
3748  static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration,
3749      List<LocalizableMessage> unacceptableReasons)
3750  {
3751    // Check that there is not already a domain with the same DN
3752    final DN dn = configuration.getBaseDN();
3753    LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
3754    if (domain != null && domain.getBaseDN().equals(dn))
3755    {
3756      unacceptableReasons.add(ERR_SYNC_INVALID_DN.get());
3757      return false;
3758    }
3759
3760    // Check that the base DN is configured as a base-dn of the directory server
3761    if (DirectoryServer.getBackend(dn) == null)
3762    {
3763      unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn));
3764      return false;
3765    }
3766
3767    // Check fractional configuration
3768    try
3769    {
3770      isFractionalConfigAcceptable(configuration);
3771    } catch (ConfigException e)
3772    {
3773      unacceptableReasons.add(e.getMessageObject());
3774      return false;
3775    }
3776
3777    return true;
3778  }
3779
3780  @Override
3781  public ConfigChangeResult applyConfigurationChange(
3782         ReplicationDomainCfg configuration)
3783  {
3784    this.config = configuration;
3785    changeConfig(configuration);
3786
3787    // Read assured + fractional configuration and each time reconnect if needed
3788    readAssuredConfig(configuration, true);
3789    readFractionalConfig(configuration, true);
3790
3791    solveConflictFlag = isSolveConflict(configuration);
3792
3793    final ConfigChangeResult ccr = new ConfigChangeResult();
3794    try
3795    {
3796      storeECLConfiguration(configuration);
3797    }
3798    catch(Exception e)
3799    {
3800      ccr.setResultCode(ResultCode.OTHER);
3801    }
3802    return ccr;
3803  }
3804
3805  @Override
3806  public boolean isConfigurationChangeAcceptable(
3807         ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
3808  {
3809    // Check that a import/export is not in progress
3810    if (ieRunning())
3811    {
3812      unacceptableReasons.add(
3813          NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get());
3814      return false;
3815    }
3816
3817    // Check fractional configuration
3818    try
3819    {
3820      isFractionalConfigAcceptable(configuration);
3821      return true;
3822    }
3823    catch (ConfigException e)
3824    {
3825      unacceptableReasons.add(e.getMessageObject());
3826      return false;
3827    }
3828  }
3829
3830  @Override
3831  public Map<String, String> getAlerts()
3832  {
3833    Map<String, String> alerts = new LinkedHashMap<>();
3834
3835    alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT,
3836               ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT);
3837    return alerts;
3838  }
3839
3840  @Override
3841  public String getClassName()
3842  {
3843    return CLASS_NAME;
3844  }
3845
3846  @Override
3847  public DN getComponentEntryDN()
3848  {
3849    return config.dn();
3850  }
3851
3852  /** Starts the Replication Domain. */
3853  public void start()
3854  {
3855    // Create the ServerStateFlush thread
3856    flushThread.start();
3857
3858    startListenService();
3859  }
3860
3861  /** Remove the configuration of the external changelog from this domain configuration. */
3862  private void removeECLDomainCfg()
3863  {
3864    try
3865    {
3866      DN eclConfigEntryDN = DN.valueOf("cn=external changeLog," + config.dn());
3867      if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
3868      {
3869        DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null);
3870      }
3871    }
3872    catch(Exception e)
3873    {
3874      logger.traceException(e);
3875      logger.error(ERR_CHECK_CREATE_REPL_BACKEND_FAILED, stackTraceToSingleLineString(e));
3876    }
3877  }
3878
3879  /**
3880   * Store the provided ECL configuration for the domain.
3881   * @param  domCfg       The provided configuration.
3882   * @throws ConfigException When an error occurred.
3883   */
3884  private void storeECLConfiguration(ReplicationDomainCfg domCfg)
3885      throws ConfigException
3886  {
3887    ExternalChangelogDomainCfg eclDomCfg = null;
3888    // create the ecl config if it does not exist
3889    // There may not be any config entry related to this domain in some
3890    // unit test cases
3891    try
3892    {
3893      DN configDn = config.dn();
3894      if (DirectoryServer.getConfigHandler().entryExists(configDn))
3895      {
3896        try
3897        { eclDomCfg = domCfg.getExternalChangelogDomain();
3898        } catch(Exception e) { /* do nothing */ }
3899        // domain with no config entry only when running unit tests
3900        if (eclDomCfg == null)
3901        {
3902          // no ECL config provided hence create a default one
3903          // create the default one
3904          DN eclConfigEntryDN = DN.valueOf("cn=external changelog," + configDn);
3905          if (!DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
3906          {
3907            // no entry exist yet for the ECL config for this domain
3908            // create it
3909            String ldif = makeLdif(
3910                "dn: cn=external changelog," + configDn,
3911                "objectClass: top",
3912                "objectClass: ds-cfg-external-changelog-domain",
3913                "cn: external changelog",
3914                "ds-cfg-enabled: " + !getBackend().isPrivateBackend());
3915            LDIFImportConfig ldifImportConfig = new LDIFImportConfig(
3916                new StringReader(ldif));
3917            // No need to validate schema in replication
3918            ldifImportConfig.setValidateSchema(false);
3919            LDIFReader reader = new LDIFReader(ldifImportConfig);
3920            Entry eclEntry = reader.readEntry();
3921            DirectoryServer.getConfigHandler().addEntry(eclEntry, null);
3922            ldifImportConfig.close();
3923          }
3924        }
3925      }
3926      eclDomCfg = domCfg.getExternalChangelogDomain();
3927      if (eclDomain != null)
3928      {
3929        eclDomain.applyConfigurationChange(eclDomCfg);
3930      }
3931      else
3932      {
3933        // Create the ECL domain object
3934        eclDomain = new ExternalChangelogDomain(this, eclDomCfg);
3935      }
3936    }
3937    catch (Exception e)
3938    {
3939      throw new ConfigException(NOTE_ERR_UNABLE_TO_ENABLE_ECL.get(
3940          "Replication Domain on " + getBaseDN(), stackTraceToSingleLineString(e)), e);
3941    }
3942  }
3943
3944  private static String makeLdif(String... lines)
3945  {
3946    final StringBuilder buffer = new StringBuilder();
3947    for (String line : lines) {
3948      buffer.append(line).append(EOL);
3949    }
3950    // Append an extra line so we can append LDIF Strings.
3951    buffer.append(EOL);
3952    return buffer.toString();
3953  }
3954
3955  @Override
3956  public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
3957  {
3958    // Check domain fractional configuration consistency with local
3959    // configuration variables
3960    forceBadDataSet = !isBackendFractionalConfigConsistent();
3961
3962    super.sessionInitiated(initStatus, rsState);
3963
3964    // Now for bad data set status if needed
3965    if (forceBadDataSet)
3966    {
3967      signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
3968      logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDN());
3969      return; // Do not send changes to the replication server
3970    }
3971
3972    try
3973    {
3974      /*
3975       * We must not publish changes to a replicationServer that has
3976       * not seen all our previous changes because this could cause
3977       * some other ldap servers to miss those changes.
3978       * Check that the ReplicationServer has seen all our previous
3979       * changes.
3980       */
3981      CSN replServerMaxCSN = rsState.getCSN(getServerId());
3982
3983      // we don't want to update from here (a DS) an empty RS because
3984      // normally the RS should have been updated by other RSes except for
3985      // very last changes lost if the local connection was broken
3986      // ... hence the RS we are connected to should not be empty
3987      // ... or if it is empty, it is due to a voluntary reset
3988      // and we don't want to update it with our changes that could be huge.
3989      if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0)
3990      {
3991        CSN ourMaxCSN = state.getMaxCSN(getServerId());
3992        if (ourMaxCSN != null
3993            && !ourMaxCSN.isOlderThanOrEqualTo(replServerMaxCSN))
3994        {
3995          pendingChanges.setRecovering(true);
3996          broker.setRecoveryRequired(true);
3997          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
3998          if (this.rsUpdater.compareAndSet(null, rsUpdater))
3999          {
4000            rsUpdater.start();
4001          }
4002        }
4003      }
4004    } catch (Exception e)
4005    {
4006      logger.error(ERR_PUBLISHING_FAKE_OPS, getBaseDN(), stackTraceToSingleLineString(e));
4007    }
4008  }
4009
4010  /**
4011   * Build the list of changes that have been processed by this server after the
4012   * CSN given as a parameter and publish them using the given session.
4013   *
4014   * @param startCSN
4015   *          The CSN where we need to start the search
4016   * @param session
4017   *          The session to use to publish the changes
4018   * @return A boolean indicating he success of the operation.
4019   * @throws Exception
4020   *           if an Exception happens during the search.
4021   */
4022  boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session)
4023      throws Exception
4024  {
4025    // Trim the changes in replayOperations that are older than the startCSN.
4026    synchronized (replayOperations)
4027    {
4028      Iterator<CSN> it = replayOperations.keySet().iterator();
4029      while (it.hasNext())
4030      {
4031        if (shutdown.get())
4032        {
4033          return false;
4034        }
4035        if (it.next().isNewerThan(startCSN))
4036        {
4037          break;
4038        }
4039        it.remove();
4040      }
4041    }
4042
4043    CSN lastRetrievedChange;
4044    InternalSearchOperation op;
4045    CSN currentStartCSN = startCSN;
4046    do
4047    {
4048      if (shutdown.get())
4049      {
4050        return false;
4051      }
4052
4053      lastRetrievedChange = null;
4054      // We can't do the search in one go because we need to store the results
4055      // so that we are sure we send the operations in order and because the
4056      // list might be large.
4057      // So we search by interval of 10 seconds and store the results in the
4058      // replayOperations list so that they are sorted before sending them.
4059      long missingChangesDelta = currentStartCSN.getTime() + 10000;
4060      CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, getServerId());
4061
4062      ScanSearchListener listener =
4063        new ScanSearchListener(currentStartCSN, endCSN);
4064      op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN,
4065              listener);
4066
4067      // Publish and remove all the changes from the replayOperations list
4068      // that are older than the endCSN.
4069      final List<FakeOperation> opsToSend = new LinkedList<>();
4070      synchronized (replayOperations)
4071      {
4072        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
4073        while (itOp.hasNext())
4074        {
4075          if (shutdown.get())
4076          {
4077            return false;
4078          }
4079          FakeOperation fakeOp = itOp.next();
4080          if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
4081              || !state.cover(fakeOp.getCSN())
4082              // do not look for replay operations in the future
4083              || currentStartCSN.isNewerThan(now()))
4084          {
4085            break;
4086          }
4087
4088          lastRetrievedChange = fakeOp.getCSN();
4089          opsToSend.add(fakeOp);
4090          itOp.remove();
4091        }
4092      }
4093
4094      for (FakeOperation opToSend : opsToSend)
4095      {
4096        if (shutdown.get())
4097        {
4098          return false;
4099        }
4100        session.publishRecovery(opToSend.generateMessage());
4101      }
4102
4103      if (lastRetrievedChange != null)
4104      {
4105        if (logger.isInfoEnabled())
4106        {
4107          logger.info(LocalizableMessage.raw("publish loop"
4108                  + " >=" + currentStartCSN + " <=" + endCSN
4109                  + " nentries=" + op.getEntriesSent()
4110                  + " result=" + op.getResultCode()
4111                  + " lastRetrievedChange=" + lastRetrievedChange));
4112        }
4113        currentStartCSN = lastRetrievedChange;
4114      }
4115      else
4116      {
4117        if (logger.isInfoEnabled())
4118        {
4119          logger.info(LocalizableMessage.raw("publish loop"
4120                  + " >=" + currentStartCSN + " <=" + endCSN
4121                  + " nentries=" + op.getEntriesSent()
4122                  + " result=" + op.getResultCode()
4123                  + " no changes"));
4124        }
4125        currentStartCSN = endCSN;
4126      }
4127    } while (pendingChanges.recoveryUntil(currentStartCSN)
4128          && op.getResultCode().equals(ResultCode.SUCCESS));
4129
4130    return op.getResultCode().equals(ResultCode.SUCCESS);
4131  }
4132
4133  private static CSN now()
4134  {
4135    // ensure now() will always come last with isNewerThan() test,
4136    // even though the timestamp, or the timestamp and seqnum would be the same
4137    return new CSN(TimeThread.getTime(), Integer.MAX_VALUE, Integer.MAX_VALUE);
4138  }
4139
4140  /**
4141   * Search for the changes that happened since fromCSN based on the historical
4142   * attribute. The only changes that will be send will be the one generated on
4143   * the serverId provided in fromCSN.
4144   *
4145   * @param baseDN
4146   *          the base DN
4147   * @param fromCSN
4148   *          The CSN from which we want the changes
4149   * @param lastCSN
4150   *          The max CSN that the search should return
4151   * @param resultListener
4152   *          The listener that will process the entries returned
4153   * @return the internal search operation
4154   * @throws Exception
4155   *           when raised.
4156   */
4157  private static InternalSearchOperation searchForChangedEntries(DN baseDN,
4158      CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener)
4159      throws Exception
4160  {
4161    String maxValueForId;
4162    if (lastCSN == null)
4163    {
4164      final Integer serverId = fromCSN.getServerId();
4165      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
4166                      + "ffffffff";
4167    }
4168    else
4169    {
4170      maxValueForId = lastCSN.toString();
4171    }
4172
4173    String filter =
4174        "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" +
4175          "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))";
4176    SearchRequest request = Requests.newSearchRequest(baseDN, SearchScope.WHOLE_SUBTREE, filter)
4177        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
4178    return getRootConnection().processSearch(request, resultListener);
4179  }
4180
4181  /**
4182   * Search for the changes that happened since fromCSN based on the historical
4183   * attribute. The only changes that will be send will be the one generated on
4184   * the serverId provided in fromCSN.
4185   *
4186   * @param baseDN
4187   *          the base DN
4188   * @param fromCSN
4189   *          The CSN from which we want the changes
4190   * @param resultListener
4191   *          that will process the entries returned.
4192   * @return the internal search operation
4193   * @throws Exception
4194   *           when raised.
4195   */
4196  static InternalSearchOperation searchForChangedEntries(DN baseDN,
4197      CSN fromCSN, InternalSearchListener resultListener) throws Exception
4198  {
4199    return searchForChangedEntries(baseDN, fromCSN, null, resultListener);
4200  }
4201
4202  /**
4203   * This method should return the total number of objects in the
4204   * replicated domain.
4205   * This count will be used for reporting.
4206   *
4207   * @throws DirectoryException when needed.
4208   *
4209   * @return The number of objects in the replication domain.
4210   */
4211  @Override
4212  public long countEntries() throws DirectoryException
4213  {
4214    Backend<?> backend = getBackend();
4215    if (!backend.supports(BackendOperation.LDIF_EXPORT))
4216    {
4217      LocalizableMessage msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID());
4218      logger.error(msg);
4219      throw new DirectoryException(ResultCode.OTHER, msg);
4220    }
4221
4222    return backend.getNumberOfEntriesInBaseDN(getBaseDN());
4223  }
4224
4225  @Override
4226  public boolean processUpdate(UpdateMsg updateMsg)
4227  {
4228    // Ignore message if fractional configuration is inconsistent and
4229    // we have been passed into bad data set status
4230    if (forceBadDataSet)
4231    {
4232      return false;
4233    }
4234
4235    if (updateMsg instanceof LDAPUpdateMsg)
4236    {
4237      LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg;
4238
4239      // Put the UpdateMsg in the RemotePendingChanges list.
4240      if (!remotePendingChanges.putRemoteUpdate(msg))
4241      {
4242        /*
4243         * Already received this change so ignore it. This may happen if there
4244         * are uncommitted changes in the queue and session failover occurs
4245         * causing a recovery of all changes since the current committed server
4246         * state. See OPENDJ-1115.
4247         */
4248        if (logger.isTraceEnabled())
4249        {
4250          logger.trace(
4251                  "LDAPReplicationDomain.processUpdate: ignoring "
4252                  + "duplicate change %s", msg.getCSN());
4253        }
4254        return true;
4255      }
4256
4257      // Put update message into the replay queue
4258      // (block until some place in the queue is available)
4259      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
4260      while (!isListenerShuttingDown())
4261      {
4262        // loop until we can offer to the queue or shutdown was initiated
4263        try
4264        {
4265          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
4266          {
4267            // successful offer to the queue, let's exit the loop
4268            break;
4269          }
4270        }
4271        catch (InterruptedException e)
4272        {
4273          // Thread interrupted: check for shutdown.
4274          Thread.currentThread().interrupt();
4275        }
4276      }
4277
4278      return false;
4279    }
4280
4281    // unknown message type, this should not happen, just ignore it.
4282    return true;
4283  }
4284
4285  /**
4286   * Monitoring information for the LDAPReplicationDomain.
4287   *
4288   * @return Monitoring attributes specific to the LDAPReplicationDomain.
4289   */
4290  @Override
4291  public Collection<Attribute> getAdditionalMonitoring()
4292  {
4293    List<Attribute> attributes = new ArrayList<>();
4294
4295    // number of updates in the pending list
4296    addMonitorData(attributes, "pending-updates", pendingChanges.size());
4297
4298    addMonitorData(attributes, "replayed-updates-ok",
4299        numReplayedPostOpCalled.get());
4300    addMonitorData(attributes, "resolved-modify-conflicts",
4301        numResolvedModifyConflicts.get());
4302    addMonitorData(attributes, "resolved-naming-conflicts",
4303        numResolvedNamingConflicts.get());
4304    addMonitorData(attributes, "unresolved-naming-conflicts",
4305        numUnresolvedNamingConflicts.get());
4306    addMonitorData(attributes, "remote-pending-changes-size",
4307        remotePendingChanges.getQueueSize());
4308
4309    return attributes;
4310  }
4311
4312  /**
4313   * Verifies that the given string represents a valid source
4314   * from which this server can be initialized.
4315   * @param sourceString The string representing the source
4316   * @return The source as a integer value
4317   * @throws DirectoryException if the string is not valid
4318   */
4319  public int decodeSource(String sourceString) throws DirectoryException
4320  {
4321    int source = 0;
4322    try
4323    {
4324      source = Integer.decode(sourceString);
4325      if (source >= -1 && source != getServerId())
4326      {
4327        // TODO Verifies serverID is in the domain
4328        // We should check here that this is a server implied
4329        // in the current domain.
4330        return source;
4331      }
4332    }
4333    catch (Exception e)
4334    {
4335      LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(
4336          getBaseDN(), getServerId(), sourceString, stackTraceToSingleLineString(e));
4337      throw new DirectoryException(ResultCode.OTHER, message, e);
4338    }
4339
4340    LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDN(), getServerId(), source, "");
4341    throw new DirectoryException(ResultCode.OTHER, message);
4342  }
4343
4344  /**
4345   * Called by synchronize post op plugin in order to add the entry historical
4346   * attributes to the UpdateMsg.
4347   * @param msg an replication update message
4348   * @param op  the operation in progress
4349   */
4350  private void addEntryAttributesForCL(UpdateMsg msg,
4351      PostOperationOperation op)
4352  {
4353    if (op instanceof PostOperationDeleteOperation)
4354    {
4355      PostOperationDeleteOperation delOp = (PostOperationDeleteOperation) op;
4356      final Set<String> names = getEclIncludesForDeletes();
4357      Entry entry = delOp.getEntryToDelete();
4358      final DeleteMsg deleteMsg = (DeleteMsg) msg;
4359      deleteMsg.setEclIncludes(getIncludedAttributes(entry, names));
4360
4361      // For delete only, add the Authorized DN since it's required in the
4362      // ECL entry but is not part of rest of the message.
4363      DN deleterDN = delOp.getAuthorizationDN();
4364      if (deleterDN != null)
4365      {
4366        deleteMsg.setInitiatorsName(deleterDN.toString());
4367      }
4368    }
4369    else if (op instanceof PostOperationModifyOperation)
4370    {
4371      PostOperationModifyOperation modOp = (PostOperationModifyOperation) op;
4372      Set<String> names = getEclIncludes();
4373      Entry entry = modOp.getCurrentEntry();
4374      ((ModifyMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4375    }
4376    else if (op instanceof PostOperationModifyDNOperation)
4377    {
4378      PostOperationModifyDNOperation modDNOp =
4379        (PostOperationModifyDNOperation) op;
4380      Set<String> names = getEclIncludes();
4381      Entry entry = modDNOp.getOriginalEntry();
4382      ((ModifyDNMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4383    }
4384    else if (op instanceof PostOperationAddOperation)
4385    {
4386      PostOperationAddOperation addOp = (PostOperationAddOperation) op;
4387      Set<String> names = getEclIncludes();
4388      Entry entry = addOp.getEntryToAdd();
4389      ((AddMsg) msg).setEclIncludes(getIncludedAttributes(entry, names));
4390    }
4391  }
4392
4393  private Collection<Attribute> getIncludedAttributes(Entry entry,
4394      Set<String> names)
4395  {
4396    if (names.isEmpty())
4397    {
4398      // Fast-path.
4399      return Collections.emptySet();
4400    }
4401    else if (names.size() == 1 && names.contains("*"))
4402    {
4403      // Potential fast-path for delete operations.
4404      List<Attribute> attributes = new LinkedList<>();
4405      for (List<Attribute> attributeList : entry.getUserAttributes().values())
4406      {
4407        attributes.addAll(attributeList);
4408      }
4409      Attribute objectClassAttribute = entry.getObjectClassAttribute();
4410      if (objectClassAttribute != null)
4411      {
4412        attributes.add(objectClassAttribute);
4413      }
4414      return attributes;
4415    }
4416    else
4417    {
4418      // Expand @objectclass references in attribute list if needed.
4419      // We do this now in order to take into account dynamic schema changes.
4420      final Set<String> expandedNames = getExpandedNames(names);
4421      final Entry filteredEntry =
4422          entry.filterEntry(expandedNames, false, false, false);
4423      return filteredEntry.getAttributes();
4424    }
4425  }
4426
4427  private Set<String> getExpandedNames(Set<String> names)
4428  {
4429    // Only rebuild the attribute set if necessary.
4430    if (!needsExpanding(names))
4431    {
4432      return names;
4433    }
4434
4435    final Set<String> expandedNames = new HashSet<>(names.size());
4436    for (String name : names)
4437    {
4438      if (name.startsWith("@"))
4439      {
4440        String ocName = name.substring(1);
4441        ObjectClass objectClass =
4442            DirectoryServer.getObjectClass(toLowerCase(ocName));
4443        if (objectClass != null)
4444        {
4445          for (AttributeType at : objectClass.getRequiredAttributeChain())
4446          {
4447            expandedNames.add(at.getNameOrOID());
4448          }
4449          for (AttributeType at : objectClass.getOptionalAttributeChain())
4450          {
4451            expandedNames.add(at.getNameOrOID());
4452          }
4453        }
4454      }
4455      else
4456      {
4457        expandedNames.add(name);
4458      }
4459    }
4460    return expandedNames;
4461  }
4462
4463  private boolean needsExpanding(Set<String> names)
4464  {
4465    for (String name : names)
4466    {
4467      if (name.startsWith("@"))
4468      {
4469        return true;
4470      }
4471    }
4472    return false;
4473  }
4474
4475  /**
4476   * Gets the fractional configuration of this domain.
4477   * @return The fractional configuration of this domain.
4478   */
4479  FractionalConfig getFractionalConfig()
4480  {
4481    return fractionalConfig;
4482  }
4483
4484  /**
4485   * This bean is a utility class used for holding the parsing
4486   * result of a fractional configuration. It also contains some facility
4487   * methods like fractional configuration comparison...
4488   */
4489  static class FractionalConfig
4490  {
4491    /**
4492     * Tells if fractional replication is enabled or not (some fractional
4493     * constraints have been put in place). If this is true then
4494     * fractionalExclusive explains the configuration mode and either
4495     * fractionalSpecificClassesAttributes or fractionalAllClassesAttributes or
4496     * both should be filled with something.
4497     */
4498    private boolean fractional;
4499
4500    /**
4501     * - If true, tells that the configured fractional replication is exclusive:
4502     * Every attributes contained in fractionalSpecificClassesAttributes and
4503     * fractionalAllClassesAttributes should be ignored when replaying operation
4504     * in local backend.
4505     * - If false, tells that the configured fractional replication is
4506     * inclusive:
4507     * Only attributes contained in fractionalSpecificClassesAttributes and
4508     * fractionalAllClassesAttributes should be taken into account in local
4509     * backend.
4510     */
4511    private boolean fractionalExclusive = true;
4512
4513    /**
4514     * Used in fractional replication: holds attributes of a specific object class.
4515     * - key = object class (name or OID of the class)
4516     * - value = the attributes of that class that should be taken into account
4517     * (inclusive or exclusive fractional replication) (name or OID of the
4518     * attribute)
4519     * When an operation coming from the network is to be locally replayed, if
4520     * the concerned entry has an objectClass attribute equals to 'key':
4521     * - inclusive mode: only the attributes in 'value' will be added/deleted/modified
4522     * - exclusive mode: the attributes in 'value' will not be added/deleted/modified
4523     */
4524    private Map<String, Set<String>> fractionalSpecificClassesAttributes = new HashMap<>();
4525
4526    /**
4527     * Used in fractional replication: holds attributes of any object class.
4528     * When an operation coming from the network is to be locally replayed:
4529     * - inclusive mode: only attributes of the matching entry not present in
4530     * fractionalAllClassesAttributes will be added/deleted/modified
4531     * - exclusive mode: attributes of the matching entry present in
4532     * fractionalAllClassesAttributes will not be added/deleted/modified
4533     * The attributes may be in human readable form of OID form.
4534     */
4535    private Set<String> fractionalAllClassesAttributes = new HashSet<>();
4536
4537    /** Base DN the fractional configuration is for. */
4538    private final DN baseDN;
4539
4540    /**
4541     * Constructs a new fractional configuration object.
4542     * @param baseDN The base DN the object is for.
4543     */
4544    private FractionalConfig(DN baseDN)
4545    {
4546      this.baseDN = baseDN;
4547    }
4548
4549    /**
4550     * Getter for fractional.
4551     * @return True if the configuration has fractional enabled
4552     */
4553    boolean isFractional()
4554    {
4555      return fractional;
4556    }
4557
4558    /**
4559     * Set the fractional parameter.
4560     * @param fractional The fractional parameter
4561     */
4562    private void setFractional(boolean fractional)
4563    {
4564      this.fractional = fractional;
4565    }
4566
4567    /**
4568     * Getter for fractionalExclusive.
4569     * @return True if the configuration has fractional exclusive enabled
4570     */
4571    boolean isFractionalExclusive()
4572    {
4573      return fractionalExclusive;
4574    }
4575
4576    /**
4577     * Set the fractionalExclusive parameter.
4578     * @param fractionalExclusive The fractionalExclusive parameter
4579     */
4580    private void setFractionalExclusive(boolean fractionalExclusive)
4581    {
4582      this.fractionalExclusive = fractionalExclusive;
4583    }
4584
4585    /**
4586     * Getter for fractionalSpecificClassesAttributes attribute.
4587     * @return The fractionalSpecificClassesAttributes attribute.
4588     */
4589    Map<String, Set<String>> getFractionalSpecificClassesAttributes()
4590    {
4591      return fractionalSpecificClassesAttributes;
4592    }
4593
4594    /**
4595     * Set the fractionalSpecificClassesAttributes parameter.
4596     * @param fractionalSpecificClassesAttributes The
4597     * fractionalSpecificClassesAttributes parameter to set.
4598     */
4599    private void setFractionalSpecificClassesAttributes(
4600        Map<String, Set<String>> fractionalSpecificClassesAttributes)
4601    {
4602      this.fractionalSpecificClassesAttributes =
4603        fractionalSpecificClassesAttributes;
4604    }
4605
4606    /**
4607     * Getter for fractionalSpecificClassesAttributes attribute.
4608     * @return The fractionalSpecificClassesAttributes attribute.
4609     */
4610    Set<String> getFractionalAllClassesAttributes()
4611    {
4612      return fractionalAllClassesAttributes;
4613    }
4614
4615    /**
4616     * Set the fractionalAllClassesAttributes parameter.
4617     * @param fractionalAllClassesAttributes The
4618     * fractionalSpecificClassesAttributes parameter to set.
4619     */
4620    private void setFractionalAllClassesAttributes(
4621        Set<String> fractionalAllClassesAttributes)
4622    {
4623      this.fractionalAllClassesAttributes = fractionalAllClassesAttributes;
4624    }
4625
4626    /**
4627     * Getter for the base baseDN.
4628     * @return The baseDN attribute.
4629     */
4630    DN getBaseDn()
4631    {
4632      return baseDN;
4633    }
4634
4635    /**
4636     * Extract the fractional configuration from the passed domain configuration
4637     * entry.
4638     * @param configuration The configuration object
4639     * @return The fractional replication configuration.
4640     * @throws ConfigException If an error occurred.
4641     */
4642    static FractionalConfig toFractionalConfig(
4643      ReplicationDomainCfg configuration) throws ConfigException
4644    {
4645      // Prepare fractional configuration variables to parse
4646      Iterator<String> exclIt = configuration.getFractionalExclude().iterator();
4647      Iterator<String> inclIt = configuration.getFractionalInclude().iterator();
4648
4649      // Get potentially new fractional configuration
4650      Map<String, Set<String>> newFractionalSpecificClassesAttributes = new HashMap<>();
4651      Set<String> newFractionalAllClassesAttributes = new HashSet<>();
4652
4653      int newFractionalMode = parseFractionalConfig(exclIt, inclIt,
4654        newFractionalSpecificClassesAttributes,
4655        newFractionalAllClassesAttributes);
4656
4657      // Create matching parsed config object
4658      FractionalConfig result = new FractionalConfig(configuration.getBaseDN());
4659      switch (newFractionalMode)
4660      {
4661        case NOT_FRACTIONAL:
4662          result.setFractional(false);
4663          result.setFractionalExclusive(true);
4664          break;
4665        case EXCLUSIVE_FRACTIONAL:
4666        case INCLUSIVE_FRACTIONAL:
4667          result.setFractional(true);
4668          result.setFractionalExclusive(
4669              newFractionalMode == EXCLUSIVE_FRACTIONAL);
4670          break;
4671      }
4672      result.setFractionalSpecificClassesAttributes(
4673        newFractionalSpecificClassesAttributes);
4674      result.setFractionalAllClassesAttributes(
4675        newFractionalAllClassesAttributes);
4676      return result;
4677    }
4678
4679    /**
4680     * Parses a fractional replication configuration, filling the empty passed
4681     * variables and returning the used fractional mode. The 2 passed variables
4682     * to fill should be initialized (not null) and empty.
4683     * @param exclIt The list of fractional exclude configuration values (may be
4684     *               null)
4685     * @param inclIt The list of fractional include configuration values (may be
4686     *               null)
4687     * @param fractionalSpecificClassesAttributes An empty map to be filled with
4688     *        what is read from the fractional configuration properties.
4689     * @param fractionalAllClassesAttributes An empty list to be filled with
4690     *        what is read from the fractional configuration properties.
4691     * @return the fractional mode deduced from the passed configuration:
4692     *         not fractional, exclusive fractional or inclusive fractional
4693     *         modes
4694     */
4695    private static int parseFractionalConfig (
4696      Iterator<String> exclIt, Iterator<String> inclIt,
4697      Map<String, Set<String>> fractionalSpecificClassesAttributes,
4698      Set<String> fractionalAllClassesAttributes) throws ConfigException
4699    {
4700      // Determine if fractional-exclude or fractional-include property is used:
4701      // only one of them is allowed
4702      int fractionalMode;
4703      Iterator<String> iterator;
4704      if (exclIt != null && exclIt.hasNext())
4705      {
4706        if (inclIt != null && inclIt.hasNext())
4707        {
4708          throw new ConfigException(
4709            NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get());
4710        }
4711
4712        fractionalMode = EXCLUSIVE_FRACTIONAL;
4713        iterator = exclIt;
4714      }
4715      else
4716      {
4717        if (inclIt != null && inclIt.hasNext())
4718        {
4719          fractionalMode = INCLUSIVE_FRACTIONAL;
4720          iterator = inclIt;
4721        }
4722        else
4723        {
4724          return NOT_FRACTIONAL;
4725        }
4726      }
4727
4728      while (iterator.hasNext())
4729      {
4730        // Parse a value with the form class:attr1,attr2...
4731        // or *:attr1,attr2...
4732        String fractCfgStr = iterator.next();
4733        StringTokenizer st = new StringTokenizer(fractCfgStr, ":");
4734        int nTokens = st.countTokens();
4735        if (nTokens < 2)
4736        {
4737          throw new ConfigException(NOTE_ERR_FRACTIONAL_CONFIG_WRONG_FORMAT.
4738            get(fractCfgStr));
4739        }
4740        // Get the class name
4741        String classNameLower = st.nextToken().toLowerCase();
4742        boolean allClasses = "*".equals(classNameLower);
4743        // Get the attributes
4744        String attributes = st.nextToken();
4745        st = new StringTokenizer(attributes, ",");
4746        while (st.hasMoreTokens())
4747        {
4748          String attrNameLower = st.nextToken().toLowerCase();
4749          // Store attribute in the appropriate variable
4750          if (allClasses)
4751          {
4752            fractionalAllClassesAttributes.add(attrNameLower);
4753          }
4754          else
4755          {
4756            Set<String> attrList = fractionalSpecificClassesAttributes.get(classNameLower);
4757            if (attrList == null)
4758            {
4759              attrList = new LinkedHashSet<>();
4760              fractionalSpecificClassesAttributes.put(classNameLower, attrList);
4761            }
4762            attrList.add(attrNameLower);
4763          }
4764        }
4765      }
4766      return fractionalMode;
4767    }
4768
4769    /** Return type of the parseFractionalConfig method. */
4770    private static final int NOT_FRACTIONAL = 0;
4771    private static final int EXCLUSIVE_FRACTIONAL = 1;
4772    private static final int INCLUSIVE_FRACTIONAL = 2;
4773
4774    /**
4775     * Get an integer representation of the domain fractional configuration.
4776     * @return An integer representation of the domain fractional configuration.
4777     */
4778    private int fractionalConfigToInt()
4779    {
4780      if (!fractional)
4781      {
4782        return NOT_FRACTIONAL;
4783      }
4784      else if (fractionalExclusive)
4785      {
4786        return EXCLUSIVE_FRACTIONAL;
4787      }
4788      return INCLUSIVE_FRACTIONAL;
4789    }
4790
4791    /**
4792     * Compare 2 fractional replication configurations and returns true if they
4793     * are equivalent.
4794     * @param cfg1 First fractional configuration
4795     * @param cfg2 Second fractional configuration
4796     * @return True if both configurations are equivalent.
4797     * @throws ConfigException If some classes or attributes could not be
4798     * retrieved from the schema.
4799     */
4800    private static boolean isFractionalConfigEquivalent(FractionalConfig cfg1,
4801        FractionalConfig cfg2) throws ConfigException
4802    {
4803      // Compare base DNs just to be consistent
4804      if (!cfg1.getBaseDn().equals(cfg2.getBaseDn()))
4805      {
4806        return false;
4807      }
4808
4809      // Compare modes
4810      if (cfg1.isFractional() != cfg2.isFractional()
4811          || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())
4812      {
4813        return false;
4814      }
4815
4816      // Compare all classes attributes
4817      Set<String> allClassesAttrs1 = cfg1.getFractionalAllClassesAttributes();
4818      Set<String> allClassesAttrs2 = cfg2.getFractionalAllClassesAttributes();
4819      if (!areAttributesEquivalent(allClassesAttrs1, allClassesAttrs2))
4820      {
4821        return false;
4822      }
4823
4824      // Compare specific classes attributes
4825      Map<String, Set<String>> specificClassesAttrs1 =
4826          cfg1.getFractionalSpecificClassesAttributes();
4827      Map<String, Set<String>> specificClassesAttrs2 =
4828          cfg2.getFractionalSpecificClassesAttributes();
4829      if (specificClassesAttrs1.size() != specificClassesAttrs2.size())
4830      {
4831        return false;
4832      }
4833
4834      /*
4835       * Check consistency of specific classes attributes
4836       *
4837       * For each class in specificClassesAttributes1, check that the attribute
4838       * list is equivalent to specificClassesAttributes2 attribute list
4839       */
4840      Schema schema = DirectoryServer.getSchema();
4841      for (String className1 : specificClassesAttrs1.keySet())
4842      {
4843        // Get class from specificClassesAttributes1
4844        ObjectClass objectClass1 = schema.getObjectClass(className1);
4845        if (objectClass1 == null)
4846        {
4847          throw new ConfigException(
4848            NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className1));
4849        }
4850
4851        // Look for matching one in specificClassesAttributes2
4852        boolean foundClass = false;
4853        for (String className2 : specificClassesAttrs2.keySet())
4854        {
4855          ObjectClass objectClass2 = schema.getObjectClass(className2);
4856          if (objectClass2 == null)
4857          {
4858            throw new ConfigException(
4859              NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className2));
4860          }
4861          if (objectClass1.equals(objectClass2))
4862          {
4863            foundClass = true;
4864            // Now compare the 2 attribute lists
4865            Set<String> attributes1 = specificClassesAttrs1.get(className1);
4866            Set<String> attributes2 = specificClassesAttrs2.get(className2);
4867            if (!areAttributesEquivalent(attributes1, attributes2))
4868            {
4869              return false;
4870            }
4871            break;
4872          }
4873        }
4874        // Found matching class ?
4875        if (!foundClass)
4876        {
4877          return false;
4878        }
4879      }
4880
4881      return true;
4882    }
4883  }
4884
4885  /**
4886   * Specifies whether this domain is enabled/disabled regarding the ECL.
4887   * @return enabled/disabled for the ECL.
4888   */
4889  boolean isECLEnabled()
4890  {
4891    return this.eclDomain.isEnabled();
4892  }
4893
4894  /**
4895   * Return the minimum time (in ms) that the domain keeps the historical
4896   * information necessary to solve conflicts.
4897   *
4898   * @return the purge delay.
4899   */
4900  long getHistoricalPurgeDelay()
4901  {
4902    return config.getConflictsHistoricalPurgeDelay() * 60 * 1000;
4903  }
4904
4905  /**
4906   * Check if the operation that just happened has cleared a conflict : Clearing
4907   * a conflict happens if the operation has freed a DN for which another entry
4908   * was in conflict.
4909   * <p>
4910   * Steps:
4911   * <ul>
4912   * <li>get the DN freed by a DELETE or MODRDN op</li>
4913   * <li>search for entries put in the conflict space (dn=entryUUID'+'....)
4914   * because the expected DN was not available (ds-sync-conflict=expected DN)
4915   * </li>
4916   * <li>retain the entry with the oldest conflict</li>
4917   * <li>rename this entry with the freedDN as it was expected originally</li>
4918   * </ul>
4919   *
4920   * @param task
4921   *          the task raising this purge.
4922   * @param endDate
4923   *          the date to stop this task whether the job is done or not.
4924   * @throws DirectoryException
4925   *           when an exception happens.
4926   */
4927  public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
4928      long endDate) throws DirectoryException
4929  {
4930     logger.trace("[PURGE] purgeConflictsHistorical "
4931         + "on domain: " + getBaseDN()
4932         + "endDate:" + new Date(endDate)
4933         + "lastCSNPurgedFromHist: "
4934         + lastCSNPurgedFromHist.toStringUI());
4935
4936    String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")";
4937    SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
4938        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
4939    InternalSearchOperation searchOp = conn.processSearch(request);
4940
4941     int count = 0;
4942     if (task != null)
4943     {
4944       task.setProgressStats(lastCSNPurgedFromHist, count);
4945     }
4946
4947     for (SearchResultEntry entry : searchOp.getSearchEntries())
4948     {
4949       long maxTimeToRun = endDate - TimeThread.getTime();
4950       if (maxTimeToRun < 0)
4951       {
4952        throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
4953            LocalizableMessage.raw(" end date reached"));
4954       }
4955
4956       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
4957       lastCSNPurgedFromHist = entryHist.getOldestCSN();
4958       entryHist.setPurgeDelay(getHistoricalPurgeDelay());
4959       Attribute attr = entryHist.encodeAndPurge();
4960       count += entryHist.getLastPurgedValuesCount();
4961       List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
4962
4963       ModifyOperation newOp = new ModifyOperationBasis(
4964           conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
4965           entry.getName(), mods);
4966       runAsSynchronizedOperation(newOp);
4967
4968       if (newOp.getResultCode() != ResultCode.SUCCESS)
4969       {
4970         // Log information for the repair tool.
4971         logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
4972       }
4973       else if (task != null)
4974       {
4975         task.setProgressStats(lastCSNPurgedFromHist, count);
4976       }
4977     }
4978  }
4979}