001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.service;
028
029import java.io.IOException;
030import java.math.BigDecimal;
031import java.math.MathContext;
032import java.math.RoundingMode;
033import java.net.*;
034import java.util.*;
035import java.util.Map.Entry;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.Semaphore;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicReference;
041
042import org.forgerock.i18n.LocalizableMessage;
043import org.forgerock.i18n.slf4j.LocalizedLogger;
044import org.forgerock.util.Utils;
045import org.opends.server.admin.std.server.ReplicationDomainCfg;
046import org.opends.server.core.DirectoryServer;
047import org.opends.server.replication.common.*;
048import org.opends.server.replication.plugin.MultimasterReplication;
049import org.opends.server.replication.protocol.*;
050import org.opends.server.types.DN;
051import org.opends.server.types.HostPort;
052
053import static org.opends.messages.ReplicationMessages.*;
054import static org.opends.server.replication.protocol.ProtocolVersion.*;
055import static org.opends.server.replication.server.ReplicationServer.*;
056import static org.opends.server.util.StaticUtils.*;
057
058/**
059 * The broker for Multi-master Replication.
060 */
061public class ReplicationBroker
062{
063
064  /**
065   * Immutable class containing information about whether the broker is
066   * connected to an RS and data associated to this connected RS.
067   */
068  // @Immutable
069  private static final class ConnectedRS
070  {
071
072    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
073        NO_CONNECTED_SERVER);
074
075    /** The info of the RS we are connected to. */
076    private final ReplicationServerInfo rsInfo;
077    /** Contains a connected session to the RS if any exist, null otherwise. */
078    private final Session session;
079    private final String replicationServer;
080
081    private ConnectedRS(String replicationServer)
082    {
083      this.rsInfo = null;
084      this.session = null;
085      this.replicationServer = replicationServer;
086    }
087
088    private ConnectedRS(ReplicationServerInfo rsInfo, Session session)
089    {
090      this.rsInfo = rsInfo;
091      this.session = session;
092      this.replicationServer = session != null ?
093          session.getReadableRemoteAddress()
094          : NO_CONNECTED_SERVER;
095    }
096
097    private static ConnectedRS stopped()
098    {
099      return new ConnectedRS("stopped");
100    }
101
102    private static ConnectedRS noConnectedRS()
103    {
104      return NO_CONNECTED_RS;
105    }
106
107    public int getServerId()
108    {
109      return rsInfo != null ? rsInfo.getServerId() : -1;
110    }
111
112    private byte getGroupId()
113    {
114      return rsInfo != null ? rsInfo.getGroupId() : -1;
115    }
116
117    private boolean isConnected()
118    {
119      return session != null;
120    }
121
122    /** {@inheritDoc} */
123    @Override
124    public String toString()
125    {
126      final StringBuilder sb = new StringBuilder();
127      toString(sb);
128      return sb.toString();
129    }
130
131    public void toString(StringBuilder sb)
132    {
133      sb.append("connected=").append(isConnected()).append(", ");
134      if (!isConnected())
135      {
136        sb.append("no connectedRS");
137      }
138      else
139      {
140        sb.append("connectedRS(serverId=").append(rsInfo.getServerId())
141          .append(", serverUrl=").append(rsInfo.getServerURL())
142          .append(", groupId=").append(rsInfo.getGroupId())
143          .append(")");
144      }
145    }
146
147  }
148  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
149  private volatile boolean shutdown;
150  private final Object startStopLock = new Object();
151  private volatile ReplicationDomainCfg config;
152  /**
153   * String reported under CSN=monitor when there is no connected RS.
154   */
155  static final String NO_CONNECTED_SERVER = "Not connected";
156  private final ServerState state;
157  private Semaphore sendWindow;
158  private int maxSendWindow;
159  private int rcvWindow = 100;
160  private int halfRcvWindow = rcvWindow / 2;
161  private int timeout;
162  private final ReplSessionSecurity replSessionSecurity;
163  /**
164   * The RS this DS is currently connected to.
165   * <p>
166   * Always use {@link #setConnectedRS(ConnectedRS)} to set a new
167   * connected RS.
168   */
169  // @NotNull // for the reference
170  private final AtomicReference<ConnectedRS> connectedRS = new AtomicReference<>(ConnectedRS.noConnectedRS());
171  /** Our replication domain. */
172  private final ReplicationDomain domain;
173  /**
174   * This object is used as a conditional event to be notified about
175   * the reception of monitor information from the Replication Server.
176   */
177  private final AtomicBoolean monitorResponse = new AtomicBoolean(false);
178  /**
179   * A Map containing the ServerStates of all the replicas in the topology
180   * as seen by the ReplicationServer the last time it was polled or the last
181   * time it published monitoring information.
182   */
183  private Map<Integer, ServerState> replicaStates = new HashMap<>();
184  /** A thread to monitor heartbeats on the session. */
185  private HeartbeatMonitor heartbeatMonitor;
186  /** The number of times the connection was lost. */
187  private int numLostConnections;
188  /**
189   * When the broker cannot connect to any replication server
190   * it log an error and keeps continuing every second.
191   * This boolean is set when the first failure happens and is used
192   * to avoid repeating the error message for further failure to connect
193   * and to know that it is necessary to print a new message when the broker
194   * finally succeed to connect.
195   */
196  private volatile boolean connectionError;
197  private final Object connectPhaseLock = new Object();
198  /**
199   * The thread that publishes messages to the RS containing the current
200   * change time of this DS.
201   */
202  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
203  /*
204   * Properties for the last topology info received from the network.
205   */
206  /** Contains the last known state of the replication topology. */
207  private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology());
208  /** <pre>@GuardedBy("this")</pre>. */
209  private volatile int updateDoneCount;
210  private volatile boolean connectRequiresRecovery;
211
212  /**
213   * This integer defines when the best replication server checking algorithm
214   * should be engaged.
215   * Every time a monitoring message (each monitoring publisher period) is
216   * received, it is incremented. When it reaches 2, we run the checking
217   * algorithm to see if we must reconnect to another best replication server.
218   * Then we reset the value to 0. But when a topology message is received, the
219   * integer is reset to 0. This ensures that we wait at least one monitoring
220   * publisher period before running the algorithm, but also that we wait at
221   * least for a monitoring period after the last received topology message
222   * (topology stabilization).
223   */
224  private int mustRunBestServerCheckingAlgorithm;
225
226  /**
227   * The monitor provider for this replication domain.
228   * <p>
229   * The name of the monitor includes the local address and must therefore be
230   * re-registered every time the session is re-established or destroyed. The
231   * monitor provider can only be created (i.e. non-null) if there is a
232   * replication domain, which is not the case in unit tests.
233   */
234  private final ReplicationMonitor monitor;
235
236  /**
237   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
238   *
239   * @param replicationDomain The replication domain that is creating us.
240   * @param state The ServerState that should be used by this broker
241   *        when negotiating the session with the replicationServer.
242   * @param config The configuration to use.
243   * @param replSessionSecurity The session security configuration.
244   */
245  public ReplicationBroker(ReplicationDomain replicationDomain,
246      ServerState state, ReplicationDomainCfg config,
247      ReplSessionSecurity replSessionSecurity)
248  {
249    this.domain = replicationDomain;
250    this.state = state;
251    this.config = config;
252    this.replSessionSecurity = replSessionSecurity;
253    this.rcvWindow = getMaxRcvWindow();
254    this.halfRcvWindow = rcvWindow / 2;
255    this.shutdown = true;
256
257    /*
258     * Only create a monitor if there is a replication domain (this is not the
259     * case in some unit tests).
260     */
261    this.monitor = replicationDomain != null ? new ReplicationMonitor(
262        replicationDomain) : null;
263    registerReplicationMonitor();
264  }
265
266  /**
267   * Start the ReplicationBroker.
268   */
269  public void start()
270  {
271    synchronized (startStopLock)
272    {
273      if (!shutdown)
274      {
275        return;
276      }
277      shutdown = false;
278      this.rcvWindow = getMaxRcvWindow();
279      connectAsDataServer();
280    }
281  }
282
283  /**
284   * Gets the group id of the RS we are connected to.
285   * @return The group id of the RS we are connected to
286   */
287  public byte getRsGroupId()
288  {
289    return connectedRS.get().getGroupId();
290  }
291
292  /**
293   * Gets the server id of the RS we are connected to.
294   * @return The server id of the RS we are connected to
295   */
296  public int getRsServerId()
297  {
298    return connectedRS.get().getServerId();
299  }
300
301  /**
302   * Gets the server id.
303   * @return The server id
304   */
305  public int getServerId()
306  {
307    return config.getServerId();
308  }
309
310  private DN getBaseDN()
311  {
312    return config.getBaseDN();
313  }
314
315  private Set<String> getReplicationServerUrls()
316  {
317    return config.getReplicationServer();
318  }
319
320  private byte getGroupId()
321  {
322    return (byte) config.getGroupId();
323  }
324
325  /**
326   * Gets the server id.
327   * @return The server id
328   */
329  private long getGenerationID()
330  {
331    return domain.getGenerationID();
332  }
333
334  /**
335   * Set the generation id - for test purpose.
336   * @param generationID The generation id
337   */
338  public void setGenerationID(long generationID)
339  {
340    domain.setGenerationID(generationID);
341  }
342
343  /**
344   * Compares 2 replication servers addresses and returns true if they both
345   * represent the same replication server instance.
346   * @param rs1Url Replication server 1 address
347   * @param rs2Url Replication server 2 address
348   * @return True if both replication server addresses represent the same
349   * replication server instance, false otherwise.
350   */
351  private static boolean isSameReplicationServerUrl(String rs1Url,
352      String rs2Url)
353  {
354    try
355    {
356      final HostPort hp1 = HostPort.valueOf(rs1Url);
357      final HostPort hp2 = HostPort.valueOf(rs2Url);
358      return hp1.isEquivalentTo(hp2);
359    }
360    catch (RuntimeException ex)
361    {
362      // Not a RS url or not a valid port number: should not happen
363      return false;
364    }
365  }
366
367  /**
368   * Bag class for keeping info we get from a replication server in order to
369   * compute the best one to connect to. This is in fact a wrapper to a
370   * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be
371   * updated with a info coming from received topology messages or monitoring
372   * messages.
373   */
374  static class ReplicationServerInfo
375  {
376    private RSInfo rsInfo;
377    private final short protocolVersion;
378    private final DN baseDN;
379    private final int windowSize;
380    // @NotNull
381    private final ServerState serverState;
382    private final boolean sslEncryption;
383    private final int degradedStatusThreshold;
384    /** Keeps the 0 value if created with a ReplServerStartMsg. */
385    private int connectedDSNumber;
386    // @NotNull
387    private Set<Integer> connectedDSs;
388    /**
389     * Is this RS locally configured? (the RS is recognized as a usable server).
390     */
391    private boolean locallyConfigured = true;
392
393    /**
394     * Create a new instance of ReplicationServerInfo wrapping the passed
395     * message.
396     * @param msg LocalizableMessage to wrap.
397     * @param newServerURL Override serverURL.
398     * @return The new instance wrapping the passed message.
399     * @throws IllegalArgumentException If the passed message has an unexpected
400     *                                  type.
401     */
402    private static ReplicationServerInfo newInstance(
403      ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
404    {
405      final ReplicationServerInfo rsInfo = newInstance(msg);
406      rsInfo.setServerURL(newServerURL);
407      return rsInfo;
408    }
409
410    /**
411     * Create a new instance of ReplicationServerInfo wrapping the passed
412     * message.
413     * @param msg LocalizableMessage to wrap.
414     * @return The new instance wrapping the passed message.
415     * @throws IllegalArgumentException If the passed message has an unexpected
416     *                                  type.
417     */
418    static ReplicationServerInfo newInstance(ReplicationMsg msg)
419        throws IllegalArgumentException
420    {
421      if (msg instanceof ReplServerStartMsg)
422      {
423        // RS uses protocol V3 or lower
424        return new ReplicationServerInfo((ReplServerStartMsg) msg);
425      }
426      else if (msg instanceof ReplServerStartDSMsg)
427      {
428        // RS uses protocol V4 or higher
429        return new ReplicationServerInfo((ReplServerStartDSMsg) msg);
430      }
431
432      // Unsupported message type: should not happen
433      throw new IllegalArgumentException("Unexpected PDU type: "
434          + msg.getClass().getName() + ":\n" + msg);
435    }
436
437    /**
438     * Constructs a ReplicationServerInfo object wrapping a
439     * {@link ReplServerStartMsg}.
440     *
441     * @param msg
442     *          The {@link ReplServerStartMsg} this object will wrap.
443     */
444    private ReplicationServerInfo(ReplServerStartMsg msg)
445    {
446      this.protocolVersion = msg.getVersion();
447      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
448          msg.getGenerationId(), msg.getGroupId(), 1);
449      this.baseDN = msg.getBaseDN();
450      this.windowSize = msg.getWindowSize();
451      final ServerState ss = msg.getServerState();
452      this.serverState = ss != null ? ss : new ServerState();
453      this.sslEncryption = msg.getSSLEncryption();
454      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
455    }
456
457    /**
458     * Constructs a ReplicationServerInfo object wrapping a
459     * {@link ReplServerStartDSMsg}.
460     *
461     * @param msg
462     *          The {@link ReplServerStartDSMsg} this object will wrap.
463     */
464    private ReplicationServerInfo(ReplServerStartDSMsg msg)
465    {
466      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
467          msg.getGenerationId(), msg.getGroupId(), msg.getWeight());
468      this.protocolVersion = msg.getVersion();
469      this.baseDN = msg.getBaseDN();
470      this.windowSize = msg.getWindowSize();
471      final ServerState ss = msg.getServerState();
472      this.serverState = ss != null ? ss : new ServerState();
473      this.sslEncryption = msg.getSSLEncryption();
474      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
475      this.connectedDSNumber = msg.getConnectedDSNumber();
476    }
477
478    /**
479     * Constructs a new replication server info with the passed RSInfo internal
480     * values and the passed connected DSs.
481     *
482     * @param rsInfo
483     *          The RSinfo to use for the update
484     * @param connectedDSs
485     *          The new connected DSs
486     */
487    ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
488    {
489      this.rsInfo =
490          new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo
491              .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
492      this.protocolVersion = 0;
493      this.baseDN = null;
494      this.windowSize = 0;
495      this.connectedDSs = connectedDSs;
496      this.connectedDSNumber = connectedDSs.size();
497      this.sslEncryption = false;
498      this.degradedStatusThreshold = -1;
499      this.serverState = new ServerState();
500    }
501
502    /**
503     * Get the server state.
504     * @return The server state
505     */
506    public ServerState getServerState()
507    {
508      return serverState;
509    }
510
511    /**
512     * Get the group id.
513     * @return The group id
514     */
515    public byte getGroupId()
516    {
517      return rsInfo.getGroupId();
518    }
519
520    /**
521     * Get the server protocol version.
522     * @return the protocolVersion
523     */
524    public short getProtocolVersion()
525    {
526      return protocolVersion;
527    }
528
529    /**
530     * Get the generation id.
531     * @return the generationId
532     */
533    public long getGenerationId()
534    {
535      return rsInfo.getGenerationId();
536    }
537
538    /**
539     * Get the server id.
540     * @return the serverId
541     */
542    public int getServerId()
543    {
544      return rsInfo.getId();
545    }
546
547    /**
548     * Get the server URL.
549     * @return the serverURL
550     */
551    public String getServerURL()
552    {
553      return rsInfo.getServerUrl();
554    }
555
556    /**
557     * Get the base DN.
558     *
559     * @return the base DN
560     */
561    public DN getBaseDN()
562    {
563      return baseDN;
564    }
565
566    /**
567     * Get the window size.
568     * @return the windowSize
569     */
570    public int getWindowSize()
571    {
572      return windowSize;
573    }
574
575    /**
576     * Get the ssl encryption.
577     * @return the sslEncryption
578     */
579    public boolean isSslEncryption()
580    {
581      return sslEncryption;
582    }
583
584    /**
585     * Get the degraded status threshold.
586     * @return the degradedStatusThreshold
587     */
588    public int getDegradedStatusThreshold()
589    {
590      return degradedStatusThreshold;
591    }
592
593    /**
594     * Get the weight.
595     * @return the weight. Null if this object is a wrapper for
596     * a ReplServerStartMsg.
597     */
598    public int getWeight()
599    {
600      return rsInfo.getWeight();
601    }
602
603    /**
604     * Get the connected DS number.
605     * @return the connectedDSNumber. Null if this object is a wrapper for
606     * a ReplServerStartMsg.
607     */
608    public int getConnectedDSNumber()
609    {
610      return connectedDSNumber;
611    }
612
613    /**
614     * Converts the object to a RSInfo object.
615     * @return The RSInfo object matching this object.
616     */
617    RSInfo toRSInfo()
618    {
619      return rsInfo;
620    }
621
622    /**
623     * Updates replication server info with the passed RSInfo internal values
624     * and the passed connected DSs.
625     * @param rsInfo The RSinfo to use for the update
626     * @param connectedDSs The new connected DSs
627     */
628    private void update(RSInfo rsInfo, Set<Integer> connectedDSs)
629    {
630      this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
631          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
632      this.connectedDSs = connectedDSs;
633      this.connectedDSNumber = connectedDSs.size();
634    }
635
636    private void setServerURL(String newServerURL)
637    {
638      rsInfo = new RSInfo(rsInfo.getId(), newServerURL,
639          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
640    }
641
642    /**
643     * Updates replication server info with the passed server state.
644     * @param serverState The ServerState to use for the update
645     */
646    private void update(ServerState serverState)
647    {
648      this.serverState.update(serverState);
649    }
650
651    /**
652     * Get the getConnectedDSs.
653     * @return the getConnectedDSs
654     */
655    public Set<Integer> getConnectedDSs()
656    {
657      return connectedDSs;
658    }
659
660    /**
661     * Gets the locally configured status for this RS.
662     * @return the locallyConfigured
663     */
664    public boolean isLocallyConfigured()
665    {
666      return locallyConfigured;
667    }
668
669    /**
670     * Sets the locally configured status for this RS.
671     * @param locallyConfigured the locallyConfigured to set
672     */
673    public void setLocallyConfigured(boolean locallyConfigured)
674    {
675      this.locallyConfigured = locallyConfigured;
676    }
677
678    /**
679     * Returns a string representation of this object.
680     * @return A string representation of this object.
681     */
682    @Override
683    public String toString()
684    {
685      return "ReplServerInfo Url:" + getServerURL()
686          + " ServerId:" + getServerId()
687          + " GroupId:" + getGroupId()
688          + " connectedDSs:" + connectedDSs;
689    }
690  }
691
692  /**
693   * Contacts all replication servers to get information from them and being
694   * able to choose the more suitable.
695   * @return the collected information.
696   */
697  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
698  {
699    final Map<Integer, ReplicationServerInfo> rsInfos = new ConcurrentSkipListMap<>();
700
701    for (String serverUrl : getReplicationServerUrls())
702    {
703      // Connect to server + get and store info about it
704      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
705      final ReplicationServerInfo rsInfo = rs.rsInfo;
706      if (rsInfo != null)
707      {
708        rsInfos.put(rsInfo.getServerId(), rsInfo);
709      }
710    }
711
712    return rsInfos;
713  }
714
715  /**
716   * Connect to a ReplicationServer.
717   *
718   * Handshake sequences between a DS and a RS is divided into 2 logical
719   * consecutive phases (phase 1 and phase 2). DS always initiates connection
720   * and always sends first message:
721   *
722   * DS<->RS:
723   * -------
724   *
725   * phase 1:
726   * DS --- ServerStartMsg ---> RS
727   * DS <--- ReplServerStartDSMsg --- RS
728   * phase 2:
729   * DS --- StartSessionMsg ---> RS
730   * DS <--- TopologyMsg --- RS
731   *
732   * Before performing a full handshake sequence, DS searches for best suitable
733   * RS by making only phase 1 handshake to every RS he knows then closing
734   * connection. This allows to gather information on available RSs and then
735   * decide with which RS the full handshake (phase 1 then phase 2) will be
736   * finally performed.
737   *
738   * @throws NumberFormatException address was invalid
739   */
740  private void connectAsDataServer()
741  {
742    /*
743     * If a first connect or a connection failure occur, we go through here.
744     * force status machine to NOT_CONNECTED_STATUS so that monitoring can see
745     * that we are not connected.
746     */
747    domain.toNotConnectedStatus();
748
749    /*
750    Stop any existing heartbeat monitor and changeTime publisher
751    from a previous session.
752    */
753    stopRSHeartBeatMonitoring();
754    stopChangeTimeHeartBeatPublishing();
755    mustRunBestServerCheckingAlgorithm = 0;
756
757    synchronized (connectPhaseLock)
758    {
759      final int serverId = getServerId();
760      final DN baseDN = getBaseDN();
761
762      /*
763       * Connect to each replication server and get their ServerState then find
764       * out which one is the best to connect to.
765       */
766      if (logger.isTraceEnabled())
767      {
768        debugInfo("phase 1 : will perform PhaseOneH with each RS in order to elect the preferred one");
769      }
770
771      // Get info from every available replication servers
772      Map<Integer, ReplicationServerInfo> rsInfos =
773          collectReplicationServersInfo();
774      computeNewTopology(toRSInfos(rsInfos));
775
776      if (rsInfos.isEmpty())
777      {
778        setConnectedRS(ConnectedRS.noConnectedRS());
779      }
780      else
781      {
782        // At least one server answered, find the best one.
783        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
784            rsInfos, serverId, getGroupId(), getGenerationID());
785
786        // Best found, now initialize connection to this one (handshake phase 1)
787        if (logger.isTraceEnabled())
788        {
789          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" + evals.getBestRS());
790        }
791
792        final ConnectedRS electedRS = performPhaseOneHandshake(
793            evals.getBestRS().getServerURL(), true);
794        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
795        if (electedRsInfo != null)
796        {
797          /*
798          Update replication server info with potentially more up to date
799          data (server state for instance may have changed)
800          */
801          rsInfos.put(electedRsInfo.getServerId(), electedRsInfo);
802
803          // Handshake phase 1 exchange went well
804
805          // Compute in which status we are starting the session to tell the RS
806          final ServerStatus initStatus = computeInitialServerStatus(
807              electedRsInfo.getGenerationId(), electedRsInfo.getServerState(),
808              electedRsInfo.getDegradedStatusThreshold(), getGenerationID());
809
810          // Perform session start (handshake phase 2)
811          final TopologyMsg topologyMsg =
812              performPhaseTwoHandshake(electedRS, initStatus);
813
814          if (topologyMsg != null) // Handshake phase 2 exchange went well
815          {
816            connectToReplicationServer(electedRS, initStatus, topologyMsg);
817          } // Could perform handshake phase 2 with best
818        } // Could perform handshake phase 1 with best
819      }
820
821      // connectedRS has been updated by calls above, reload it
822      final ConnectedRS rs = connectedRS.get();
823      if (rs.isConnected())
824      {
825        connectPhaseLock.notify();
826
827        final long rsGenId = rs.rsInfo.getGenerationId();
828        final int rsServerId = rs.rsInfo.getServerId();
829        if (rsGenId == getGenerationID() || rsGenId == -1)
830        {
831          logger.info(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG, serverId, rsServerId, baseDN,
832              rs.replicationServer, getGenerationID());
833        }
834        else
835        {
836          logger.warn(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG, serverId, rsServerId, baseDN,
837              rs.replicationServer, getGenerationID(), rsGenId);
838        }
839      }
840      else
841      {
842         // This server could not find any replicationServer.
843         // It's going to start in degraded mode. Log a message.
844        if (!connectionError)
845        {
846          connectionError = true;
847          connectPhaseLock.notify();
848
849          if (!rsInfos.isEmpty())
850          {
851            logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN,
852                Utils.joinAsString(", ", rsInfos.keySet()));
853          }
854          else
855          {
856            logger.warn(WARN_NO_AVAILABLE_CHANGELOGS, serverId, baseDN);
857          }
858        }
859      }
860    }
861  }
862
863  private void computeNewTopology(List<RSInfo> newRSInfos)
864  {
865    final int rsServerId = getRsServerId();
866
867    Topology oldTopo;
868    Topology newTopo;
869    do
870    {
871      oldTopo = topology.get();
872      newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(),
873          rsServerId, getReplicationServerUrls(), oldTopo.rsInfos);
874    }
875    while (!topology.compareAndSet(oldTopo, newTopo));
876
877    if (logger.isTraceEnabled())
878    {
879      debugInfo(topologyChange(rsServerId, oldTopo, newTopo));
880    }
881  }
882
883  private StringBuilder topologyChange(int rsServerId, Topology oldTopo,
884      Topology newTopo)
885  {
886    final StringBuilder sb = new StringBuilder();
887    sb.append("rsServerId=").append(rsServerId);
888    if (newTopo.equals(oldTopo))
889    {
890      sb.append(", unchangedTopology=").append(newTopo);
891    }
892    else
893    {
894      sb.append(", oldTopology=").append(oldTopo);
895      sb.append(", newTopology=").append(newTopo);
896    }
897    return sb;
898  }
899
900  /**
901   * Connects to a replication server.
902   *
903   * @param rs
904   *          the Replication Server to connect to
905   * @param initStatus
906   *          The status to enter the state machine with
907   * @param topologyMsg
908   *          the message containing the topology information
909   */
910  private void connectToReplicationServer(ConnectedRS rs,
911      ServerStatus initStatus, TopologyMsg topologyMsg)
912  {
913    final DN baseDN = getBaseDN();
914    final ReplicationServerInfo rsInfo = rs.rsInfo;
915
916    boolean connectCompleted = false;
917    try
918    {
919      maxSendWindow = rsInfo.getWindowSize();
920
921      receiveTopo(topologyMsg, rs.getServerId());
922
923      /*
924      Log a message to let the administrator know that the failure was resolved.
925      Wake up all the thread that were waiting on the window
926      on the previous connection.
927      */
928      connectionError = false;
929      if (sendWindow != null)
930      {
931        /*
932         * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding
933         * this semaphore will get blocked when they acquire it. However, we
934         * also need to make sure that we don't overflow the semaphore by
935         * releasing too many permits.
936         */
937        final int MAX_PERMITS = Integer.MAX_VALUE >>> 2;
938        if (sendWindow.availablePermits() < MAX_PERMITS)
939        {
940          /*
941           * At least 2^29 acquisitions would need to occur for this to be
942           * insufficient. In addition, at least 2^30 releases would need to
943           * occur for this to potentially overflow. Hopefully this is unlikely
944           * to happen.
945           */
946          sendWindow.release(MAX_PERMITS);
947        }
948      }
949      sendWindow = new Semaphore(maxSendWindow);
950      rcvWindow = getMaxRcvWindow();
951
952      domain.sessionInitiated(initStatus, rsInfo.getServerState());
953
954      final byte groupId = getGroupId();
955      if (rs.getGroupId() != groupId)
956      {
957        /*
958        Connected to replication server with wrong group id:
959        warn user and start heartbeat monitor to recover when a server
960        with the right group id shows up.
961        */
962        logger.warn(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID,
963            groupId, rs.getServerId(), rsInfo.getServerURL(), rs.getGroupId(), baseDN, getServerId());
964      }
965      startRSHeartBeatMonitoring(rs);
966      if (rsInfo.getProtocolVersion() >=
967        ProtocolVersion.REPLICATION_PROTOCOL_V3)
968      {
969        startChangeTimeHeartBeatPublishing(rs);
970      }
971      connectCompleted = true;
972    }
973    catch (Exception e)
974    {
975      logger.error(ERR_COMPUTING_FAKE_OPS, baseDN, rsInfo.getServerURL(),
976          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
977    }
978    finally
979    {
980      if (!connectCompleted)
981      {
982        setConnectedRS(ConnectedRS.noConnectedRS());
983      }
984    }
985  }
986
987  /**
988   * Determines the status we are starting with according to our state and the
989   * RS state.
990   *
991   * @param rsGenId The generation id of the RS
992   * @param rsState The server state of the RS
993   * @param degradedStatusThreshold The degraded status threshold of the RS
994   * @param dsGenId The local generation id
995   * @return The initial status
996   */
997  private ServerStatus computeInitialServerStatus(long rsGenId,
998    ServerState rsState, int degradedStatusThreshold, long dsGenId)
999  {
1000    if (rsGenId == -1)
1001    {
1002      // RS has no generation id
1003      return ServerStatus.NORMAL_STATUS;
1004    }
1005    else if (rsGenId != dsGenId)
1006    {
1007      // DS and RS do not have same generation id
1008      return ServerStatus.BAD_GEN_ID_STATUS;
1009    }
1010    else
1011    {
1012      /*
1013      DS and RS have same generation id
1014
1015      Determine if we are late or not to replay changes. RS uses a
1016      threshold value for pending changes to be replayed by a DS to
1017      determine if the DS is in normal status or in degraded status.
1018      Let's compare the local and remote server state using  this threshold
1019      value to determine if we are late or not
1020      */
1021
1022      int nChanges = ServerState.diffChanges(rsState, state);
1023      if (logger.isTraceEnabled())
1024      {
1025        debugInfo("computed " + nChanges + " changes late.");
1026      }
1027
1028      /*
1029      Check status to know if it is relevant to change the status. Do not
1030      take RSD lock to test. If we attempt to change the status whereas
1031      we are in a status that do not allows that, this will be noticed by
1032      the changeStatusFromStatusAnalyzer method. This allows to take the
1033      lock roughly only when needed versus every sleep time timeout.
1034      */
1035      if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
1036      {
1037        return ServerStatus.DEGRADED_STATUS;
1038      }
1039      // degradedStatusThreshold value of '0' means no degrading system used
1040      // (no threshold): force normal status
1041      return ServerStatus.NORMAL_STATUS;
1042    }
1043  }
1044
1045
1046
1047  /**
1048   * Connect to the provided server performing the first phase handshake (start
1049   * messages exchange) and return the reply message from the replication
1050   * server, wrapped in a ReplicationServerInfo object.
1051   *
1052   * @param serverURL
1053   *          Server to connect to.
1054   * @param keepSession
1055   *          Do we keep session opened or not after handshake. Use true if want
1056   *          to perform handshake phase 2 with the same session and keep the
1057   *          session to create as the current one.
1058   * @return The answer from the server . Null if could not get an answer.
1059   */
1060  private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
1061  {
1062    Session newSession = null;
1063    Socket socket = null;
1064    boolean hasConnected = false;
1065    LocalizableMessage errorMessage = null;
1066
1067    try
1068    {
1069      // Open a socket connection to the next candidate.
1070      socket = new Socket();
1071      socket.setReceiveBufferSize(1000000);
1072      socket.setTcpNoDelay(true);
1073      if (config.getSourceAddress() != null)
1074      {
1075        InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0);
1076        socket.bind(local);
1077      }
1078      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
1079      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
1080      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
1081      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
1082
1083      // Send our ServerStartMsg.
1084      final HostPort hp = new HostPort(
1085          socket.getLocalAddress().getHostName(), socket.getLocalPort());
1086      final String url = hp.toString();
1087      final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
1088          getMaxRcvWindow(), config.getHeartbeatInterval(), state,
1089          getGenerationID(), isSslEncryption, getGroupId());
1090      newSession.publish(serverStartMsg);
1091
1092      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
1093      // come back.
1094      ReplicationMsg msg = newSession.receive();
1095      if (logger.isTraceEnabled())
1096      {
1097        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
1098            + msg);
1099      }
1100
1101      // Wrap received message in a server info object
1102      final ReplicationServerInfo replServerInfo =
1103          ReplicationServerInfo.newInstance(msg, serverURL);
1104
1105      // Sanity check
1106      final DN repDN = replServerInfo.getBaseDN();
1107      if (!getBaseDN().equals(repDN))
1108      {
1109        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDN, getBaseDN());
1110        return setConnectedRS(ConnectedRS.noConnectedRS());
1111      }
1112
1113      /*
1114       * We have sent our own protocol version to the replication server. The
1115       * replication server will use the same one (or an older one if it is an
1116       * old replication server).
1117       */
1118      newSession.setProtocolVersion(
1119          getCompatibleVersion(replServerInfo.getProtocolVersion()));
1120
1121      if (!isSslEncryption)
1122      {
1123        newSession.stopEncryption();
1124      }
1125
1126      hasConnected = true;
1127
1128      if (keepSession)
1129      {
1130        // cannot store it yet,
1131        // only store after a successful phase two handshake
1132        return new ConnectedRS(replServerInfo, newSession);
1133      }
1134      return new ConnectedRS(replServerInfo, null);
1135    }
1136    catch (ConnectException e)
1137    {
1138      logger.traceException(e);
1139      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), serverURL, getBaseDN());
1140    }
1141    catch (SocketTimeoutException e)
1142    {
1143      logger.traceException(e);
1144      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), serverURL, getBaseDN());
1145    }
1146    catch (Exception e)
1147    {
1148      logger.traceException(e);
1149      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
1150          getServerId(), serverURL, getBaseDN(), stackTraceToSingleLineString(e));
1151    }
1152    finally
1153    {
1154      if (!hasConnected || !keepSession)
1155      {
1156        close(newSession);
1157        close(socket);
1158      }
1159
1160      if (!hasConnected && errorMessage != null && !connectionError)
1161      {
1162        // There was no server waiting on this host:port
1163        // Log a notice and will try the next replicationServer in the list
1164        if (keepSession) // Log error message only for final connection
1165        {
1166          // log the error message only once to avoid overflowing the error log
1167          logger.error(errorMessage);
1168        }
1169
1170        logger.trace(errorMessage);
1171      }
1172    }
1173    return setConnectedRS(ConnectedRS.noConnectedRS());
1174  }
1175
1176  /**
1177   * Performs the second phase handshake (send StartSessionMsg and receive
1178   * TopologyMsg messages exchange) and return the reply message from the
1179   * replication server.
1180   *
1181   * @param electedRS Server we are connecting with.
1182   * @param initStatus The status we are starting with
1183   * @return The ReplServerStartMsg the server replied. Null if could not
1184   *         get an answer.
1185   */
1186  private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS,
1187    ServerStatus initStatus)
1188  {
1189    try
1190    {
1191      // Send our StartSessionMsg.
1192      final StartSessionMsg startSessionMsg;
1193      startSessionMsg = new StartSessionMsg(
1194          initStatus,
1195          domain.getRefUrls(),
1196          domain.isAssured(),
1197          domain.getAssuredMode(),
1198          domain.getAssuredSdLevel());
1199      startSessionMsg.setEclIncludes(
1200          domain.getEclIncludes(domain.getServerId()),
1201          domain.getEclIncludesForDeletes(domain.getServerId()));
1202      final Session session = electedRS.session;
1203      session.publish(startSessionMsg);
1204
1205      // Read the TopologyMsg that should come back.
1206      final TopologyMsg topologyMsg = (TopologyMsg) session.receive();
1207
1208      if (logger.isTraceEnabled())
1209      {
1210        debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg
1211            + "\nAND RECEIVED:\n" + topologyMsg);
1212      }
1213
1214      // Alright set the timeout to the desired value
1215      session.setSoTimeout(timeout);
1216      setConnectedRS(electedRS);
1217      return topologyMsg;
1218    }
1219    catch (Exception e)
1220    {
1221      logger.error(WARN_EXCEPTION_STARTING_SESSION_PHASE,
1222          getServerId(), electedRS.rsInfo.getServerURL(), getBaseDN(), stackTraceToSingleLineString(e));
1223
1224      setConnectedRS(ConnectedRS.noConnectedRS());
1225      return null;
1226    }
1227  }
1228
1229  /**
1230   * Class holding evaluation results for electing the best replication server
1231   * for the local directory server.
1232   */
1233  static class RSEvaluations
1234  {
1235    private final int localServerId;
1236    private Map<Integer, ReplicationServerInfo> bestRSs;
1237    private final Map<Integer, LocalizableMessage> rsEvals = new HashMap<>();
1238
1239    /**
1240     * Ctor.
1241     *
1242     * @param localServerId
1243     *          the serverId for the local directory server
1244     * @param rsInfos
1245     *          a Map of serverId => {@link ReplicationServerInfo} with all the
1246     *          candidate replication servers
1247     */
1248    RSEvaluations(int localServerId,
1249        Map<Integer, ReplicationServerInfo> rsInfos)
1250    {
1251      this.localServerId = localServerId;
1252      this.bestRSs = rsInfos;
1253    }
1254
1255    private boolean keepBest(LocalEvaluation eval)
1256    {
1257      if (eval.hasAcceptedAny())
1258      {
1259        bestRSs = eval.getAccepted();
1260        rsEvals.putAll(eval.getRejected());
1261        return true;
1262      }
1263      return false;
1264    }
1265
1266    /**
1267     * Sets the elected best replication server, rejecting all the other
1268     * replication servers with the supplied evaluation.
1269     *
1270     * @param bestRsId
1271     *          the serverId of the elected replication server
1272     * @param rejectedRSsEval
1273     *          the evaluation for all the rejected replication servers
1274     */
1275    private void setBestRS(int bestRsId, LocalizableMessage rejectedRSsEval)
1276    {
1277      for (Iterator<Entry<Integer, ReplicationServerInfo>> it =
1278          this.bestRSs.entrySet().iterator(); it.hasNext();)
1279      {
1280        final Entry<Integer, ReplicationServerInfo> entry = it.next();
1281        final Integer rsId = entry.getKey();
1282        final ReplicationServerInfo rsInfo = entry.getValue();
1283        if (rsInfo.getServerId() != bestRsId)
1284        {
1285          it.remove();
1286        }
1287        rsEvals.put(rsId, rejectedRSsEval);
1288      }
1289    }
1290
1291    private void discardAll(LocalizableMessage eval)
1292    {
1293      for (Integer rsId : bestRSs.keySet())
1294      {
1295        rsEvals.put(rsId, eval);
1296      }
1297    }
1298
1299    private boolean foundBestRS()
1300    {
1301      return bestRSs.size() == 1;
1302    }
1303
1304    /**
1305     * Returns the {@link ReplicationServerInfo} for the best replication
1306     * server.
1307     *
1308     * @return the {@link ReplicationServerInfo} for the best replication server
1309     */
1310    ReplicationServerInfo getBestRS()
1311    {
1312      if (foundBestRS())
1313      {
1314        return bestRSs.values().iterator().next();
1315      }
1316      return null;
1317    }
1318
1319    /**
1320     * Returns the evaluations for all the candidate replication servers.
1321     *
1322     * @return a Map of serverId => LocalizableMessage containing the evaluation for each
1323     *         candidate replication servers.
1324     */
1325    Map<Integer, LocalizableMessage> getEvaluations()
1326    {
1327      if (foundBestRS())
1328      {
1329        final Integer bestRSServerId = getBestRS().getServerId();
1330        if (rsEvals.get(bestRSServerId) == null)
1331        {
1332          final LocalizableMessage eval = NOTE_BEST_RS.get(bestRSServerId, localServerId);
1333          rsEvals.put(bestRSServerId, eval);
1334        }
1335      }
1336      return Collections.unmodifiableMap(rsEvals);
1337    }
1338
1339    /**
1340     * Returns the evaluation for the supplied replication server Id.
1341     * <p>
1342     * Note: "unknown RS" message is returned if the supplied replication server
1343     * was not part of the candidate replication servers.
1344     *
1345     * @param rsServerId
1346     *          the supplied replication server Id
1347     * @return the evaluation {@link LocalizableMessage} for the supplied replication
1348     *         server Id
1349     */
1350    private LocalizableMessage getEvaluation(int rsServerId)
1351    {
1352      final LocalizableMessage evaluation = getEvaluations().get(rsServerId);
1353      if (evaluation != null)
1354      {
1355        return evaluation;
1356      }
1357      return NOTE_UNKNOWN_RS.get(rsServerId, localServerId);
1358    }
1359
1360    /** {@inheritDoc} */
1361    @Override
1362    public String toString()
1363    {
1364      return "Current best replication server Ids: " + bestRSs.keySet()
1365          + ", Evaluation of connected replication servers"
1366          + " (ServerId => Evaluation): " + rsEvals.keySet()
1367          + ", Any replication server not appearing here"
1368          + " could not be contacted.";
1369    }
1370  }
1371
1372  /**
1373   * Evaluation local to one filter.
1374   */
1375  private static class LocalEvaluation
1376  {
1377    private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>();
1378    private final Map<ReplicationServerInfo, LocalizableMessage> rsEvals = new HashMap<>();
1379
1380    private void accept(Integer rsId, ReplicationServerInfo rsInfo)
1381    {
1382      // forget previous eval, including undoing reject
1383      this.rsEvals.remove(rsInfo);
1384      this.accepted.put(rsId, rsInfo);
1385    }
1386
1387    private void reject(ReplicationServerInfo rsInfo, LocalizableMessage reason)
1388    {
1389      this.accepted.remove(rsInfo.getServerId()); // undo accept
1390      this.rsEvals.put(rsInfo, reason);
1391    }
1392
1393    private Map<Integer, ReplicationServerInfo> getAccepted()
1394    {
1395      return accepted;
1396    }
1397
1398    private ReplicationServerInfo[] getAcceptedRSInfos()
1399    {
1400      return accepted.values().toArray(
1401          new ReplicationServerInfo[accepted.size()]);
1402    }
1403
1404    public Map<Integer, LocalizableMessage> getRejected()
1405    {
1406      final Map<Integer, LocalizableMessage> result = new HashMap<>();
1407      for (Entry<ReplicationServerInfo, LocalizableMessage> entry : rsEvals.entrySet())
1408      {
1409        result.put(entry.getKey().getServerId(), entry.getValue());
1410      }
1411      return result;
1412    }
1413
1414    private boolean hasAcceptedAny()
1415    {
1416      return !accepted.isEmpty();
1417    }
1418
1419  }
1420
1421  /**
1422   * Returns the replication server that best fits our need so that we can
1423   * connect to it or determine if we must disconnect from current one to
1424   * re-connect to best server.
1425   * <p>
1426   * Note: this method is static for test purpose (access from unit tests)
1427   *
1428   * @param firstConnection True if we run this method for the very first
1429   * connection of the broker. False if we run this method to determine if the
1430   * replication server we are currently connected to is still the best or not.
1431   * @param rsServerId The id of the replication server we are currently
1432   * connected to. Only used when firstConnection is false.
1433   * @param myState The local server state.
1434   * @param rsInfos The list of available replication servers and their
1435   * associated information (choice will be made among them).
1436   * @param localServerId The server id for the suffix we are working for.
1437   * @param groupId The groupId we prefer being connected to if possible
1438   * @param generationId The generation id we are using
1439   * @return The computed best replication server. If the returned value is
1440   * null, the best replication server is undetermined but the local server must
1441   * disconnect (so the best replication server is another one than the current
1442   * one). Null can only be returned when firstConnection is false.
1443   */
1444  static RSEvaluations computeBestReplicationServer(
1445      boolean firstConnection, int rsServerId, ServerState myState,
1446      Map<Integer, ReplicationServerInfo> rsInfos, int localServerId,
1447      byte groupId, long generationId)
1448  {
1449    final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos);
1450    // Shortcut, if only one server, this is the best
1451    if (evals.foundBestRS())
1452    {
1453      return evals;
1454    }
1455
1456    /**
1457     * Apply some filtering criteria to determine the best servers list from
1458     * the available ones. The ordered list of criteria is (from more important
1459     * to less important):
1460     * - replication server has the same group id as the local DS one
1461     * - replication server has the same generation id as the local DS one
1462     * - replication server is up to date regarding changes generated by the
1463     *   local DS
1464     * - replication server in the same VM as local DS one
1465     */
1466    /*
1467    The list of best replication servers is filtered with each criteria. At
1468    each criteria, the list is replaced with the filtered one if there
1469    are some servers from the filtering, otherwise, the list is left as is
1470    and the new filtering for the next criteria is applied and so on.
1471
1472    Use only servers locally configured: those are servers declared in
1473    the local configuration. When the current method is called, for
1474    sure, at least one server from the list is locally configured
1475    */
1476    filterServersLocallyConfigured(evals, localServerId);
1477    // Some servers with same group id ?
1478    filterServersWithSameGroupId(evals, localServerId, groupId);
1479    // Some servers with same generation id ?
1480    final boolean rssWithSameGenerationIdExist =
1481        filterServersWithSameGenerationId(evals, localServerId, generationId);
1482    if (rssWithSameGenerationIdExist)
1483    {
1484      // If some servers with the right generation id this is useful to
1485      // run the local DS change criteria
1486      filterServersWithAllLocalDSChanges(evals, myState, localServerId);
1487    }
1488    // Some servers in the local VM or local host?
1489    filterServersOnSameHost(evals, localServerId);
1490
1491    if (evals.foundBestRS())
1492    {
1493      return evals;
1494    }
1495
1496    /**
1497     * Now apply the choice based on the weight to the best servers list
1498     */
1499    if (firstConnection)
1500    {
1501      // We are not connected to a server yet
1502      computeBestServerForWeight(evals, -1, -1);
1503    }
1504    else
1505    {
1506      /*
1507       * We are already connected to a RS: compute the best RS as far as the
1508       * weights is concerned. If this is another one, some DS must disconnect.
1509       */
1510      computeBestServerForWeight(evals, rsServerId, localServerId);
1511    }
1512    return evals;
1513  }
1514
1515  /**
1516   * Creates a new list that contains only replication servers that are locally
1517   * configured.
1518   * @param evals The evaluation object
1519   */
1520  private static void filterServersLocallyConfigured(RSEvaluations evals,
1521      int localServerId)
1522  {
1523    final LocalEvaluation eval = new LocalEvaluation();
1524    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1525    {
1526      final Integer rsId = entry.getKey();
1527      final ReplicationServerInfo rsInfo = entry.getValue();
1528      if (rsInfo.isLocallyConfigured())
1529      {
1530        eval.accept(rsId, rsInfo);
1531      }
1532      else
1533      {
1534        eval.reject(rsInfo,
1535            NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId));
1536      }
1537    }
1538    evals.keepBest(eval);
1539  }
1540
1541  /**
1542   * Creates a new list that contains only replication servers that have the
1543   * passed group id, from a passed replication server list.
1544   * @param evals The evaluation object
1545   * @param groupId The group id that must match
1546   */
1547  private static void filterServersWithSameGroupId(RSEvaluations evals,
1548      int localServerId, byte groupId)
1549  {
1550    final LocalEvaluation eval = new LocalEvaluation();
1551    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1552    {
1553      final Integer rsId = entry.getKey();
1554      final ReplicationServerInfo rsInfo = entry.getValue();
1555      if (rsInfo.getGroupId() == groupId)
1556      {
1557        eval.accept(rsId, rsInfo);
1558      }
1559      else
1560      {
1561        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get(
1562            rsId, rsInfo.getGroupId(), localServerId, groupId));
1563      }
1564    }
1565    evals.keepBest(eval);
1566  }
1567
1568  /**
1569   * Creates a new list that contains only replication servers that have the
1570   * provided generation id, from a provided replication server list.
1571   * When the selected replication servers have no change (empty serverState)
1572   * then the 'empty'(generationId==-1) replication servers are also included
1573   * in the result list.
1574   *
1575   * @param evals The evaluation object
1576   * @param generationId The generation id that must match
1577   * @return whether some replication server passed the filter
1578   */
1579  private static boolean filterServersWithSameGenerationId(
1580      RSEvaluations evals, long localServerId, long generationId)
1581  {
1582    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1583    final LocalEvaluation eval = new LocalEvaluation();
1584    boolean emptyState = true;
1585
1586    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1587    {
1588      final Integer rsId = entry.getKey();
1589      final ReplicationServerInfo rsInfo = entry.getValue();
1590      if (rsInfo.getGenerationId() == generationId)
1591      {
1592        eval.accept(rsId, rsInfo);
1593        if (!rsInfo.serverState.isEmpty())
1594        {
1595          emptyState = false;
1596        }
1597      }
1598      else if (rsInfo.getGenerationId() == -1)
1599      {
1600        eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId,
1601            generationId, localServerId));
1602      }
1603      else
1604      {
1605        eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get(
1606            rsId, rsInfo.getGenerationId(), localServerId, generationId));
1607      }
1608    }
1609
1610    if (emptyState)
1611    {
1612      // If the RS with a generationId have all an empty state,
1613      // then the 'empty'(genId=-1) RSes are also candidate
1614      for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1615      {
1616        ReplicationServerInfo rsInfo = entry.getValue();
1617        if (rsInfo.getGenerationId() == -1)
1618        {
1619          // will undo the reject of previously rejected RSs
1620          eval.accept(entry.getKey(), rsInfo);
1621        }
1622      }
1623    }
1624
1625    return evals.keepBest(eval);
1626  }
1627
1628  /**
1629   * Creates a new list that contains only replication servers that have the
1630   * latest changes from the passed DS, from a passed replication server list.
1631   * @param evals The evaluation object
1632   * @param localState The state of the local DS
1633   * @param localServerId The server id to consider for the changes
1634   */
1635  private static void filterServersWithAllLocalDSChanges(
1636      RSEvaluations evals, ServerState localState, int localServerId)
1637  {
1638    // Extract the CSN of the latest change generated by the local server
1639    final CSN localCSN = getCSN(localState, localServerId);
1640
1641    /**
1642     * Find replication servers that are up to date (or more up to date than us,
1643     * if for instance we failed and restarted, having sent some changes to the
1644     * RS but without having time to store our own state) regarding our own
1645     * server id. If some servers are more up to date, prefer this list but take
1646     * only the latest CSN.
1647     */
1648    final LocalEvaluation mostUpToDateEval = new LocalEvaluation();
1649    boolean foundRSMoreUpToDateThanLocalDS = false;
1650    CSN latestRsCSN = null;
1651    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1652    {
1653      final Integer rsId = entry.getKey();
1654      final ReplicationServerInfo rsInfo = entry.getValue();
1655      final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId);
1656
1657      // Has this replication server the latest local change ?
1658      if (rsCSN.isOlderThan(localCSN))
1659      {
1660        mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get(
1661            rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1662      }
1663      else if (rsCSN.equals(localCSN))
1664      {
1665        // This replication server has exactly the latest change from the
1666        // local server
1667        if (!foundRSMoreUpToDateThanLocalDS)
1668        {
1669          mostUpToDateEval.accept(rsId, rsInfo);
1670        }
1671        else
1672        {
1673          mostUpToDateEval.reject(rsInfo,
1674            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1675              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1676        }
1677      }
1678      else if (rsCSN.isNewerThan(localCSN))
1679      {
1680        // This replication server is even more up to date than the local server
1681        if (latestRsCSN == null)
1682        {
1683          foundRSMoreUpToDateThanLocalDS = true;
1684          // all previous results are now outdated, reject them all
1685          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
1686              localCSN);
1687          // Initialize the latest CSN
1688          latestRsCSN = rsCSN;
1689        }
1690
1691        if (rsCSN.equals(latestRsCSN))
1692        {
1693          mostUpToDateEval.accept(rsId, rsInfo);
1694        }
1695        else if (rsCSN.isNewerThan(latestRsCSN))
1696        {
1697          // This RS is even more up to date, reject all previously accepted RSs
1698          // and store this new RS
1699          rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId,
1700              localCSN);
1701          mostUpToDateEval.accept(rsId, rsInfo);
1702          latestRsCSN = rsCSN;
1703        }
1704        else
1705        {
1706          mostUpToDateEval.reject(rsInfo,
1707            NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1708              rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI()));
1709        }
1710      }
1711    }
1712    evals.keepBest(mostUpToDateEval);
1713  }
1714
1715  private static CSN getCSN(ServerState state, int serverId)
1716  {
1717    final CSN csn = state.getCSN(serverId);
1718    if (csn != null)
1719    {
1720      return csn;
1721    }
1722    return new CSN(0, 0, serverId);
1723  }
1724
1725  private static void rejectAllWithRSIsLaterThanBestRS(
1726      final LocalEvaluation eval, int localServerId, CSN localCSN)
1727  {
1728    for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos())
1729    {
1730      final String rsCSN =
1731          getCSN(rsInfo.getServerState(), localServerId).toStringUI();
1732      final LocalizableMessage reason =
1733          NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get(
1734            rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI());
1735      eval.reject(rsInfo, reason);
1736    }
1737  }
1738
1739  /**
1740   * Creates a new list that contains only replication servers that are on the
1741   * same host as the local DS, from a passed replication server list. This
1742   * method will gives priority to any replication server which is in the same
1743   * VM as this DS.
1744   *
1745   * @param evals The evaluation object
1746   */
1747  private static void filterServersOnSameHost(RSEvaluations evals,
1748      int localServerId)
1749  {
1750    /*
1751     * Initially look for all servers on the same host. If we find one in the
1752     * same VM, then narrow the search.
1753     */
1754    boolean foundRSInSameVM = false;
1755    final LocalEvaluation eval = new LocalEvaluation();
1756    for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet())
1757    {
1758      final Integer rsId = entry.getKey();
1759      final ReplicationServerInfo rsInfo = entry.getValue();
1760      final HostPort hp = HostPort.valueOf(rsInfo.getServerURL());
1761      if (hp.isLocalAddress())
1762      {
1763        if (isLocalReplicationServerPort(hp.getPort()))
1764        {
1765          if (!foundRSInSameVM)
1766          {
1767            // An RS in the same VM will always have priority.
1768            // Narrow the search to only include servers in this VM.
1769            rejectAllWithRSOnDifferentVMThanDS(eval, localServerId);
1770            foundRSInSameVM = true;
1771          }
1772          eval.accept(rsId, rsInfo);
1773        }
1774        else if (!foundRSInSameVM)
1775        {
1776          // OK, accept RSs on the same machine because we have not found an RS
1777          // in the same VM yet
1778          eval.accept(rsId, rsInfo);
1779        }
1780        else
1781        {
1782          // Skip: we have found some RSs in the same VM, but this RS is not.
1783          eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId,
1784              localServerId));
1785        }
1786      }
1787      else
1788      {
1789        eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId,
1790            localServerId));
1791      }
1792    }
1793    evals.keepBest(eval);
1794  }
1795
1796  private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval,
1797      int localServerId)
1798  {
1799    for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos())
1800    {
1801      eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(
1802          rsInfo.getServerId(), localServerId));
1803    }
1804  }
1805
1806  /**
1807   * Computes the best replication server the local server should be connected
1808   * to so that the load is correctly spread across the topology, following the
1809   * weights guidance.
1810   * Warning: This method is expected to be called with at least 2 servers in
1811   * bestServers
1812   * Note: this method is static for test purpose (access from unit tests)
1813   * @param evals The evaluation object
1814   * @param currentRsServerId The replication server the local server is
1815   *        currently connected to. -1 if the local server is not yet connected
1816   *        to any replication server.
1817   * @param localServerId The server id of the local server. This is not used
1818   *        when it is not connected to a replication server
1819   *        (currentRsServerId = -1)
1820   */
1821  static void computeBestServerForWeight(RSEvaluations evals,
1822      int currentRsServerId, int localServerId)
1823  {
1824    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1825    /*
1826     * - Compute the load goal of each RS, deducing it from the weights affected
1827     * to them.
1828     * - Compute the current load of each RS, deducing it from the DSs
1829     * currently connected to them.
1830     * - Compute the differences between the load goals and the current loads of
1831     * the RSs.
1832     */
1833    // Sum of the weights
1834    int sumOfWeights = 0;
1835    // Sum of the connected DSs
1836    int sumOfConnectedDSs = 0;
1837    for (ReplicationServerInfo rsInfo : bestServers.values())
1838    {
1839      sumOfWeights += rsInfo.getWeight();
1840      sumOfConnectedDSs += rsInfo.getConnectedDSNumber();
1841    }
1842
1843    // Distance (difference) of the current loads to the load goals of each RS:
1844    // key:server id, value: distance
1845    Map<Integer, BigDecimal> loadDistances = new HashMap<>();
1846    // Precision for the operations (number of digits after the dot)
1847    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
1848    for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet())
1849    {
1850      final Integer rsId = entry.getKey();
1851      final ReplicationServerInfo rsInfo = entry.getValue();
1852
1853      //  load goal = rs weight / sum of weights
1854      BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide(
1855          BigDecimal.valueOf(sumOfWeights), mathContext);
1856      BigDecimal currentLoadBd = BigDecimal.ZERO;
1857      if (sumOfConnectedDSs != 0)
1858      {
1859        // current load = number of connected DSs / total number of DSs
1860        int connectedDSs = rsInfo.getConnectedDSNumber();
1861        currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
1862            BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
1863      }
1864      // load distance = load goal - current load
1865      BigDecimal loadDistanceBd =
1866        loadGoalBd.subtract(currentLoadBd, mathContext);
1867      loadDistances.put(rsId, loadDistanceBd);
1868    }
1869
1870    if (currentRsServerId == -1)
1871    {
1872      // The local server is not connected yet, find best server to connect to,
1873      // taking the weights into account.
1874      computeBestServerWhenNotConnected(evals, loadDistances, localServerId);
1875    }
1876    else
1877    {
1878      // The local server is currently connected to a RS, let's see if it must
1879      // disconnect or not, taking the weights into account.
1880      computeBestServerWhenConnected(evals, loadDistances, localServerId,
1881          currentRsServerId, sumOfWeights, sumOfConnectedDSs);
1882    }
1883  }
1884
1885  private static void computeBestServerWhenNotConnected(RSEvaluations evals,
1886      Map<Integer, BigDecimal> loadDistances, int localServerId)
1887  {
1888    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1889    /*
1890     * Find the server with the current highest distance to its load goal and
1891     * choose it. Make an exception if every server is correctly balanced,
1892     * that is every current load distances are equal to 0, in that case,
1893     * choose the server with the highest weight
1894     */
1895    int bestRsId = 0; // If all server equal, return the first one
1896    float highestDistance = Float.NEGATIVE_INFINITY;
1897    boolean allRsWithZeroDistance = true;
1898    int highestWeightRsId = -1;
1899    int highestWeight = -1;
1900    for (Integer rsId : bestServers.keySet())
1901    {
1902      float loadDistance = loadDistances.get(rsId).floatValue();
1903      if (loadDistance > highestDistance)
1904      {
1905        // This server is far more from its balance point
1906        bestRsId = rsId;
1907        highestDistance = loadDistance;
1908      }
1909      if (loadDistance != 0)
1910      {
1911        allRsWithZeroDistance = false;
1912      }
1913      int weight = bestServers.get(rsId).getWeight();
1914      if (weight > highestWeight)
1915      {
1916        // This server has a higher weight
1917        highestWeightRsId = rsId;
1918        highestWeight = weight;
1919      }
1920    }
1921    // All servers with a 0 distance ?
1922    if (allRsWithZeroDistance)
1923    {
1924      // Choose server with the highest weight
1925      bestRsId = highestWeightRsId;
1926    }
1927    evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId,
1928        bestRsId));
1929  }
1930
1931  private static void computeBestServerWhenConnected(RSEvaluations evals,
1932      Map<Integer, BigDecimal> loadDistances, int localServerId,
1933      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
1934  {
1935    final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs;
1936    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
1937    float currentLoadDistance =
1938      loadDistances.get(currentRsServerId).floatValue();
1939    if (currentLoadDistance < 0)
1940    {
1941      /*
1942      Too much DSs connected to the current RS, compared with its load
1943      goal:
1944      Determine the potential number of DSs to disconnect from the current
1945      RS and see if the local DS is part of them: the DSs that must
1946      disconnect are those with the lowest server id.
1947      Compute the sum of the distances of the load goals of the other RSs
1948      */
1949      BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
1950      for (Integer rsId : bestServers.keySet())
1951      {
1952        if (rsId != currentRsServerId)
1953        {
1954          sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
1955            loadDistances.get(rsId), mathContext);
1956        }
1957      }
1958
1959      if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
1960      {
1961        /*
1962        The average distance of the other RSs shows a lack of DSs.
1963        Compute the number of DSs to disconnect from the current RS,
1964        rounding to the nearest integer number. Do only this if there is
1965        no risk of yoyo effect: when the exact balance cannot be
1966        established due to the current number of DSs connected, do not
1967        disconnect a DS. A simple example where the balance cannot be
1968        reached is:
1969        - RS1 has weight 1 and 2 DSs
1970        - RS2 has weight 1 and 1 DS
1971        => disconnecting a DS from RS1 to reconnect it to RS2 would have no
1972        sense as this would lead to the reverse situation. In that case,
1973        the perfect balance cannot be reached and we must stick to the
1974        current situation, otherwise the DS would keep move between the 2
1975        RSs
1976        */
1977        float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
1978          multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
1979              .floatValue();
1980        int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
1981
1982        // Avoid yoyo effect
1983        if (overloadingDSsNumber == 1)
1984        {
1985          // What would be the new load distance for the current RS if
1986          // we disconnect some DSs ?
1987          ReplicationServerInfo currentReplicationServerInfo =
1988            bestServers.get(currentRsServerId);
1989
1990          int currentRsWeight = currentReplicationServerInfo.getWeight();
1991          BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
1992          BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
1993          BigDecimal currentRsLoadGoalBd =
1994            currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
1995          BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
1996          if (sumOfConnectedDSs != 0)
1997          {
1998            int connectedDSs = currentReplicationServerInfo.
1999              getConnectedDSNumber();
2000            BigDecimal potentialNewConnectedDSsBd =
2001                BigDecimal.valueOf(connectedDSs - 1);
2002            BigDecimal sumOfConnectedDSsBd =
2003                BigDecimal.valueOf(sumOfConnectedDSs);
2004            potentialCurrentRsNewLoadBd =
2005              potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
2006                mathContext);
2007          }
2008          BigDecimal potentialCurrentRsNewLoadDistanceBd =
2009            currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
2010              mathContext);
2011
2012          // What would be the new load distance for the other RSs ?
2013          BigDecimal additionalDsLoadBd =
2014              BigDecimal.ONE.divide(
2015                  BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
2016          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
2017            sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
2018                  mathContext);
2019
2020          /*
2021          Now compare both values: we must not disconnect the DS if this
2022          is for going in a situation where the load distance of the other
2023          RSs is the opposite of the future load distance of the local RS
2024          or we would evaluate that we should disconnect just after being
2025          arrived on the new RS. But we should disconnect if we reach the
2026          perfect balance (both values are 0).
2027          */
2028          if (mustAvoidYoyoEffect(potentialCurrentRsNewLoadDistanceBd,
2029              potentialNewSumOfLoadDistancesOfOtherRSsBd))
2030          {
2031            // Avoid the yoyo effect, and keep the local DS connected to its
2032            // current RS
2033            evals.setBestRS(currentRsServerId,
2034                NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId));
2035            return;
2036          }
2037        }
2038
2039        ReplicationServerInfo currentRsInfo =
2040            bestServers.get(currentRsServerId);
2041        if (isServerOverloadingRS(localServerId, currentRsInfo,
2042            overloadingDSsNumber))
2043        {
2044          // The local server is part of the DSs to disconnect
2045          evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get(
2046              localServerId, currentRsServerId));
2047        }
2048        else
2049        {
2050          // The local server is not part of the servers to disconnect from the
2051          // current RS.
2052          evals.setBestRS(currentRsServerId,
2053              NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId,
2054                  currentRsServerId));
2055        }
2056      } else {
2057        // The average distance of the other RSs does not show a lack of DSs:
2058        // no need to disconnect any DS from the current RS.
2059        evals.setBestRS(currentRsServerId,
2060            NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId,
2061                currentRsServerId));
2062      }
2063    } else {
2064      // The RS load goal is reached or there are not enough DSs connected to
2065      // it to reach it: do not disconnect from this RS and return rsInfo for
2066      // this RS
2067      evals.setBestRS(currentRsServerId,
2068          NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId,
2069              currentRsServerId));
2070    }
2071  }
2072
2073  private static boolean mustAvoidYoyoEffect(BigDecimal rsNewLoadDistance,
2074      BigDecimal otherRSsNewSumOfLoadDistances)
2075  {
2076    final MathContext roundCtx = new MathContext(6, RoundingMode.DOWN);
2077    final BigDecimal rsLoadDistance = rsNewLoadDistance.round(roundCtx);
2078    final BigDecimal otherRSsSumOfLoadDistances =
2079        otherRSsNewSumOfLoadDistances.round(roundCtx);
2080
2081    return rsLoadDistance.compareTo(BigDecimal.ZERO) != 0
2082        && rsLoadDistance.compareTo(otherRSsSumOfLoadDistances.negate()) == 0;
2083  }
2084
2085  /**
2086   * Returns whether the local DS is overloading the RS.
2087   * <p>
2088   * There are an "overloadingDSsNumber" of DS overloading the RS. The list of
2089   * DSs connected to this RS is ordered by serverId to use a consistent
2090   * ordering across all nodes in the topology. The serverIds which index in the
2091   * List are lower than "overloadingDSsNumber" will be evicted first.
2092   * <p>
2093   * This ordering is unfair since nodes with the lower serverIds will be
2094   * evicted more often than nodes with higher serverIds. However, it is a
2095   * consistent and reliable ordering applicable anywhere in the topology.
2096   */
2097  private static boolean isServerOverloadingRS(int localServerId,
2098      ReplicationServerInfo currentRsInfo, int overloadingDSsNumber)
2099  {
2100    List<Integer> serversConnectedToCurrentRS = new ArrayList<>(currentRsInfo.getConnectedDSs());
2101    Collections.sort(serversConnectedToCurrentRS);
2102
2103    final int idx = serversConnectedToCurrentRS.indexOf(localServerId);
2104    return idx != -1 && idx < overloadingDSsNumber;
2105  }
2106
2107  /**
2108   * Start the heartbeat monitor thread.
2109   */
2110  private void startRSHeartBeatMonitoring(ConnectedRS rs)
2111  {
2112    final long heartbeatInterval = config.getHeartbeatInterval();
2113    if (heartbeatInterval > 0)
2114    {
2115      heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(),
2116          getBaseDN().toString(), rs.session, heartbeatInterval);
2117      heartbeatMonitor.start();
2118    }
2119  }
2120
2121  /**
2122   * Stop the heartbeat monitor thread.
2123   */
2124  private synchronized void stopRSHeartBeatMonitoring()
2125  {
2126    if (heartbeatMonitor != null)
2127    {
2128      heartbeatMonitor.shutdown();
2129      heartbeatMonitor = null;
2130    }
2131  }
2132
2133  /**
2134   * Restart the ReplicationBroker.
2135   * @param infiniteTry the socket which failed
2136   */
2137  public void reStart(boolean infiniteTry)
2138  {
2139    reStart(connectedRS.get().session, infiniteTry);
2140  }
2141
2142  /**
2143   * Restart the ReplicationServer broker after a failure.
2144   *
2145   * @param failingSession the socket which failed
2146   * @param infiniteTry the socket which failed
2147   */
2148  private void reStart(Session failingSession, boolean infiniteTry)
2149  {
2150    if (failingSession != null)
2151    {
2152      failingSession.close();
2153      numLostConnections++;
2154    }
2155
2156    ConnectedRS rs = connectedRS.get();
2157    if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS()))
2158    {
2159      rs = setConnectedRS(ConnectedRS.noConnectedRS());
2160    }
2161
2162    while (true)
2163    {
2164      // Synchronize inside the loop in order to allow shutdown.
2165      synchronized (startStopLock)
2166      {
2167        if (rs.isConnected() || shutdown)
2168        {
2169          break;
2170        }
2171
2172        try
2173        {
2174          connectAsDataServer();
2175          rs = connectedRS.get();
2176        }
2177        catch (Exception e)
2178        {
2179          logger.error(NOTE_EXCEPTION_RESTARTING_SESSION,
2180              getBaseDN(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
2181        }
2182
2183        if (rs.isConnected() || !infiniteTry)
2184        {
2185          break;
2186        }
2187      }
2188      try
2189      {
2190          Thread.sleep(500);
2191      }
2192      catch (InterruptedException ignored)
2193      {
2194        // ignore
2195      }
2196    }
2197
2198    if (logger.isTraceEnabled())
2199    {
2200      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
2201          + rs.getServerId() + ") genId=" + getGenerationID());
2202    }
2203  }
2204
2205  /**
2206   * Publish a message to the other servers.
2207   * @param msg the message to publish
2208   */
2209  public void publish(ReplicationMsg msg)
2210  {
2211    publish(msg, false, true);
2212  }
2213
2214  /**
2215   * Publish a message to the other servers.
2216   * @param msg            The message to publish.
2217   * @param retryOnFailure Whether reconnect should automatically be done.
2218   * @return               Whether publish succeeded.
2219   */
2220  boolean publish(ReplicationMsg msg, boolean retryOnFailure)
2221  {
2222    return publish(msg, false, retryOnFailure);
2223  }
2224
2225  /**
2226   * Publish a recovery message to the other servers.
2227   * @param msg the message to publish
2228   */
2229  public void publishRecovery(ReplicationMsg msg)
2230  {
2231    publish(msg, true, true);
2232  }
2233
2234  /**
2235   * Publish a message to the other servers.
2236   * @param msg the message to publish
2237   * @param recoveryMsg the message is a recovery LocalizableMessage
2238   * @param retryOnFailure whether retry should be done on failure
2239   * @return whether the message was successfully sent.
2240   */
2241  private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
2242      boolean retryOnFailure)
2243  {
2244    boolean done = false;
2245
2246    while (!done && !shutdown)
2247    {
2248      if (connectionError)
2249      {
2250        /*
2251        It was not possible to connect to any replication server.
2252        Since the operation was already processed, we have no other
2253        choice than to return without sending the ReplicationMsg
2254        and relying on the resend procedure of the connect phase to
2255        fix the problem when we finally connect.
2256        */
2257
2258        if (logger.isTraceEnabled())
2259        {
2260          debugInfo("publish(): Publishing a message is not possible due to"
2261              + " existing connection error.");
2262        }
2263
2264        return false;
2265      }
2266
2267      try
2268      {
2269        /*
2270        save the session at the time when we acquire the
2271        sendwindow credit so that we can make sure later
2272        that the session did not change in between.
2273        This is necessary to make sure that we don't publish a message
2274        on a session with a credit that was acquired from a previous
2275        session.
2276        */
2277        Session currentSession;
2278        Semaphore currentWindowSemaphore;
2279        synchronized (connectPhaseLock)
2280        {
2281          currentSession = connectedRS.get().session;
2282          currentWindowSemaphore = sendWindow;
2283        }
2284
2285        /*
2286        If the Replication domain has decided that there is a need to
2287        recover some changes then it is not allowed to send this
2288        change but it will be the responsibility of the recovery thread to
2289        do it.
2290        */
2291        if (!recoveryMsg & connectRequiresRecovery)
2292        {
2293          return false;
2294        }
2295
2296        boolean credit;
2297        if (msg instanceof UpdateMsg)
2298        {
2299          /*
2300          Acquiring the window credit must be done outside of the
2301          connectPhaseLock because it can be blocking and we don't
2302          want to hold off reconnection in case the connection dropped.
2303          */
2304          credit =
2305            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
2306        }
2307        else
2308        {
2309          credit = true;
2310        }
2311
2312        if (credit)
2313        {
2314          synchronized (connectPhaseLock)
2315          {
2316            /*
2317            session may have been set to null in the connection phase
2318            when restarting the broker for example.
2319            Check the session. If it has changed, some disconnection or
2320            reconnection happened and we need to restart from scratch.
2321            */
2322            final Session session = connectedRS.get().session;
2323            if (session != null && session == currentSession)
2324            {
2325              session.publish(msg);
2326              done = true;
2327            }
2328          }
2329        }
2330        if (!credit && currentWindowSemaphore.availablePermits() == 0)
2331        {
2332          synchronized (connectPhaseLock)
2333          {
2334            /*
2335            the window is still closed.
2336            Send a WindowProbeMsg message to wake up the receiver in case the
2337            window update message was lost somehow...
2338            then loop to check again if connection was closed.
2339            */
2340            Session session = connectedRS.get().session;
2341            if (session != null)
2342            {
2343              session.publish(new WindowProbeMsg());
2344            }
2345          }
2346        }
2347      }
2348      catch (IOException e)
2349      {
2350        if (logger.isTraceEnabled())
2351        {
2352          debugInfo("publish(): IOException caught: "
2353              + stackTraceToSingleLineString(e));
2354        }
2355        if (!retryOnFailure)
2356        {
2357          return false;
2358        }
2359
2360        // The receive threads should handle reconnection or
2361        // mark this broker in error. Just retry.
2362        synchronized (connectPhaseLock)
2363        {
2364          try
2365          {
2366            connectPhaseLock.wait(100);
2367          }
2368          catch (InterruptedException ignored)
2369          {
2370            if (logger.isTraceEnabled())
2371            {
2372              debugInfo("publish(): InterruptedException caught 1: "
2373                  + stackTraceToSingleLineString(ignored));
2374            }
2375          }
2376        }
2377      }
2378      catch (InterruptedException ignored)
2379      {
2380        // just loop.
2381        if (logger.isTraceEnabled())
2382        {
2383          debugInfo("publish(): InterruptedException caught 2: "
2384              + stackTraceToSingleLineString(ignored));
2385        }
2386      }
2387    }
2388    return true;
2389  }
2390
2391  /**
2392   * Receive a message.
2393   * This method is not thread-safe and should either always be
2394   * called in a single thread or protected by a locking mechanism
2395   * before being called. This is a wrapper to the method with a boolean version
2396   * so that we do not have to modify existing tests.
2397   *
2398   * @return the received message
2399   * @throws SocketTimeoutException if the timeout set by setSoTimeout
2400   *         has expired
2401   */
2402  public ReplicationMsg receive() throws SocketTimeoutException
2403  {
2404    return receive(false, true, false);
2405  }
2406
2407  /**
2408   * Receive a message.
2409   * This method is not thread-safe and should either always be
2410   * called in a single thread or protected by a locking mechanism
2411   * before being called.
2412   *
2413   * @param reconnectToTheBestRS Whether broker will automatically switch
2414   *                             to the best suitable RS.
2415   * @param reconnectOnFailure   Whether broker will automatically reconnect
2416   *                             on failure.
2417   * @param returnOnTopoChange   Whether broker should return TopologyMsg
2418   *                             received.
2419   * @return the received message
2420   *
2421   * @throws SocketTimeoutException if the timeout set by setSoTimeout
2422   *         has expired
2423   */
2424  ReplicationMsg receive(boolean reconnectToTheBestRS,
2425      boolean reconnectOnFailure, boolean returnOnTopoChange)
2426    throws SocketTimeoutException
2427  {
2428    while (!shutdown)
2429    {
2430      ConnectedRS rs = connectedRS.get();
2431      if (reconnectOnFailure && !rs.isConnected())
2432      {
2433        // infinite try to reconnect
2434        reStart(null, true);
2435        continue;
2436      }
2437
2438      // Save session information for later in case we need it for log messages
2439      // after the session has been closed and/or failed.
2440      if (rs.session == null)
2441      {
2442        // Must be shutting down.
2443        break;
2444      }
2445
2446      final int serverId = getServerId();
2447      final DN baseDN = getBaseDN();
2448      final int previousRsServerID = rs.getServerId();
2449      try
2450      {
2451        ReplicationMsg msg = rs.session.receive();
2452        if (msg instanceof UpdateMsg)
2453        {
2454          synchronized (this)
2455          {
2456            rcvWindow--;
2457          }
2458        }
2459        if (msg instanceof WindowMsg)
2460        {
2461          final WindowMsg windowMsg = (WindowMsg) msg;
2462          sendWindow.release(windowMsg.getNumAck());
2463        }
2464        else if (msg instanceof TopologyMsg)
2465        {
2466          final TopologyMsg topoMsg = (TopologyMsg) msg;
2467          receiveTopo(topoMsg, getRsServerId());
2468          if (reconnectToTheBestRS)
2469          {
2470            // Reset wait time before next computation of best server
2471            mustRunBestServerCheckingAlgorithm = 0;
2472          }
2473
2474          // Caller wants to check what's changed
2475          if (returnOnTopoChange)
2476          {
2477            return msg;
2478          }
2479        }
2480        else if (msg instanceof StopMsg)
2481        {
2482          // RS performs a proper disconnection
2483          logger.warn(WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED, previousRsServerID, rs.replicationServer,
2484              serverId, baseDN);
2485
2486          // Try to find a suitable RS
2487          reStart(rs.session, true);
2488        }
2489        else if (msg instanceof MonitorMsg)
2490        {
2491          // This is the response to a MonitorRequest that was sent earlier or
2492          // the regular message of the monitoring publisher of the RS.
2493          MonitorMsg monitorMsg = (MonitorMsg) msg;
2494
2495          // Extract and store replicas ServerStates
2496          final Map<Integer, ServerState> newReplicaStates = new HashMap<>();
2497          for (int srvId : toIterable(monitorMsg.ldapIterator()))
2498          {
2499            newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
2500          }
2501          replicaStates = newReplicaStates;
2502
2503          // Notify the sender that the response was received.
2504          synchronized (monitorResponse)
2505          {
2506            monitorResponse.set(true);
2507            monitorResponse.notify();
2508          }
2509
2510          // Update the replication servers ServerStates with new received info
2511          Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos;
2512          for (int srvId : toIterable(monitorMsg.rsIterator()))
2513          {
2514            final ReplicationServerInfo rsInfo = rsInfos.get(srvId);
2515            if (rsInfo != null)
2516            {
2517              rsInfo.update(monitorMsg.getRSServerState(srvId));
2518            }
2519          }
2520
2521          /*
2522          Now if it is allowed, compute the best replication server to see if
2523          it is still the one we are currently connected to. If not,
2524          disconnect properly and let the connection algorithm re-connect to
2525          best replication server
2526          */
2527          if (reconnectToTheBestRS)
2528          {
2529            mustRunBestServerCheckingAlgorithm++;
2530            if (mustRunBestServerCheckingAlgorithm == 2)
2531            {
2532              // Stable topology (no topo msg since few seconds): proceed with
2533              // best server checking.
2534              final RSEvaluations evals = computeBestReplicationServer(
2535                  false, previousRsServerID, state,
2536                  rsInfos, serverId, getGroupId(), getGenerationID());
2537              final ReplicationServerInfo bestServerInfo = evals.getBestRS();
2538              if (previousRsServerID != -1
2539                  && (bestServerInfo == null
2540                      || bestServerInfo.getServerId() != previousRsServerID))
2541              {
2542                // The best replication server is no more the one we are
2543                // currently using. Disconnect properly then reconnect.
2544                LocalizableMessage message;
2545                if (bestServerInfo == null)
2546                {
2547                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
2548                      serverId, previousRsServerID, rs.replicationServer, baseDN);
2549                }
2550                else
2551                {
2552                  final int bestRsServerId = bestServerInfo.getServerId();
2553                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
2554                      serverId, previousRsServerID, rs.replicationServer, bestRsServerId, baseDN,
2555                      evals.getEvaluation(previousRsServerID),
2556                      evals.getEvaluation(bestRsServerId));
2557                }
2558                logger.info(message);
2559                if (logger.isTraceEnabled())
2560                {
2561                  debugInfo("best replication servers evaluation results: " + evals);
2562                }
2563                reStart(true);
2564              }
2565
2566              // Reset wait time before next computation of best server
2567              mustRunBestServerCheckingAlgorithm = 0;
2568            }
2569          }
2570        }
2571        else
2572        {
2573          return msg;
2574        }
2575      }
2576      catch (SocketTimeoutException e)
2577      {
2578        throw e;
2579      }
2580      catch (Exception e)
2581      {
2582        logger.traceException(e);
2583
2584        if (!shutdown)
2585        {
2586          if (rs.session == null || !rs.session.closeInitiated())
2587          {
2588            // We did not initiate the close on our side, log an error message.
2589            logger.error(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED,
2590                serverId, baseDN, previousRsServerID, rs.replicationServer);
2591          }
2592
2593          if (!reconnectOnFailure)
2594          {
2595            break; // does not seem necessary to explicitly disconnect ..
2596          }
2597
2598          reStart(rs.session, true);
2599        }
2600      }
2601    } // while !shutdown
2602    return null;
2603  }
2604
2605  /**
2606   * Gets the States of all the Replicas currently in the Topology. When this
2607   * method is called, a Monitoring message will be sent to the Replication
2608   * Server to which this domain is currently connected so that it computes a
2609   * table containing information about all Directory Servers in the topology.
2610   * This Computation involves communications will all the servers currently
2611   * connected and
2612   *
2613   * @return The States of all Replicas in the topology (except us)
2614   */
2615  public Map<Integer, ServerState> getReplicaStates()
2616  {
2617    monitorResponse.set(false);
2618
2619    // publish Monitor Request LocalizableMessage to the Replication Server
2620    publish(new MonitorRequestMsg(getServerId(), getRsServerId()));
2621
2622    // wait for Response up to 10 seconds.
2623    try
2624    {
2625      synchronized (monitorResponse)
2626      {
2627        if (!monitorResponse.get())
2628        {
2629          monitorResponse.wait(10000);
2630        }
2631      }
2632    } catch (InterruptedException e)
2633    {
2634      Thread.currentThread().interrupt();
2635    }
2636    return replicaStates;
2637  }
2638
2639  /**
2640   * This method allows to do the necessary computing for the window
2641   * management after treatment by the worker threads.
2642   *
2643   * This should be called once the replay thread have done their job
2644   * and the window can be open again.
2645   */
2646  public synchronized void updateWindowAfterReplay()
2647  {
2648    try
2649    {
2650      updateDoneCount++;
2651      final Session session = connectedRS.get().session;
2652      if (updateDoneCount >= halfRcvWindow && session != null)
2653      {
2654        session.publish(new WindowMsg(updateDoneCount));
2655        rcvWindow += updateDoneCount;
2656        updateDoneCount = 0;
2657      }
2658    } catch (IOException e)
2659    {
2660      // Any error on the socket will be handled by the thread calling receive()
2661      // just ignore.
2662    }
2663  }
2664
2665  /** Stop the server. */
2666  public void stop()
2667  {
2668    if (logger.isTraceEnabled() && !shutdown)
2669    {
2670      debugInfo("is stopping and will close the connection to RS(" + getRsServerId() + ")");
2671    }
2672
2673    synchronized (startStopLock)
2674    {
2675      if (shutdown)
2676      {
2677        return;
2678      }
2679      domain.publishReplicaOfflineMsg();
2680      shutdown = true;
2681      setConnectedRS(ConnectedRS.stopped());
2682      stopRSHeartBeatMonitoring();
2683      stopChangeTimeHeartBeatPublishing();
2684      deregisterReplicationMonitor();
2685    }
2686  }
2687
2688  /**
2689   * Set a timeout value.
2690   * With this option set to a non-zero value, calls to the receive() method
2691   * block for only this amount of time after which a
2692   * java.net.SocketTimeoutException is raised.
2693   * The Broker is valid and usable even after such an Exception is raised.
2694   *
2695   * @param timeout the specified timeout, in milliseconds.
2696   * @throws SocketException if there is an error in the underlying protocol,
2697   *         such as a TCP error.
2698   */
2699  public void setSoTimeout(int timeout) throws SocketException
2700  {
2701    this.timeout = timeout;
2702    final Session session = connectedRS.get().session;
2703    if (session != null)
2704    {
2705      session.setSoTimeout(timeout);
2706    }
2707  }
2708
2709  /**
2710   * Get the name of the replicationServer to which this broker is currently
2711   * connected.
2712   *
2713   * @return the name of the replicationServer to which this domain
2714   *         is currently connected.
2715   */
2716  public String getReplicationServer()
2717  {
2718    return connectedRS.get().replicationServer;
2719  }
2720
2721  /**
2722   * Get the maximum receive window size.
2723   *
2724   * @return The maximum receive window size.
2725   */
2726  public int getMaxRcvWindow()
2727  {
2728    return config.getWindowSize();
2729  }
2730
2731  /**
2732   * Get the current receive window size.
2733   *
2734   * @return The current receive window size.
2735   */
2736  public int getCurrentRcvWindow()
2737  {
2738    return rcvWindow;
2739  }
2740
2741  /**
2742   * Get the maximum send window size.
2743   *
2744   * @return The maximum send window size.
2745   */
2746  public int getMaxSendWindow()
2747  {
2748    return maxSendWindow;
2749  }
2750
2751  /**
2752   * Get the current send window size.
2753   *
2754   * @return The current send window size.
2755   */
2756  public int getCurrentSendWindow()
2757  {
2758    if (isConnected())
2759    {
2760      return sendWindow.availablePermits();
2761    }
2762    return 0;
2763  }
2764
2765  /**
2766   * Get the number of times the connection was lost.
2767   * @return The number of times the connection was lost.
2768   */
2769  public int getNumLostConnections()
2770  {
2771    return numLostConnections;
2772  }
2773
2774  /**
2775   * Change some configuration parameters.
2776   *
2777   * @param newConfig  The new config to use.
2778   * @return                    A boolean indicating if the changes
2779   *                            requires to restart the service.
2780   */
2781  boolean changeConfig(ReplicationDomainCfg newConfig)
2782  {
2783    // These parameters needs to be renegotiated with the ReplicationServer
2784    // so if they have changed, that requires restarting the session with
2785    // the ReplicationServer.
2786    // A new session is necessary only when information regarding
2787    // the connection is modified
2788    boolean needToRestartSession =
2789        !newConfig.getReplicationServer().equals(config.getReplicationServer())
2790        || newConfig.getWindowSize() != config.getWindowSize()
2791        || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval()
2792        || newConfig.getGroupId() != config.getGroupId();
2793
2794    this.config = newConfig;
2795    this.rcvWindow = newConfig.getWindowSize();
2796    this.halfRcvWindow = this.rcvWindow / 2;
2797
2798    return needToRestartSession;
2799  }
2800
2801  /**
2802   * Get the version of the replication protocol.
2803   * @return The version of the replication protocol.
2804   */
2805  public short getProtocolVersion()
2806  {
2807    final Session session = connectedRS.get().session;
2808    if (session != null)
2809    {
2810      return session.getProtocolVersion();
2811    }
2812    return ProtocolVersion.getCurrentVersion();
2813  }
2814
2815  /**
2816   * Check if the broker is connected to a ReplicationServer and therefore
2817   * ready to received and send Replication Messages.
2818   *
2819   * @return true if the server is connected, false if not.
2820   */
2821  public boolean isConnected()
2822  {
2823    return connectedRS.get().isConnected();
2824  }
2825
2826  /**
2827   * Determine whether the connection to the replication server is encrypted.
2828   * @return true if the connection is encrypted, false otherwise.
2829   */
2830  public boolean isSessionEncrypted()
2831  {
2832    final Session session = connectedRS.get().session;
2833    return session != null ? session.isEncrypted() : false;
2834  }
2835
2836  /**
2837   * Signals the RS we just entered a new status.
2838   * @param newStatus The status the local DS just entered
2839   */
2840  public void signalStatusChange(ServerStatus newStatus)
2841  {
2842    try
2843    {
2844      connectedRS.get().session.publish(
2845          new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus));
2846    } catch (IOException ex)
2847    {
2848      logger.error(ERR_EXCEPTION_SENDING_CS, getBaseDN(), getServerId(),
2849          ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
2850    }
2851  }
2852
2853  /**
2854   * Gets the info for DSs in the topology (except us).
2855   * @return The info for DSs in the topology (except us)
2856   */
2857  public Map<Integer, DSInfo> getReplicaInfos()
2858  {
2859    return topology.get().replicaInfos;
2860  }
2861
2862  /**
2863   * Gets the info for RSs in the topology (except the one we are connected
2864   * to).
2865   * @return The info for RSs in the topology (except the one we are connected
2866   * to)
2867   */
2868  public List<RSInfo> getRsInfos()
2869  {
2870    return toRSInfos(topology.get().rsInfos);
2871  }
2872
2873  private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos)
2874  {
2875    final List<RSInfo> result = new ArrayList<>();
2876    for (ReplicationServerInfo rsInfo : rsInfos.values())
2877    {
2878      result.add(rsInfo.toRSInfo());
2879    }
2880    return result;
2881  }
2882
2883  /**
2884   * Processes an incoming TopologyMsg.
2885   * Updates the structures for the local view of the topology.
2886   *
2887   * @param topoMsg
2888   *          The topology information received from RS.
2889   * @param rsServerId
2890   *          the serverId to use for the connectedDS
2891   */
2892  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
2893  {
2894    final Topology newTopo = computeNewTopology(topoMsg, rsServerId);
2895    for (DSInfo dsInfo : newTopo.replicaInfos.values())
2896    {
2897      domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo
2898          .getEclIncludesForDeletes());
2899    }
2900  }
2901
2902  private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId)
2903  {
2904    Topology oldTopo;
2905    Topology newTopo;
2906    do
2907    {
2908      oldTopo = topology.get();
2909      newTopo = new Topology(topoMsg, getServerId(), rsServerId,
2910              getReplicationServerUrls(), oldTopo.rsInfos);
2911    }
2912    while (!topology.compareAndSet(oldTopo, newTopo));
2913
2914    if (logger.isTraceEnabled())
2915    {
2916      final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo);
2917      sb.append(" received TopologyMsg=").append(topoMsg);
2918      debugInfo(sb);
2919    }
2920    return newTopo;
2921  }
2922
2923  /**
2924   * Contains the last known state of the replication topology.
2925   */
2926  static final class Topology
2927  {
2928
2929    /**
2930     * The RS's serverId that this DS was connected to when this topology state
2931     * was computed.
2932     */
2933    private final int rsServerId;
2934    /**
2935     * Info for other DSs.
2936     * <p>
2937     * Warning: does not contain info for us (for our server id)
2938     */
2939    final Map<Integer, DSInfo> replicaInfos;
2940    /**
2941     * The map of replication server info initialized at connection time and
2942     * regularly updated. This is used to decide to which best suitable
2943     * replication server one wants to connect. Key: replication server id
2944     * Value: replication server info for the matching replication server id
2945     */
2946    final Map<Integer, ReplicationServerInfo> rsInfos;
2947
2948    private Topology()
2949    {
2950      this.rsServerId = -1;
2951      this.replicaInfos = Collections.emptyMap();
2952      this.rsInfos = Collections.emptyMap();
2953    }
2954
2955    /**
2956     * Constructor to use when only the RSInfos need to be recomputed.
2957     *
2958     * @param dsInfosToKeep
2959     *          the DSInfos that will be stored as is
2960     * @param newRSInfos
2961     *          the new RSInfos from which to compute the new topology
2962     * @param dsServerId
2963     *          the DS serverId
2964     * @param rsServerId
2965     *          the current connected RS serverId
2966     * @param configuredReplicationServerUrls
2967     *          the configured replication server URLs
2968     * @param previousRsInfos
2969     *          the RSInfos computed in the previous Topology object
2970     */
2971    Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos,
2972        int dsServerId, int rsServerId,
2973        Set<String> configuredReplicationServerUrls,
2974        Map<Integer, ReplicationServerInfo> previousRsInfos)
2975    {
2976      this.rsServerId = rsServerId;
2977      this.replicaInfos = dsInfosToKeep == null
2978          ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
2979      this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
2980          previousRsInfos, configuredReplicationServerUrls);
2981    }
2982
2983    /**
2984     * Constructor to use when a new TopologyMsg has been received.
2985     *
2986     * @param topoMsg
2987     *          the topology message containing the new DSInfos and RSInfos from
2988     *          which to compute the new topology
2989     * @param dsServerId
2990     *          the DS serverId
2991     * @param rsServerId
2992     *          the current connected RS serverId
2993     * @param configuredReplicationServerUrls
2994     *          the configured replication server URLs
2995     * @param previousRsInfos
2996     *          the RSInfos computed in the previous Topology object
2997     */
2998    Topology(TopologyMsg topoMsg, int dsServerId,
2999        int rsServerId, Set<String> configuredReplicationServerUrls,
3000        Map<Integer, ReplicationServerInfo> previousRsInfos)
3001    {
3002      this.rsServerId = rsServerId;
3003      this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId);
3004      this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(),
3005          previousRsInfos, configuredReplicationServerUrls);
3006    }
3007
3008    private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos,
3009        int dsServerId)
3010    {
3011      final Map<Integer, DSInfo> copy = new HashMap<>(dsInfos);
3012      copy.remove(dsServerId);
3013      return Collections.unmodifiableMap(copy);
3014    }
3015
3016    private Map<Integer, ReplicationServerInfo> computeRSInfos(
3017        int dsServerId, List<RSInfo> newRsInfos,
3018        Map<Integer, ReplicationServerInfo> previousRsInfos,
3019        Set<String> configuredReplicationServerUrls)
3020    {
3021      final Map<Integer, ReplicationServerInfo> results = new HashMap<>(previousRsInfos);
3022
3023      // Update replication server info list with the received topology info
3024      final Set<Integer> rssToKeep = new HashSet<>();
3025      for (RSInfo newRSInfo : newRsInfos)
3026      {
3027        final int rsId = newRSInfo.getId();
3028        rssToKeep.add(rsId); // Mark this server as still existing
3029        Set<Integer> connectedDSs =
3030            computeDSsConnectedTo(rsId, dsServerId);
3031        ReplicationServerInfo rsInfo = results.get(rsId);
3032        if (rsInfo == null)
3033        {
3034          // New replication server, create info for it add it to the list
3035          rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs);
3036          setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls);
3037          results.put(rsId, rsInfo);
3038        }
3039        else
3040        {
3041          // Update the existing info for the replication server
3042          rsInfo.update(newRSInfo, connectedDSs);
3043        }
3044      }
3045
3046      // Remove any replication server that may have disappeared from the
3047      // topology
3048      results.keySet().retainAll(rssToKeep);
3049
3050      return Collections.unmodifiableMap(results);
3051    }
3052
3053    /** Computes the list of DSs connected to a particular RS. */
3054    private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId)
3055    {
3056      final Set<Integer> connectedDSs = new HashSet<>();
3057      if (rsServerId == rsId)
3058      {
3059        /*
3060         * If we are computing connected DSs for the RS we are connected to, we
3061         * should count the local DS as the DSInfo of the local DS is not sent
3062         * by the replication server in the topology message. We must count
3063         * ourselves as a connected server.
3064         */
3065        connectedDSs.add(dsServerId);
3066      }
3067
3068      for (DSInfo dsInfo : replicaInfos.values())
3069      {
3070        if (dsInfo.getRsId() == rsId)
3071        {
3072          connectedDSs.add(dsInfo.getDsId());
3073        }
3074      }
3075
3076      return connectedDSs;
3077    }
3078
3079    /**
3080     * Sets the locally configured flag for the passed ReplicationServerInfo
3081     * object, analyzing the local configuration.
3082     *
3083     * @param rsInfo
3084     *          the Replication server to check and update
3085     * @param configuredReplicationServerUrls
3086     */
3087    private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo,
3088        Set<String> configuredReplicationServerUrls)
3089    {
3090      // Determine if the passed ReplicationServerInfo has a URL that is present
3091      // in the locally configured replication servers
3092      String rsUrl = rsInfo.getServerURL();
3093      if (rsUrl == null)
3094      {
3095        // The ReplicationServerInfo has been generated from a server with
3096        // no URL in TopologyMsg (i.e: with replication protocol version < 4):
3097        // ignore this server as we do not know how to connect to it
3098        rsInfo.setLocallyConfigured(false);
3099        return;
3100      }
3101      for (String serverUrl : configuredReplicationServerUrls)
3102      {
3103        if (isSameReplicationServerUrl(serverUrl, rsUrl))
3104        {
3105          // This RS is locally configured, mark this
3106          rsInfo.setLocallyConfigured(true);
3107          rsInfo.setServerURL(serverUrl);
3108          return;
3109        }
3110      }
3111      rsInfo.setLocallyConfigured(false);
3112    }
3113
3114    /** {@inheritDoc} */
3115    @Override
3116    public boolean equals(Object obj)
3117    {
3118      if (this == obj)
3119      {
3120        return true;
3121      }
3122      if (obj == null || getClass() != obj.getClass())
3123      {
3124        return false;
3125      }
3126      final Topology other = (Topology) obj;
3127      return rsServerId == other.rsServerId
3128          && Objects.equals(replicaInfos, other.replicaInfos)
3129          && Objects.equals(rsInfos, other.rsInfos)
3130          && urlsEqual1(replicaInfos, other.replicaInfos)
3131          && urlsEqual2(rsInfos, other.rsInfos);
3132    }
3133
3134    private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1,
3135        Map<Integer, DSInfo> replicaInfos2)
3136    {
3137      for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet())
3138      {
3139        DSInfo dsInfo = replicaInfos2.get(entry.getKey());
3140        if (!Objects.equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl()))
3141        {
3142          return false;
3143        }
3144      }
3145      return true;
3146    }
3147
3148    private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1,
3149        Map<Integer, ReplicationServerInfo> rsInfos2)
3150    {
3151      for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet())
3152      {
3153        ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey());
3154        if (!Objects.equals(entry.getValue().getServerURL(), rsInfo.getServerURL()))
3155        {
3156          return false;
3157        }
3158      }
3159      return true;
3160    }
3161
3162    /** {@inheritDoc} */
3163    @Override
3164    public int hashCode()
3165    {
3166      final int prime = 31;
3167      int result = 1;
3168      result = prime * result + rsServerId;
3169      result = prime * result
3170          + (replicaInfos == null ? 0 : replicaInfos.hashCode());
3171      result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode());
3172      return result;
3173    }
3174
3175    /** {@inheritDoc} */
3176    @Override
3177    public String toString()
3178    {
3179      return getClass().getSimpleName()
3180          + " rsServerId=" + rsServerId
3181          + ", replicaInfos=" + replicaInfos.values()
3182          + ", rsInfos=" + rsInfos.values();
3183    }
3184  }
3185
3186  /**
3187   * Check if the broker could not find any Replication Server and therefore
3188   * connection attempt failed.
3189   *
3190   * @return true if the server could not connect to any Replication Server.
3191   */
3192  boolean hasConnectionError()
3193  {
3194    return connectionError;
3195  }
3196
3197  /**
3198   * Starts publishing to the RS the current timestamp used in this server.
3199   */
3200  private void startChangeTimeHeartBeatPublishing(ConnectedRS rs)
3201  {
3202    // Start a CSN heartbeat thread.
3203    long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
3204    if (changeTimeHeartbeatInterval > 0)
3205    {
3206      final String threadName = "Replica DS(" + getServerId()
3207              + ") change time heartbeat publisher for domain \"" + getBaseDN()
3208              + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer;
3209
3210      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
3211          threadName, rs.session, changeTimeHeartbeatInterval, getServerId());
3212      ctHeartbeatPublisherThread.start();
3213    }
3214    else
3215    {
3216      if (logger.isTraceEnabled())
3217      {
3218        debugInfo("is not configured to send CSN heartbeat interval");
3219      }
3220    }
3221  }
3222
3223  /**
3224   * Stops publishing to the RS the current timestamp used in this server.
3225   */
3226  private synchronized void stopChangeTimeHeartBeatPublishing()
3227  {
3228    if (ctHeartbeatPublisherThread != null)
3229    {
3230      ctHeartbeatPublisherThread.shutdown();
3231      ctHeartbeatPublisherThread = null;
3232    }
3233  }
3234
3235  /**
3236   * Set the connectRequiresRecovery to the provided value.
3237   * This flag is used to indicate if a recovery of Update is necessary
3238   * after a reconnection to a RS.
3239   * It is the responsibility of the ReplicationDomain to set it during the
3240   * sessionInitiated phase.
3241   *
3242   * @param b the new value of the connectRequiresRecovery.
3243   */
3244  public void setRecoveryRequired(boolean b)
3245  {
3246    connectRequiresRecovery = b;
3247  }
3248
3249  /**
3250   * Returns whether the broker is shutting down.
3251   * @return whether the broker is shutting down.
3252   */
3253  boolean shuttingDown()
3254  {
3255    return shutdown;
3256  }
3257
3258  /**
3259   * Returns the local address of this replication domain, or the empty string
3260   * if it is not yet connected.
3261   *
3262   * @return The local address.
3263   */
3264  String getLocalUrl()
3265  {
3266    final Session session = connectedRS.get().session;
3267    return session != null ? session.getLocalUrl() : "";
3268  }
3269
3270  /**
3271   * Returns the replication monitor instance name associated with this broker.
3272   *
3273   * @return The replication monitor instance name.
3274   */
3275  String getReplicationMonitorInstanceName()
3276  {
3277    // Only invoked by replication domain so always non-null.
3278    return monitor.getMonitorInstanceName();
3279  }
3280
3281  private ConnectedRS setConnectedRS(final ConnectedRS newRS)
3282  {
3283    final ConnectedRS oldRS = connectedRS.getAndSet(newRS);
3284    if (!oldRS.equals(newRS) && oldRS.session != null)
3285    {
3286      // monitor name is changing, deregister before registering again
3287      deregisterReplicationMonitor();
3288      oldRS.session.close();
3289      registerReplicationMonitor();
3290    }
3291    return newRS;
3292  }
3293
3294  /**
3295   * Must be invoked each time the session changes because, the monitor name is
3296   * dynamically created with the session name, while monitor registration is
3297   * static.
3298   *
3299   * @see #monitor
3300   */
3301  private void registerReplicationMonitor()
3302  {
3303    // The monitor should not be registered if this is a unit test
3304    // because the replication domain is null.
3305    if (monitor != null)
3306    {
3307      DirectoryServer.registerMonitorProvider(monitor);
3308    }
3309  }
3310
3311  private void deregisterReplicationMonitor()
3312  {
3313    // The monitor should not be deregistered if this is a unit test
3314    // because the replication domain is null.
3315    if (monitor != null)
3316    {
3317      DirectoryServer.deregisterMonitorProvider(monitor);
3318    }
3319  }
3320
3321  /** {@inheritDoc} */
3322  @Override
3323  public String toString()
3324  {
3325    final StringBuilder sb = new StringBuilder();
3326    sb.append(getClass().getSimpleName())
3327      .append(" \"").append(getBaseDN()).append(" ")
3328      .append(getServerId()).append("\",")
3329      .append(" groupId=").append(getGroupId())
3330      .append(", genId=").append(getGenerationID())
3331      .append(", ");
3332    connectedRS.get().toString(sb);
3333    return sb.toString();
3334  }
3335
3336  private void debugInfo(CharSequence message)
3337  {
3338    logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN()
3339        + " and serverId=" + getServerId() + ": " + message);
3340  }
3341}