001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2016 ForgeRock AS
026 */
027package org.opends.server.replication.plugin;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
031import static org.opends.server.util.ServerConstants.*;
032import static org.opends.server.util.StaticUtils.*;
033
034import java.util.ArrayList;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.BlockingQueue;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.LinkedBlockingQueue;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.forgerock.i18n.LocalizableMessage;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigChangeResult;
048import org.forgerock.opendj.config.server.ConfigException;
049import org.forgerock.opendj.ldap.ResultCode;
050import org.opends.server.admin.server.ConfigurationAddListener;
051import org.opends.server.admin.server.ConfigurationChangeListener;
052import org.opends.server.admin.server.ConfigurationDeleteListener;
053import org.opends.server.admin.std.server.ReplicationDomainCfg;
054import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
055import org.opends.server.api.Backend;
056import org.opends.server.api.BackupTaskListener;
057import org.opends.server.api.ExportTaskListener;
058import org.opends.server.api.ImportTaskListener;
059import org.opends.server.api.RestoreTaskListener;
060import org.opends.server.api.SynchronizationProvider;
061import org.opends.server.core.DirectoryServer;
062import org.opends.server.replication.service.DSRSShutdownSync;
063import org.opends.server.types.BackupConfig;
064import org.opends.server.types.Control;
065import org.opends.server.types.DN;
066import org.opends.server.types.DirectoryException;
067import org.opends.server.types.Entry;
068import org.opends.server.types.LDIFExportConfig;
069import org.opends.server.types.LDIFImportConfig;
070import org.opends.server.types.Modification;
071import org.opends.server.types.Operation;
072import org.opends.server.types.RestoreConfig;
073import org.opends.server.types.SynchronizationProviderResult;
074import org.opends.server.types.operation.PluginOperation;
075import org.opends.server.types.operation.PostOperationAddOperation;
076import org.opends.server.types.operation.PostOperationDeleteOperation;
077import org.opends.server.types.operation.PostOperationModifyDNOperation;
078import org.opends.server.types.operation.PostOperationModifyOperation;
079import org.opends.server.types.operation.PostOperationOperation;
080import org.opends.server.types.operation.PreOperationAddOperation;
081import org.opends.server.types.operation.PreOperationDeleteOperation;
082import org.opends.server.types.operation.PreOperationModifyDNOperation;
083import org.opends.server.types.operation.PreOperationModifyOperation;
084import org.opends.server.util.Platform;
085
086/**
087 * This class is used to load the Replication code inside the JVM
088 * and to trigger initialization of the replication.
089 *
090 * It also extends the SynchronizationProvider class in order to have some
091 * replication code running during the operation process
092 * as pre-op, conflictResolution, and post-op.
093 */
094public class MultimasterReplication
095       extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
096       implements ConfigurationAddListener<ReplicationDomainCfg>,
097                  ConfigurationDeleteListener<ReplicationDomainCfg>,
098                  ConfigurationChangeListener
099                  <ReplicationSynchronizationProviderCfg>,
100                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
101                  ExportTaskListener
102{
103
104  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
105
106  private ReplicationServerListener replicationServerListener;
107  private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<>(4);
108  private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
109  /** The queue of received update messages, to be treated by the ReplayThread threads. */
110  private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<>(10000);
111  /** The list of ReplayThread threads. */
112  private static final List<ReplayThread> replayThreads = new ArrayList<>();
113  /** The configurable number of replay threads. */
114  private static int replayThreadNumber = 10;
115
116  /** Enum that symbolizes the state of the multimaster replication. */
117  private static enum State
118  {
119    STARTING, RUNNING, STOPPING
120  }
121
122  private static final AtomicReference<State> state = new AtomicReference<>(State.STARTING);
123
124  /** The configurable connection/handshake timeout. */
125  private static volatile int connectionTimeoutMS = 5000;
126
127  /**
128   * Finds the domain for a given DN.
129   *
130   * @param dn         The DN for which the domain must be returned.
131   * @param pluginOp   An optional operation for which the check is done.
132   *                   Can be null is the request has no associated operation.
133   * @return           The domain for this DN.
134   */
135  public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
136  {
137    /*
138     * Don't run the special replication code on Operation that are
139     * specifically marked as don't synchronize.
140     */
141    if (pluginOp instanceof Operation)
142    {
143        final Operation op = (Operation) pluginOp;
144        if (op.dontSynchronize())
145        {
146          return null;
147        }
148
149        /*
150         * Check if the provided operation is a repair operation and set the
151         * synchronization flags if necessary.
152         * The repair operations are tagged as synchronization operations so
153         * that the core server let the operation modify the entryuuid and
154         * ds-sync-hist attributes.
155         * They are also tagged as dontSynchronize so that the replication code
156         * running later do not generate CSN, solve conflicts and forward the
157         * operation to the replication server.
158         */
159        final List<Control> controls = op.getRequestControls();
160        for (Iterator<Control> iter = controls.iterator(); iter.hasNext();)
161        {
162          Control c = iter.next();
163          if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID()))
164          {
165            op.setSynchronizationOperation(true);
166            op.setDontSynchronize(true);
167            /*
168            remove this control from the list of controls since it has now been
169            processed and the local backend will fail if it finds a control that
170            it does not know about and that is marked as critical.
171            */
172            iter.remove();
173            return null;
174          }
175        }
176    }
177
178
179    LDAPReplicationDomain domain = null;
180    DN temp = dn;
181    while (domain == null && temp != null)
182    {
183      domain = domains.get(temp);
184      temp = temp.getParentDNInSuffix();
185    }
186
187    return domain;
188  }
189
190  /**
191   * Creates a new domain from its configEntry, do the
192   * necessary initialization and starts it so that it is
193   * fully operational when this method returns.
194   * @param configuration The entry with the configuration of this domain.
195   * @return The domain created.
196   * @throws ConfigException When the configuration is not valid.
197   */
198  public static LDAPReplicationDomain createNewDomain(
199      ReplicationDomainCfg configuration)
200      throws ConfigException
201  {
202    try
203    {
204      final LDAPReplicationDomain domain = new LDAPReplicationDomain(
205          configuration, updateToReplayQueue, dsrsShutdownSync);
206      if (domains.isEmpty())
207      {
208        // Create the threads that will process incoming update messages
209        createReplayThreads();
210      }
211
212      domains.put(domain.getBaseDN(), domain);
213      return domain;
214    }
215    catch (ConfigException e)
216    {
217      logger.error(ERR_COULD_NOT_START_REPLICATION, configuration.dn(),
218          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
219    }
220    return null;
221  }
222
223  /**
224   * Creates a new domain from its configEntry, do the necessary initialization
225   * and starts it so that it is fully operational when this method returns. It
226   * is only used for tests so far.
227   *
228   * @param configuration The entry with the configuration of this domain.
229   * @param queue         The BlockingQueue that this domain will use.
230   *
231   * @return              The domain created.
232   *
233   * @throws ConfigException When the configuration is not valid.
234   */
235  static LDAPReplicationDomain createNewDomain(
236      ReplicationDomainCfg configuration,
237      BlockingQueue<UpdateToReplay> queue)
238      throws ConfigException
239  {
240    final LDAPReplicationDomain domain =
241        new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
242    domains.put(domain.getBaseDN(), domain);
243    return domain;
244  }
245
246  /**
247   * Deletes a domain.
248   * @param dn : the base DN of the domain to delete.
249   */
250  public static void deleteDomain(DN dn)
251  {
252    LDAPReplicationDomain domain = domains.remove(dn);
253    if (domain != null)
254    {
255      domain.delete();
256    }
257
258    // No replay threads running if no replication need
259    if (domains.isEmpty()) {
260      stopReplayThreads();
261    }
262  }
263
264  /** {@inheritDoc} */
265  @Override
266  public void initializeSynchronizationProvider(
267      ReplicationSynchronizationProviderCfg cfg) throws ConfigException
268  {
269    domains.clear();
270    replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
271
272    // Register as an add and delete listener with the root configuration so we
273    // can be notified if Multimaster domain entries are added or removed.
274    cfg.addReplicationDomainAddListener(this);
275    cfg.addReplicationDomainDeleteListener(this);
276
277    // Register as a root configuration listener so that we can be notified if
278    // number of replay threads is changed and apply changes.
279    cfg.addReplicationChangeListener(this);
280
281    replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg);
282    connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
283
284    //  Create the list of domains that are already defined.
285    for (String name : cfg.listReplicationDomains())
286    {
287      createNewDomain(cfg.getReplicationDomain(name));
288    }
289
290    // If any schema changes were made with the server offline, then handle them now.
291    List<Modification> offlineSchemaChanges =
292         DirectoryServer.getOfflineSchemaChanges();
293    if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
294    {
295      processSchemaChange(offlineSchemaChanges);
296    }
297
298    DirectoryServer.registerBackupTaskListener(this);
299    DirectoryServer.registerRestoreTaskListener(this);
300    DirectoryServer.registerExportTaskListener(this);
301    DirectoryServer.registerImportTaskListener(this);
302
303    DirectoryServer.registerSupportedControl(
304        ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL);
305  }
306
307  private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg)
308  {
309    Integer value = cfg.getNumUpdateReplayThreads();
310    return value == null ? Platform.computeNumberOfThreads(16, 2.0f) : value;
311  }
312
313  /**
314   * Create the threads that will wait for incoming update messages.
315   */
316  private static synchronized void createReplayThreads()
317  {
318    replayThreads.clear();
319
320    for (int i = 0; i < replayThreadNumber; i++)
321    {
322      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
323      replayThread.start();
324      replayThreads.add(replayThread);
325    }
326  }
327
328  /**
329   * Stop the threads that are waiting for incoming update messages.
330   */
331  private static synchronized void stopReplayThreads()
332  {
333    //  stop the replay threads
334    for (ReplayThread replayThread : replayThreads)
335    {
336      replayThread.shutdown();
337    }
338
339    for (ReplayThread replayThread : replayThreads)
340    {
341      try
342      {
343        replayThread.join();
344      }
345      catch(InterruptedException e)
346      {
347        Thread.currentThread().interrupt();
348      }
349    }
350    replayThreads.clear();
351  }
352
353  /** {@inheritDoc} */
354  @Override
355  public boolean isConfigurationAddAcceptable(
356      ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
357  {
358    return LDAPReplicationDomain.isConfigurationAcceptable(
359      configuration, unacceptableReasons);
360  }
361
362  /** {@inheritDoc} */
363  @Override
364  public ConfigChangeResult applyConfigurationAdd(
365     ReplicationDomainCfg configuration)
366  {
367    ConfigChangeResult ccr = new ConfigChangeResult();
368    try
369    {
370      LDAPReplicationDomain rd = createNewDomain(configuration);
371      if (State.RUNNING.equals(state.get()))
372      {
373        rd.start();
374        if (State.STOPPING.equals(state.get())) {
375          rd.shutdown();
376        }
377      }
378    } catch (ConfigException e)
379    {
380      // we should never get to this point because the configEntry has
381      // already been validated in isConfigurationAddAcceptable()
382      ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION);
383    }
384    return ccr;
385  }
386
387  /** {@inheritDoc} */
388  @Override
389  public void doPostOperation(PostOperationAddOperation addOperation)
390  {
391    DN dn = addOperation.getEntryDN();
392    genericPostOperation(addOperation, dn);
393  }
394
395
396  /** {@inheritDoc} */
397  @Override
398  public void doPostOperation(PostOperationDeleteOperation deleteOperation)
399  {
400    DN dn = deleteOperation.getEntryDN();
401    genericPostOperation(deleteOperation, dn);
402  }
403
404  /** {@inheritDoc} */
405  @Override
406  public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation)
407  {
408    DN dn = modifyDNOperation.getEntryDN();
409    genericPostOperation(modifyDNOperation, dn);
410  }
411
412  /** {@inheritDoc} */
413  @Override
414  public void doPostOperation(PostOperationModifyOperation modifyOperation)
415  {
416    DN dn = modifyOperation.getEntryDN();
417    genericPostOperation(modifyOperation, dn);
418  }
419
420  /** {@inheritDoc} */
421  @Override
422  public SynchronizationProviderResult handleConflictResolution(
423      PreOperationModifyOperation modifyOperation)
424  {
425    LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
426    if (domain != null)
427    {
428      return domain.handleConflictResolution(modifyOperation);
429    }
430    return new SynchronizationProviderResult.ContinueProcessing();
431  }
432
433  /** {@inheritDoc} */
434  @Override
435  public SynchronizationProviderResult handleConflictResolution(
436      PreOperationAddOperation addOperation) throws DirectoryException
437  {
438    LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
439    if (domain != null)
440    {
441      return domain.handleConflictResolution(addOperation);
442    }
443    return new SynchronizationProviderResult.ContinueProcessing();
444  }
445
446  /** {@inheritDoc} */
447  @Override
448  public SynchronizationProviderResult handleConflictResolution(
449      PreOperationDeleteOperation deleteOperation) throws DirectoryException
450  {
451    LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
452    if (domain != null)
453    {
454      return domain.handleConflictResolution(deleteOperation);
455    }
456    return new SynchronizationProviderResult.ContinueProcessing();
457  }
458
459  /** {@inheritDoc} */
460  @Override
461  public SynchronizationProviderResult handleConflictResolution(
462      PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
463  {
464    LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
465    if (domain != null)
466    {
467      return domain.handleConflictResolution(modifyDNOperation);
468    }
469    return new SynchronizationProviderResult.ContinueProcessing();
470  }
471
472  /** {@inheritDoc} */
473  @Override
474  public SynchronizationProviderResult
475         doPreOperation(PreOperationModifyOperation modifyOperation)
476  {
477    DN operationDN = modifyOperation.getEntryDN();
478    LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation);
479
480    if (domain == null || !domain.solveConflict())
481    {
482      return new SynchronizationProviderResult.ContinueProcessing();
483    }
484
485    EntryHistorical historicalInformation = (EntryHistorical)
486      modifyOperation.getAttachment(EntryHistorical.HISTORICAL);
487    if (historicalInformation == null)
488    {
489      Entry entry = modifyOperation.getModifiedEntry();
490      historicalInformation = EntryHistorical.newInstanceFromEntry(entry);
491      modifyOperation.setAttachment(EntryHistorical.HISTORICAL,
492          historicalInformation);
493    }
494    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
495    historicalInformation.setHistoricalAttrToOperation(modifyOperation);
496
497    if (modifyOperation.getModifications().isEmpty())
498    {
499      /*
500       * This operation becomes a no-op due to conflict resolution
501       * stop the processing and send an OK result
502       */
503      return new SynchronizationProviderResult.StopProcessing(
504          ResultCode.SUCCESS, null);
505    }
506
507    return new SynchronizationProviderResult.ContinueProcessing();
508  }
509
510  /** {@inheritDoc} */
511  @Override
512  public SynchronizationProviderResult doPreOperation(
513         PreOperationDeleteOperation deleteOperation) throws DirectoryException
514  {
515    return new SynchronizationProviderResult.ContinueProcessing();
516  }
517
518  /** {@inheritDoc} */
519  @Override
520  public SynchronizationProviderResult doPreOperation(
521         PreOperationModifyDNOperation modifyDNOperation)
522         throws DirectoryException
523  {
524    DN operationDN = modifyDNOperation.getEntryDN();
525    LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
526
527    if (domain == null || !domain.solveConflict())
528    {
529      return new SynchronizationProviderResult.ContinueProcessing();
530    }
531
532    // The historical object is retrieved from the attachment created
533    // in the HandleConflictResolution phase.
534    EntryHistorical historicalInformation = (EntryHistorical)
535    modifyDNOperation.getAttachment(EntryHistorical.HISTORICAL);
536    if (historicalInformation == null)
537    {
538      // When no Historical attached, create once by loading from the entry
539      // and attach it to the operation
540      Entry entry = modifyDNOperation.getUpdatedEntry();
541      historicalInformation = EntryHistorical.newInstanceFromEntry(entry);
542      modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL,
543          historicalInformation);
544    }
545    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
546
547    // Add to the operation the historical attribute : "dn:changeNumber:moddn"
548    historicalInformation.setHistoricalAttrToOperation(modifyDNOperation);
549
550    return new SynchronizationProviderResult.ContinueProcessing();
551  }
552
553  /** {@inheritDoc} */
554  @Override
555  public SynchronizationProviderResult doPreOperation(
556         PreOperationAddOperation addOperation)
557  {
558    // Check replication domain
559    LDAPReplicationDomain domain =
560      findDomain(addOperation.getEntryDN(), addOperation);
561    if (domain == null)
562    {
563      return new SynchronizationProviderResult.ContinueProcessing();
564    }
565
566    // For LOCAL op only, generate CSN and attach Context
567    if (!addOperation.isSynchronizationOperation())
568    {
569      domain.doPreOperation(addOperation);
570    }
571
572    // Add to the operation the historical attribute : "dn:changeNumber:add"
573    EntryHistorical.setHistoricalAttrToOperation(addOperation);
574
575    return new SynchronizationProviderResult.ContinueProcessing();
576  }
577
578  /** {@inheritDoc} */
579  @Override
580  public void finalizeSynchronizationProvider()
581  {
582    setState(State.STOPPING);
583
584    for (LDAPReplicationDomain domain : domains.values())
585    {
586      domain.shutdown();
587    }
588    domains.clear();
589
590    stopReplayThreads();
591
592    if (replicationServerListener != null)
593    {
594      replicationServerListener.shutdown();
595    }
596
597    DirectoryServer.deregisterBackupTaskListener(this);
598    DirectoryServer.deregisterRestoreTaskListener(this);
599    DirectoryServer.deregisterExportTaskListener(this);
600    DirectoryServer.deregisterImportTaskListener(this);
601  }
602
603  /**
604   * This method is called whenever the server detects a modification
605   * of the schema done by directly modifying the backing files
606   * of the schema backend.
607   * Call the schema Domain if it exists.
608   *
609   * @param  modifications  The list of modifications that was
610   *                                      applied to the schema.
611   *
612   */
613  @Override
614  public void processSchemaChange(List<Modification> modifications)
615  {
616    LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
617    if (domain != null)
618    {
619      domain.synchronizeSchemaModifications(modifications);
620    }
621  }
622
623  /** {@inheritDoc} */
624  @Override
625  public void processBackupBegin(Backend backend, BackupConfig config)
626  {
627    for (DN dn : backend.getBaseDNs())
628    {
629      LDAPReplicationDomain domain = findDomain(dn, null);
630      if (domain != null)
631      {
632        domain.backupStart();
633      }
634    }
635  }
636
637  /** {@inheritDoc} */
638  @Override
639  public void processBackupEnd(Backend backend, BackupConfig config,
640                               boolean successful)
641  {
642    for (DN dn : backend.getBaseDNs())
643    {
644      LDAPReplicationDomain domain = findDomain(dn, null);
645      if (domain != null)
646      {
647        domain.backupEnd();
648      }
649    }
650  }
651
652  /** {@inheritDoc} */
653  @Override
654  public void processRestoreBegin(Backend backend, RestoreConfig config)
655  {
656    for (DN dn : backend.getBaseDNs())
657    {
658      LDAPReplicationDomain domain = findDomain(dn, null);
659      if (domain != null)
660      {
661        domain.disable();
662      }
663    }
664  }
665
666  /** {@inheritDoc} */
667  @Override
668  public void processRestoreEnd(Backend backend, RestoreConfig config,
669                                boolean successful)
670  {
671    for (DN dn : backend.getBaseDNs())
672    {
673      LDAPReplicationDomain domain = findDomain(dn, null);
674      if (domain != null)
675      {
676        domain.enable();
677      }
678    }
679  }
680
681  /** {@inheritDoc} */
682  @Override
683  public void processImportBegin(Backend backend, LDIFImportConfig config)
684  {
685    for (DN dn : backend.getBaseDNs())
686    {
687      LDAPReplicationDomain domain = findDomain(dn, null);
688      if (domain != null)
689      {
690        domain.disable();
691      }
692    }
693  }
694
695  /** {@inheritDoc} */
696  @Override
697  public void processImportEnd(Backend backend, LDIFImportConfig config,
698                               boolean successful)
699  {
700    for (DN dn : backend.getBaseDNs())
701    {
702      LDAPReplicationDomain domain = findDomain(dn, null);
703      if (domain != null)
704      {
705        domain.enable();
706      }
707    }
708  }
709
710  /** {@inheritDoc} */
711  @Override
712  public void processExportBegin(Backend backend, LDIFExportConfig config)
713  {
714    for (DN dn : backend.getBaseDNs())
715    {
716      LDAPReplicationDomain domain = findDomain(dn, null);
717      if (domain != null)
718      {
719        domain.backupStart();
720      }
721    }
722  }
723
724  /** {@inheritDoc} */
725  @Override
726  public void processExportEnd(Backend backend, LDIFExportConfig config,
727                               boolean successful)
728  {
729    for (DN dn : backend.getBaseDNs())
730    {
731      LDAPReplicationDomain domain = findDomain(dn, null);
732      if (domain != null)
733      {
734        domain.backupEnd();
735      }
736    }
737  }
738
739  /** {@inheritDoc} */
740  @Override
741  public ConfigChangeResult applyConfigurationDelete(
742      ReplicationDomainCfg configuration)
743  {
744    deleteDomain(configuration.getBaseDN());
745
746    return new ConfigChangeResult();
747  }
748
749  /** {@inheritDoc} */
750  @Override
751  public boolean isConfigurationDeleteAcceptable(
752      ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons)
753  {
754    return true;
755  }
756
757  /**
758   * Generic code for all the postOperation entry point.
759   *
760   * @param operation The Operation for which the post-operation is called.
761   * @param dn The Dn for which the post-operation is called.
762   */
763  private void genericPostOperation(PostOperationOperation operation, DN dn)
764  {
765    LDAPReplicationDomain domain = findDomain(dn, operation);
766    if (domain != null) {
767      domain.synchronize(operation);
768    }
769  }
770
771  /**
772   * Returns the replication server listener associated to that Multimaster
773   * Replication.
774   * @return the listener.
775   */
776  public ReplicationServerListener getReplicationServerListener()
777  {
778    return replicationServerListener;
779  }
780
781  /** {@inheritDoc} */
782  @Override
783  public boolean isConfigurationChangeAcceptable(
784      ReplicationSynchronizationProviderCfg configuration,
785      List<LocalizableMessage> unacceptableReasons)
786  {
787    return true;
788  }
789
790  @Override
791  public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration)
792  {
793
794    // Stop threads then restart new number of threads
795    stopReplayThreads();
796    replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration);
797    if (!domains.isEmpty())
798    {
799      createReplayThreads();
800    }
801
802    connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
803        Integer.MAX_VALUE);
804
805    return new ConfigChangeResult();
806  }
807
808  /** {@inheritDoc} */
809  @Override
810  public void completeSynchronizationProvider()
811  {
812    for (LDAPReplicationDomain domain : domains.values())
813    {
814      domain.start();
815    }
816    setState(State.RUNNING);
817  }
818
819  private void setState(State newState)
820  {
821    state.set(newState);
822    synchronized (state)
823    {
824      state.notifyAll();
825    }
826  }
827
828  /**
829   * Gets the number of handled domain objects.
830   * @return The number of handled domain objects
831   */
832  public static int getNumberOfDomains()
833  {
834    return domains.size();
835  }
836
837  /**
838   * Gets the Set of domain baseDN which are disabled for the external changelog.
839   *
840   * @return The Set of domain baseDNs which are disabled for the external changelog.
841   * @throws DirectoryException
842   *            if a problem occurs
843   */
844  public static Set<DN> getExcludedChangelogDomains() throws DirectoryException
845  {
846    final Set<DN> disabledBaseDNs = new HashSet<>(domains.size() + 1);
847    disabledBaseDNs.add(DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT));
848    for (LDAPReplicationDomain domain : domains.values())
849    {
850      if (!domain.isECLEnabled())
851      {
852        disabledBaseDNs.add(domain.getBaseDN());
853      }
854    }
855    return disabledBaseDNs;
856  }
857
858  /**
859   * Returns whether the provided baseDN represents a replication domain enabled
860   * for the external changelog.
861   *
862   * @param baseDN
863   *          the replication domain to check
864   * @return true if the provided baseDN is enabled for the external changelog,
865   *         false if the provided baseDN is disabled for the external changelog
866   *         or unknown to multimaster replication.
867   */
868  public static boolean isECLEnabledDomain(DN baseDN)
869  {
870    waitForStartup();
871    // if state is STOPPING, then we need to return from this method
872    final LDAPReplicationDomain domain = domains.get(baseDN);
873    return domain != null && domain.isECLEnabled();
874  }
875
876  /**
877   * Returns whether the external change-log contains data from at least a domain.
878   * @return whether the external change-log contains data from at least a domain
879   */
880  public static boolean isECLEnabled()
881  {
882    waitForStartup();
883    for (LDAPReplicationDomain domain : domains.values())
884    {
885      if (domain.isECLEnabled())
886      {
887        return true;
888      }
889    }
890    return false;
891  }
892
893  private static void waitForStartup()
894  {
895    if (State.STARTING.equals(state.get()))
896    {
897      synchronized (state)
898      {
899        while (State.STARTING.equals(state.get()))
900        {
901          try
902          {
903            state.wait();
904          }
905          catch (InterruptedException ignored)
906          {
907            // loop and check state again
908          }
909        }
910      }
911    }
912  }
913
914  /**
915   * Returns the connection timeout in milli-seconds.
916   *
917   * @return The connection timeout in milli-seconds.
918   */
919  public static int getConnectionTimeoutMS()
920  {
921    return connectionTimeoutMS;
922  }
923
924}