001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2008-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.service;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.replication.common.AssuredMode.*;
031import static org.opends.server.replication.common.StatusMachine.*;
032import static org.opends.server.util.CollectionUtils.*;
033
034import java.io.BufferedOutputStream;
035import java.io.IOException;
036import java.io.InputStream;
037import java.io.OutputStream;
038import java.net.SocketTimeoutException;
039import java.util.*;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.TimeoutException;
042import java.util.concurrent.atomic.AtomicInteger;
043import java.util.concurrent.atomic.AtomicReference;
044
045import org.forgerock.i18n.LocalizableMessage;
046import org.forgerock.i18n.slf4j.LocalizedLogger;
047import org.forgerock.opendj.config.server.ConfigException;
048import org.forgerock.opendj.ldap.ResultCode;
049import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
050import org.opends.server.admin.std.server.ReplicationDomainCfg;
051import org.opends.server.api.DirectoryThread;
052import org.opends.server.backends.task.Task;
053import org.opends.server.replication.common.*;
054import org.opends.server.replication.protocol.*;
055import org.opends.server.tasks.InitializeTargetTask;
056import org.opends.server.tasks.InitializeTask;
057import org.opends.server.types.Attribute;
058import org.opends.server.types.DN;
059import org.opends.server.types.DirectoryException;
060
061/**
062 * This class should be used as a base for Replication implementations.
063 * <p>
064 * It is intended that developer in need of a replication mechanism
065 * subclass this class with their own implementation.
066 * <p>
067 *   The startup phase of the ReplicationDomain subclass,
068 *   should read the list of replication servers from the configuration,
069 *   instantiate a {@link ServerState} then start the publish service
070 *   by calling {@link #startPublishService()}.
071 *   At this point it can start calling the {@link #publish(UpdateMsg)}
072 *   method if needed.
073 * <p>
074 *   When the startup phase reach the point when the subclass is ready
075 *   to handle updates the Replication Domain implementation should call the
076 *   {@link #startListenService()} method.
077 *   At this point a Listener thread is created on the Replication Service
078 *   and which can start receiving updates.
079 * <p>
080 *   When updates are received the Replication Service calls the
081 *   {@link #processUpdate(UpdateMsg)} method.
082 *   ReplicationDomain implementation should implement the appropriate code
083 *   for replaying the update on the local repository.
084 *   When fully done the subclass must call the
085 *   {@link #processUpdateDone(UpdateMsg, String)} method.
086 *   This allows to process the update asynchronously if necessary.
087 *
088 * <p>
089 *   To propagate changes to other replica, a ReplicationDomain implementation
090 *   must use the {@link #publish(UpdateMsg)} method.
091 * <p>
092 *   If the Full Initialization process is needed then implementation
093 *   for {@code importBackend(InputStream)} and
094 *   {@code exportBackend(OutputStream)} must be
095 *   provided.
096 * <p>
097 *   Full Initialization of a replica can be triggered by LDAP clients
098 *   by creating InitializeTasks or InitializeTargetTask.
099 *   Full initialization can also be triggered from the ReplicationDomain
100 *   implementation using methods {@link #initializeRemote(int, Task)}
101 *   or {@link #initializeFromRemote(int, Task)}.
102 * <p>
103 *   At shutdown time, the {@link #disableService()} method should be called to
104 *   cleanly stop the replication service.
105 */
106public abstract class ReplicationDomain
107{
108
109  /**
110   * Contains all the attributes included for the ECL (External Changelog).
111   */
112  // @Immutable
113  private static final class ECLIncludes
114  {
115
116    final Map<Integer, Set<String>> includedAttrsByServer;
117    final Set<String> includedAttrsAllServers;
118
119    final Map<Integer, Set<String>> includedAttrsForDeletesByServer;
120    final Set<String> includedAttrsForDeletesAllServers;
121
122    private ECLIncludes(
123        Map<Integer, Set<String>> includedAttrsByServer,
124        Set<String> includedAttrsAllServers,
125        Map<Integer, Set<String>> includedAttrsForDeletesByServer,
126        Set<String> includedAttrsForDeletesAllServers)
127    {
128      this.includedAttrsByServer = includedAttrsByServer;
129      this.includedAttrsAllServers = includedAttrsAllServers;
130
131      this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer;
132      this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers;
133    }
134
135    @SuppressWarnings("unchecked")
136    public ECLIncludes()
137    {
138      this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP,
139          Collections.EMPTY_SET);
140    }
141
142    /**
143     * Add attributes to be included in the ECL.
144     *
145     * @param serverId
146     *          Server where these attributes are configured.
147     * @param includeAttributes
148     *          Attributes to be included with all change records, may include
149     *          wild-cards.
150     * @param includeAttributesForDeletes
151     *          Additional attributes to be included with delete change records,
152     *          may include wild-cards.
153     * @return a new {@link ECLIncludes} object if included attributes have
154     *         changed, or the current object otherwise.
155     */
156    public ECLIncludes addIncludedAttributes(int serverId,
157        Set<String> includeAttributes, Set<String> includeAttributesForDeletes)
158    {
159      boolean configurationChanged = false;
160
161      Set<String> s1 = new HashSet<>(includeAttributes);
162
163      // Combine all+delete attributes.
164      Set<String> s2 = new HashSet<>(s1);
165      s2.addAll(includeAttributesForDeletes);
166
167      Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer;
168      if (!s1.equals(this.includedAttrsByServer.get(serverId)))
169      {
170        configurationChanged = true;
171        eclIncludesByServer = new HashMap<>(this.includedAttrsByServer);
172        eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1));
173      }
174
175      Map<Integer, Set<String>> eclIncludesForDeletesByServer = this.includedAttrsForDeletesByServer;
176      if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId)))
177      {
178        configurationChanged = true;
179        eclIncludesForDeletesByServer = new HashMap<>(this.includedAttrsForDeletesByServer);
180        eclIncludesForDeletesByServer.put(serverId, Collections.unmodifiableSet(s2));
181      }
182
183      if (!configurationChanged)
184      {
185        return this;
186      }
187
188      // and rebuild the global list to be ready for usage
189      Set<String> eclIncludesAllServer = new HashSet<>();
190      for (Set<String> attributes : eclIncludesByServer.values())
191      {
192        eclIncludesAllServer.addAll(attributes);
193      }
194
195      Set<String> eclIncludesForDeletesAllServer = new HashSet<>();
196      for (Set<String> attributes : eclIncludesForDeletesByServer.values())
197      {
198        eclIncludesForDeletesAllServer.addAll(attributes);
199      }
200      return new ECLIncludes(eclIncludesByServer,
201          Collections.unmodifiableSet(eclIncludesAllServer),
202          eclIncludesForDeletesByServer,
203          Collections.unmodifiableSet(eclIncludesForDeletesAllServer));
204    }
205  }
206
207  /**
208   * Current status for this replicated domain.
209   */
210  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
211  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
212
213  /** The configuration of the replication domain. */
214  protected volatile ReplicationDomainCfg config;
215  /**
216   * The assured configuration of the replication domain. It is a duplicate of
217   * {@link #config} because of its update model.
218   *
219   * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
220   */
221  private volatile ReplicationDomainCfg assuredConfig;
222
223  /**
224   * The ReplicationBroker that is used by this ReplicationDomain to
225   * connect to the ReplicationService.
226   */
227  protected ReplicationBroker broker;
228
229  /**
230   * This Map is used to store all outgoing assured messages in order
231   * to be able to correlate all the coming back acks to the original
232   * operation.
233   */
234  private final Map<CSN, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<>();
235  /**
236   * The context related to an import or export being processed
237   * Null when none is being processed.
238   */
239  private final AtomicReference<ImportExportContext> importExportContext = new AtomicReference<>();
240
241  /**
242   * The Thread waiting for incoming update messages for this domain and pushing
243   * them to the global incoming update message queue for later processing by
244   * replay threads.
245   */
246  private volatile DirectoryThread listenerThread;
247
248  /** A set of counters used for Monitoring. */
249  private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
250  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
251  private AtomicInteger numSentUpdates = new AtomicInteger(0);
252
253  /** Assured replication monitoring counters. */
254
255  /** Number of updates sent in Assured Mode, Safe Read. */
256  private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0);
257  /**
258   * Number of updates sent in Assured Mode, Safe Read, that have been
259   * successfully acknowledged.
260   */
261  private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0);
262  /**
263   * Number of updates sent in Assured Mode, Safe Read, that have not been
264   * successfully acknowledged (either because of timeout, wrong status or error
265   * at replay).
266   */
267  private AtomicInteger assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
268  /**
269   * Number of updates sent in Assured Mode, Safe Read, that have not been
270   * successfully acknowledged because of timeout.
271   */
272  private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0);
273  /**
274   * Number of updates sent in Assured Mode, Safe Read, that have not been
275   * successfully acknowledged because of wrong status.
276   */
277  private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0);
278  /**
279   * Number of updates sent in Assured Mode, Safe Read, that have not been
280   * successfully acknowledged because of replay error.
281   */
282  private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0);
283  /**
284   * Multiple values allowed: number of updates sent in Assured Mode, Safe Read,
285   * that have not been successfully acknowledged (either because of timeout,
286   * wrong status or error at replay) for a particular server (DS or RS).
287   * <p>
288   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
289   */
290  private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<>();
291  /** Number of updates received in Assured Mode, Safe Read request. */
292  private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
293  /**
294   * Number of updates received in Assured Mode, Safe Read request that we have
295   * acked without errors.
296   */
297  private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
298  /**
299   * Number of updates received in Assured Mode, Safe Read request that we have
300   * acked with errors.
301   */
302  private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
303  /** Number of updates sent in Assured Mode, Safe Data. */
304  private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0);
305  /**
306   * Number of updates sent in Assured Mode, Safe Data, that have been
307   * successfully acknowledged.
308   */
309  private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0);
310  /**
311   * Number of updates sent in Assured Mode, Safe Data, that have not been
312   * successfully acknowledged because of timeout.
313   */
314  private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0);
315  /**
316   * Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
317   * that have not been successfully acknowledged because of timeout for a
318   * particular RS.
319   * <p>
320   * String format: &lt;server id&gt;:&lt;number of failed updates&gt;
321   */
322  private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<>();
323
324  /* Status related monitoring fields */
325
326  /**
327   * Indicates the date when the status changed. This may be used to indicate
328   * the date the session with the current replication server started (when
329   * status is NORMAL for instance). All the above assured monitoring fields
330   * are also reset each time the status is changed
331   */
332  private Date lastStatusChangeDate = new Date();
333
334  /**
335   * The state maintained by the Concrete Class.
336   */
337  private final ServerState state;
338
339  /**
340   * The generator that will be used to generate {@link CSN}
341   * for this domain.
342   */
343  private final CSNGenerator generator;
344
345  private final AtomicReference<ECLIncludes> eclIncludes = new AtomicReference<>(new ECLIncludes());
346
347  /**
348   * An object used to protect the initialization of the underlying broker
349   * session of this ReplicationDomain.
350   */
351  private final Object sessionLock = new Object();
352
353  /**
354   * The generationId for this replication domain. It is made of a hash of the
355   * 1000 first entries for this domain.
356   */
357  protected volatile long generationId;
358
359  /**
360   * Returns the {@link CSNGenerator} that will be used to
361   * generate {@link CSN} for this domain.
362   *
363   * @return The {@link CSNGenerator} that will be used to
364   *         generate {@link CSN} for this domain.
365   */
366  public CSNGenerator getGenerator()
367  {
368    return generator;
369  }
370
371  /**
372   * Creates a ReplicationDomain with the provided parameters.
373   *
374   * @param config
375   *          The configuration object for this ReplicationDomain
376   * @param generationId
377   *          the generation of this ReplicationDomain
378   */
379  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
380  {
381    this(config, generationId, new ServerState());
382  }
383
384  /**
385   * Creates a ReplicationDomain with the provided parameters. (for unit test
386   * purpose only)
387   *
388   * @param config
389   *          The configuration object for this ReplicationDomain
390   * @param generationId
391   *          the generation of this ReplicationDomain
392   * @param serverState
393   *          The serverState to use
394   */
395  public ReplicationDomain(ReplicationDomainCfg config, long generationId,
396      ServerState serverState)
397  {
398    this.config = config;
399    this.assuredConfig = config;
400    this.generationId = generationId;
401    this.state = serverState;
402    this.generator = new CSNGenerator(getServerId(), state);
403  }
404
405  /**
406   * Set the initial status of the domain and perform necessary initializations.
407   * This method will be called by the Broker each time the ReplicationBroker
408   * establish a new session to a Replication Server.
409   *
410   * Implementations may override this method when they need to perform
411   * additional computing after session establishment.
412   * The default implementation should be sufficient for ReplicationDomains
413   * that don't need to perform additional computing.
414   *
415   * @param initStatus              The status to enter the state machine with.
416   * @param rsState                 The ServerState of the ReplicationServer
417   *                                with which the session was established.
418   */
419  public void sessionInitiated(ServerStatus initStatus, ServerState rsState)
420  {
421    // Sanity check: is it a valid initial status?
422    if (!isValidInitialStatus(initStatus))
423    {
424      logger.error(ERR_DS_INVALID_INIT_STATUS, initStatus, getBaseDN(), getServerId());
425    }
426    else
427    {
428      status = initStatus;
429    }
430    generator.adjust(state);
431    generator.adjust(rsState);
432  }
433
434  /**
435   * Processes an incoming ChangeStatusMsg. Compute new status according to
436   * given order. Then update domain for being compliant with new status
437   * definition.
438   * @param csMsg The received status message
439   */
440  private void receiveChangeStatus(ChangeStatusMsg csMsg)
441  {
442    if (logger.isTraceEnabled())
443    {
444      logger.trace("Replication domain " + getBaseDN() +
445        " received change status message:\n" + csMsg);
446    }
447
448    ServerStatus reqStatus = csMsg.getRequestedStatus();
449
450    // Translate requested status to a state machine event
451    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
452    if (event == StatusMachineEvent.INVALID_EVENT)
453    {
454      logger.error(ERR_DS_INVALID_REQUESTED_STATUS, reqStatus, getBaseDN(), getServerId());
455      return;
456    }
457
458    // Set the new status to the requested one
459    setNewStatus(event);
460  }
461
462  /**
463   * Called when first connection or disconnection detected.
464   */
465  void toNotConnectedStatus()
466  {
467    // Go into not connected status
468    setNewStatus(StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT);
469  }
470
471  /**
472   * Perform whatever actions are needed to apply properties for being
473   * compliant with new status. Must be called in synchronized section for
474   * status. The new status is already set in status variable.
475   */
476  private void updateDomainForNewStatus()
477  {
478    switch (status)
479    {
480      case FULL_UPDATE_STATUS:
481        // Signal RS we just entered the full update status
482        broker.signalStatusChange(status);
483        break;
484      case NOT_CONNECTED_STATUS:
485      case NORMAL_STATUS:
486      case DEGRADED_STATUS:
487      case BAD_GEN_ID_STATUS:
488        break;
489      default:
490        if (logger.isTraceEnabled())
491        {
492          logger.trace("updateDomainForNewStatus: unexpected status: " + status);
493        }
494    }
495  }
496
497  /**
498   * Gets the status for this domain.
499   * @return The status for this domain.
500   */
501  public ServerStatus getStatus()
502  {
503    return status;
504  }
505
506  /**
507   * Returns the base DN of this ReplicationDomain. All Replication Domain using
508   * this baseDN will be connected through the Replication Service.
509   *
510   * @return The base DN of this ReplicationDomain
511   */
512  public DN getBaseDN()
513  {
514    return config.getBaseDN();
515  }
516
517  /**
518   * Get the server ID. The identifier of this Replication Domain inside the
519   * Replication Service. Each Domain must use a unique ServerID.
520   *
521   * @return The server ID.
522   */
523  public int getServerId()
524  {
525    return config.getServerId();
526  }
527
528  /**
529   * Window size used during initialization .. between - the
530   * initializer/exporter DS that listens/waits acknowledges and that slows down
531   * data msg publishing based on the slowest server - and each
532   * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
533   * received.
534   *
535   * @return the initWindow
536   */
537  protected int getInitWindow()
538  {
539    return config.getInitializationWindowSize();
540  }
541
542  /**
543   * Tells if assured replication is enabled for this domain.
544   * @return True if assured replication is enabled for this domain.
545   */
546  public boolean isAssured()
547  {
548    return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
549        || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
550  }
551
552  /**
553   * Gives the mode for the assured replication of the domain. Only used when
554   * assured is true).
555   *
556   * @return The mode for the assured replication of the domain.
557   */
558  public AssuredMode getAssuredMode()
559  {
560    switch (assuredConfig.getAssuredType())
561    {
562    case SAFE_DATA:
563    case NOT_ASSURED: // The assured mode will be ignored in that case anyway
564      return AssuredMode.SAFE_DATA_MODE;
565    case SAFE_READ:
566      return AssuredMode.SAFE_READ_MODE;
567    }
568    return null; // should never happen
569  }
570
571  /**
572   * Gives the assured Safe Data level of the replication of the domain. (used
573   * when assuredMode is SAFE_DATA).
574   *
575   * @return The assured level of the replication of the domain.
576   */
577  public byte getAssuredSdLevel()
578  {
579    return (byte) assuredConfig.getAssuredSdLevel();
580  }
581
582  /**
583   * Gives the assured timeout of the replication of the domain (in ms).
584   * @return The assured timeout of the replication of the domain.
585   */
586  public long getAssuredTimeout()
587  {
588    return assuredConfig.getAssuredTimeout();
589  }
590
591  /**
592   * Gets the group id for this domain.
593   * @return The group id for this domain.
594   */
595  public byte getGroupId()
596  {
597    return (byte) config.getGroupId();
598  }
599
600  /**
601   * Gets the referrals URLs this domain publishes. Referrals urls to be
602   * published to other servers of the topology.
603   * <p>
604   * TODO: fill that with all currently opened urls if no urls configured
605   *
606   * @return The referrals URLs this domain publishes.
607   */
608  public Set<String> getRefUrls()
609  {
610    return config.getReferralsUrl();
611  }
612
613  /**
614   * Gets the info for Replicas in the topology (except us).
615   * @return The info for Replicas in the topology (except us)
616   */
617  public Map<Integer, DSInfo> getReplicaInfos()
618  {
619    return broker.getReplicaInfos();
620  }
621
622  /**
623   * Returns information about the DS server related to the provided serverId.
624   * based on the TopologyMsg we received when the remote replica connected or
625   * disconnected. Return null when no server with the provided serverId is
626   * connected.
627   *
628   * @param  dsId The provided serverId of the remote replica
629   * @return the info related to this remote server if it is connected,
630   *                  null is the server is NOT connected.
631   */
632  private DSInfo getConnectedRemoteDS(int dsId)
633  {
634    return getReplicaInfos().get(dsId);
635  }
636
637  /**
638   * Gets the States of all the Replicas currently in the
639   * Topology.
640   * When this method is called, a Monitoring message will be sent
641   * to the Replication Server to which this domain is currently connected
642   * so that it computes a table containing information about
643   * all Directory Servers in the topology.
644   * This Computation involves communications will all the servers
645   * currently connected and
646   *
647   * @return The States of all Replicas in the topology (except us)
648   */
649  public Map<Integer, ServerState> getReplicaStates()
650  {
651    return broker.getReplicaStates();
652  }
653
654  /**
655   * Gets the info for RSs in the topology (except the one we are connected
656   * to).
657   * @return The info for RSs in the topology (except the one we are connected
658   * to)
659   */
660  public List<RSInfo> getRsInfos()
661  {
662    return broker.getRsInfos();
663  }
664
665
666  /**
667   * Gets the server ID of the Replication Server to which the domain
668   * is currently connected.
669   *
670   * @return The server ID of the Replication Server to which the domain
671   *         is currently connected.
672   */
673  public int getRsServerId()
674  {
675    return broker.getRsServerId();
676  }
677
678  /**
679   * Increment the number of processed updates.
680   */
681  private void incProcessedUpdates()
682  {
683    numProcessedUpdates.incrementAndGet();
684  }
685
686  /**
687   * Get the number of updates replayed by the replication.
688   *
689   * @return The number of updates replayed by the replication
690   */
691  int getNumProcessedUpdates()
692  {
693    if (numProcessedUpdates != null)
694    {
695      return numProcessedUpdates.get();
696    }
697    return 0;
698  }
699
700  /**
701   * Get the number of updates received by the replication plugin.
702   *
703   * @return the number of updates received
704   */
705  int getNumRcvdUpdates()
706  {
707    if (numRcvdUpdates != null)
708    {
709      return numRcvdUpdates.get();
710    }
711    return 0;
712  }
713
714  /**
715   * Get the number of updates sent by the replication plugin.
716   *
717   * @return the number of updates sent
718   */
719  int getNumSentUpdates()
720  {
721    if (numSentUpdates != null)
722    {
723      return numSentUpdates.get();
724    }
725    return 0;
726  }
727
728  /**
729   * Receives an update message from the replicationServer.
730   * The other types of messages are processed in an opaque way for the caller.
731   * Also responsible for updating the list of pending changes
732   * @return the received message - null if none
733   */
734  private UpdateMsg receive()
735  {
736    UpdateMsg update = null;
737
738    while (update == null)
739    {
740      InitializeRequestMsg initReqMsg = null;
741      ReplicationMsg msg;
742      try
743      {
744        msg = broker.receive(true, true, false);
745        if (msg == null)
746        {
747          // The server is in the shutdown process
748          return null;
749        }
750
751        if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg))
752        {
753          logger.trace("LocalizableMessage received <" + msg + ">");
754        }
755
756        if (msg instanceof AckMsg)
757        {
758          AckMsg ack = (AckMsg) msg;
759          receiveAck(ack);
760        }
761        else if (msg instanceof InitializeRequestMsg)
762        {
763          // Another server requests us to provide entries
764          // for a total update
765          initReqMsg = (InitializeRequestMsg)msg;
766        }
767        else if (msg instanceof InitializeTargetMsg)
768        {
769          // Another server is exporting its entries to us
770          InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
771
772          /*
773          This must be done while we are still holding the broker lock
774          because we are now going to receive a bunch of entries from the
775          remote server and we want the import thread to catch them and
776          not the ListenerThread.
777          */
778          initialize(initTargetMsg, initTargetMsg.getSenderID());
779        }
780        else if (msg instanceof ErrorMsg)
781        {
782          ErrorMsg errorMsg = (ErrorMsg)msg;
783          ImportExportContext ieCtx = importExportContext.get();
784          if (ieCtx != null)
785          {
786            /*
787            This is an error termination for the 2 following cases :
788            - either during an export
789            - or before an import really started
790            For example, when we publish a request and the
791            replicationServer did not find the import source.
792
793            A remote error during the import will be received in the
794            receiveEntryBytes() method.
795            */
796            if (logger.isTraceEnabled())
797            {
798              logger.trace(
799                  "[IE] processErrorMsg:" + getServerId() +
800                  " baseDN: " + getBaseDN() +
801                  " Error Msg received: " + errorMsg);
802            }
803
804            if (errorMsg.getCreationTime() > ieCtx.startTime)
805            {
806              // consider only ErrorMsg that relate to the current import/export
807              processErrorMsg(errorMsg, ieCtx);
808            }
809            else
810            {
811              /*
812              Simply log - happen when the ErrorMsg relates to a previous
813              attempt of initialization while we have started a new one
814              on this side.
815              */
816              logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
817            }
818          }
819          else
820          {
821            // Simply log - happen if import/export has been terminated
822            // on our side before receiving this ErrorMsg.
823            logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails());
824          }
825        }
826        else if (msg instanceof ChangeStatusMsg)
827        {
828          ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
829          receiveChangeStatus(csMsg);
830        }
831        else if (msg instanceof UpdateMsg)
832        {
833          update = (UpdateMsg) msg;
834          generator.adjust(update.getCSN());
835        }
836        else if (msg instanceof InitializeRcvAckMsg)
837        {
838          ImportExportContext ieCtx = importExportContext.get();
839          if (ieCtx != null)
840          {
841            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
842            ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck());
843          }
844          // Trash this msg When no input/export is running/should never happen
845        }
846      }
847      catch (SocketTimeoutException e)
848      {
849        // just retry
850      }
851      /*
852      Test if we have received and export request message and
853      if that's the case handle it now.
854      This must be done outside of the portion of code protected
855      by the broker lock so that we keep receiving update
856      when we are doing and export and so that a possible
857      closure of the socket happening when we are publishing the
858      entries to the remote can be handled by the other
859      replay thread when they call this method and therefore the
860      broker.receive() method.
861      */
862      if (initReqMsg != null)
863      {
864        // Do this work in a thread to allow replay thread continue working
865        ExportThread exportThread = new ExportThread(
866            initReqMsg.getSenderID(), initReqMsg.getInitWindow());
867        exportThread.start();
868      }
869    }
870
871    numRcvdUpdates.incrementAndGet();
872    if (update.isAssured()
873        && broker.getRsGroupId() == getGroupId()
874        && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
875    {
876      assuredSrReceivedUpdates.incrementAndGet();
877    }
878    return update;
879  }
880
881  /**
882   * Updates the passed monitoring list of errors received for assured messages
883   * (safe data or safe read, depending of the passed list to update) for a
884   * particular server in the list. This increments the counter of error for the
885   * passed server, or creates an initial value of 1 error for it if the server
886   * is not yet present in the map.
887   * @param errorsByServer map of number of errors per serverID
888   * @param sid the ID of the server which produced an error
889   */
890  private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
891    Integer sid)
892  {
893    synchronized (errorsByServer)
894    {
895      Integer serverErrCount = errorsByServer.get(sid);
896      if (serverErrCount == null)
897      {
898        // Server not present in list, create an entry with an
899        // initial number of errors set to 1
900        errorsByServer.put(sid, 1);
901      } else
902      {
903        // Server already present in list, just increment number of
904        // errors for the server
905        int val = serverErrCount;
906        val++;
907        errorsByServer.put(sid, val);
908      }
909    }
910  }
911
912  /**
913   * Do the necessary processing when an AckMsg is received.
914   *
915   * @param ack The AckMsg that was received.
916   */
917  private void receiveAck(AckMsg ack)
918  {
919    CSN csn = ack.getCSN();
920
921    // Remove the message for pending ack list (this may already make the thread
922    // that is waiting for the ack be aware of its reception)
923    UpdateMsg update = waitingAckMsgs.remove(csn);
924
925    // Signal waiting thread ack has been received
926    if (update != null)
927    {
928      synchronized (update)
929      {
930        update.notify();
931      }
932
933      // Analyze status of embedded in the ack to see if everything went well
934      boolean hasTimeout = ack.hasTimeout();
935      boolean hasReplayErrors = ack.hasReplayError();
936      boolean hasWrongStatus = ack.hasWrongStatus();
937
938      AssuredMode updateAssuredMode = update.getAssuredMode();
939
940      if ( hasTimeout || hasReplayErrors || hasWrongStatus)
941      {
942        /*
943        Some problems detected: message did not correctly reach every
944        requested servers. Log problem
945        */
946        logger.info(NOTE_DS_RECEIVED_ACK_ERROR, getBaseDN(), getServerId(), update, ack.errorsToString());
947
948        List<Integer> failedServers = ack.getFailedServers();
949
950        // Increment assured replication monitoring counters
951        switch (updateAssuredMode)
952        {
953          case SAFE_READ_MODE:
954            assuredSrNotAcknowledgedUpdates.incrementAndGet();
955            if (hasTimeout)
956            {
957              assuredSrTimeoutUpdates.incrementAndGet();
958            }
959            if (hasReplayErrors)
960            {
961              assuredSrReplayErrorUpdates.incrementAndGet();
962            }
963            if (hasWrongStatus)
964            {
965              assuredSrWrongStatusUpdates.incrementAndGet();
966            }
967            if (failedServers != null) // This should always be the case !
968            {
969              for(Integer sid : failedServers)
970              {
971                updateAssuredErrorsByServer(
972                  assuredSrServerNotAcknowledgedUpdates, sid);
973              }
974            }
975            break;
976          case SAFE_DATA_MODE:
977            // The only possible cause of ack error in safe data mode is timeout
978            if (hasTimeout) // So should always be the case
979            {
980              assuredSdTimeoutUpdates.incrementAndGet();
981            }
982            if (failedServers != null) // This should always be the case !
983            {
984              for(Integer sid : failedServers)
985              {
986                updateAssuredErrorsByServer(
987                  assuredSdServerTimeoutUpdates, sid);
988              }
989            }
990            break;
991          default:
992          // Should not happen
993        }
994      } else
995      {
996        // Update has been acknowledged without errors
997        // Increment assured replication monitoring counters
998        switch (updateAssuredMode)
999        {
1000          case SAFE_READ_MODE:
1001            assuredSrAcknowledgedUpdates.incrementAndGet();
1002            break;
1003          case SAFE_DATA_MODE:
1004            assuredSdAcknowledgedUpdates.incrementAndGet();
1005            break;
1006          default:
1007          // Should not happen
1008        }
1009      }
1010    }
1011  }
1012
1013
1014  /*
1015   * After this point the code is related to the Total Update.
1016   */
1017
1018  /**
1019   * This thread is launched when we want to export data to another server.
1020   *
1021   * When a task is created locally (so this local server is the initiator)
1022   * of the export (Example: dsreplication initialize-all),
1023   * this thread is NOT used but the task thread is running the export instead).
1024   */
1025  private class ExportThread extends DirectoryThread
1026  {
1027    /** Id of server that will be initialized. */
1028    private final int serverIdToInitialize;
1029    private final int initWindow;
1030
1031
1032
1033    /**
1034     * Constructor for the ExportThread.
1035     *
1036     * @param serverIdToInitialize
1037     *          serverId of server that will receive entries
1038     * @param initWindow
1039     *          The value of the initialization window for flow control between
1040     *          the importer and the exporter.
1041     */
1042    public ExportThread(int serverIdToInitialize, int initWindow)
1043    {
1044      super("Export thread from serverId=" + getServerId() + " to serverId="
1045          + serverIdToInitialize);
1046      this.serverIdToInitialize = serverIdToInitialize;
1047      this.initWindow = initWindow;
1048    }
1049
1050
1051
1052    /**
1053     * Run method for this class.
1054     */
1055    @Override
1056    public void run()
1057    {
1058      if (logger.isTraceEnabled())
1059      {
1060        logger.trace("[IE] starting " + getName());
1061      }
1062      try
1063      {
1064        initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
1065            initWindow);
1066      } catch (DirectoryException de)
1067      {
1068        /*
1069        An error message has been sent to the peer
1070        This server is not the initiator of the export so there is
1071        nothing more to do locally.
1072        */
1073      }
1074
1075      if (logger.isTraceEnabled())
1076      {
1077        logger.trace("[IE] ending " + getName());
1078      }
1079    }
1080  }
1081
1082  /**
1083   * This class contains the context related to an import or export launched on
1084   * the domain.
1085   */
1086  protected class ImportExportContext
1087  {
1088    /** The private task that initiated the operation. */
1089    private Task initializeTask;
1090    /** The destination in the case of an export. */
1091    private int exportTarget = RoutableMsg.UNKNOWN_SERVER;
1092    /** The source in the case of an import. */
1093    private int importSource = RoutableMsg.UNKNOWN_SERVER;
1094
1095    /** The total entry count expected to be processed. */
1096    private long entryCount;
1097    /** The count for the entry not yet processed. */
1098    private long entryLeftCount;
1099
1100    /** Exception raised during the initialization. */
1101    private DirectoryException exception;
1102
1103    /** Whether the context is related to an import or an export. */
1104    private final boolean importInProgress;
1105
1106    /** Current counter of messages exchanged during the initialization. */
1107    private int msgCnt;
1108
1109    /**
1110     * Number of connections lost when we start the initialization. Will help
1111     * counting connections lost during initialization,
1112     */
1113    private int initNumLostConnections;
1114
1115    /**
1116     * Request message sent when this server has the initializeFromRemote task.
1117     */
1118    private InitializeRequestMsg initReqMsgSent;
1119
1120    /**
1121     * Start time of the initialization process. ErrorMsg timestamped before
1122     * this startTime will be ignored.
1123     */
1124    private final long startTime;
1125
1126    /** List for replicas (DS) connected to the topology when initialization started. */
1127    private final Set<Integer> startList = new HashSet<>(0);
1128
1129    /**
1130     * List for replicas (DS) with a failure (disconnected from the topology)
1131     * since the initialization started.
1132     */
1133    private final Set<Integer> failureList = new HashSet<>(0);
1134
1135    /**
1136     * Flow control during initialization: for each remote server, counter of
1137     * messages received.
1138     */
1139    private final Map<Integer, Integer> ackVals = new HashMap<>();
1140    /** ServerId of the slowest server (the one with the smallest non null counter). */
1141    private int slowestServerId = -1;
1142
1143    private short exporterProtocolVersion = -1;
1144
1145    /** Window used during this initialization. */
1146    private int initWindow;
1147
1148    /** Number of attempt already done for this initialization. */
1149    private short attemptCnt;
1150
1151    /**
1152     * Creates a new IEContext.
1153     *
1154     * @param importInProgress true if the IEContext will be used
1155     *                         for and import, false if the IEContext
1156     *                         will be used for and export.
1157     */
1158    private ImportExportContext(boolean importInProgress)
1159    {
1160      this.importInProgress = importInProgress;
1161      this.startTime = System.currentTimeMillis();
1162      this.attemptCnt = 0;
1163    }
1164
1165    /**
1166     * Returns a boolean indicating if a total update import is currently in
1167     * Progress.
1168     *
1169     * @return A boolean indicating if a total update import is currently in
1170     *         Progress.
1171     */
1172    boolean importInProgress()
1173    {
1174      return importInProgress;
1175    }
1176
1177    /**
1178     * Returns the total number of entries to be processed when a total update
1179     * is in progress.
1180     *
1181     * @return The total number of entries to be processed when a total update
1182     *         is in progress.
1183     */
1184    long getTotalEntryCount()
1185    {
1186      return entryCount;
1187    }
1188
1189    /**
1190     * Returns the number of entries still to be processed when a total update
1191     * is in progress.
1192     *
1193     * @return The number of entries still to be processed when a total update
1194     *         is in progress.
1195     */
1196    long getLeftEntryCount()
1197    {
1198      return entryLeftCount;
1199    }
1200
1201    /**
1202     * Initializes the import/export counters with the provider value.
1203     * @param total Total number of entries to be processed.
1204     * @throws DirectoryException if an error occurred.
1205     */
1206    private void initializeCounters(long total) throws DirectoryException
1207    {
1208      entryCount = total;
1209      entryLeftCount = total;
1210
1211      if (initializeTask instanceof InitializeTask)
1212      {
1213        final InitializeTask task = (InitializeTask) initializeTask;
1214        task.setTotal(entryCount);
1215        task.setLeft(entryCount);
1216      }
1217      else if (initializeTask instanceof InitializeTargetTask)
1218      {
1219        final InitializeTargetTask task = (InitializeTargetTask) initializeTask;
1220        task.setTotal(entryCount);
1221        task.setLeft(entryCount);
1222      }
1223    }
1224
1225    /**
1226     * Update the counters of the task for each entry processed during
1227     * an import or export.
1228     *
1229     * @param  entriesDone The number of entries that were processed
1230     *                     since the last time this method was called.
1231     *
1232     * @throws DirectoryException if an error occurred.
1233     */
1234    private void updateCounters(int entriesDone) throws DirectoryException
1235    {
1236      entryLeftCount -= entriesDone;
1237
1238      if (initializeTask != null)
1239      {
1240        if (initializeTask instanceof InitializeTask)
1241        {
1242          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
1243        }
1244        else if (initializeTask instanceof InitializeTargetTask)
1245        {
1246          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
1247        }
1248      }
1249    }
1250
1251    /** {@inheritDoc} */
1252    @Override
1253    public String toString()
1254    {
1255      return "[Entry count=" + this.entryCount +
1256             ", Entry left count=" + this.entryLeftCount + "]";
1257    }
1258
1259    /**
1260     * Gets the server id of the exporting server.
1261     * @return the server id of the exporting server.
1262     */
1263    public int getExportTarget()
1264    {
1265      return exportTarget;
1266    }
1267
1268    /**
1269     * Gets the server id of the importing server.
1270     * @return the server id of the importing server.
1271     */
1272    public int getImportSource()
1273    {
1274      return importSource;
1275    }
1276
1277    /**
1278     * Get the exception that occurred during the import/export.
1279     * @return the exception that occurred during the import/export.
1280     */
1281    public DirectoryException getException()
1282    {
1283      return exception;
1284    }
1285
1286    /**
1287     * Set the exception that occurred during the import/export.
1288     * @param exception the exception that occurred during the import/export.
1289     */
1290    public void setException(DirectoryException exception)
1291    {
1292      this.exception = exception;
1293    }
1294
1295    /**
1296     * Only sets the exception that occurred during the import/export if none
1297     * was already set on this object.
1298     *
1299     * @param exception the exception that occurred during the import/export.
1300     */
1301    public void setExceptionIfNoneSet(DirectoryException exception)
1302    {
1303      if (exception == null)
1304      {
1305        this.exception = exception;
1306      }
1307    }
1308
1309    /**
1310     * Set the id of the EntryMsg acknowledged from a receiver (importer)server.
1311     * (updated via the listener thread)
1312     * @param serverId serverId of the acknowledger/receiver/importer server.
1313     * @param numAck   id of the message received.
1314     */
1315    private void setAckVal(int serverId, int numAck)
1316    {
1317      if (logger.isTraceEnabled())
1318      {
1319        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
1320      }
1321
1322      this.ackVals.put(serverId, numAck);
1323
1324      // Recompute the server with the minAck returned,means the slowest server.
1325      slowestServerId = serverId;
1326      for (Integer sid : importExportContext.get().ackVals.keySet())
1327      {
1328        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
1329        {
1330          slowestServerId = sid;
1331        }
1332      }
1333    }
1334
1335    /**
1336     * Returns the serverId of the server that acknowledged the smallest
1337     * EntryMsg id.
1338     * @return serverId of the server with latest acknowledge.
1339     *                  0 when no ack has been received yet.
1340     */
1341    public int getSlowestServer()
1342    {
1343      if (logger.isTraceEnabled())
1344      {
1345        logger.trace("[IE] getSlowestServer" + slowestServerId
1346            + " " + this.ackVals.get(slowestServerId));
1347      }
1348
1349      return this.slowestServerId;
1350    }
1351
1352  }
1353
1354  /**
1355   * Verifies that the given string represents a valid source
1356   * from which this server can be initialized.
1357   *
1358   * @param targetString The string representing the source
1359   * @return The source as a integer value
1360   * @throws DirectoryException if the string is not valid
1361   */
1362  public int decodeTarget(String targetString) throws DirectoryException
1363  {
1364    if ("all".equalsIgnoreCase(targetString))
1365    {
1366      return RoutableMsg.ALL_SERVERS;
1367    }
1368
1369    // So should be a serverID
1370    try
1371    {
1372      int target = Integer.decode(targetString);
1373      if (target >= 0)
1374      {
1375        // FIXME Could we check now that it is a know server in the domain ?
1376        // JNR: Yes please
1377      }
1378      return target;
1379    }
1380    catch (Exception e)
1381    {
1382      ResultCode resultCode = ResultCode.OTHER;
1383      LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get();
1384      throw new DirectoryException(resultCode, message, e);
1385    }
1386  }
1387
1388  /**
1389   * Initializes a remote server from this server.
1390   * <p>
1391   * The {@code exportBackend(OutputStream)} will therefore be called
1392   * on this server, and the {@code importBackend(InputStream)}
1393   * will be called on the remote server.
1394   * <p>
1395   * The InputStream and OutputStream given as a parameter to those
1396   * methods will be connected through the replication protocol.
1397   *
1398   * @param target   The server-id of the server that should be initialized.
1399   *                 The target can be discovered using the
1400   *                 {@link #getReplicaInfos()} method.
1401   * @param initTask The task that triggers this initialization and that should
1402   *                 be updated with its progress.
1403   *
1404   * @throws DirectoryException If it was not possible to publish the
1405   *                            Initialization message to the Topology.
1406   */
1407  public void initializeRemote(int target, Task initTask)
1408  throws DirectoryException
1409  {
1410    initializeRemote(target, getServerId(), initTask, getInitWindow());
1411  }
1412
1413  /**
1414   * Process the initialization of some other server or servers in the topology
1415   * specified by the target argument when this initialization specifying the
1416   * server that requests the initialization.
1417   *
1418   * @param serverToInitialize The target server that should be initialized.
1419   * @param serverRunningTheTask The server that initiated the export. It can
1420   * be the serverID of this server, or the serverID of a remote server.
1421   * @param initTask The task in this server that triggers this initialization
1422   * and that should be updated with its progress. Null when the export is done
1423   * following a request coming from a remote server (task is remote).
1424   * @param initWindow The value of the initialization window for flow control
1425   * between the importer and the exporter.
1426   *
1427   * @exception DirectoryException When an error occurs. No exception raised
1428   * means success.
1429   */
1430  protected void initializeRemote(int serverToInitialize,
1431      int serverRunningTheTask, Task initTask, int initWindow)
1432  throws DirectoryException
1433  {
1434    final ImportExportContext ieCtx = acquireIEContext(false);
1435
1436    /*
1437    We manage the list of servers to initialize in order :
1438    - to test at the end that all expected servers have reconnected
1439    after their import and with the right genId
1440    - to update the task with the server(s) where this test failed
1441    */
1442
1443    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
1444    {
1445      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
1446          countEntries(), getBaseDN(), getServerId());
1447
1448      ieCtx.startList.addAll(getReplicaInfos().keySet());
1449
1450      // We manage the list of servers with which a flow control can be enabled
1451      for (DSInfo dsi : getReplicaInfos().values())
1452      {
1453        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
1454        {
1455          ieCtx.setAckVal(dsi.getDsId(), 0);
1456        }
1457      }
1458    }
1459    else
1460    {
1461      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(),
1462          getBaseDN(), getServerId(), serverToInitialize);
1463
1464      ieCtx.startList.add(serverToInitialize);
1465
1466      // We manage the list of servers with which a flow control can be enabled
1467      for (DSInfo dsi : getReplicaInfos().values())
1468      {
1469        if (dsi.getDsId() == serverToInitialize &&
1470            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
1471        {
1472          ieCtx.setAckVal(dsi.getDsId(), 0);
1473        }
1474      }
1475    }
1476
1477    DirectoryException exportRootException = null;
1478
1479    // loop for the case where the exporter is the initiator
1480    int attempt = 0;
1481    boolean done = false;
1482    while (!done && ++attempt < 2) // attempt loop
1483    {
1484      try
1485      {
1486        ieCtx.exportTarget = serverToInitialize;
1487        if (initTask != null)
1488        {
1489          ieCtx.initializeTask = initTask;
1490        }
1491        ieCtx.initializeCounters(countEntries());
1492        ieCtx.msgCnt = 0;
1493        ieCtx.initNumLostConnections = broker.getNumLostConnections();
1494        ieCtx.initWindow = initWindow;
1495
1496        // Send start message to the peer
1497        InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
1498            getBaseDN(), getServerId(), serverToInitialize,
1499            serverRunningTheTask, ieCtx.entryCount, initWindow);
1500
1501        broker.publish(initTargetMsg);
1502
1503        // Wait for all servers to be ok
1504        waitForRemoteStartOfInit(ieCtx);
1505
1506        // Servers that left in the list are those for which we could not test
1507        // that they have been successfully initialized.
1508        if (!ieCtx.failureList.isEmpty())
1509        {
1510          throw new DirectoryException(
1511              ResultCode.OTHER,
1512              ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDN(), ieCtx.failureList));
1513        }
1514
1515        exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
1516
1517        // Notify the peer of the success
1518        broker.publish(
1519            new DoneMsg(getServerId(), initTargetMsg.getDestination()));
1520      }
1521      catch(DirectoryException exportException)
1522      {
1523        // Give priority to the first exception raised - stored in the context
1524        final DirectoryException ieEx = ieCtx.exception;
1525        exportRootException = ieEx != null ? ieEx : exportException;
1526      }
1527
1528      if (logger.isTraceEnabled())
1529      {
1530        logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
1531            + " export ends with connected=" + broker.isConnected()
1532            + " exportRootException=" + exportRootException);
1533      }
1534
1535      if (exportRootException != null)
1536      {
1537        try
1538        {
1539          /*
1540          Handling the errors during export
1541
1542          Note: we could have lost the connection and another thread
1543          the listener one) has already managed to reconnect.
1544          So we MUST rely on the test broker.isConnected()
1545          ONLY to do 'wait to be reconnected by another thread'
1546          (if not yet reconnected already).
1547          */
1548          if (!broker.isConnected())
1549          {
1550            // We are still disconnected, so we wait for the listener thread
1551            // to reconnect - wait 10s
1552            if (logger.isTraceEnabled())
1553            {
1554              logger.trace("[IE] Exporter wait for reconnection by the listener thread");
1555            }
1556            int att=0;
1557            while (!broker.shuttingDown()
1558                && !broker.isConnected()
1559                && ++att < 100)
1560            {
1561              try { Thread.sleep(100); }
1562              catch(Exception e){ /* do nothing */ }
1563            }
1564          }
1565
1566          if (initTask != null
1567              && broker.isConnected()
1568              && serverToInitialize != RoutableMsg.ALL_SERVERS)
1569          {
1570            /*
1571            NewAttempt case : In the case where
1572            - it's not an InitializeAll
1573            - AND the previous export attempt failed
1574            - AND we are (now) connected
1575            - and we own the task and this task is not an InitializeAll
1576            Let's :
1577            - sleep to let time to the other peer to reconnect if needed
1578            - and launch another attempt
1579            */
1580            try { Thread.sleep(1000); }
1581            catch(Exception e){ /* do nothing */ }
1582
1583            logger.info(NOTE_RESENDING_INIT_TARGET, exportRootException.getLocalizedMessage());
1584            continue;
1585          }
1586
1587          broker.publish(new ErrorMsg(
1588              serverToInitialize, exportRootException.getMessageObject()));
1589        }
1590        catch(Exception e)
1591        {
1592          // Ignore the failure raised while proceeding the root failure
1593        }
1594      }
1595
1596      // We are always done for this export ...
1597      // ... except in the NewAttempt case (see above)
1598      done = true;
1599
1600    } // attempt loop
1601
1602    // Wait for all servers to be ok, and build the failure list
1603    waitForRemoteEndOfInit(ieCtx);
1604
1605    // Servers that left in the list are those for which we could not test
1606    // that they have been successfully initialized.
1607    if (!ieCtx.failureList.isEmpty() && exportRootException == null)
1608    {
1609      exportRootException = new DirectoryException(ResultCode.OTHER,
1610              ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(getGenerationID(), ieCtx.failureList));
1611    }
1612
1613    // Don't forget to release IEcontext acquired at beginning.
1614    releaseIEContext(); // FIXME should not this be in a finally?
1615
1616    final String cause = exportRootException == null ? ""
1617        : exportRootException.getLocalizedMessage();
1618    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
1619    {
1620      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL,
1621          getBaseDN(), getServerId(), cause);
1622    }
1623    else
1624    {
1625      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END,
1626          getBaseDN(), getServerId(), serverToInitialize, cause);
1627    }
1628
1629
1630    if (exportRootException != null)
1631    {
1632      throw exportRootException;
1633    }
1634  }
1635
1636  /**
1637   * For all remote servers in the start list:
1638   * - wait it has finished the import and present the expected generationID,
1639   * - build the failureList.
1640   */
1641  private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
1642  {
1643    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
1644
1645    if (logger.isTraceEnabled())
1646    {
1647      logger.trace("[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
1648    }
1649
1650    int waitResultAttempt = 0;
1651    boolean done;
1652    do
1653    {
1654      done = true;
1655      for (DSInfo dsi : getReplicaInfos().values())
1656      {
1657        if (logger.isTraceEnabled())
1658        {
1659          logger.trace(
1660            "[IE] wait for start dsId " + dsi.getDsId()
1661            + " " + dsi.getStatus()
1662            + " " + dsi.getGenerationId()
1663            + " " + getGenerationID());
1664        }
1665        if (ieCtx.startList.contains(dsi.getDsId()))
1666        {
1667          if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS)
1668          {
1669            // this one is still not doing the Full Update ... retry later
1670            done = false;
1671            try { Thread.sleep(100);
1672            }
1673            catch (InterruptedException e) {
1674              Thread.currentThread().interrupt();
1675            }
1676            waitResultAttempt++;
1677            break;
1678          }
1679          else
1680          {
1681            // this one is ok
1682            replicasWeAreWaitingFor.remove(dsi.getDsId());
1683          }
1684        }
1685      }
1686    }
1687    while (!done && waitResultAttempt < 1200 && !broker.shuttingDown());
1688
1689    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
1690
1691    if (logger.isTraceEnabled())
1692    {
1693      logger.trace("[IE] wait for start ends with " + ieCtx.failureList);
1694    }
1695  }
1696
1697  /**
1698   * For all remote servers in the start list:
1699   * - wait it has finished the import and present the expected generationID,
1700   * - build the failureList.
1701   */
1702  private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
1703  {
1704    final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList);
1705
1706    if (logger.isTraceEnabled())
1707    {
1708      logger.trace("[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
1709    }
1710
1711    /*
1712    In case some new servers appear during the init, we want them to be
1713    considered in the processing of sorting the successfully initialized
1714    and the others
1715    */
1716    replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet());
1717
1718    boolean done;
1719    do
1720    {
1721      done = true;
1722      int reconnectMaxDelayInSec = 10;
1723      int reconnectWait = 0;
1724      Iterator<Integer> it = replicasWeAreWaitingFor.iterator();
1725      while (it.hasNext())
1726      {
1727        int serverId = it.next();
1728        if (ieCtx.failureList.contains(serverId))
1729        {
1730          /*
1731          this server has already been in error during initialization
1732          don't wait for it
1733          */
1734          continue;
1735        }
1736
1737        DSInfo dsInfo = getConnectedRemoteDS(serverId);
1738        if (dsInfo == null)
1739        {
1740          /*
1741          this server is disconnected
1742          may be for a long time if it crashed or had been stopped
1743          may be just the time to reconnect after import : should be short
1744          */
1745          if (++reconnectWait<reconnectMaxDelayInSec)
1746          {
1747            // let's still wait to give a chance to this server to reconnect
1748            done = false;
1749          }
1750          // Else we left enough time to the servers to reconnect
1751        }
1752        else
1753        {
1754          // this server is connected
1755          if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS)
1756          {
1757            // this one is still doing the Full Update ... retry later
1758            done = false;
1759            break;
1760          }
1761
1762          if (dsInfo.getGenerationId() == getGenerationID())
1763          { // and with the expected generationId
1764            // We're done with this server
1765            it.remove();
1766          }
1767        }
1768      }
1769
1770      // loop and wait
1771      if (!done)
1772      {
1773        try { Thread.sleep(1000); }
1774        catch (InterruptedException e) {
1775          Thread.currentThread().interrupt();
1776        } // 1sec
1777      }
1778    }
1779    while (!done && !broker.shuttingDown()); // infinite wait
1780
1781    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
1782
1783    if (logger.isTraceEnabled())
1784    {
1785      logger.trace("[IE] wait for end ends with " + ieCtx.failureList);
1786    }
1787  }
1788
1789  /**
1790   * Get the ServerState maintained by the Concrete class.
1791   *
1792   * @return the ServerState maintained by the Concrete class.
1793   */
1794  public ServerState getServerState()
1795  {
1796    return state;
1797  }
1798
1799  /**
1800   * Acquire and initialize the import/export context, verifying no other
1801   * import/export is in progress.
1802   */
1803  private ImportExportContext acquireIEContext(boolean importInProgress)
1804      throws DirectoryException
1805  {
1806    final ImportExportContext ieCtx = new ImportExportContext(importInProgress);
1807    if (!importExportContext.compareAndSet(null, ieCtx))
1808    {
1809      // Rejects 2 simultaneous exports
1810      LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
1811      throw new DirectoryException(ResultCode.OTHER, message);
1812    }
1813    return ieCtx;
1814  }
1815
1816  private void releaseIEContext()
1817  {
1818    importExportContext.set(null);
1819  }
1820
1821  /**
1822   * Processes an error message received while an export is
1823   * on going, or an import will start.
1824   *
1825   * @param errorMsg The error message received.
1826   */
1827  private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx)
1828  {
1829    //Exporting must not be stopped on the first error, if we run initialize-all
1830    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
1831    {
1832      // The ErrorMsg is received while we have started an initialization
1833      ieCtx.setExceptionIfNoneSet(new DirectoryException(
1834          ResultCode.OTHER, errorMsg.getDetails()));
1835
1836      /*
1837       * This can happen :
1838       * - on the first InitReqMsg sent when source in not known for example
1839       * - on the next attempt when source crashed and did not reconnect
1840       *   even after the nextInitAttemptDelay
1841       * During the import, the ErrorMsg will be received by receiveEntryBytes
1842       */
1843      if (ieCtx.initializeTask instanceof InitializeTask)
1844      {
1845        // Update the task that initiated the import
1846        ((InitializeTask) ieCtx.initializeTask)
1847            .updateTaskCompletionState(ieCtx.getException());
1848
1849        releaseIEContext();
1850      }
1851    }
1852  }
1853
1854  /**
1855   * Receives bytes related to an entry in the context of an import to
1856   * initialize the domain (called by ReplLDIFInputStream).
1857   *
1858   * @return The bytes. Null when the Done or Err message has been received
1859   */
1860  protected byte[] receiveEntryBytes()
1861  {
1862    ReplicationMsg msg;
1863    while (true)
1864    {
1865      ImportExportContext ieCtx = importExportContext.get();
1866      try
1867      {
1868        // In the context of the total update, we don't want any automatic
1869        // re-connection done transparently by the broker because of a better
1870        // RS or because of a connection failure.
1871        // We want to be notified of topology change in order to track a
1872        // potential disconnection of the exporter.
1873        msg = broker.receive(false, false, true);
1874
1875        if (logger.isTraceEnabled())
1876        {
1877          logger.trace("[IE] In "
1878              + broker.getReplicationMonitorInstanceName()
1879              + ", receiveEntryBytes " + msg);
1880        }
1881
1882        if (msg == null)
1883        {
1884          if (broker.shuttingDown())
1885          {
1886            // The server is in the shutdown process
1887            return null;
1888          }
1889          else
1890          {
1891            // Handle connection issues
1892            ieCtx.setExceptionIfNoneSet(new DirectoryException(
1893                ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT
1894                    .get(broker.getReplicationServer())));
1895            return null;
1896          }
1897        }
1898
1899        // Check good ordering of msg received
1900        if (msg instanceof EntryMsg)
1901        {
1902          EntryMsg entryMsg = (EntryMsg)msg;
1903          byte[] entryBytes = entryMsg.getEntryBytes();
1904          ieCtx.updateCounters(countEntryLimits(entryBytes));
1905
1906          if (ieCtx.exporterProtocolVersion >=
1907            ProtocolVersion.REPLICATION_PROTOCOL_V4)
1908          {
1909            // check the msgCnt of the msg received to check ordering
1910            if (++ieCtx.msgCnt != entryMsg.getMsgId())
1911            {
1912              ieCtx.setExceptionIfNoneSet(new DirectoryException(
1913                  ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(ieCtx.msgCnt, entryMsg.getMsgId())));
1914              return null;
1915            }
1916
1917            // send the ack of flow control mgmt
1918            if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0)
1919            {
1920              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
1921                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
1922              broker.publish(amsg, false);
1923              if (logger.isTraceEnabled())
1924              {
1925                logger.trace("[IE] In "
1926                    + broker.getReplicationMonitorInstanceName()
1927                    + ", publish InitializeRcvAckMsg" + amsg);
1928              }
1929            }
1930          }
1931          return entryBytes;
1932        }
1933        else if (msg instanceof DoneMsg)
1934        {
1935          /*
1936          This is the normal termination of the import
1937          No error is stored and the import is ended by returning null
1938          */
1939          return null;
1940        }
1941        else if (msg instanceof ErrorMsg)
1942        {
1943          /*
1944          This is an error termination during the import
1945          The error is stored and the import is ended by returning null
1946          */
1947          if (ieCtx.getException() == null)
1948          {
1949            ErrorMsg errMsg = (ErrorMsg)msg;
1950            if (errMsg.getCreationTime() > ieCtx.startTime)
1951            {
1952              ieCtx.setException(
1953                  new DirectoryException(ResultCode.OTHER,errMsg.getDetails()));
1954              return null;
1955            }
1956          }
1957        }
1958        else
1959        {
1960          // Other messages received during an import are trashed except
1961          // the topologyMsg.
1962          if (msg instanceof TopologyMsg
1963              && getConnectedRemoteDS(ieCtx.importSource) == null)
1964          {
1965            LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get(
1966                getBaseDN(), getServerId(), ieCtx.importSource);
1967            ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, errMsg));
1968            return null;
1969          }
1970        }
1971      }
1972      catch(Exception e)
1973      {
1974        ieCtx.setExceptionIfNoneSet(new DirectoryException(
1975            ResultCode.OTHER,
1976            ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
1977      }
1978    }
1979  }
1980
1981  /**
1982   * Count the number of entries in the provided byte[].
1983   * This is based on the hypothesis that the entries are separated
1984   * by a "\n\n" String.
1985   *
1986   * @param   entryBytes the set of bytes containing one or more entries.
1987   * @return  The number of entries in the provided byte[].
1988   */
1989  private int countEntryLimits(byte[] entryBytes)
1990  {
1991    return countEntryLimits(entryBytes, 0, entryBytes.length);
1992  }
1993
1994  /**
1995   * Count the number of entries in the provided byte[].
1996   * This is based on the hypothesis that the entries are separated
1997   * by a "\n\n" String.
1998   *
1999   * @param   entryBytes the set of bytes containing one or more entries.
2000   * @return  The number of entries in the provided byte[].
2001   */
2002  private int countEntryLimits(byte[] entryBytes, int pos, int length)
2003  {
2004    int entryCount = 0;
2005    int count = 0;
2006    while (count<=length-2)
2007    {
2008      if (entryBytes[pos+count] == '\n' && entryBytes[pos+count+1] == '\n')
2009      {
2010        entryCount++;
2011        count++;
2012      }
2013      count++;
2014    }
2015    return entryCount;
2016  }
2017
2018  /**
2019   * Exports an entry in LDIF format.
2020   *
2021   * @param lDIFEntry The entry to be exported in byte[] form.
2022   * @param pos       The starting Position in the array.
2023   * @param length    Number of array elements to be copied.
2024   *
2025   * @throws IOException when an error occurred.
2026   */
2027  void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
2028      throws IOException
2029  {
2030    if (logger.isTraceEnabled())
2031    {
2032      logger.trace("[IE] Entering exportLDIFEntry entry=" + Arrays.toString(lDIFEntry));
2033    }
2034
2035    // build the message
2036    ImportExportContext ieCtx = importExportContext.get();
2037    EntryMsg entryMessage = new EntryMsg(
2038        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
2039        ++ieCtx.msgCnt);
2040
2041    // Waiting the slowest loop
2042    while (!broker.shuttingDown())
2043    {
2044      /*
2045      If an error was raised - like receiving an ErrorMsg from a remote
2046      server that have been stored by the listener thread in the ieContext,
2047      we just abandon the export by throwing an exception.
2048      */
2049      if (ieCtx.getException() != null)
2050      {
2051        throw new IOException(ieCtx.getException().getMessage());
2052      }
2053
2054      int slowestServerId = ieCtx.getSlowestServer();
2055      if (getConnectedRemoteDS(slowestServerId) == null)
2056      {
2057        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
2058            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer())));
2059
2060        throw new IOException("IOException with nested DirectoryException",
2061            ieCtx.getException());
2062      }
2063
2064      int ourLastExportedCnt = ieCtx.msgCnt;
2065      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
2066
2067      if (logger.isTraceEnabled())
2068      {
2069        logger.trace("[IE] Entering exportLDIFEntry waiting " +
2070            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
2071      }
2072
2073      if (ourLastExportedCnt - slowestCnt > ieCtx.initWindow)
2074      {
2075        if (logger.isTraceEnabled())
2076        {
2077          logger.trace("[IE] Entering exportLDIFEntry waiting");
2078        }
2079
2080        // our export is too far beyond the slowest importer - let's wait
2081        try { Thread.sleep(100); }
2082        catch(Exception e) { /* do nothing */ }
2083
2084        // process any connection error
2085        if (broker.hasConnectionError()
2086          || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
2087        {
2088          // publish failed - store the error in the ieContext ...
2089          DirectoryException de = new DirectoryException(ResultCode.OTHER,
2090              ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId()));
2091          ieCtx.setExceptionIfNoneSet(de);
2092          // .. and abandon the export by throwing an exception.
2093          throw new IOException(de.getMessage());
2094        }
2095      }
2096      else
2097      {
2098        if (logger.isTraceEnabled())
2099        {
2100          logger.trace("[IE] slowest got to us => stop waiting");
2101        }
2102        break;
2103      }
2104    } // Waiting the slowest loop
2105
2106    if (logger.isTraceEnabled())
2107    {
2108      logger.trace("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry));
2109    }
2110
2111    boolean sent = broker.publish(entryMessage, false);
2112
2113    // process any publish error
2114    if (!sent
2115        || broker.hasConnectionError()
2116        || broker.getNumLostConnections() != ieCtx.initNumLostConnections)
2117    {
2118      // publish failed - store the error in the ieContext ...
2119      DirectoryException de = new DirectoryException(ResultCode.OTHER,
2120          ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId()));
2121      ieCtx.setExceptionIfNoneSet(de);
2122      // .. and abandon the export by throwing an exception.
2123      throw new IOException(de.getMessage());
2124    }
2125
2126    // publish succeeded
2127    try
2128    {
2129      ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length));
2130    }
2131    catch (DirectoryException de)
2132    {
2133      ieCtx.setExceptionIfNoneSet(de);
2134      // .. and abandon the export by throwing an exception.
2135      throw new IOException(de.getMessage());
2136    }
2137  }
2138
2139  /**
2140   * Initializes asynchronously this domain from a remote source server.
2141   * Before returning from this call, for the provided task :
2142   * - the progressing counters are updated during the initialization using
2143   *   setTotal() and setLeft().
2144   * - the end of the initialization using updateTaskCompletionState().
2145   * <p>
2146   * When this method is called, a request for initialization is sent to the
2147   * remote source server requesting initialization.
2148   * <p>
2149   *
2150   * @param source   The server-id of the source from which to initialize.
2151   *                 The source can be discovered using the
2152   *                 {@link #getReplicaInfos()} method.
2153   *
2154   * @param initTask The task that launched the initialization
2155   *                 and should be updated of its progress.
2156   *
2157   * @throws DirectoryException If it was not possible to publish the
2158   *                            Initialization message to the Topology.
2159   *                            The task state is updated.
2160   */
2161  public void initializeFromRemote(int source, Task initTask)
2162  throws DirectoryException
2163  {
2164    if (logger.isTraceEnabled())
2165    {
2166      logger.trace("[IE] Entering initializeFromRemote for " + this);
2167    }
2168
2169    LocalizableMessage errMsg = !broker.isConnected()
2170        ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDN())
2171        : null;
2172
2173    /*
2174    We must not test here whether the remote source is connected to
2175    the topology by testing if it stands in the replicas list since.
2176    In the case of a re-attempt of initialization, the listener thread is
2177    running this method directly coming from initialize() method and did
2178    not processed any topology message in between the failure and the
2179    new attempt.
2180    */
2181    try
2182    {
2183      /*
2184      We must immediately acquire a context to store the task inside
2185      The context will be used when we (the listener thread) will receive
2186      the InitializeTargetMsg, process the import, and at the end
2187      update the task.
2188      */
2189
2190      final ImportExportContext ieCtx = acquireIEContext(true);
2191      ieCtx.initializeTask = initTask;
2192      ieCtx.attemptCnt = 0;
2193      ieCtx.initReqMsgSent = new InitializeRequestMsg(
2194          getBaseDN(), getServerId(), source, getInitWindow());
2195      broker.publish(ieCtx.initReqMsgSent);
2196
2197      /*
2198      The normal success processing is now to receive InitTargetMsg then
2199      entries from the remote server.
2200      The error cases are :
2201      - either local error immediately caught below
2202      - a remote error we will receive as an ErrorMsg
2203      */
2204    }
2205    catch(DirectoryException de)
2206    {
2207      errMsg = de.getMessageObject();
2208    }
2209    catch(Exception e)
2210    {
2211      // Should not happen
2212      errMsg = LocalizableMessage.raw(e.getLocalizedMessage());
2213      logger.error(errMsg);
2214    }
2215
2216    // When error, update the task and raise the error to the caller
2217    if (errMsg != null)
2218    {
2219      // No need to call here updateTaskCompletionState - will be done
2220      // by the caller
2221      releaseIEContext();
2222      throw new DirectoryException(ResultCode.OTHER, errMsg);
2223    }
2224  }
2225
2226  /**
2227   * Processes an InitializeTargetMsg received from a remote server
2228   * meaning processes an initialization from the entries expected to be
2229   * received now.
2230   *
2231   * @param initTargetMsgReceived The message received from the remote server.
2232   *
2233   * @param requesterServerId The serverId of the server that requested the
2234   *                          initialization meaning the server where the
2235   *                          task has initially been created (this server,
2236   *                          or the remote server).
2237   */
2238  private void initialize(InitializeTargetMsg initTargetMsgReceived, int requesterServerId)
2239  {
2240    if (logger.isTraceEnabled())
2241    {
2242      logger.trace("[IE] Entering initialize - domain=" + this);
2243    }
2244
2245    InitializeTask initFromTask = null;
2246    int source = initTargetMsgReceived.getSenderID();
2247    ImportExportContext ieCtx = importExportContext.get();
2248    try
2249    {
2250      // Log starting
2251      logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START, getBaseDN(),
2252          initTargetMsgReceived.getSenderID(), getServerId());
2253
2254      // Go into full update status
2255      setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
2256
2257      // Acquire an import context if no already done (and initialize).
2258      if (initTargetMsgReceived.getInitiatorID() != getServerId())
2259      {
2260        /*
2261        The initTargetMsgReceived is for an import initiated by the remote server.
2262        Test and set if no import already in progress
2263        */
2264        ieCtx = acquireIEContext(true);
2265      }
2266
2267      // Initialize stuff
2268      ieCtx.importSource = source;
2269      ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount());
2270      ieCtx.initWindow = initTargetMsgReceived.getInitWindow();
2271      ieCtx.exporterProtocolVersion = getProtocolVersion(source);
2272      initFromTask = (InitializeTask) ieCtx.initializeTask;
2273
2274      // Launch the import
2275      importBackend(new ReplInputStream(this));
2276    }
2277    catch (DirectoryException e)
2278    {
2279      /*
2280      Store the exception raised. It will be considered if no other exception
2281      has been previously stored in  the context
2282      */
2283      ieCtx.setExceptionIfNoneSet(e);
2284    }
2285    finally
2286    {
2287      if (logger.isTraceEnabled())
2288      {
2289        logger.trace("[IE] Domain=" + this
2290          + " ends import with exception=" + ieCtx.getException()
2291          + " connected=" + broker.isConnected());
2292      }
2293
2294      /*
2295      It is necessary to restart (reconnect to RS) for different reasons
2296      - when everything went well, reconnect in order to exchange
2297      new state, new generation ID
2298      - when we have connection failure, reconnect to retry a new import
2299      right here, right now
2300      we never want retryOnFailure if we fails reconnecting in the restart.
2301      */
2302      broker.reStart(false);
2303
2304      if (ieCtx.getException() != null
2305          && broker.isConnected()
2306          && initFromTask != null
2307          && ++ieCtx.attemptCnt < 2)
2308      {
2309          /*
2310          Worth a new attempt
2311          since initFromTask is in this server, connection is ok
2312          */
2313          try
2314          {
2315            /*
2316            Wait for the exporter to stabilize - eventually reconnect as
2317            well if it was connected to the same RS than the one we lost ...
2318            */
2319            Thread.sleep(1000);
2320
2321            /*
2322            Restart the whole import protocol exchange by sending again
2323            the request
2324            */
2325            logger.info(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST,
2326                ieCtx.getException().getLocalizedMessage());
2327
2328            broker.publish(ieCtx.initReqMsgSent);
2329
2330            ieCtx.initializeCounters(0);
2331            ieCtx.exception = null;
2332            ieCtx.msgCnt = 0;
2333
2334            // Processing of the received initTargetMsgReceived is done
2335            // let's wait for the next one
2336            return;
2337          }
2338          catch(Exception e)
2339          {
2340            /*
2341            An error occurs when sending a new request for a new import.
2342            This error is not stored, preferring to keep the initial one.
2343            */
2344            logger.error(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST,
2345                e.getLocalizedMessage(), ieCtx.getException().getLocalizedMessage());
2346          }
2347      }
2348
2349      // ===================
2350      // No new attempt case
2351
2352      if (logger.isTraceEnabled())
2353      {
2354        logger.trace("[IE] Domain=" + this
2355          + " ends initialization with exception=" + ieCtx.getException()
2356          + " connected=" + broker.isConnected()
2357          + " task=" + initFromTask
2358          + " attempt=" + ieCtx.attemptCnt);
2359      }
2360
2361      try
2362      {
2363        if (broker.isConnected() && ieCtx.getException() != null)
2364        {
2365          // Let's notify the exporter
2366          ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
2367              ieCtx.getException().getMessageObject());
2368          broker.publish(errorMsg);
2369        }
2370        /*
2371        Update the task that initiated the import must be the last thing.
2372        Particularly, broker.restart() after import success must be done
2373        before some other operations/tasks to be launched,
2374        like resetting the generation ID.
2375        */
2376        if (initFromTask != null)
2377        {
2378          initFromTask.updateTaskCompletionState(ieCtx.getException());
2379        }
2380      }
2381      finally
2382      {
2383        String errorMsg = ieCtx.getException() != null ? ieCtx.getException().getLocalizedMessage() : "";
2384        logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END,
2385            getBaseDN(), initTargetMsgReceived.getSenderID(), getServerId(), errorMsg);
2386        releaseIEContext();
2387      } // finally
2388    } // finally
2389  }
2390
2391  /**
2392   * Return the protocol version of the DS related to the provided serverId.
2393   * Returns -1 when the protocol version is not known.
2394   * @param dsServerId The provided serverId.
2395   * @return The protocol version.
2396   */
2397  private short getProtocolVersion(int dsServerId)
2398  {
2399    final DSInfo dsInfo = getReplicaInfos().get(dsServerId);
2400    if (dsInfo != null)
2401    {
2402      return dsInfo.getProtocolVersion();
2403    }
2404    return -1;
2405  }
2406
2407  /**
2408   * Sets the status to a new value depending of the passed status machine
2409   * event.
2410   * @param event The event that may make the status be changed
2411   */
2412  protected void signalNewStatus(StatusMachineEvent event)
2413  {
2414    setNewStatus(event);
2415    broker.signalStatusChange(status);
2416  }
2417
2418  private void setNewStatus(StatusMachineEvent event)
2419  {
2420    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
2421    if (newStatus == ServerStatus.INVALID_STATUS)
2422    {
2423      logger.error(ERR_DS_CANNOT_CHANGE_STATUS, getBaseDN(), getServerId(), status, event);
2424      return;
2425    }
2426
2427    if (newStatus != status)
2428    {
2429      // Reset status date
2430      lastStatusChangeDate = new Date();
2431      // Reset monitoring counters if reconnection
2432      if (newStatus == ServerStatus.NOT_CONNECTED_STATUS)
2433      {
2434        resetMonitoringCounters();
2435      }
2436
2437      status = newStatus;
2438      if (logger.isTraceEnabled())
2439      {
2440        logger.trace("Replication domain " + getBaseDN()
2441            + " new status is: " + status);
2442      }
2443
2444      // Perform whatever actions are needed to apply properties for being
2445      // compliant with new status
2446      updateDomainForNewStatus();
2447    }
2448  }
2449
2450  /**
2451   * Returns a boolean indicating if an import or export is currently
2452   * processed.
2453   *
2454   * @return The status
2455   */
2456  public boolean ieRunning()
2457  {
2458    return importExportContext.get() != null;
2459  }
2460
2461  /**
2462   * Check the value of the Replication Servers generation ID.
2463   *
2464   * @param generationID        The expected value of the generation ID.
2465   *
2466   * @throws DirectoryException When the generation ID of the Replication
2467   *                            Servers is not the expected value.
2468   */
2469  private void checkGenerationID(long generationID) throws DirectoryException
2470  {
2471    boolean allSet = true;
2472
2473    for (int i = 0; i< 50; i++)
2474    {
2475      allSet = true;
2476      for (RSInfo rsInfo : getRsInfos())
2477      {
2478        // the 'empty' RSes (generationId==-1) are considered as good citizens
2479        if (rsInfo.getGenerationId() != -1 &&
2480            rsInfo.getGenerationId() != generationID)
2481        {
2482          try
2483          {
2484            Thread.sleep(i*100);
2485          } catch (InterruptedException e)
2486          {
2487            Thread.currentThread().interrupt();
2488          }
2489          allSet = false;
2490          break;
2491        }
2492      }
2493      if (allSet)
2494      {
2495        break;
2496      }
2497    }
2498    if (!allSet)
2499    {
2500      LocalizableMessage message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDN());
2501      throw new DirectoryException(ResultCode.OTHER, message);
2502    }
2503  }
2504
2505  /**
2506   * Reset the Replication Log.
2507   * Calling this method will remove all the Replication information that
2508   * was kept on all the Replication Servers currently connected in the
2509   * topology.
2510   *
2511   * @throws DirectoryException If this ReplicationDomain is not currently
2512   *                           connected to a Replication Server or it
2513   *                           was not possible to contact it.
2514   */
2515  void resetReplicationLog() throws DirectoryException
2516  {
2517    // Reset the Generation ID to -1 to clean the ReplicationServers.
2518    resetGenerationId(-1L);
2519
2520    // check that at least one ReplicationServer did change its generation-id
2521    checkGenerationID(-1);
2522
2523    // Reconnect to the Replication Server so that it adopts our GenerationID.
2524    restartService();
2525
2526    // wait for the domain to reconnect.
2527    int count = 0;
2528    while (!isConnected() && count < 10)
2529    {
2530      try
2531      {
2532        Thread.sleep(100);
2533      } catch (InterruptedException e)
2534      {
2535        Thread.currentThread().interrupt();
2536      }
2537    }
2538
2539    resetGenerationId(getGenerationID());
2540
2541    // check that at least one ReplicationServer did change its generation-id
2542    checkGenerationID(getGenerationID());
2543  }
2544
2545  /**
2546   * Reset the generationId of this domain in the whole topology.
2547   * A message is sent to the Replication Servers for them to reset
2548   * their change dbs.
2549   *
2550   * @param generationIdNewValue  The new value of the generation Id.
2551   * @throws DirectoryException   When an error occurs
2552   */
2553  public void resetGenerationId(Long generationIdNewValue)
2554      throws DirectoryException
2555  {
2556    if (logger.isTraceEnabled())
2557    {
2558      logger.trace("Server id " + getServerId() + " and domain "
2559          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
2560    }
2561
2562    ResetGenerationIdMsg genIdMessage =
2563        new ResetGenerationIdMsg(getGenId(generationIdNewValue));
2564
2565    if (!isConnected())
2566    {
2567      LocalizableMessage message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDN(),
2568          getServerId(), genIdMessage.getGenerationId());
2569      throw new DirectoryException(ResultCode.OTHER, message);
2570    }
2571    broker.publish(genIdMessage);
2572
2573    // check that at least one ReplicationServer did change its generation-id
2574    checkGenerationID(getGenId(generationIdNewValue));
2575  }
2576
2577  private long getGenId(Long generationIdNewValue)
2578  {
2579    if (generationIdNewValue != null)
2580    {
2581      return generationIdNewValue;
2582    }
2583    return getGenerationID();
2584  }
2585
2586
2587  /*
2588   ******** End of The total Update code *********
2589   */
2590
2591  /*
2592   ******* Start of Monitoring Code **********
2593   */
2594
2595  /**
2596   * Get the maximum receive window size.
2597   *
2598   * @return The maximum receive window size.
2599   */
2600  int getMaxRcvWindow()
2601  {
2602    if (broker != null)
2603    {
2604      return broker.getMaxRcvWindow();
2605    }
2606    return 0;
2607  }
2608
2609  /**
2610   * Get the current receive window size.
2611   *
2612   * @return The current receive window size.
2613   */
2614  int getCurrentRcvWindow()
2615  {
2616    if (broker != null)
2617    {
2618      return broker.getCurrentRcvWindow();
2619    }
2620    return 0;
2621  }
2622
2623  /**
2624   * Get the maximum send window size.
2625   *
2626   * @return The maximum send window size.
2627   */
2628  int getMaxSendWindow()
2629  {
2630    if (broker != null)
2631    {
2632      return broker.getMaxSendWindow();
2633    }
2634    return 0;
2635  }
2636
2637  /**
2638   * Get the current send window size.
2639   *
2640   * @return The current send window size.
2641   */
2642  int getCurrentSendWindow()
2643  {
2644    if (broker != null)
2645    {
2646      return broker.getCurrentSendWindow();
2647    }
2648    return 0;
2649  }
2650
2651  /**
2652   * Get the number of times the replication connection was lost.
2653   * @return The number of times the replication connection was lost.
2654   */
2655  int getNumLostConnections()
2656  {
2657    if (broker != null)
2658    {
2659      return broker.getNumLostConnections();
2660    }
2661    return 0;
2662  }
2663
2664  /**
2665   * Determine whether the connection to the replication server is encrypted.
2666   * @return true if the connection is encrypted, false otherwise.
2667   */
2668  boolean isSessionEncrypted()
2669  {
2670    return broker != null && broker.isSessionEncrypted();
2671  }
2672
2673  /**
2674   * Check if the domain is connected to a ReplicationServer.
2675   *
2676   * @return true if the server is connected, false if not.
2677   */
2678  public boolean isConnected()
2679  {
2680    return broker != null && broker.isConnected();
2681  }
2682
2683  /**
2684   * Check if the domain has a connection error.
2685   * A Connection error happens when the broker could not be created
2686   * or when the broker could not find any ReplicationServer to connect to.
2687   *
2688   * @return true if the domain has a connection error.
2689   */
2690  public boolean hasConnectionError()
2691  {
2692    return broker == null || broker.hasConnectionError();
2693  }
2694
2695  /**
2696   * Get the name of the replicationServer to which this domain is currently
2697   * connected.
2698   *
2699   * @return the name of the replicationServer to which this domain
2700   *         is currently connected.
2701   */
2702  public String getReplicationServer()
2703  {
2704    if (broker != null)
2705    {
2706      return broker.getReplicationServer();
2707    }
2708    return ReplicationBroker.NO_CONNECTED_SERVER;
2709  }
2710
2711  /**
2712   * Gets the number of updates sent in assured safe read mode.
2713   * @return The number of updates sent in assured safe read mode.
2714   */
2715  public int getAssuredSrSentUpdates()
2716  {
2717    return assuredSrSentUpdates.get();
2718  }
2719
2720  /**
2721   * Gets the number of updates sent in assured safe read mode that have been
2722   * acknowledged without errors.
2723   * @return The number of updates sent in assured safe read mode that have been
2724   * acknowledged without errors.
2725   */
2726  public int getAssuredSrAcknowledgedUpdates()
2727  {
2728    return assuredSrAcknowledgedUpdates.get();
2729  }
2730
2731  /**
2732   * Gets the number of updates sent in assured safe read mode that have not
2733   * been acknowledged.
2734   * @return The number of updates sent in assured safe read mode that have not
2735   * been acknowledged.
2736   */
2737  public int getAssuredSrNotAcknowledgedUpdates()
2738  {
2739    return assuredSrNotAcknowledgedUpdates.get();
2740  }
2741
2742  /**
2743   * Gets the number of updates sent in assured safe read mode that have not
2744   * been acknowledged due to timeout error.
2745   * @return The number of updates sent in assured safe read mode that have not
2746   * been acknowledged due to timeout error.
2747   */
2748  public int getAssuredSrTimeoutUpdates()
2749  {
2750    return assuredSrTimeoutUpdates.get();
2751  }
2752
2753  /**
2754   * Gets the number of updates sent in assured safe read mode that have not
2755   * been acknowledged due to wrong status error.
2756   * @return The number of updates sent in assured safe read mode that have not
2757   * been acknowledged due to wrong status error.
2758   */
2759  public int getAssuredSrWrongStatusUpdates()
2760  {
2761    return assuredSrWrongStatusUpdates.get();
2762  }
2763
2764  /**
2765   * Gets the number of updates sent in assured safe read mode that have not
2766   * been acknowledged due to replay error.
2767   * @return The number of updates sent in assured safe read mode that have not
2768   * been acknowledged due to replay error.
2769   */
2770  public int getAssuredSrReplayErrorUpdates()
2771  {
2772    return assuredSrReplayErrorUpdates.get();
2773  }
2774
2775  /**
2776   * Gets the number of updates sent in assured safe read mode that have not
2777   * been acknowledged per server.
2778   * @return A copy of the map that contains the number of updates sent in
2779   * assured safe read mode that have not been acknowledged per server.
2780   */
2781  public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
2782  {
2783    synchronized(assuredSrServerNotAcknowledgedUpdates)
2784    {
2785      return new HashMap<>(assuredSrServerNotAcknowledgedUpdates);
2786    }
2787  }
2788
2789  /**
2790   * Gets the number of updates received in assured safe read mode request.
2791   * @return The number of updates received in assured safe read mode request.
2792   */
2793  public int getAssuredSrReceivedUpdates()
2794  {
2795    return assuredSrReceivedUpdates.get();
2796  }
2797
2798  /**
2799   * Gets the number of updates received in assured safe read mode that we acked
2800   * without error (no replay error).
2801   * @return The number of updates received in assured safe read mode that we
2802   * acked without error (no replay error).
2803   */
2804  public int getAssuredSrReceivedUpdatesAcked()
2805  {
2806    return this.assuredSrReceivedUpdatesAcked.get();
2807  }
2808
2809  /**
2810   * Gets the number of updates received in assured safe read mode that we did
2811   * not ack due to error (replay error).
2812   * @return The number of updates received in assured safe read mode that we
2813   * did not ack due to error (replay error).
2814   */
2815  public int getAssuredSrReceivedUpdatesNotAcked()
2816  {
2817    return this.assuredSrReceivedUpdatesNotAcked.get();
2818  }
2819
2820  /**
2821   * Gets the number of updates sent in assured safe data mode.
2822   * @return The number of updates sent in assured safe data mode.
2823   */
2824  public int getAssuredSdSentUpdates()
2825  {
2826    return assuredSdSentUpdates.get();
2827  }
2828
2829  /**
2830   * Gets the number of updates sent in assured safe data mode that have been
2831   * acknowledged without errors.
2832   * @return The number of updates sent in assured safe data mode that have been
2833   * acknowledged without errors.
2834   */
2835  public int getAssuredSdAcknowledgedUpdates()
2836  {
2837    return assuredSdAcknowledgedUpdates.get();
2838  }
2839
2840  /**
2841   * Gets the number of updates sent in assured safe data mode that have not
2842   * been acknowledged due to timeout error.
2843   * @return The number of updates sent in assured safe data mode that have not
2844   * been acknowledged due to timeout error.
2845   */
2846  public int getAssuredSdTimeoutUpdates()
2847  {
2848    return assuredSdTimeoutUpdates.get();
2849  }
2850
2851  /**
2852   * Gets the number of updates sent in assured safe data mode that have not
2853   * been acknowledged due to timeout error per server.
2854   * @return A copy of the map that contains the number of updates sent in
2855   * assured safe data mode that have not been acknowledged due to timeout
2856   * error per server.
2857   */
2858  public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
2859  {
2860    synchronized(assuredSdServerTimeoutUpdates)
2861    {
2862      return new HashMap<>(assuredSdServerTimeoutUpdates);
2863    }
2864  }
2865
2866  /**
2867   * Gets the date of the last status change.
2868   * @return The date of the last status change.
2869   */
2870  public Date getLastStatusChangeDate()
2871  {
2872    return lastStatusChangeDate;
2873  }
2874
2875  /**
2876   * Resets the values of the monitoring counters.
2877   */
2878  private void resetMonitoringCounters()
2879  {
2880    numProcessedUpdates = new AtomicInteger(0);
2881    numRcvdUpdates = new AtomicInteger(0);
2882    numSentUpdates = new AtomicInteger(0);
2883
2884    assuredSrSentUpdates = new AtomicInteger(0);
2885    assuredSrAcknowledgedUpdates = new AtomicInteger(0);
2886    assuredSrNotAcknowledgedUpdates = new AtomicInteger(0);
2887    assuredSrTimeoutUpdates = new AtomicInteger(0);
2888    assuredSrWrongStatusUpdates = new AtomicInteger(0);
2889    assuredSrReplayErrorUpdates = new AtomicInteger(0);
2890    synchronized (assuredSrServerNotAcknowledgedUpdates)
2891    {
2892      assuredSrServerNotAcknowledgedUpdates.clear();
2893    }
2894    assuredSrReceivedUpdates = new AtomicInteger(0);
2895    assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
2896    assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
2897    assuredSdSentUpdates = new AtomicInteger(0);
2898    assuredSdAcknowledgedUpdates = new AtomicInteger(0);
2899    assuredSdTimeoutUpdates = new AtomicInteger(0);
2900    synchronized (assuredSdServerTimeoutUpdates)
2901    {
2902      assuredSdServerTimeoutUpdates.clear();
2903    }
2904  }
2905
2906  /*
2907   ********** End of Monitoring Code **************
2908   */
2909
2910  /**
2911   * Start the publish mechanism of the Replication Service. After this method
2912   * has been called, the publish service can be used by calling the
2913   * {@link #publish(UpdateMsg)} method.
2914   *
2915   * @throws ConfigException
2916   *           If the DirectoryServer configuration was incorrect.
2917   */
2918  public void startPublishService() throws ConfigException
2919  {
2920    synchronized (sessionLock)
2921    {
2922      if (broker == null)
2923      {
2924        // create the broker object used to publish and receive changes
2925        broker = new ReplicationBroker(
2926            this, state, config, new ReplSessionSecurity());
2927        broker.start();
2928      }
2929    }
2930  }
2931
2932  /**
2933   * Starts the receiver side of the Replication Service.
2934   * <p>
2935   * After this method has been called, the Replication Service will start
2936   * calling the {@link #processUpdate(UpdateMsg)}.
2937   * <p>
2938   * This method must be called once and must be called after the
2939   * {@link #startPublishService()}.
2940   */
2941  public void startListenService()
2942  {
2943    synchronized (sessionLock)
2944    {
2945      if (listenerThread != null)
2946      {
2947        return;
2948      }
2949
2950      final String threadName = "Replica DS(" + getServerId() + ") listener for domain \"" + getBaseDN() + "\"";
2951
2952      listenerThread = new DirectoryThread(new Runnable()
2953      {
2954        @Override
2955        public void run()
2956        {
2957          if (logger.isTraceEnabled())
2958          {
2959            logger.trace("Replication Listener thread starting.");
2960          }
2961
2962          // Loop processing any incoming update messages.
2963          while (!listenerThread.isShutdownInitiated())
2964          {
2965            final UpdateMsg updateMsg = receive();
2966            if (updateMsg == null)
2967            {
2968              // The server is shutting down.
2969              listenerThread.initiateShutdown();
2970            }
2971            else if (processUpdate(updateMsg)
2972                && updateMsg.contributesToDomainState())
2973            {
2974              /*
2975               * Warning: in synchronous mode, no way to tell the replay of an
2976               * update went wrong Just put null in processUpdateDone so that if
2977               * assured replication is used the ack is sent without error at
2978               * replay flag.
2979               */
2980              processUpdateDone(updateMsg, null);
2981              state.update(updateMsg.getCSN());
2982            }
2983          }
2984
2985          if (logger.isTraceEnabled())
2986          {
2987            logger.trace("Replication Listener thread stopping.");
2988          }
2989        }
2990      }, threadName);
2991
2992      listenerThread.start();
2993    }
2994  }
2995
2996  /**
2997   * Temporarily disable the Replication Service.
2998   * The Replication Service can be enabled again using
2999   * {@link #enableService()}.
3000   * <p>
3001   * It can be useful to disable the Replication Service when the
3002   * repository where the replicated information is stored becomes
3003   * temporarily unavailable and replicated updates can therefore not
3004   * be replayed during a while. This method is not MT safe.
3005   */
3006  public void disableService()
3007  {
3008    synchronized (sessionLock)
3009    {
3010      /*
3011      Stop the broker first in order to prevent the listener from
3012      reconnecting - see OPENDJ-457.
3013      */
3014      if (broker != null)
3015      {
3016        broker.stop();
3017      }
3018
3019      // Stop the listener thread
3020      if (listenerThread != null)
3021      {
3022        listenerThread.initiateShutdown();
3023        try
3024        {
3025          listenerThread.join();
3026        }
3027        catch (InterruptedException e)
3028        {
3029          // Give up waiting.
3030        }
3031        listenerThread = null;
3032      }
3033    }
3034  }
3035
3036  /**
3037   * Returns {@code true} if the listener thread is shutting down or has
3038   * shutdown.
3039   *
3040   * @return {@code true} if the listener thread is shutting down or has
3041   *         shutdown.
3042   */
3043  protected final boolean isListenerShuttingDown()
3044  {
3045    final DirectoryThread tmp = listenerThread;
3046    return tmp == null || tmp.isShutdownInitiated();
3047  }
3048
3049  /**
3050   * Restart the Replication service after a {@link #disableService()}.
3051   * <p>
3052   * The Replication Service will restart from the point indicated by the
3053   * {@link ServerState} that was given as a parameter to the
3054   * {@link #startPublishService()} at startup time.
3055   * <p>
3056   * If some data have changed in the repository during the period of time when
3057   * the Replication Service was disabled, this {@link ServerState} should
3058   * therefore be updated by the Replication Domain subclass before calling this
3059   * method. This method is not MT safe.
3060   */
3061  public void enableService()
3062  {
3063    synchronized (sessionLock)
3064    {
3065      broker.start();
3066      startListenService();
3067    }
3068  }
3069
3070  /**
3071   * Change some ReplicationDomain parameters.
3072   *
3073   * @param config
3074   *          The new configuration that this domain should now use.
3075   */
3076  protected void changeConfig(ReplicationDomainCfg config)
3077  {
3078    if (broker != null && broker.changeConfig(config))
3079    {
3080      restartService();
3081    }
3082  }
3083
3084  /**
3085   * Applies a configuration change to the attributes which should be included
3086   * in the ECL.
3087   *
3088   * @param includeAttributes
3089   *          attributes to be included with all change records.
3090   * @param includeAttributesForDeletes
3091   *          additional attributes to be included with delete change records.
3092   */
3093  public void changeConfig(Set<String> includeAttributes,
3094      Set<String> includeAttributesForDeletes)
3095  {
3096    final boolean attrsModified = setEclIncludes(
3097        getServerId(), includeAttributes, includeAttributesForDeletes);
3098    if (attrsModified && broker != null)
3099    {
3100      restartService();
3101    }
3102  }
3103
3104  private void restartService()
3105  {
3106    disableService();
3107    enableService();
3108  }
3109
3110  /**
3111   * This method should trigger an export of the replicated data.
3112   * to the provided outputStream.
3113   * When finished the outputStream should be flushed and closed.
3114   *
3115   * @param output               The OutputStream where the export should
3116   *                             be produced.
3117   * @throws DirectoryException  When needed.
3118   */
3119  protected abstract void exportBackend(OutputStream output)
3120           throws DirectoryException;
3121
3122  /**
3123   * This method should trigger an import of the replicated data.
3124   *
3125   * @param input                The InputStream from which
3126   *                             the import should be reading entries.
3127   *
3128   * @throws DirectoryException  When needed.
3129   */
3130  protected abstract void importBackend(InputStream input)
3131           throws DirectoryException;
3132
3133  /**
3134   * This method should return the total number of objects in the
3135   * replicated domain.
3136   * This count will be used for reporting.
3137   *
3138   * @throws DirectoryException when needed.
3139   *
3140   * @return The number of objects in the replication domain.
3141   */
3142  public abstract long countEntries() throws DirectoryException;
3143
3144
3145
3146  /**
3147   * This method should handle the processing of {@link UpdateMsg} receive from
3148   * remote replication entities.
3149   * <p>
3150   * This method will be called by a single thread and should therefore should
3151   * not be blocking.
3152   *
3153   * @param updateMsg
3154   *          The {@link UpdateMsg} that was received.
3155   * @return A boolean indicating if the processing is completed at return time.
3156   *         If <code> true </code> is returned, no further processing is
3157   *         necessary. If <code> false </code> is returned, the subclass should
3158   *         call the method {@link #processUpdateDone(UpdateMsg, String)} and
3159   *         update the ServerState When this processing is complete.
3160   */
3161  public abstract boolean processUpdate(UpdateMsg updateMsg);
3162
3163  /**
3164   * This method must be called after each call to
3165   * {@link #processUpdate(UpdateMsg)} when the processing of the
3166   * update is completed.
3167   * <p>
3168   * It is useful for implementation needing to process the update in an
3169   * asynchronous way or using several threads, but must be called even by
3170   * implementation doing it in a synchronous, single-threaded way.
3171   *
3172   * @param msg
3173   *          The UpdateMsg whose processing was completed.
3174   * @param replayErrorMsg
3175   *          if not null, this means an error occurred during the replay of
3176   *          this update, and this is the matching human readable message
3177   *          describing the problem.
3178   */
3179  protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
3180  {
3181    broker.updateWindowAfterReplay();
3182
3183    /*
3184    Send an ack if it was requested and the group id is the same of the RS
3185    one. Only Safe Read mode makes sense in DS for returning an ack.
3186    */
3187    // Assured feature is supported starting from replication protocol V2
3188    if (msg.isAssured()
3189      && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
3190    {
3191      if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
3192      {
3193        if (broker.getRsGroupId() == getGroupId())
3194        {
3195          // Send the ack
3196          AckMsg ackMsg = new AckMsg(msg.getCSN());
3197          if (replayErrorMsg != null)
3198          {
3199            // Mark the error in the ack
3200            //   -> replay error occurred
3201            ackMsg.setHasReplayError(true);
3202            //   -> replay error occurred in our server
3203            ackMsg.setFailedServers(newArrayList(getServerId()));
3204          }
3205          broker.publish(ackMsg);
3206          if (replayErrorMsg != null)
3207          {
3208            assuredSrReceivedUpdatesNotAcked.incrementAndGet();
3209          }
3210          else
3211          {
3212            assuredSrReceivedUpdatesAcked.incrementAndGet();
3213          }
3214        }
3215      }
3216      else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
3217      {
3218        logger.error(ERR_DS_UNKNOWN_ASSURED_MODE, getServerId(), msg.getAssuredMode(), getBaseDN(), msg);
3219      }
3220        // Nothing to do in Assured safe data mode, only RS ack updates.
3221    }
3222
3223    incProcessedUpdates();
3224  }
3225
3226  /**
3227   * Prepare a message if it is to be sent in assured mode.
3228   * If the assured mode is enabled, this method should be called before
3229   * publish(UpdateMsg msg) method. This will configure the update accordingly
3230   * before it is sent and will prepare the mechanism that will block until the
3231   * matching ack is received. To wait for the ack after publish call, use
3232   * the waitForAckIfAssuredEnabled() method.
3233   * The expected typical usage in a service inheriting from this class is
3234   * the following sequence:
3235   * UpdateMsg msg = xxx;
3236   * prepareWaitForAckIfAssuredEnabled(msg);
3237   * publish(msg);
3238   * waitForAckIfAssuredEnabled(msg);
3239   *
3240   * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have
3241   * no effect if assured replication is disabled.
3242   * Note: this mechanism should not be used if using publish(byte[] msg)
3243   * version as usage of these methods is already hidden inside.
3244   *
3245   * @param msg The update message to be sent soon.
3246   */
3247  protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
3248  {
3249    /*
3250     * If assured configured, set message accordingly to request an ack in the
3251     * right assured mode.
3252     * No ack requested for a RS with a different group id.
3253     * Assured replication supported for the same locality,
3254     * i.e: a topology working in the same geographical location).
3255     * If we are connected to a RS which is not in our locality,
3256     * no need to ask for an ack.
3257     */
3258    if (needsAck())
3259    {
3260      msg.setAssured(true);
3261      msg.setAssuredMode(getAssuredMode());
3262      if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
3263      {
3264        msg.setSafeDataLevel(getAssuredSdLevel());
3265      }
3266
3267      // Add the assured message to the list of update that are waiting for acks
3268      waitingAckMsgs.put(msg.getCSN(), msg);
3269    }
3270  }
3271
3272  private boolean needsAck()
3273  {
3274    return isAssured() && broker.getRsGroupId() == getGroupId();
3275  }
3276
3277  /**
3278   * Wait for the processing of an assured message after it has been sent, if
3279   * assured replication is configured, otherwise, do nothing.
3280   * The prepareWaitForAckIfAssuredEnabled method should have been called
3281   * before, see its comment for the full picture.
3282   *
3283   * @param msg The UpdateMsg for which we are waiting for an ack.
3284   * @throws TimeoutException When the configured timeout occurs waiting for the
3285   * ack.
3286   */
3287  protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
3288    throws TimeoutException
3289  {
3290    if (needsAck())
3291    {
3292      // Increment assured replication monitoring counters
3293      switch (getAssuredMode())
3294      {
3295        case SAFE_READ_MODE:
3296          assuredSrSentUpdates.incrementAndGet();
3297          break;
3298        case SAFE_DATA_MODE:
3299          assuredSdSentUpdates.incrementAndGet();
3300          break;
3301        default:
3302        // Should not happen
3303      }
3304    } else
3305    {
3306      // Not assured or bad group id, return immediately
3307      return;
3308    }
3309
3310    // Wait for the ack to be received, timing out if necessary
3311    long startTime = System.currentTimeMillis();
3312    synchronized (msg)
3313    {
3314      CSN csn = msg.getCSN();
3315      while (waitingAckMsgs.containsKey(csn))
3316      {
3317        try
3318        {
3319          /*
3320          WARNING: this timeout may be difficult to optimize: too low, it
3321          may use too much CPU, too high, it may penalize performance...
3322          */
3323          msg.wait(10);
3324        } catch (InterruptedException e)
3325        {
3326          if (logger.isTraceEnabled())
3327          {
3328            logger.trace("waitForAck method interrupted for replication " +
3329              "baseDN: " + getBaseDN());
3330          }
3331          break;
3332        }
3333        // Timeout ?
3334        if (System.currentTimeMillis() - startTime >= getAssuredTimeout())
3335        {
3336          /*
3337          Timeout occurred, be sure that ack is not being received and if so,
3338          remove the update from the wait list, log the timeout error and
3339          also update assured monitoring counters
3340          */
3341          final UpdateMsg update = waitingAckMsgs.remove(csn);
3342          if (update == null)
3343          {
3344            // Ack received just before timeout limit: we can exit
3345            break;
3346          }
3347
3348          // No luck, this is a real timeout
3349          // Increment assured replication monitoring counters
3350          switch (msg.getAssuredMode())
3351          {
3352          case SAFE_READ_MODE:
3353            assuredSrNotAcknowledgedUpdates.incrementAndGet();
3354            assuredSrTimeoutUpdates.incrementAndGet();
3355            // Increment number of errors for our RS
3356            updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates,
3357                broker.getRsServerId());
3358            break;
3359          case SAFE_DATA_MODE:
3360            assuredSdTimeoutUpdates.incrementAndGet();
3361            // Increment number of errors for our RS
3362            updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates,
3363                broker.getRsServerId());
3364            break;
3365          default:
3366            // Should not happen
3367          }
3368
3369          throw new TimeoutException("No ack received for message csn: " + csn
3370              + " and replication domain: " + getBaseDN() + " after "
3371              + getAssuredTimeout() + " ms.");
3372        }
3373      }
3374    }
3375  }
3376
3377  /**
3378   * Publish an {@link UpdateMsg} to the Replication Service.
3379   * <p>
3380   * The Replication Service will handle the delivery of this {@link UpdateMsg}
3381   * to all the participants of this Replication Domain. These members will be
3382   * receive this {@link UpdateMsg} through a call of the
3383   * {@link #processUpdate(UpdateMsg)} message.
3384   *
3385   * @param msg The UpdateMsg that should be published.
3386   */
3387  public void publish(UpdateMsg msg)
3388  {
3389    broker.publish(msg);
3390    if (msg.contributesToDomainState())
3391    {
3392      state.update(msg.getCSN());
3393    }
3394    numSentUpdates.incrementAndGet();
3395  }
3396
3397  /**
3398   * Publishes a replica offline message if all pending changes for current
3399   * replica have been sent out.
3400   */
3401  public void publishReplicaOfflineMsg()
3402  {
3403    // Here to be overridden
3404  }
3405
3406  /**
3407   * This method should return the generationID to use for this
3408   * ReplicationDomain.
3409   * This method can be called at any time after the ReplicationDomain
3410   * has been started.
3411   *
3412   * @return The GenerationID.
3413   */
3414  public long getGenerationID()
3415  {
3416    return generationId;
3417  }
3418
3419  /**
3420   * Sets the generationId for this replication domain.
3421   *
3422   * @param generationId
3423   *          the generationId to set
3424   */
3425  public void setGenerationID(long generationId)
3426  {
3427    this.generationId = generationId;
3428  }
3429
3430  /**
3431   * Subclasses should use this method to add additional monitoring information
3432   * in the ReplicationDomain.
3433   *
3434   * @return Additional monitoring attributes that will be added in the
3435   *         ReplicationDomain monitoring entry.
3436   */
3437  public Collection<Attribute> getAdditionalMonitoring()
3438  {
3439    return new ArrayList<>();
3440  }
3441
3442  /**
3443   * Returns the Import/Export context associated to this ReplicationDomain.
3444   *
3445   * @return the Import/Export context associated to this ReplicationDomain
3446   */
3447  protected ImportExportContext getImportExportContext()
3448  {
3449    return importExportContext.get();
3450  }
3451
3452  /**
3453   * Returns the local address of this replication domain, or the empty string
3454   * if it is not yet connected.
3455   *
3456   * @return The local address.
3457   */
3458  String getLocalUrl()
3459  {
3460    final ReplicationBroker tmp = broker;
3461    return tmp != null ? tmp.getLocalUrl() : "";
3462  }
3463
3464  /**
3465   * Set the attributes configured on a server to be included in the ECL.
3466   *
3467   * @param serverId
3468   *          Server where these attributes are configured.
3469   * @param includeAttributes
3470   *          Attributes to be included with all change records, may include
3471   *          wild-cards.
3472   * @param includeAttributesForDeletes
3473   *          Additional attributes to be included with delete change records,
3474   *          may include wild-cards.
3475   * @return {@code true} if the set of attributes was modified.
3476   */
3477  public boolean setEclIncludes(int serverId,
3478      Set<String> includeAttributes,
3479      Set<String> includeAttributesForDeletes)
3480  {
3481    ECLIncludes current;
3482    ECLIncludes updated;
3483    do
3484    {
3485      current = this.eclIncludes.get();
3486      updated = current.addIncludedAttributes(
3487          serverId, includeAttributes, includeAttributesForDeletes);
3488    }
3489    while (!this.eclIncludes.compareAndSet(current, updated));
3490    return current != updated;
3491  }
3492
3493
3494
3495  /**
3496   * Get the attributes to include in each change for the ECL.
3497   *
3498   * @return The attributes to include in each change for the ECL.
3499   */
3500  public Set<String> getEclIncludes()
3501  {
3502    return eclIncludes.get().includedAttrsAllServers;
3503  }
3504
3505
3506
3507  /**
3508   * Get the attributes to include in each delete change for the ECL.
3509   *
3510   * @return The attributes to include in each delete change for the ECL.
3511   */
3512  public Set<String> getEclIncludesForDeletes()
3513  {
3514    return eclIncludes.get().includedAttrsForDeletesAllServers;
3515  }
3516
3517
3518
3519  /**
3520   * Get the attributes to include in each change for the ECL for a given
3521   * serverId.
3522   *
3523   * @param serverId
3524   *          The serverId for which we want the include attributes.
3525   * @return The attributes.
3526   */
3527  Set<String> getEclIncludes(int serverId)
3528  {
3529    return eclIncludes.get().includedAttrsByServer.get(serverId);
3530  }
3531
3532
3533
3534  /**
3535   * Get the attributes to include in each change for the ECL for a given
3536   * serverId.
3537   *
3538   * @param serverId
3539   *          The serverId for which we want the include attributes.
3540   * @return The attributes.
3541   */
3542  Set<String> getEclIncludesForDeletes(int serverId)
3543  {
3544    return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId);
3545  }
3546
3547  /**
3548   * Returns the CSN of the last Change that was fully processed by this
3549   * ReplicationDomain.
3550   *
3551   * @return The CSN of the last Change that was fully processed by this
3552   *         ReplicationDomain.
3553   */
3554  public CSN getLastLocalChange()
3555  {
3556    return state.getCSN(getServerId());
3557  }
3558
3559  /**
3560   * Gets and stores the assured replication configuration parameters. Returns a
3561   * boolean indicating if the passed configuration has changed compared to
3562   * previous values and the changes require a reconnection.
3563   *
3564   * @param config
3565   *          The configuration object
3566   * @param allowReconnection
3567   *          Tells if one must reconnect if significant changes occurred
3568   */
3569  protected void readAssuredConfig(ReplicationDomainCfg config,
3570      boolean allowReconnection)
3571  {
3572    // Disconnect if required: changing configuration values before
3573    // disconnection would make assured replication used immediately and
3574    // disconnection could cause some timeouts error.
3575    if (needReconnection(config) && allowReconnection)
3576    {
3577      disableService();
3578
3579      assuredConfig = config;
3580
3581      enableService();
3582    }
3583  }
3584
3585  private boolean needReconnection(ReplicationDomainCfg cfg)
3586  {
3587    final AssuredMode assuredMode = getAssuredMode();
3588    switch (cfg.getAssuredType())
3589    {
3590    case NOT_ASSURED:
3591      if (isAssured())
3592      {
3593        return true;
3594      }
3595      break;
3596    case SAFE_DATA:
3597      if (!isAssured() || assuredMode == SAFE_READ_MODE)
3598      {
3599        return true;
3600      }
3601      break;
3602    case SAFE_READ:
3603      if (!isAssured() || assuredMode == SAFE_DATA_MODE)
3604      {
3605        return true;
3606      }
3607      break;
3608    }
3609
3610    return isAssured()
3611        && assuredMode == SAFE_DATA_MODE
3612        && cfg.getAssuredSdLevel() != getAssuredSdLevel();
3613  }
3614
3615  /** {@inheritDoc} */
3616  @Override
3617  public String toString()
3618  {
3619    return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
3620  }
3621}