001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.service; 028 029import java.io.IOException; 030import java.math.BigDecimal; 031import java.math.MathContext; 032import java.math.RoundingMode; 033import java.net.*; 034import java.util.*; 035import java.util.Map.Entry; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.Semaphore; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.concurrent.atomic.AtomicReference; 041 042import org.forgerock.i18n.LocalizableMessage; 043import org.forgerock.i18n.slf4j.LocalizedLogger; 044import org.forgerock.util.Utils; 045import org.opends.server.admin.std.server.ReplicationDomainCfg; 046import org.opends.server.core.DirectoryServer; 047import org.opends.server.replication.common.*; 048import org.opends.server.replication.plugin.MultimasterReplication; 049import org.opends.server.replication.protocol.*; 050import org.opends.server.types.DN; 051import org.opends.server.types.HostPort; 052 053import static org.opends.messages.ReplicationMessages.*; 054import static org.opends.server.replication.protocol.ProtocolVersion.*; 055import static org.opends.server.replication.server.ReplicationServer.*; 056import static org.opends.server.util.StaticUtils.*; 057 058/** 059 * The broker for Multi-master Replication. 060 */ 061public class ReplicationBroker 062{ 063 064 /** 065 * Immutable class containing information about whether the broker is 066 * connected to an RS and data associated to this connected RS. 067 */ 068 // @Immutable 069 private static final class ConnectedRS 070 { 071 072 private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS( 073 NO_CONNECTED_SERVER); 074 075 /** The info of the RS we are connected to. */ 076 private final ReplicationServerInfo rsInfo; 077 /** Contains a connected session to the RS if any exist, null otherwise. */ 078 private final Session session; 079 private final String replicationServer; 080 081 private ConnectedRS(String replicationServer) 082 { 083 this.rsInfo = null; 084 this.session = null; 085 this.replicationServer = replicationServer; 086 } 087 088 private ConnectedRS(ReplicationServerInfo rsInfo, Session session) 089 { 090 this.rsInfo = rsInfo; 091 this.session = session; 092 this.replicationServer = session != null ? 093 session.getReadableRemoteAddress() 094 : NO_CONNECTED_SERVER; 095 } 096 097 private static ConnectedRS stopped() 098 { 099 return new ConnectedRS("stopped"); 100 } 101 102 private static ConnectedRS noConnectedRS() 103 { 104 return NO_CONNECTED_RS; 105 } 106 107 public int getServerId() 108 { 109 return rsInfo != null ? rsInfo.getServerId() : -1; 110 } 111 112 private byte getGroupId() 113 { 114 return rsInfo != null ? rsInfo.getGroupId() : -1; 115 } 116 117 private boolean isConnected() 118 { 119 return session != null; 120 } 121 122 /** {@inheritDoc} */ 123 @Override 124 public String toString() 125 { 126 final StringBuilder sb = new StringBuilder(); 127 toString(sb); 128 return sb.toString(); 129 } 130 131 public void toString(StringBuilder sb) 132 { 133 sb.append("connected=").append(isConnected()).append(", "); 134 if (!isConnected()) 135 { 136 sb.append("no connectedRS"); 137 } 138 else 139 { 140 sb.append("connectedRS(serverId=").append(rsInfo.getServerId()) 141 .append(", serverUrl=").append(rsInfo.getServerURL()) 142 .append(", groupId=").append(rsInfo.getGroupId()) 143 .append(")"); 144 } 145 } 146 147 } 148 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 149 private volatile boolean shutdown; 150 private final Object startStopLock = new Object(); 151 private volatile ReplicationDomainCfg config; 152 /** 153 * String reported under CSN=monitor when there is no connected RS. 154 */ 155 static final String NO_CONNECTED_SERVER = "Not connected"; 156 private final ServerState state; 157 private Semaphore sendWindow; 158 private int maxSendWindow; 159 private int rcvWindow = 100; 160 private int halfRcvWindow = rcvWindow / 2; 161 private int timeout; 162 private final ReplSessionSecurity replSessionSecurity; 163 /** 164 * The RS this DS is currently connected to. 165 * <p> 166 * Always use {@link #setConnectedRS(ConnectedRS)} to set a new 167 * connected RS. 168 */ 169 // @NotNull // for the reference 170 private final AtomicReference<ConnectedRS> connectedRS = new AtomicReference<>(ConnectedRS.noConnectedRS()); 171 /** Our replication domain. */ 172 private final ReplicationDomain domain; 173 /** 174 * This object is used as a conditional event to be notified about 175 * the reception of monitor information from the Replication Server. 176 */ 177 private final AtomicBoolean monitorResponse = new AtomicBoolean(false); 178 /** 179 * A Map containing the ServerStates of all the replicas in the topology 180 * as seen by the ReplicationServer the last time it was polled or the last 181 * time it published monitoring information. 182 */ 183 private Map<Integer, ServerState> replicaStates = new HashMap<>(); 184 /** A thread to monitor heartbeats on the session. */ 185 private HeartbeatMonitor heartbeatMonitor; 186 /** The number of times the connection was lost. */ 187 private int numLostConnections; 188 /** 189 * When the broker cannot connect to any replication server 190 * it log an error and keeps continuing every second. 191 * This boolean is set when the first failure happens and is used 192 * to avoid repeating the error message for further failure to connect 193 * and to know that it is necessary to print a new message when the broker 194 * finally succeed to connect. 195 */ 196 private volatile boolean connectionError; 197 private final Object connectPhaseLock = new Object(); 198 /** 199 * The thread that publishes messages to the RS containing the current 200 * change time of this DS. 201 */ 202 private CTHeartbeatPublisherThread ctHeartbeatPublisherThread; 203 /* 204 * Properties for the last topology info received from the network. 205 */ 206 /** Contains the last known state of the replication topology. */ 207 private final AtomicReference<Topology> topology = new AtomicReference<>(new Topology()); 208 /** <pre>@GuardedBy("this")</pre>. */ 209 private volatile int updateDoneCount; 210 private volatile boolean connectRequiresRecovery; 211 212 /** 213 * This integer defines when the best replication server checking algorithm 214 * should be engaged. 215 * Every time a monitoring message (each monitoring publisher period) is 216 * received, it is incremented. When it reaches 2, we run the checking 217 * algorithm to see if we must reconnect to another best replication server. 218 * Then we reset the value to 0. But when a topology message is received, the 219 * integer is reset to 0. This ensures that we wait at least one monitoring 220 * publisher period before running the algorithm, but also that we wait at 221 * least for a monitoring period after the last received topology message 222 * (topology stabilization). 223 */ 224 private int mustRunBestServerCheckingAlgorithm; 225 226 /** 227 * The monitor provider for this replication domain. 228 * <p> 229 * The name of the monitor includes the local address and must therefore be 230 * re-registered every time the session is re-established or destroyed. The 231 * monitor provider can only be created (i.e. non-null) if there is a 232 * replication domain, which is not the case in unit tests. 233 */ 234 private final ReplicationMonitor monitor; 235 236 /** 237 * Creates a new ReplicationServer Broker for a particular ReplicationDomain. 238 * 239 * @param replicationDomain The replication domain that is creating us. 240 * @param state The ServerState that should be used by this broker 241 * when negotiating the session with the replicationServer. 242 * @param config The configuration to use. 243 * @param replSessionSecurity The session security configuration. 244 */ 245 public ReplicationBroker(ReplicationDomain replicationDomain, 246 ServerState state, ReplicationDomainCfg config, 247 ReplSessionSecurity replSessionSecurity) 248 { 249 this.domain = replicationDomain; 250 this.state = state; 251 this.config = config; 252 this.replSessionSecurity = replSessionSecurity; 253 this.rcvWindow = getMaxRcvWindow(); 254 this.halfRcvWindow = rcvWindow / 2; 255 this.shutdown = true; 256 257 /* 258 * Only create a monitor if there is a replication domain (this is not the 259 * case in some unit tests). 260 */ 261 this.monitor = replicationDomain != null ? new ReplicationMonitor( 262 replicationDomain) : null; 263 registerReplicationMonitor(); 264 } 265 266 /** 267 * Start the ReplicationBroker. 268 */ 269 public void start() 270 { 271 synchronized (startStopLock) 272 { 273 if (!shutdown) 274 { 275 return; 276 } 277 shutdown = false; 278 this.rcvWindow = getMaxRcvWindow(); 279 connectAsDataServer(); 280 } 281 } 282 283 /** 284 * Gets the group id of the RS we are connected to. 285 * @return The group id of the RS we are connected to 286 */ 287 public byte getRsGroupId() 288 { 289 return connectedRS.get().getGroupId(); 290 } 291 292 /** 293 * Gets the server id of the RS we are connected to. 294 * @return The server id of the RS we are connected to 295 */ 296 public int getRsServerId() 297 { 298 return connectedRS.get().getServerId(); 299 } 300 301 /** 302 * Gets the server id. 303 * @return The server id 304 */ 305 public int getServerId() 306 { 307 return config.getServerId(); 308 } 309 310 private DN getBaseDN() 311 { 312 return config.getBaseDN(); 313 } 314 315 private Set<String> getReplicationServerUrls() 316 { 317 return config.getReplicationServer(); 318 } 319 320 private byte getGroupId() 321 { 322 return (byte) config.getGroupId(); 323 } 324 325 /** 326 * Gets the server id. 327 * @return The server id 328 */ 329 private long getGenerationID() 330 { 331 return domain.getGenerationID(); 332 } 333 334 /** 335 * Set the generation id - for test purpose. 336 * @param generationID The generation id 337 */ 338 public void setGenerationID(long generationID) 339 { 340 domain.setGenerationID(generationID); 341 } 342 343 /** 344 * Compares 2 replication servers addresses and returns true if they both 345 * represent the same replication server instance. 346 * @param rs1Url Replication server 1 address 347 * @param rs2Url Replication server 2 address 348 * @return True if both replication server addresses represent the same 349 * replication server instance, false otherwise. 350 */ 351 private static boolean isSameReplicationServerUrl(String rs1Url, 352 String rs2Url) 353 { 354 try 355 { 356 final HostPort hp1 = HostPort.valueOf(rs1Url); 357 final HostPort hp2 = HostPort.valueOf(rs2Url); 358 return hp1.isEquivalentTo(hp2); 359 } 360 catch (RuntimeException ex) 361 { 362 // Not a RS url or not a valid port number: should not happen 363 return false; 364 } 365 } 366 367 /** 368 * Bag class for keeping info we get from a replication server in order to 369 * compute the best one to connect to. This is in fact a wrapper to a 370 * ReplServerStartMsg (V3) or a ReplServerStartDSMsg (V4). This can also be 371 * updated with a info coming from received topology messages or monitoring 372 * messages. 373 */ 374 static class ReplicationServerInfo 375 { 376 private RSInfo rsInfo; 377 private final short protocolVersion; 378 private final DN baseDN; 379 private final int windowSize; 380 // @NotNull 381 private final ServerState serverState; 382 private final boolean sslEncryption; 383 private final int degradedStatusThreshold; 384 /** Keeps the 0 value if created with a ReplServerStartMsg. */ 385 private int connectedDSNumber; 386 // @NotNull 387 private Set<Integer> connectedDSs; 388 /** 389 * Is this RS locally configured? (the RS is recognized as a usable server). 390 */ 391 private boolean locallyConfigured = true; 392 393 /** 394 * Create a new instance of ReplicationServerInfo wrapping the passed 395 * message. 396 * @param msg LocalizableMessage to wrap. 397 * @param newServerURL Override serverURL. 398 * @return The new instance wrapping the passed message. 399 * @throws IllegalArgumentException If the passed message has an unexpected 400 * type. 401 */ 402 private static ReplicationServerInfo newInstance( 403 ReplicationMsg msg, String newServerURL) throws IllegalArgumentException 404 { 405 final ReplicationServerInfo rsInfo = newInstance(msg); 406 rsInfo.setServerURL(newServerURL); 407 return rsInfo; 408 } 409 410 /** 411 * Create a new instance of ReplicationServerInfo wrapping the passed 412 * message. 413 * @param msg LocalizableMessage to wrap. 414 * @return The new instance wrapping the passed message. 415 * @throws IllegalArgumentException If the passed message has an unexpected 416 * type. 417 */ 418 static ReplicationServerInfo newInstance(ReplicationMsg msg) 419 throws IllegalArgumentException 420 { 421 if (msg instanceof ReplServerStartMsg) 422 { 423 // RS uses protocol V3 or lower 424 return new ReplicationServerInfo((ReplServerStartMsg) msg); 425 } 426 else if (msg instanceof ReplServerStartDSMsg) 427 { 428 // RS uses protocol V4 or higher 429 return new ReplicationServerInfo((ReplServerStartDSMsg) msg); 430 } 431 432 // Unsupported message type: should not happen 433 throw new IllegalArgumentException("Unexpected PDU type: " 434 + msg.getClass().getName() + ":\n" + msg); 435 } 436 437 /** 438 * Constructs a ReplicationServerInfo object wrapping a 439 * {@link ReplServerStartMsg}. 440 * 441 * @param msg 442 * The {@link ReplServerStartMsg} this object will wrap. 443 */ 444 private ReplicationServerInfo(ReplServerStartMsg msg) 445 { 446 this.protocolVersion = msg.getVersion(); 447 this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), 448 msg.getGenerationId(), msg.getGroupId(), 1); 449 this.baseDN = msg.getBaseDN(); 450 this.windowSize = msg.getWindowSize(); 451 final ServerState ss = msg.getServerState(); 452 this.serverState = ss != null ? ss : new ServerState(); 453 this.sslEncryption = msg.getSSLEncryption(); 454 this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); 455 } 456 457 /** 458 * Constructs a ReplicationServerInfo object wrapping a 459 * {@link ReplServerStartDSMsg}. 460 * 461 * @param msg 462 * The {@link ReplServerStartDSMsg} this object will wrap. 463 */ 464 private ReplicationServerInfo(ReplServerStartDSMsg msg) 465 { 466 this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(), 467 msg.getGenerationId(), msg.getGroupId(), msg.getWeight()); 468 this.protocolVersion = msg.getVersion(); 469 this.baseDN = msg.getBaseDN(); 470 this.windowSize = msg.getWindowSize(); 471 final ServerState ss = msg.getServerState(); 472 this.serverState = ss != null ? ss : new ServerState(); 473 this.sslEncryption = msg.getSSLEncryption(); 474 this.degradedStatusThreshold = msg.getDegradedStatusThreshold(); 475 this.connectedDSNumber = msg.getConnectedDSNumber(); 476 } 477 478 /** 479 * Constructs a new replication server info with the passed RSInfo internal 480 * values and the passed connected DSs. 481 * 482 * @param rsInfo 483 * The RSinfo to use for the update 484 * @param connectedDSs 485 * The new connected DSs 486 */ 487 ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs) 488 { 489 this.rsInfo = 490 new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(), rsInfo 491 .getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 492 this.protocolVersion = 0; 493 this.baseDN = null; 494 this.windowSize = 0; 495 this.connectedDSs = connectedDSs; 496 this.connectedDSNumber = connectedDSs.size(); 497 this.sslEncryption = false; 498 this.degradedStatusThreshold = -1; 499 this.serverState = new ServerState(); 500 } 501 502 /** 503 * Get the server state. 504 * @return The server state 505 */ 506 public ServerState getServerState() 507 { 508 return serverState; 509 } 510 511 /** 512 * Get the group id. 513 * @return The group id 514 */ 515 public byte getGroupId() 516 { 517 return rsInfo.getGroupId(); 518 } 519 520 /** 521 * Get the server protocol version. 522 * @return the protocolVersion 523 */ 524 public short getProtocolVersion() 525 { 526 return protocolVersion; 527 } 528 529 /** 530 * Get the generation id. 531 * @return the generationId 532 */ 533 public long getGenerationId() 534 { 535 return rsInfo.getGenerationId(); 536 } 537 538 /** 539 * Get the server id. 540 * @return the serverId 541 */ 542 public int getServerId() 543 { 544 return rsInfo.getId(); 545 } 546 547 /** 548 * Get the server URL. 549 * @return the serverURL 550 */ 551 public String getServerURL() 552 { 553 return rsInfo.getServerUrl(); 554 } 555 556 /** 557 * Get the base DN. 558 * 559 * @return the base DN 560 */ 561 public DN getBaseDN() 562 { 563 return baseDN; 564 } 565 566 /** 567 * Get the window size. 568 * @return the windowSize 569 */ 570 public int getWindowSize() 571 { 572 return windowSize; 573 } 574 575 /** 576 * Get the ssl encryption. 577 * @return the sslEncryption 578 */ 579 public boolean isSslEncryption() 580 { 581 return sslEncryption; 582 } 583 584 /** 585 * Get the degraded status threshold. 586 * @return the degradedStatusThreshold 587 */ 588 public int getDegradedStatusThreshold() 589 { 590 return degradedStatusThreshold; 591 } 592 593 /** 594 * Get the weight. 595 * @return the weight. Null if this object is a wrapper for 596 * a ReplServerStartMsg. 597 */ 598 public int getWeight() 599 { 600 return rsInfo.getWeight(); 601 } 602 603 /** 604 * Get the connected DS number. 605 * @return the connectedDSNumber. Null if this object is a wrapper for 606 * a ReplServerStartMsg. 607 */ 608 public int getConnectedDSNumber() 609 { 610 return connectedDSNumber; 611 } 612 613 /** 614 * Converts the object to a RSInfo object. 615 * @return The RSInfo object matching this object. 616 */ 617 RSInfo toRSInfo() 618 { 619 return rsInfo; 620 } 621 622 /** 623 * Updates replication server info with the passed RSInfo internal values 624 * and the passed connected DSs. 625 * @param rsInfo The RSinfo to use for the update 626 * @param connectedDSs The new connected DSs 627 */ 628 private void update(RSInfo rsInfo, Set<Integer> connectedDSs) 629 { 630 this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(), 631 rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 632 this.connectedDSs = connectedDSs; 633 this.connectedDSNumber = connectedDSs.size(); 634 } 635 636 private void setServerURL(String newServerURL) 637 { 638 rsInfo = new RSInfo(rsInfo.getId(), newServerURL, 639 rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight()); 640 } 641 642 /** 643 * Updates replication server info with the passed server state. 644 * @param serverState The ServerState to use for the update 645 */ 646 private void update(ServerState serverState) 647 { 648 this.serverState.update(serverState); 649 } 650 651 /** 652 * Get the getConnectedDSs. 653 * @return the getConnectedDSs 654 */ 655 public Set<Integer> getConnectedDSs() 656 { 657 return connectedDSs; 658 } 659 660 /** 661 * Gets the locally configured status for this RS. 662 * @return the locallyConfigured 663 */ 664 public boolean isLocallyConfigured() 665 { 666 return locallyConfigured; 667 } 668 669 /** 670 * Sets the locally configured status for this RS. 671 * @param locallyConfigured the locallyConfigured to set 672 */ 673 public void setLocallyConfigured(boolean locallyConfigured) 674 { 675 this.locallyConfigured = locallyConfigured; 676 } 677 678 /** 679 * Returns a string representation of this object. 680 * @return A string representation of this object. 681 */ 682 @Override 683 public String toString() 684 { 685 return "ReplServerInfo Url:" + getServerURL() 686 + " ServerId:" + getServerId() 687 + " GroupId:" + getGroupId() 688 + " connectedDSs:" + connectedDSs; 689 } 690 } 691 692 /** 693 * Contacts all replication servers to get information from them and being 694 * able to choose the more suitable. 695 * @return the collected information. 696 */ 697 private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo() 698 { 699 final Map<Integer, ReplicationServerInfo> rsInfos = new ConcurrentSkipListMap<>(); 700 701 for (String serverUrl : getReplicationServerUrls()) 702 { 703 // Connect to server + get and store info about it 704 final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false); 705 final ReplicationServerInfo rsInfo = rs.rsInfo; 706 if (rsInfo != null) 707 { 708 rsInfos.put(rsInfo.getServerId(), rsInfo); 709 } 710 } 711 712 return rsInfos; 713 } 714 715 /** 716 * Connect to a ReplicationServer. 717 * 718 * Handshake sequences between a DS and a RS is divided into 2 logical 719 * consecutive phases (phase 1 and phase 2). DS always initiates connection 720 * and always sends first message: 721 * 722 * DS<->RS: 723 * ------- 724 * 725 * phase 1: 726 * DS --- ServerStartMsg ---> RS 727 * DS <--- ReplServerStartDSMsg --- RS 728 * phase 2: 729 * DS --- StartSessionMsg ---> RS 730 * DS <--- TopologyMsg --- RS 731 * 732 * Before performing a full handshake sequence, DS searches for best suitable 733 * RS by making only phase 1 handshake to every RS he knows then closing 734 * connection. This allows to gather information on available RSs and then 735 * decide with which RS the full handshake (phase 1 then phase 2) will be 736 * finally performed. 737 * 738 * @throws NumberFormatException address was invalid 739 */ 740 private void connectAsDataServer() 741 { 742 /* 743 * If a first connect or a connection failure occur, we go through here. 744 * force status machine to NOT_CONNECTED_STATUS so that monitoring can see 745 * that we are not connected. 746 */ 747 domain.toNotConnectedStatus(); 748 749 /* 750 Stop any existing heartbeat monitor and changeTime publisher 751 from a previous session. 752 */ 753 stopRSHeartBeatMonitoring(); 754 stopChangeTimeHeartBeatPublishing(); 755 mustRunBestServerCheckingAlgorithm = 0; 756 757 synchronized (connectPhaseLock) 758 { 759 final int serverId = getServerId(); 760 final DN baseDN = getBaseDN(); 761 762 /* 763 * Connect to each replication server and get their ServerState then find 764 * out which one is the best to connect to. 765 */ 766 if (logger.isTraceEnabled()) 767 { 768 debugInfo("phase 1 : will perform PhaseOneH with each RS in order to elect the preferred one"); 769 } 770 771 // Get info from every available replication servers 772 Map<Integer, ReplicationServerInfo> rsInfos = 773 collectReplicationServersInfo(); 774 computeNewTopology(toRSInfos(rsInfos)); 775 776 if (rsInfos.isEmpty()) 777 { 778 setConnectedRS(ConnectedRS.noConnectedRS()); 779 } 780 else 781 { 782 // At least one server answered, find the best one. 783 RSEvaluations evals = computeBestReplicationServer(true, -1, state, 784 rsInfos, serverId, getGroupId(), getGenerationID()); 785 786 // Best found, now initialize connection to this one (handshake phase 1) 787 if (logger.isTraceEnabled()) 788 { 789 debugInfo("phase 2 : will perform PhaseOneH with the preferred RS=" + evals.getBestRS()); 790 } 791 792 final ConnectedRS electedRS = performPhaseOneHandshake( 793 evals.getBestRS().getServerURL(), true); 794 final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; 795 if (electedRsInfo != null) 796 { 797 /* 798 Update replication server info with potentially more up to date 799 data (server state for instance may have changed) 800 */ 801 rsInfos.put(electedRsInfo.getServerId(), electedRsInfo); 802 803 // Handshake phase 1 exchange went well 804 805 // Compute in which status we are starting the session to tell the RS 806 final ServerStatus initStatus = computeInitialServerStatus( 807 electedRsInfo.getGenerationId(), electedRsInfo.getServerState(), 808 electedRsInfo.getDegradedStatusThreshold(), getGenerationID()); 809 810 // Perform session start (handshake phase 2) 811 final TopologyMsg topologyMsg = 812 performPhaseTwoHandshake(electedRS, initStatus); 813 814 if (topologyMsg != null) // Handshake phase 2 exchange went well 815 { 816 connectToReplicationServer(electedRS, initStatus, topologyMsg); 817 } // Could perform handshake phase 2 with best 818 } // Could perform handshake phase 1 with best 819 } 820 821 // connectedRS has been updated by calls above, reload it 822 final ConnectedRS rs = connectedRS.get(); 823 if (rs.isConnected()) 824 { 825 connectPhaseLock.notify(); 826 827 final long rsGenId = rs.rsInfo.getGenerationId(); 828 final int rsServerId = rs.rsInfo.getServerId(); 829 if (rsGenId == getGenerationID() || rsGenId == -1) 830 { 831 logger.info(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG, serverId, rsServerId, baseDN, 832 rs.replicationServer, getGenerationID()); 833 } 834 else 835 { 836 logger.warn(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG, serverId, rsServerId, baseDN, 837 rs.replicationServer, getGenerationID(), rsGenId); 838 } 839 } 840 else 841 { 842 // This server could not find any replicationServer. 843 // It's going to start in degraded mode. Log a message. 844 if (!connectionError) 845 { 846 connectionError = true; 847 connectPhaseLock.notify(); 848 849 if (!rsInfos.isEmpty()) 850 { 851 logger.warn(WARN_COULD_NOT_FIND_CHANGELOG, serverId, baseDN, 852 Utils.joinAsString(", ", rsInfos.keySet())); 853 } 854 else 855 { 856 logger.warn(WARN_NO_AVAILABLE_CHANGELOGS, serverId, baseDN); 857 } 858 } 859 } 860 } 861 } 862 863 private void computeNewTopology(List<RSInfo> newRSInfos) 864 { 865 final int rsServerId = getRsServerId(); 866 867 Topology oldTopo; 868 Topology newTopo; 869 do 870 { 871 oldTopo = topology.get(); 872 newTopo = new Topology(oldTopo.replicaInfos, newRSInfos, getServerId(), 873 rsServerId, getReplicationServerUrls(), oldTopo.rsInfos); 874 } 875 while (!topology.compareAndSet(oldTopo, newTopo)); 876 877 if (logger.isTraceEnabled()) 878 { 879 debugInfo(topologyChange(rsServerId, oldTopo, newTopo)); 880 } 881 } 882 883 private StringBuilder topologyChange(int rsServerId, Topology oldTopo, 884 Topology newTopo) 885 { 886 final StringBuilder sb = new StringBuilder(); 887 sb.append("rsServerId=").append(rsServerId); 888 if (newTopo.equals(oldTopo)) 889 { 890 sb.append(", unchangedTopology=").append(newTopo); 891 } 892 else 893 { 894 sb.append(", oldTopology=").append(oldTopo); 895 sb.append(", newTopology=").append(newTopo); 896 } 897 return sb; 898 } 899 900 /** 901 * Connects to a replication server. 902 * 903 * @param rs 904 * the Replication Server to connect to 905 * @param initStatus 906 * The status to enter the state machine with 907 * @param topologyMsg 908 * the message containing the topology information 909 */ 910 private void connectToReplicationServer(ConnectedRS rs, 911 ServerStatus initStatus, TopologyMsg topologyMsg) 912 { 913 final DN baseDN = getBaseDN(); 914 final ReplicationServerInfo rsInfo = rs.rsInfo; 915 916 boolean connectCompleted = false; 917 try 918 { 919 maxSendWindow = rsInfo.getWindowSize(); 920 921 receiveTopo(topologyMsg, rs.getServerId()); 922 923 /* 924 Log a message to let the administrator know that the failure was resolved. 925 Wake up all the thread that were waiting on the window 926 on the previous connection. 927 */ 928 connectionError = false; 929 if (sendWindow != null) 930 { 931 /* 932 * Fix (hack) for OPENDJ-401: we want to ensure that no threads holding 933 * this semaphore will get blocked when they acquire it. However, we 934 * also need to make sure that we don't overflow the semaphore by 935 * releasing too many permits. 936 */ 937 final int MAX_PERMITS = Integer.MAX_VALUE >>> 2; 938 if (sendWindow.availablePermits() < MAX_PERMITS) 939 { 940 /* 941 * At least 2^29 acquisitions would need to occur for this to be 942 * insufficient. In addition, at least 2^30 releases would need to 943 * occur for this to potentially overflow. Hopefully this is unlikely 944 * to happen. 945 */ 946 sendWindow.release(MAX_PERMITS); 947 } 948 } 949 sendWindow = new Semaphore(maxSendWindow); 950 rcvWindow = getMaxRcvWindow(); 951 952 domain.sessionInitiated(initStatus, rsInfo.getServerState()); 953 954 final byte groupId = getGroupId(); 955 if (rs.getGroupId() != groupId) 956 { 957 /* 958 Connected to replication server with wrong group id: 959 warn user and start heartbeat monitor to recover when a server 960 with the right group id shows up. 961 */ 962 logger.warn(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID, 963 groupId, rs.getServerId(), rsInfo.getServerURL(), rs.getGroupId(), baseDN, getServerId()); 964 } 965 startRSHeartBeatMonitoring(rs); 966 if (rsInfo.getProtocolVersion() >= 967 ProtocolVersion.REPLICATION_PROTOCOL_V3) 968 { 969 startChangeTimeHeartBeatPublishing(rs); 970 } 971 connectCompleted = true; 972 } 973 catch (Exception e) 974 { 975 logger.error(ERR_COMPUTING_FAKE_OPS, baseDN, rsInfo.getServerURL(), 976 e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 977 } 978 finally 979 { 980 if (!connectCompleted) 981 { 982 setConnectedRS(ConnectedRS.noConnectedRS()); 983 } 984 } 985 } 986 987 /** 988 * Determines the status we are starting with according to our state and the 989 * RS state. 990 * 991 * @param rsGenId The generation id of the RS 992 * @param rsState The server state of the RS 993 * @param degradedStatusThreshold The degraded status threshold of the RS 994 * @param dsGenId The local generation id 995 * @return The initial status 996 */ 997 private ServerStatus computeInitialServerStatus(long rsGenId, 998 ServerState rsState, int degradedStatusThreshold, long dsGenId) 999 { 1000 if (rsGenId == -1) 1001 { 1002 // RS has no generation id 1003 return ServerStatus.NORMAL_STATUS; 1004 } 1005 else if (rsGenId != dsGenId) 1006 { 1007 // DS and RS do not have same generation id 1008 return ServerStatus.BAD_GEN_ID_STATUS; 1009 } 1010 else 1011 { 1012 /* 1013 DS and RS have same generation id 1014 1015 Determine if we are late or not to replay changes. RS uses a 1016 threshold value for pending changes to be replayed by a DS to 1017 determine if the DS is in normal status or in degraded status. 1018 Let's compare the local and remote server state using this threshold 1019 value to determine if we are late or not 1020 */ 1021 1022 int nChanges = ServerState.diffChanges(rsState, state); 1023 if (logger.isTraceEnabled()) 1024 { 1025 debugInfo("computed " + nChanges + " changes late."); 1026 } 1027 1028 /* 1029 Check status to know if it is relevant to change the status. Do not 1030 take RSD lock to test. If we attempt to change the status whereas 1031 we are in a status that do not allows that, this will be noticed by 1032 the changeStatusFromStatusAnalyzer method. This allows to take the 1033 lock roughly only when needed versus every sleep time timeout. 1034 */ 1035 if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold) 1036 { 1037 return ServerStatus.DEGRADED_STATUS; 1038 } 1039 // degradedStatusThreshold value of '0' means no degrading system used 1040 // (no threshold): force normal status 1041 return ServerStatus.NORMAL_STATUS; 1042 } 1043 } 1044 1045 1046 1047 /** 1048 * Connect to the provided server performing the first phase handshake (start 1049 * messages exchange) and return the reply message from the replication 1050 * server, wrapped in a ReplicationServerInfo object. 1051 * 1052 * @param serverURL 1053 * Server to connect to. 1054 * @param keepSession 1055 * Do we keep session opened or not after handshake. Use true if want 1056 * to perform handshake phase 2 with the same session and keep the 1057 * session to create as the current one. 1058 * @return The answer from the server . Null if could not get an answer. 1059 */ 1060 private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession) 1061 { 1062 Session newSession = null; 1063 Socket socket = null; 1064 boolean hasConnected = false; 1065 LocalizableMessage errorMessage = null; 1066 1067 try 1068 { 1069 // Open a socket connection to the next candidate. 1070 socket = new Socket(); 1071 socket.setReceiveBufferSize(1000000); 1072 socket.setTcpNoDelay(true); 1073 if (config.getSourceAddress() != null) 1074 { 1075 InetSocketAddress local = new InetSocketAddress(config.getSourceAddress(), 0); 1076 socket.bind(local); 1077 } 1078 int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); 1079 socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS); 1080 newSession = replSessionSecurity.createClientSession(socket, timeoutMS); 1081 boolean isSslEncryption = replSessionSecurity.isSslEncryption(); 1082 1083 // Send our ServerStartMsg. 1084 final HostPort hp = new HostPort( 1085 socket.getLocalAddress().getHostName(), socket.getLocalPort()); 1086 final String url = hp.toString(); 1087 final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), 1088 getMaxRcvWindow(), config.getHeartbeatInterval(), state, 1089 getGenerationID(), isSslEncryption, getGroupId()); 1090 newSession.publish(serverStartMsg); 1091 1092 // Read the ReplServerStartMsg or ReplServerStartDSMsg that should 1093 // come back. 1094 ReplicationMsg msg = newSession.receive(); 1095 if (logger.isTraceEnabled()) 1096 { 1097 debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n" 1098 + msg); 1099 } 1100 1101 // Wrap received message in a server info object 1102 final ReplicationServerInfo replServerInfo = 1103 ReplicationServerInfo.newInstance(msg, serverURL); 1104 1105 // Sanity check 1106 final DN repDN = replServerInfo.getBaseDN(); 1107 if (!getBaseDN().equals(repDN)) 1108 { 1109 errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDN, getBaseDN()); 1110 return setConnectedRS(ConnectedRS.noConnectedRS()); 1111 } 1112 1113 /* 1114 * We have sent our own protocol version to the replication server. The 1115 * replication server will use the same one (or an older one if it is an 1116 * old replication server). 1117 */ 1118 newSession.setProtocolVersion( 1119 getCompatibleVersion(replServerInfo.getProtocolVersion())); 1120 1121 if (!isSslEncryption) 1122 { 1123 newSession.stopEncryption(); 1124 } 1125 1126 hasConnected = true; 1127 1128 if (keepSession) 1129 { 1130 // cannot store it yet, 1131 // only store after a successful phase two handshake 1132 return new ConnectedRS(replServerInfo, newSession); 1133 } 1134 return new ConnectedRS(replServerInfo, null); 1135 } 1136 catch (ConnectException e) 1137 { 1138 logger.traceException(e); 1139 errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(), serverURL, getBaseDN()); 1140 } 1141 catch (SocketTimeoutException e) 1142 { 1143 logger.traceException(e); 1144 errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(), serverURL, getBaseDN()); 1145 } 1146 catch (Exception e) 1147 { 1148 logger.traceException(e); 1149 errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get( 1150 getServerId(), serverURL, getBaseDN(), stackTraceToSingleLineString(e)); 1151 } 1152 finally 1153 { 1154 if (!hasConnected || !keepSession) 1155 { 1156 close(newSession); 1157 close(socket); 1158 } 1159 1160 if (!hasConnected && errorMessage != null && !connectionError) 1161 { 1162 // There was no server waiting on this host:port 1163 // Log a notice and will try the next replicationServer in the list 1164 if (keepSession) // Log error message only for final connection 1165 { 1166 // log the error message only once to avoid overflowing the error log 1167 logger.error(errorMessage); 1168 } 1169 1170 logger.trace(errorMessage); 1171 } 1172 } 1173 return setConnectedRS(ConnectedRS.noConnectedRS()); 1174 } 1175 1176 /** 1177 * Performs the second phase handshake (send StartSessionMsg and receive 1178 * TopologyMsg messages exchange) and return the reply message from the 1179 * replication server. 1180 * 1181 * @param electedRS Server we are connecting with. 1182 * @param initStatus The status we are starting with 1183 * @return The ReplServerStartMsg the server replied. Null if could not 1184 * get an answer. 1185 */ 1186 private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS, 1187 ServerStatus initStatus) 1188 { 1189 try 1190 { 1191 // Send our StartSessionMsg. 1192 final StartSessionMsg startSessionMsg; 1193 startSessionMsg = new StartSessionMsg( 1194 initStatus, 1195 domain.getRefUrls(), 1196 domain.isAssured(), 1197 domain.getAssuredMode(), 1198 domain.getAssuredSdLevel()); 1199 startSessionMsg.setEclIncludes( 1200 domain.getEclIncludes(domain.getServerId()), 1201 domain.getEclIncludesForDeletes(domain.getServerId())); 1202 final Session session = electedRS.session; 1203 session.publish(startSessionMsg); 1204 1205 // Read the TopologyMsg that should come back. 1206 final TopologyMsg topologyMsg = (TopologyMsg) session.receive(); 1207 1208 if (logger.isTraceEnabled()) 1209 { 1210 debugInfo("RB HANDSHAKE SENT:\n" + startSessionMsg 1211 + "\nAND RECEIVED:\n" + topologyMsg); 1212 } 1213 1214 // Alright set the timeout to the desired value 1215 session.setSoTimeout(timeout); 1216 setConnectedRS(electedRS); 1217 return topologyMsg; 1218 } 1219 catch (Exception e) 1220 { 1221 logger.error(WARN_EXCEPTION_STARTING_SESSION_PHASE, 1222 getServerId(), electedRS.rsInfo.getServerURL(), getBaseDN(), stackTraceToSingleLineString(e)); 1223 1224 setConnectedRS(ConnectedRS.noConnectedRS()); 1225 return null; 1226 } 1227 } 1228 1229 /** 1230 * Class holding evaluation results for electing the best replication server 1231 * for the local directory server. 1232 */ 1233 static class RSEvaluations 1234 { 1235 private final int localServerId; 1236 private Map<Integer, ReplicationServerInfo> bestRSs; 1237 private final Map<Integer, LocalizableMessage> rsEvals = new HashMap<>(); 1238 1239 /** 1240 * Ctor. 1241 * 1242 * @param localServerId 1243 * the serverId for the local directory server 1244 * @param rsInfos 1245 * a Map of serverId => {@link ReplicationServerInfo} with all the 1246 * candidate replication servers 1247 */ 1248 RSEvaluations(int localServerId, 1249 Map<Integer, ReplicationServerInfo> rsInfos) 1250 { 1251 this.localServerId = localServerId; 1252 this.bestRSs = rsInfos; 1253 } 1254 1255 private boolean keepBest(LocalEvaluation eval) 1256 { 1257 if (eval.hasAcceptedAny()) 1258 { 1259 bestRSs = eval.getAccepted(); 1260 rsEvals.putAll(eval.getRejected()); 1261 return true; 1262 } 1263 return false; 1264 } 1265 1266 /** 1267 * Sets the elected best replication server, rejecting all the other 1268 * replication servers with the supplied evaluation. 1269 * 1270 * @param bestRsId 1271 * the serverId of the elected replication server 1272 * @param rejectedRSsEval 1273 * the evaluation for all the rejected replication servers 1274 */ 1275 private void setBestRS(int bestRsId, LocalizableMessage rejectedRSsEval) 1276 { 1277 for (Iterator<Entry<Integer, ReplicationServerInfo>> it = 1278 this.bestRSs.entrySet().iterator(); it.hasNext();) 1279 { 1280 final Entry<Integer, ReplicationServerInfo> entry = it.next(); 1281 final Integer rsId = entry.getKey(); 1282 final ReplicationServerInfo rsInfo = entry.getValue(); 1283 if (rsInfo.getServerId() != bestRsId) 1284 { 1285 it.remove(); 1286 } 1287 rsEvals.put(rsId, rejectedRSsEval); 1288 } 1289 } 1290 1291 private void discardAll(LocalizableMessage eval) 1292 { 1293 for (Integer rsId : bestRSs.keySet()) 1294 { 1295 rsEvals.put(rsId, eval); 1296 } 1297 } 1298 1299 private boolean foundBestRS() 1300 { 1301 return bestRSs.size() == 1; 1302 } 1303 1304 /** 1305 * Returns the {@link ReplicationServerInfo} for the best replication 1306 * server. 1307 * 1308 * @return the {@link ReplicationServerInfo} for the best replication server 1309 */ 1310 ReplicationServerInfo getBestRS() 1311 { 1312 if (foundBestRS()) 1313 { 1314 return bestRSs.values().iterator().next(); 1315 } 1316 return null; 1317 } 1318 1319 /** 1320 * Returns the evaluations for all the candidate replication servers. 1321 * 1322 * @return a Map of serverId => LocalizableMessage containing the evaluation for each 1323 * candidate replication servers. 1324 */ 1325 Map<Integer, LocalizableMessage> getEvaluations() 1326 { 1327 if (foundBestRS()) 1328 { 1329 final Integer bestRSServerId = getBestRS().getServerId(); 1330 if (rsEvals.get(bestRSServerId) == null) 1331 { 1332 final LocalizableMessage eval = NOTE_BEST_RS.get(bestRSServerId, localServerId); 1333 rsEvals.put(bestRSServerId, eval); 1334 } 1335 } 1336 return Collections.unmodifiableMap(rsEvals); 1337 } 1338 1339 /** 1340 * Returns the evaluation for the supplied replication server Id. 1341 * <p> 1342 * Note: "unknown RS" message is returned if the supplied replication server 1343 * was not part of the candidate replication servers. 1344 * 1345 * @param rsServerId 1346 * the supplied replication server Id 1347 * @return the evaluation {@link LocalizableMessage} for the supplied replication 1348 * server Id 1349 */ 1350 private LocalizableMessage getEvaluation(int rsServerId) 1351 { 1352 final LocalizableMessage evaluation = getEvaluations().get(rsServerId); 1353 if (evaluation != null) 1354 { 1355 return evaluation; 1356 } 1357 return NOTE_UNKNOWN_RS.get(rsServerId, localServerId); 1358 } 1359 1360 /** {@inheritDoc} */ 1361 @Override 1362 public String toString() 1363 { 1364 return "Current best replication server Ids: " + bestRSs.keySet() 1365 + ", Evaluation of connected replication servers" 1366 + " (ServerId => Evaluation): " + rsEvals.keySet() 1367 + ", Any replication server not appearing here" 1368 + " could not be contacted."; 1369 } 1370 } 1371 1372 /** 1373 * Evaluation local to one filter. 1374 */ 1375 private static class LocalEvaluation 1376 { 1377 private final Map<Integer, ReplicationServerInfo> accepted = new HashMap<>(); 1378 private final Map<ReplicationServerInfo, LocalizableMessage> rsEvals = new HashMap<>(); 1379 1380 private void accept(Integer rsId, ReplicationServerInfo rsInfo) 1381 { 1382 // forget previous eval, including undoing reject 1383 this.rsEvals.remove(rsInfo); 1384 this.accepted.put(rsId, rsInfo); 1385 } 1386 1387 private void reject(ReplicationServerInfo rsInfo, LocalizableMessage reason) 1388 { 1389 this.accepted.remove(rsInfo.getServerId()); // undo accept 1390 this.rsEvals.put(rsInfo, reason); 1391 } 1392 1393 private Map<Integer, ReplicationServerInfo> getAccepted() 1394 { 1395 return accepted; 1396 } 1397 1398 private ReplicationServerInfo[] getAcceptedRSInfos() 1399 { 1400 return accepted.values().toArray( 1401 new ReplicationServerInfo[accepted.size()]); 1402 } 1403 1404 public Map<Integer, LocalizableMessage> getRejected() 1405 { 1406 final Map<Integer, LocalizableMessage> result = new HashMap<>(); 1407 for (Entry<ReplicationServerInfo, LocalizableMessage> entry : rsEvals.entrySet()) 1408 { 1409 result.put(entry.getKey().getServerId(), entry.getValue()); 1410 } 1411 return result; 1412 } 1413 1414 private boolean hasAcceptedAny() 1415 { 1416 return !accepted.isEmpty(); 1417 } 1418 1419 } 1420 1421 /** 1422 * Returns the replication server that best fits our need so that we can 1423 * connect to it or determine if we must disconnect from current one to 1424 * re-connect to best server. 1425 * <p> 1426 * Note: this method is static for test purpose (access from unit tests) 1427 * 1428 * @param firstConnection True if we run this method for the very first 1429 * connection of the broker. False if we run this method to determine if the 1430 * replication server we are currently connected to is still the best or not. 1431 * @param rsServerId The id of the replication server we are currently 1432 * connected to. Only used when firstConnection is false. 1433 * @param myState The local server state. 1434 * @param rsInfos The list of available replication servers and their 1435 * associated information (choice will be made among them). 1436 * @param localServerId The server id for the suffix we are working for. 1437 * @param groupId The groupId we prefer being connected to if possible 1438 * @param generationId The generation id we are using 1439 * @return The computed best replication server. If the returned value is 1440 * null, the best replication server is undetermined but the local server must 1441 * disconnect (so the best replication server is another one than the current 1442 * one). Null can only be returned when firstConnection is false. 1443 */ 1444 static RSEvaluations computeBestReplicationServer( 1445 boolean firstConnection, int rsServerId, ServerState myState, 1446 Map<Integer, ReplicationServerInfo> rsInfos, int localServerId, 1447 byte groupId, long generationId) 1448 { 1449 final RSEvaluations evals = new RSEvaluations(localServerId, rsInfos); 1450 // Shortcut, if only one server, this is the best 1451 if (evals.foundBestRS()) 1452 { 1453 return evals; 1454 } 1455 1456 /** 1457 * Apply some filtering criteria to determine the best servers list from 1458 * the available ones. The ordered list of criteria is (from more important 1459 * to less important): 1460 * - replication server has the same group id as the local DS one 1461 * - replication server has the same generation id as the local DS one 1462 * - replication server is up to date regarding changes generated by the 1463 * local DS 1464 * - replication server in the same VM as local DS one 1465 */ 1466 /* 1467 The list of best replication servers is filtered with each criteria. At 1468 each criteria, the list is replaced with the filtered one if there 1469 are some servers from the filtering, otherwise, the list is left as is 1470 and the new filtering for the next criteria is applied and so on. 1471 1472 Use only servers locally configured: those are servers declared in 1473 the local configuration. When the current method is called, for 1474 sure, at least one server from the list is locally configured 1475 */ 1476 filterServersLocallyConfigured(evals, localServerId); 1477 // Some servers with same group id ? 1478 filterServersWithSameGroupId(evals, localServerId, groupId); 1479 // Some servers with same generation id ? 1480 final boolean rssWithSameGenerationIdExist = 1481 filterServersWithSameGenerationId(evals, localServerId, generationId); 1482 if (rssWithSameGenerationIdExist) 1483 { 1484 // If some servers with the right generation id this is useful to 1485 // run the local DS change criteria 1486 filterServersWithAllLocalDSChanges(evals, myState, localServerId); 1487 } 1488 // Some servers in the local VM or local host? 1489 filterServersOnSameHost(evals, localServerId); 1490 1491 if (evals.foundBestRS()) 1492 { 1493 return evals; 1494 } 1495 1496 /** 1497 * Now apply the choice based on the weight to the best servers list 1498 */ 1499 if (firstConnection) 1500 { 1501 // We are not connected to a server yet 1502 computeBestServerForWeight(evals, -1, -1); 1503 } 1504 else 1505 { 1506 /* 1507 * We are already connected to a RS: compute the best RS as far as the 1508 * weights is concerned. If this is another one, some DS must disconnect. 1509 */ 1510 computeBestServerForWeight(evals, rsServerId, localServerId); 1511 } 1512 return evals; 1513 } 1514 1515 /** 1516 * Creates a new list that contains only replication servers that are locally 1517 * configured. 1518 * @param evals The evaluation object 1519 */ 1520 private static void filterServersLocallyConfigured(RSEvaluations evals, 1521 int localServerId) 1522 { 1523 final LocalEvaluation eval = new LocalEvaluation(); 1524 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1525 { 1526 final Integer rsId = entry.getKey(); 1527 final ReplicationServerInfo rsInfo = entry.getValue(); 1528 if (rsInfo.isLocallyConfigured()) 1529 { 1530 eval.accept(rsId, rsInfo); 1531 } 1532 else 1533 { 1534 eval.reject(rsInfo, 1535 NOTE_RS_NOT_LOCALLY_CONFIGURED.get(rsId, localServerId)); 1536 } 1537 } 1538 evals.keepBest(eval); 1539 } 1540 1541 /** 1542 * Creates a new list that contains only replication servers that have the 1543 * passed group id, from a passed replication server list. 1544 * @param evals The evaluation object 1545 * @param groupId The group id that must match 1546 */ 1547 private static void filterServersWithSameGroupId(RSEvaluations evals, 1548 int localServerId, byte groupId) 1549 { 1550 final LocalEvaluation eval = new LocalEvaluation(); 1551 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1552 { 1553 final Integer rsId = entry.getKey(); 1554 final ReplicationServerInfo rsInfo = entry.getValue(); 1555 if (rsInfo.getGroupId() == groupId) 1556 { 1557 eval.accept(rsId, rsInfo); 1558 } 1559 else 1560 { 1561 eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS.get( 1562 rsId, rsInfo.getGroupId(), localServerId, groupId)); 1563 } 1564 } 1565 evals.keepBest(eval); 1566 } 1567 1568 /** 1569 * Creates a new list that contains only replication servers that have the 1570 * provided generation id, from a provided replication server list. 1571 * When the selected replication servers have no change (empty serverState) 1572 * then the 'empty'(generationId==-1) replication servers are also included 1573 * in the result list. 1574 * 1575 * @param evals The evaluation object 1576 * @param generationId The generation id that must match 1577 * @return whether some replication server passed the filter 1578 */ 1579 private static boolean filterServersWithSameGenerationId( 1580 RSEvaluations evals, long localServerId, long generationId) 1581 { 1582 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1583 final LocalEvaluation eval = new LocalEvaluation(); 1584 boolean emptyState = true; 1585 1586 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1587 { 1588 final Integer rsId = entry.getKey(); 1589 final ReplicationServerInfo rsInfo = entry.getValue(); 1590 if (rsInfo.getGenerationId() == generationId) 1591 { 1592 eval.accept(rsId, rsInfo); 1593 if (!rsInfo.serverState.isEmpty()) 1594 { 1595 emptyState = false; 1596 } 1597 } 1598 else if (rsInfo.getGenerationId() == -1) 1599 { 1600 eval.reject(rsInfo, NOTE_RS_HAS_NO_GENERATION_ID.get(rsId, 1601 generationId, localServerId)); 1602 } 1603 else 1604 { 1605 eval.reject(rsInfo, NOTE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS.get( 1606 rsId, rsInfo.getGenerationId(), localServerId, generationId)); 1607 } 1608 } 1609 1610 if (emptyState) 1611 { 1612 // If the RS with a generationId have all an empty state, 1613 // then the 'empty'(genId=-1) RSes are also candidate 1614 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1615 { 1616 ReplicationServerInfo rsInfo = entry.getValue(); 1617 if (rsInfo.getGenerationId() == -1) 1618 { 1619 // will undo the reject of previously rejected RSs 1620 eval.accept(entry.getKey(), rsInfo); 1621 } 1622 } 1623 } 1624 1625 return evals.keepBest(eval); 1626 } 1627 1628 /** 1629 * Creates a new list that contains only replication servers that have the 1630 * latest changes from the passed DS, from a passed replication server list. 1631 * @param evals The evaluation object 1632 * @param localState The state of the local DS 1633 * @param localServerId The server id to consider for the changes 1634 */ 1635 private static void filterServersWithAllLocalDSChanges( 1636 RSEvaluations evals, ServerState localState, int localServerId) 1637 { 1638 // Extract the CSN of the latest change generated by the local server 1639 final CSN localCSN = getCSN(localState, localServerId); 1640 1641 /** 1642 * Find replication servers that are up to date (or more up to date than us, 1643 * if for instance we failed and restarted, having sent some changes to the 1644 * RS but without having time to store our own state) regarding our own 1645 * server id. If some servers are more up to date, prefer this list but take 1646 * only the latest CSN. 1647 */ 1648 final LocalEvaluation mostUpToDateEval = new LocalEvaluation(); 1649 boolean foundRSMoreUpToDateThanLocalDS = false; 1650 CSN latestRsCSN = null; 1651 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1652 { 1653 final Integer rsId = entry.getKey(); 1654 final ReplicationServerInfo rsInfo = entry.getValue(); 1655 final CSN rsCSN = getCSN(rsInfo.getServerState(), localServerId); 1656 1657 // Has this replication server the latest local change ? 1658 if (rsCSN.isOlderThan(localCSN)) 1659 { 1660 mostUpToDateEval.reject(rsInfo, NOTE_RS_LATER_THAN_LOCAL_DS.get( 1661 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1662 } 1663 else if (rsCSN.equals(localCSN)) 1664 { 1665 // This replication server has exactly the latest change from the 1666 // local server 1667 if (!foundRSMoreUpToDateThanLocalDS) 1668 { 1669 mostUpToDateEval.accept(rsId, rsInfo); 1670 } 1671 else 1672 { 1673 mostUpToDateEval.reject(rsInfo, 1674 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1675 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1676 } 1677 } 1678 else if (rsCSN.isNewerThan(localCSN)) 1679 { 1680 // This replication server is even more up to date than the local server 1681 if (latestRsCSN == null) 1682 { 1683 foundRSMoreUpToDateThanLocalDS = true; 1684 // all previous results are now outdated, reject them all 1685 rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, 1686 localCSN); 1687 // Initialize the latest CSN 1688 latestRsCSN = rsCSN; 1689 } 1690 1691 if (rsCSN.equals(latestRsCSN)) 1692 { 1693 mostUpToDateEval.accept(rsId, rsInfo); 1694 } 1695 else if (rsCSN.isNewerThan(latestRsCSN)) 1696 { 1697 // This RS is even more up to date, reject all previously accepted RSs 1698 // and store this new RS 1699 rejectAllWithRSIsLaterThanBestRS(mostUpToDateEval, localServerId, 1700 localCSN); 1701 mostUpToDateEval.accept(rsId, rsInfo); 1702 latestRsCSN = rsCSN; 1703 } 1704 else 1705 { 1706 mostUpToDateEval.reject(rsInfo, 1707 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1708 rsId, rsCSN.toStringUI(), localServerId, localCSN.toStringUI())); 1709 } 1710 } 1711 } 1712 evals.keepBest(mostUpToDateEval); 1713 } 1714 1715 private static CSN getCSN(ServerState state, int serverId) 1716 { 1717 final CSN csn = state.getCSN(serverId); 1718 if (csn != null) 1719 { 1720 return csn; 1721 } 1722 return new CSN(0, 0, serverId); 1723 } 1724 1725 private static void rejectAllWithRSIsLaterThanBestRS( 1726 final LocalEvaluation eval, int localServerId, CSN localCSN) 1727 { 1728 for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos()) 1729 { 1730 final String rsCSN = 1731 getCSN(rsInfo.getServerState(), localServerId).toStringUI(); 1732 final LocalizableMessage reason = 1733 NOTE_RS_LATER_THAN_ANOTHER_RS_MORE_UP_TO_DATE_THAN_LOCAL_DS.get( 1734 rsInfo.getServerId(), rsCSN, localServerId, localCSN.toStringUI()); 1735 eval.reject(rsInfo, reason); 1736 } 1737 } 1738 1739 /** 1740 * Creates a new list that contains only replication servers that are on the 1741 * same host as the local DS, from a passed replication server list. This 1742 * method will gives priority to any replication server which is in the same 1743 * VM as this DS. 1744 * 1745 * @param evals The evaluation object 1746 */ 1747 private static void filterServersOnSameHost(RSEvaluations evals, 1748 int localServerId) 1749 { 1750 /* 1751 * Initially look for all servers on the same host. If we find one in the 1752 * same VM, then narrow the search. 1753 */ 1754 boolean foundRSInSameVM = false; 1755 final LocalEvaluation eval = new LocalEvaluation(); 1756 for (Entry<Integer, ReplicationServerInfo> entry : evals.bestRSs.entrySet()) 1757 { 1758 final Integer rsId = entry.getKey(); 1759 final ReplicationServerInfo rsInfo = entry.getValue(); 1760 final HostPort hp = HostPort.valueOf(rsInfo.getServerURL()); 1761 if (hp.isLocalAddress()) 1762 { 1763 if (isLocalReplicationServerPort(hp.getPort())) 1764 { 1765 if (!foundRSInSameVM) 1766 { 1767 // An RS in the same VM will always have priority. 1768 // Narrow the search to only include servers in this VM. 1769 rejectAllWithRSOnDifferentVMThanDS(eval, localServerId); 1770 foundRSInSameVM = true; 1771 } 1772 eval.accept(rsId, rsInfo); 1773 } 1774 else if (!foundRSInSameVM) 1775 { 1776 // OK, accept RSs on the same machine because we have not found an RS 1777 // in the same VM yet 1778 eval.accept(rsId, rsInfo); 1779 } 1780 else 1781 { 1782 // Skip: we have found some RSs in the same VM, but this RS is not. 1783 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get(rsId, 1784 localServerId)); 1785 } 1786 } 1787 else 1788 { 1789 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_HOST_THAN_DS.get(rsId, 1790 localServerId)); 1791 } 1792 } 1793 evals.keepBest(eval); 1794 } 1795 1796 private static void rejectAllWithRSOnDifferentVMThanDS(LocalEvaluation eval, 1797 int localServerId) 1798 { 1799 for (ReplicationServerInfo rsInfo : eval.getAcceptedRSInfos()) 1800 { 1801 eval.reject(rsInfo, NOTE_RS_ON_DIFFERENT_VM_THAN_DS.get( 1802 rsInfo.getServerId(), localServerId)); 1803 } 1804 } 1805 1806 /** 1807 * Computes the best replication server the local server should be connected 1808 * to so that the load is correctly spread across the topology, following the 1809 * weights guidance. 1810 * Warning: This method is expected to be called with at least 2 servers in 1811 * bestServers 1812 * Note: this method is static for test purpose (access from unit tests) 1813 * @param evals The evaluation object 1814 * @param currentRsServerId The replication server the local server is 1815 * currently connected to. -1 if the local server is not yet connected 1816 * to any replication server. 1817 * @param localServerId The server id of the local server. This is not used 1818 * when it is not connected to a replication server 1819 * (currentRsServerId = -1) 1820 */ 1821 static void computeBestServerForWeight(RSEvaluations evals, 1822 int currentRsServerId, int localServerId) 1823 { 1824 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1825 /* 1826 * - Compute the load goal of each RS, deducing it from the weights affected 1827 * to them. 1828 * - Compute the current load of each RS, deducing it from the DSs 1829 * currently connected to them. 1830 * - Compute the differences between the load goals and the current loads of 1831 * the RSs. 1832 */ 1833 // Sum of the weights 1834 int sumOfWeights = 0; 1835 // Sum of the connected DSs 1836 int sumOfConnectedDSs = 0; 1837 for (ReplicationServerInfo rsInfo : bestServers.values()) 1838 { 1839 sumOfWeights += rsInfo.getWeight(); 1840 sumOfConnectedDSs += rsInfo.getConnectedDSNumber(); 1841 } 1842 1843 // Distance (difference) of the current loads to the load goals of each RS: 1844 // key:server id, value: distance 1845 Map<Integer, BigDecimal> loadDistances = new HashMap<>(); 1846 // Precision for the operations (number of digits after the dot) 1847 final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); 1848 for (Entry<Integer, ReplicationServerInfo> entry : bestServers.entrySet()) 1849 { 1850 final Integer rsId = entry.getKey(); 1851 final ReplicationServerInfo rsInfo = entry.getValue(); 1852 1853 // load goal = rs weight / sum of weights 1854 BigDecimal loadGoalBd = BigDecimal.valueOf(rsInfo.getWeight()).divide( 1855 BigDecimal.valueOf(sumOfWeights), mathContext); 1856 BigDecimal currentLoadBd = BigDecimal.ZERO; 1857 if (sumOfConnectedDSs != 0) 1858 { 1859 // current load = number of connected DSs / total number of DSs 1860 int connectedDSs = rsInfo.getConnectedDSNumber(); 1861 currentLoadBd = BigDecimal.valueOf(connectedDSs).divide( 1862 BigDecimal.valueOf(sumOfConnectedDSs), mathContext); 1863 } 1864 // load distance = load goal - current load 1865 BigDecimal loadDistanceBd = 1866 loadGoalBd.subtract(currentLoadBd, mathContext); 1867 loadDistances.put(rsId, loadDistanceBd); 1868 } 1869 1870 if (currentRsServerId == -1) 1871 { 1872 // The local server is not connected yet, find best server to connect to, 1873 // taking the weights into account. 1874 computeBestServerWhenNotConnected(evals, loadDistances, localServerId); 1875 } 1876 else 1877 { 1878 // The local server is currently connected to a RS, let's see if it must 1879 // disconnect or not, taking the weights into account. 1880 computeBestServerWhenConnected(evals, loadDistances, localServerId, 1881 currentRsServerId, sumOfWeights, sumOfConnectedDSs); 1882 } 1883 } 1884 1885 private static void computeBestServerWhenNotConnected(RSEvaluations evals, 1886 Map<Integer, BigDecimal> loadDistances, int localServerId) 1887 { 1888 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1889 /* 1890 * Find the server with the current highest distance to its load goal and 1891 * choose it. Make an exception if every server is correctly balanced, 1892 * that is every current load distances are equal to 0, in that case, 1893 * choose the server with the highest weight 1894 */ 1895 int bestRsId = 0; // If all server equal, return the first one 1896 float highestDistance = Float.NEGATIVE_INFINITY; 1897 boolean allRsWithZeroDistance = true; 1898 int highestWeightRsId = -1; 1899 int highestWeight = -1; 1900 for (Integer rsId : bestServers.keySet()) 1901 { 1902 float loadDistance = loadDistances.get(rsId).floatValue(); 1903 if (loadDistance > highestDistance) 1904 { 1905 // This server is far more from its balance point 1906 bestRsId = rsId; 1907 highestDistance = loadDistance; 1908 } 1909 if (loadDistance != 0) 1910 { 1911 allRsWithZeroDistance = false; 1912 } 1913 int weight = bestServers.get(rsId).getWeight(); 1914 if (weight > highestWeight) 1915 { 1916 // This server has a higher weight 1917 highestWeightRsId = rsId; 1918 highestWeight = weight; 1919 } 1920 } 1921 // All servers with a 0 distance ? 1922 if (allRsWithZeroDistance) 1923 { 1924 // Choose server with the highest weight 1925 bestRsId = highestWeightRsId; 1926 } 1927 evals.setBestRS(bestRsId, NOTE_BIGGEST_WEIGHT_RS.get(localServerId, 1928 bestRsId)); 1929 } 1930 1931 private static void computeBestServerWhenConnected(RSEvaluations evals, 1932 Map<Integer, BigDecimal> loadDistances, int localServerId, 1933 int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs) 1934 { 1935 final Map<Integer, ReplicationServerInfo> bestServers = evals.bestRSs; 1936 final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP); 1937 float currentLoadDistance = 1938 loadDistances.get(currentRsServerId).floatValue(); 1939 if (currentLoadDistance < 0) 1940 { 1941 /* 1942 Too much DSs connected to the current RS, compared with its load 1943 goal: 1944 Determine the potential number of DSs to disconnect from the current 1945 RS and see if the local DS is part of them: the DSs that must 1946 disconnect are those with the lowest server id. 1947 Compute the sum of the distances of the load goals of the other RSs 1948 */ 1949 BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO; 1950 for (Integer rsId : bestServers.keySet()) 1951 { 1952 if (rsId != currentRsServerId) 1953 { 1954 sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add( 1955 loadDistances.get(rsId), mathContext); 1956 } 1957 } 1958 1959 if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0) 1960 { 1961 /* 1962 The average distance of the other RSs shows a lack of DSs. 1963 Compute the number of DSs to disconnect from the current RS, 1964 rounding to the nearest integer number. Do only this if there is 1965 no risk of yoyo effect: when the exact balance cannot be 1966 established due to the current number of DSs connected, do not 1967 disconnect a DS. A simple example where the balance cannot be 1968 reached is: 1969 - RS1 has weight 1 and 2 DSs 1970 - RS2 has weight 1 and 1 DS 1971 => disconnecting a DS from RS1 to reconnect it to RS2 would have no 1972 sense as this would lead to the reverse situation. In that case, 1973 the perfect balance cannot be reached and we must stick to the 1974 current situation, otherwise the DS would keep move between the 2 1975 RSs 1976 */ 1977 float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd. 1978 multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext) 1979 .floatValue(); 1980 int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber); 1981 1982 // Avoid yoyo effect 1983 if (overloadingDSsNumber == 1) 1984 { 1985 // What would be the new load distance for the current RS if 1986 // we disconnect some DSs ? 1987 ReplicationServerInfo currentReplicationServerInfo = 1988 bestServers.get(currentRsServerId); 1989 1990 int currentRsWeight = currentReplicationServerInfo.getWeight(); 1991 BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight); 1992 BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights); 1993 BigDecimal currentRsLoadGoalBd = 1994 currentRsWeightBd.divide(sumOfWeightsBd, mathContext); 1995 BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO; 1996 if (sumOfConnectedDSs != 0) 1997 { 1998 int connectedDSs = currentReplicationServerInfo. 1999 getConnectedDSNumber(); 2000 BigDecimal potentialNewConnectedDSsBd = 2001 BigDecimal.valueOf(connectedDSs - 1); 2002 BigDecimal sumOfConnectedDSsBd = 2003 BigDecimal.valueOf(sumOfConnectedDSs); 2004 potentialCurrentRsNewLoadBd = 2005 potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd, 2006 mathContext); 2007 } 2008 BigDecimal potentialCurrentRsNewLoadDistanceBd = 2009 currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd, 2010 mathContext); 2011 2012 // What would be the new load distance for the other RSs ? 2013 BigDecimal additionalDsLoadBd = 2014 BigDecimal.ONE.divide( 2015 BigDecimal.valueOf(sumOfConnectedDSs), mathContext); 2016 BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd = 2017 sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd, 2018 mathContext); 2019 2020 /* 2021 Now compare both values: we must not disconnect the DS if this 2022 is for going in a situation where the load distance of the other 2023 RSs is the opposite of the future load distance of the local RS 2024 or we would evaluate that we should disconnect just after being 2025 arrived on the new RS. But we should disconnect if we reach the 2026 perfect balance (both values are 0). 2027 */ 2028 if (mustAvoidYoyoEffect(potentialCurrentRsNewLoadDistanceBd, 2029 potentialNewSumOfLoadDistancesOfOtherRSsBd)) 2030 { 2031 // Avoid the yoyo effect, and keep the local DS connected to its 2032 // current RS 2033 evals.setBestRS(currentRsServerId, 2034 NOTE_AVOID_YOYO_EFFECT.get(localServerId, currentRsServerId)); 2035 return; 2036 } 2037 } 2038 2039 ReplicationServerInfo currentRsInfo = 2040 bestServers.get(currentRsServerId); 2041 if (isServerOverloadingRS(localServerId, currentRsInfo, 2042 overloadingDSsNumber)) 2043 { 2044 // The local server is part of the DSs to disconnect 2045 evals.discardAll(NOTE_DISCONNECT_DS_FROM_OVERLOADED_RS.get( 2046 localServerId, currentRsServerId)); 2047 } 2048 else 2049 { 2050 // The local server is not part of the servers to disconnect from the 2051 // current RS. 2052 evals.setBestRS(currentRsServerId, 2053 NOTE_DO_NOT_DISCONNECT_DS_FROM_OVERLOADED_RS.get(localServerId, 2054 currentRsServerId)); 2055 } 2056 } else { 2057 // The average distance of the other RSs does not show a lack of DSs: 2058 // no need to disconnect any DS from the current RS. 2059 evals.setBestRS(currentRsServerId, 2060 NOTE_NO_NEED_TO_REBALANCE_DSS_BETWEEN_RSS.get(localServerId, 2061 currentRsServerId)); 2062 } 2063 } else { 2064 // The RS load goal is reached or there are not enough DSs connected to 2065 // it to reach it: do not disconnect from this RS and return rsInfo for 2066 // this RS 2067 evals.setBestRS(currentRsServerId, 2068 NOTE_DO_NOT_DISCONNECT_DS_FROM_ACCEPTABLE_LOAD_RS.get(localServerId, 2069 currentRsServerId)); 2070 } 2071 } 2072 2073 private static boolean mustAvoidYoyoEffect(BigDecimal rsNewLoadDistance, 2074 BigDecimal otherRSsNewSumOfLoadDistances) 2075 { 2076 final MathContext roundCtx = new MathContext(6, RoundingMode.DOWN); 2077 final BigDecimal rsLoadDistance = rsNewLoadDistance.round(roundCtx); 2078 final BigDecimal otherRSsSumOfLoadDistances = 2079 otherRSsNewSumOfLoadDistances.round(roundCtx); 2080 2081 return rsLoadDistance.compareTo(BigDecimal.ZERO) != 0 2082 && rsLoadDistance.compareTo(otherRSsSumOfLoadDistances.negate()) == 0; 2083 } 2084 2085 /** 2086 * Returns whether the local DS is overloading the RS. 2087 * <p> 2088 * There are an "overloadingDSsNumber" of DS overloading the RS. The list of 2089 * DSs connected to this RS is ordered by serverId to use a consistent 2090 * ordering across all nodes in the topology. The serverIds which index in the 2091 * List are lower than "overloadingDSsNumber" will be evicted first. 2092 * <p> 2093 * This ordering is unfair since nodes with the lower serverIds will be 2094 * evicted more often than nodes with higher serverIds. However, it is a 2095 * consistent and reliable ordering applicable anywhere in the topology. 2096 */ 2097 private static boolean isServerOverloadingRS(int localServerId, 2098 ReplicationServerInfo currentRsInfo, int overloadingDSsNumber) 2099 { 2100 List<Integer> serversConnectedToCurrentRS = new ArrayList<>(currentRsInfo.getConnectedDSs()); 2101 Collections.sort(serversConnectedToCurrentRS); 2102 2103 final int idx = serversConnectedToCurrentRS.indexOf(localServerId); 2104 return idx != -1 && idx < overloadingDSsNumber; 2105 } 2106 2107 /** 2108 * Start the heartbeat monitor thread. 2109 */ 2110 private void startRSHeartBeatMonitoring(ConnectedRS rs) 2111 { 2112 final long heartbeatInterval = config.getHeartbeatInterval(); 2113 if (heartbeatInterval > 0) 2114 { 2115 heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(), 2116 getBaseDN().toString(), rs.session, heartbeatInterval); 2117 heartbeatMonitor.start(); 2118 } 2119 } 2120 2121 /** 2122 * Stop the heartbeat monitor thread. 2123 */ 2124 private synchronized void stopRSHeartBeatMonitoring() 2125 { 2126 if (heartbeatMonitor != null) 2127 { 2128 heartbeatMonitor.shutdown(); 2129 heartbeatMonitor = null; 2130 } 2131 } 2132 2133 /** 2134 * Restart the ReplicationBroker. 2135 * @param infiniteTry the socket which failed 2136 */ 2137 public void reStart(boolean infiniteTry) 2138 { 2139 reStart(connectedRS.get().session, infiniteTry); 2140 } 2141 2142 /** 2143 * Restart the ReplicationServer broker after a failure. 2144 * 2145 * @param failingSession the socket which failed 2146 * @param infiniteTry the socket which failed 2147 */ 2148 private void reStart(Session failingSession, boolean infiniteTry) 2149 { 2150 if (failingSession != null) 2151 { 2152 failingSession.close(); 2153 numLostConnections++; 2154 } 2155 2156 ConnectedRS rs = connectedRS.get(); 2157 if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS())) 2158 { 2159 rs = setConnectedRS(ConnectedRS.noConnectedRS()); 2160 } 2161 2162 while (true) 2163 { 2164 // Synchronize inside the loop in order to allow shutdown. 2165 synchronized (startStopLock) 2166 { 2167 if (rs.isConnected() || shutdown) 2168 { 2169 break; 2170 } 2171 2172 try 2173 { 2174 connectAsDataServer(); 2175 rs = connectedRS.get(); 2176 } 2177 catch (Exception e) 2178 { 2179 logger.error(NOTE_EXCEPTION_RESTARTING_SESSION, 2180 getBaseDN(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 2181 } 2182 2183 if (rs.isConnected() || !infiniteTry) 2184 { 2185 break; 2186 } 2187 } 2188 try 2189 { 2190 Thread.sleep(500); 2191 } 2192 catch (InterruptedException ignored) 2193 { 2194 // ignore 2195 } 2196 } 2197 2198 if (logger.isTraceEnabled()) 2199 { 2200 debugInfo("end restart : connected=" + rs.isConnected() + " with RS(" 2201 + rs.getServerId() + ") genId=" + getGenerationID()); 2202 } 2203 } 2204 2205 /** 2206 * Publish a message to the other servers. 2207 * @param msg the message to publish 2208 */ 2209 public void publish(ReplicationMsg msg) 2210 { 2211 publish(msg, false, true); 2212 } 2213 2214 /** 2215 * Publish a message to the other servers. 2216 * @param msg The message to publish. 2217 * @param retryOnFailure Whether reconnect should automatically be done. 2218 * @return Whether publish succeeded. 2219 */ 2220 boolean publish(ReplicationMsg msg, boolean retryOnFailure) 2221 { 2222 return publish(msg, false, retryOnFailure); 2223 } 2224 2225 /** 2226 * Publish a recovery message to the other servers. 2227 * @param msg the message to publish 2228 */ 2229 public void publishRecovery(ReplicationMsg msg) 2230 { 2231 publish(msg, true, true); 2232 } 2233 2234 /** 2235 * Publish a message to the other servers. 2236 * @param msg the message to publish 2237 * @param recoveryMsg the message is a recovery LocalizableMessage 2238 * @param retryOnFailure whether retry should be done on failure 2239 * @return whether the message was successfully sent. 2240 */ 2241 private boolean publish(ReplicationMsg msg, boolean recoveryMsg, 2242 boolean retryOnFailure) 2243 { 2244 boolean done = false; 2245 2246 while (!done && !shutdown) 2247 { 2248 if (connectionError) 2249 { 2250 /* 2251 It was not possible to connect to any replication server. 2252 Since the operation was already processed, we have no other 2253 choice than to return without sending the ReplicationMsg 2254 and relying on the resend procedure of the connect phase to 2255 fix the problem when we finally connect. 2256 */ 2257 2258 if (logger.isTraceEnabled()) 2259 { 2260 debugInfo("publish(): Publishing a message is not possible due to" 2261 + " existing connection error."); 2262 } 2263 2264 return false; 2265 } 2266 2267 try 2268 { 2269 /* 2270 save the session at the time when we acquire the 2271 sendwindow credit so that we can make sure later 2272 that the session did not change in between. 2273 This is necessary to make sure that we don't publish a message 2274 on a session with a credit that was acquired from a previous 2275 session. 2276 */ 2277 Session currentSession; 2278 Semaphore currentWindowSemaphore; 2279 synchronized (connectPhaseLock) 2280 { 2281 currentSession = connectedRS.get().session; 2282 currentWindowSemaphore = sendWindow; 2283 } 2284 2285 /* 2286 If the Replication domain has decided that there is a need to 2287 recover some changes then it is not allowed to send this 2288 change but it will be the responsibility of the recovery thread to 2289 do it. 2290 */ 2291 if (!recoveryMsg & connectRequiresRecovery) 2292 { 2293 return false; 2294 } 2295 2296 boolean credit; 2297 if (msg instanceof UpdateMsg) 2298 { 2299 /* 2300 Acquiring the window credit must be done outside of the 2301 connectPhaseLock because it can be blocking and we don't 2302 want to hold off reconnection in case the connection dropped. 2303 */ 2304 credit = 2305 currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS); 2306 } 2307 else 2308 { 2309 credit = true; 2310 } 2311 2312 if (credit) 2313 { 2314 synchronized (connectPhaseLock) 2315 { 2316 /* 2317 session may have been set to null in the connection phase 2318 when restarting the broker for example. 2319 Check the session. If it has changed, some disconnection or 2320 reconnection happened and we need to restart from scratch. 2321 */ 2322 final Session session = connectedRS.get().session; 2323 if (session != null && session == currentSession) 2324 { 2325 session.publish(msg); 2326 done = true; 2327 } 2328 } 2329 } 2330 if (!credit && currentWindowSemaphore.availablePermits() == 0) 2331 { 2332 synchronized (connectPhaseLock) 2333 { 2334 /* 2335 the window is still closed. 2336 Send a WindowProbeMsg message to wake up the receiver in case the 2337 window update message was lost somehow... 2338 then loop to check again if connection was closed. 2339 */ 2340 Session session = connectedRS.get().session; 2341 if (session != null) 2342 { 2343 session.publish(new WindowProbeMsg()); 2344 } 2345 } 2346 } 2347 } 2348 catch (IOException e) 2349 { 2350 if (logger.isTraceEnabled()) 2351 { 2352 debugInfo("publish(): IOException caught: " 2353 + stackTraceToSingleLineString(e)); 2354 } 2355 if (!retryOnFailure) 2356 { 2357 return false; 2358 } 2359 2360 // The receive threads should handle reconnection or 2361 // mark this broker in error. Just retry. 2362 synchronized (connectPhaseLock) 2363 { 2364 try 2365 { 2366 connectPhaseLock.wait(100); 2367 } 2368 catch (InterruptedException ignored) 2369 { 2370 if (logger.isTraceEnabled()) 2371 { 2372 debugInfo("publish(): InterruptedException caught 1: " 2373 + stackTraceToSingleLineString(ignored)); 2374 } 2375 } 2376 } 2377 } 2378 catch (InterruptedException ignored) 2379 { 2380 // just loop. 2381 if (logger.isTraceEnabled()) 2382 { 2383 debugInfo("publish(): InterruptedException caught 2: " 2384 + stackTraceToSingleLineString(ignored)); 2385 } 2386 } 2387 } 2388 return true; 2389 } 2390 2391 /** 2392 * Receive a message. 2393 * This method is not thread-safe and should either always be 2394 * called in a single thread or protected by a locking mechanism 2395 * before being called. This is a wrapper to the method with a boolean version 2396 * so that we do not have to modify existing tests. 2397 * 2398 * @return the received message 2399 * @throws SocketTimeoutException if the timeout set by setSoTimeout 2400 * has expired 2401 */ 2402 public ReplicationMsg receive() throws SocketTimeoutException 2403 { 2404 return receive(false, true, false); 2405 } 2406 2407 /** 2408 * Receive a message. 2409 * This method is not thread-safe and should either always be 2410 * called in a single thread or protected by a locking mechanism 2411 * before being called. 2412 * 2413 * @param reconnectToTheBestRS Whether broker will automatically switch 2414 * to the best suitable RS. 2415 * @param reconnectOnFailure Whether broker will automatically reconnect 2416 * on failure. 2417 * @param returnOnTopoChange Whether broker should return TopologyMsg 2418 * received. 2419 * @return the received message 2420 * 2421 * @throws SocketTimeoutException if the timeout set by setSoTimeout 2422 * has expired 2423 */ 2424 ReplicationMsg receive(boolean reconnectToTheBestRS, 2425 boolean reconnectOnFailure, boolean returnOnTopoChange) 2426 throws SocketTimeoutException 2427 { 2428 while (!shutdown) 2429 { 2430 ConnectedRS rs = connectedRS.get(); 2431 if (reconnectOnFailure && !rs.isConnected()) 2432 { 2433 // infinite try to reconnect 2434 reStart(null, true); 2435 continue; 2436 } 2437 2438 // Save session information for later in case we need it for log messages 2439 // after the session has been closed and/or failed. 2440 if (rs.session == null) 2441 { 2442 // Must be shutting down. 2443 break; 2444 } 2445 2446 final int serverId = getServerId(); 2447 final DN baseDN = getBaseDN(); 2448 final int previousRsServerID = rs.getServerId(); 2449 try 2450 { 2451 ReplicationMsg msg = rs.session.receive(); 2452 if (msg instanceof UpdateMsg) 2453 { 2454 synchronized (this) 2455 { 2456 rcvWindow--; 2457 } 2458 } 2459 if (msg instanceof WindowMsg) 2460 { 2461 final WindowMsg windowMsg = (WindowMsg) msg; 2462 sendWindow.release(windowMsg.getNumAck()); 2463 } 2464 else if (msg instanceof TopologyMsg) 2465 { 2466 final TopologyMsg topoMsg = (TopologyMsg) msg; 2467 receiveTopo(topoMsg, getRsServerId()); 2468 if (reconnectToTheBestRS) 2469 { 2470 // Reset wait time before next computation of best server 2471 mustRunBestServerCheckingAlgorithm = 0; 2472 } 2473 2474 // Caller wants to check what's changed 2475 if (returnOnTopoChange) 2476 { 2477 return msg; 2478 } 2479 } 2480 else if (msg instanceof StopMsg) 2481 { 2482 // RS performs a proper disconnection 2483 logger.warn(WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED, previousRsServerID, rs.replicationServer, 2484 serverId, baseDN); 2485 2486 // Try to find a suitable RS 2487 reStart(rs.session, true); 2488 } 2489 else if (msg instanceof MonitorMsg) 2490 { 2491 // This is the response to a MonitorRequest that was sent earlier or 2492 // the regular message of the monitoring publisher of the RS. 2493 MonitorMsg monitorMsg = (MonitorMsg) msg; 2494 2495 // Extract and store replicas ServerStates 2496 final Map<Integer, ServerState> newReplicaStates = new HashMap<>(); 2497 for (int srvId : toIterable(monitorMsg.ldapIterator())) 2498 { 2499 newReplicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId)); 2500 } 2501 replicaStates = newReplicaStates; 2502 2503 // Notify the sender that the response was received. 2504 synchronized (monitorResponse) 2505 { 2506 monitorResponse.set(true); 2507 monitorResponse.notify(); 2508 } 2509 2510 // Update the replication servers ServerStates with new received info 2511 Map<Integer, ReplicationServerInfo> rsInfos = topology.get().rsInfos; 2512 for (int srvId : toIterable(monitorMsg.rsIterator())) 2513 { 2514 final ReplicationServerInfo rsInfo = rsInfos.get(srvId); 2515 if (rsInfo != null) 2516 { 2517 rsInfo.update(monitorMsg.getRSServerState(srvId)); 2518 } 2519 } 2520 2521 /* 2522 Now if it is allowed, compute the best replication server to see if 2523 it is still the one we are currently connected to. If not, 2524 disconnect properly and let the connection algorithm re-connect to 2525 best replication server 2526 */ 2527 if (reconnectToTheBestRS) 2528 { 2529 mustRunBestServerCheckingAlgorithm++; 2530 if (mustRunBestServerCheckingAlgorithm == 2) 2531 { 2532 // Stable topology (no topo msg since few seconds): proceed with 2533 // best server checking. 2534 final RSEvaluations evals = computeBestReplicationServer( 2535 false, previousRsServerID, state, 2536 rsInfos, serverId, getGroupId(), getGenerationID()); 2537 final ReplicationServerInfo bestServerInfo = evals.getBestRS(); 2538 if (previousRsServerID != -1 2539 && (bestServerInfo == null 2540 || bestServerInfo.getServerId() != previousRsServerID)) 2541 { 2542 // The best replication server is no more the one we are 2543 // currently using. Disconnect properly then reconnect. 2544 LocalizableMessage message; 2545 if (bestServerInfo == null) 2546 { 2547 message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get( 2548 serverId, previousRsServerID, rs.replicationServer, baseDN); 2549 } 2550 else 2551 { 2552 final int bestRsServerId = bestServerInfo.getServerId(); 2553 message = NOTE_NEW_BEST_REPLICATION_SERVER.get( 2554 serverId, previousRsServerID, rs.replicationServer, bestRsServerId, baseDN, 2555 evals.getEvaluation(previousRsServerID), 2556 evals.getEvaluation(bestRsServerId)); 2557 } 2558 logger.info(message); 2559 if (logger.isTraceEnabled()) 2560 { 2561 debugInfo("best replication servers evaluation results: " + evals); 2562 } 2563 reStart(true); 2564 } 2565 2566 // Reset wait time before next computation of best server 2567 mustRunBestServerCheckingAlgorithm = 0; 2568 } 2569 } 2570 } 2571 else 2572 { 2573 return msg; 2574 } 2575 } 2576 catch (SocketTimeoutException e) 2577 { 2578 throw e; 2579 } 2580 catch (Exception e) 2581 { 2582 logger.traceException(e); 2583 2584 if (!shutdown) 2585 { 2586 if (rs.session == null || !rs.session.closeInitiated()) 2587 { 2588 // We did not initiate the close on our side, log an error message. 2589 logger.error(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED, 2590 serverId, baseDN, previousRsServerID, rs.replicationServer); 2591 } 2592 2593 if (!reconnectOnFailure) 2594 { 2595 break; // does not seem necessary to explicitly disconnect .. 2596 } 2597 2598 reStart(rs.session, true); 2599 } 2600 } 2601 } // while !shutdown 2602 return null; 2603 } 2604 2605 /** 2606 * Gets the States of all the Replicas currently in the Topology. When this 2607 * method is called, a Monitoring message will be sent to the Replication 2608 * Server to which this domain is currently connected so that it computes a 2609 * table containing information about all Directory Servers in the topology. 2610 * This Computation involves communications will all the servers currently 2611 * connected and 2612 * 2613 * @return The States of all Replicas in the topology (except us) 2614 */ 2615 public Map<Integer, ServerState> getReplicaStates() 2616 { 2617 monitorResponse.set(false); 2618 2619 // publish Monitor Request LocalizableMessage to the Replication Server 2620 publish(new MonitorRequestMsg(getServerId(), getRsServerId())); 2621 2622 // wait for Response up to 10 seconds. 2623 try 2624 { 2625 synchronized (monitorResponse) 2626 { 2627 if (!monitorResponse.get()) 2628 { 2629 monitorResponse.wait(10000); 2630 } 2631 } 2632 } catch (InterruptedException e) 2633 { 2634 Thread.currentThread().interrupt(); 2635 } 2636 return replicaStates; 2637 } 2638 2639 /** 2640 * This method allows to do the necessary computing for the window 2641 * management after treatment by the worker threads. 2642 * 2643 * This should be called once the replay thread have done their job 2644 * and the window can be open again. 2645 */ 2646 public synchronized void updateWindowAfterReplay() 2647 { 2648 try 2649 { 2650 updateDoneCount++; 2651 final Session session = connectedRS.get().session; 2652 if (updateDoneCount >= halfRcvWindow && session != null) 2653 { 2654 session.publish(new WindowMsg(updateDoneCount)); 2655 rcvWindow += updateDoneCount; 2656 updateDoneCount = 0; 2657 } 2658 } catch (IOException e) 2659 { 2660 // Any error on the socket will be handled by the thread calling receive() 2661 // just ignore. 2662 } 2663 } 2664 2665 /** Stop the server. */ 2666 public void stop() 2667 { 2668 if (logger.isTraceEnabled() && !shutdown) 2669 { 2670 debugInfo("is stopping and will close the connection to RS(" + getRsServerId() + ")"); 2671 } 2672 2673 synchronized (startStopLock) 2674 { 2675 if (shutdown) 2676 { 2677 return; 2678 } 2679 domain.publishReplicaOfflineMsg(); 2680 shutdown = true; 2681 setConnectedRS(ConnectedRS.stopped()); 2682 stopRSHeartBeatMonitoring(); 2683 stopChangeTimeHeartBeatPublishing(); 2684 deregisterReplicationMonitor(); 2685 } 2686 } 2687 2688 /** 2689 * Set a timeout value. 2690 * With this option set to a non-zero value, calls to the receive() method 2691 * block for only this amount of time after which a 2692 * java.net.SocketTimeoutException is raised. 2693 * The Broker is valid and usable even after such an Exception is raised. 2694 * 2695 * @param timeout the specified timeout, in milliseconds. 2696 * @throws SocketException if there is an error in the underlying protocol, 2697 * such as a TCP error. 2698 */ 2699 public void setSoTimeout(int timeout) throws SocketException 2700 { 2701 this.timeout = timeout; 2702 final Session session = connectedRS.get().session; 2703 if (session != null) 2704 { 2705 session.setSoTimeout(timeout); 2706 } 2707 } 2708 2709 /** 2710 * Get the name of the replicationServer to which this broker is currently 2711 * connected. 2712 * 2713 * @return the name of the replicationServer to which this domain 2714 * is currently connected. 2715 */ 2716 public String getReplicationServer() 2717 { 2718 return connectedRS.get().replicationServer; 2719 } 2720 2721 /** 2722 * Get the maximum receive window size. 2723 * 2724 * @return The maximum receive window size. 2725 */ 2726 public int getMaxRcvWindow() 2727 { 2728 return config.getWindowSize(); 2729 } 2730 2731 /** 2732 * Get the current receive window size. 2733 * 2734 * @return The current receive window size. 2735 */ 2736 public int getCurrentRcvWindow() 2737 { 2738 return rcvWindow; 2739 } 2740 2741 /** 2742 * Get the maximum send window size. 2743 * 2744 * @return The maximum send window size. 2745 */ 2746 public int getMaxSendWindow() 2747 { 2748 return maxSendWindow; 2749 } 2750 2751 /** 2752 * Get the current send window size. 2753 * 2754 * @return The current send window size. 2755 */ 2756 public int getCurrentSendWindow() 2757 { 2758 if (isConnected()) 2759 { 2760 return sendWindow.availablePermits(); 2761 } 2762 return 0; 2763 } 2764 2765 /** 2766 * Get the number of times the connection was lost. 2767 * @return The number of times the connection was lost. 2768 */ 2769 public int getNumLostConnections() 2770 { 2771 return numLostConnections; 2772 } 2773 2774 /** 2775 * Change some configuration parameters. 2776 * 2777 * @param newConfig The new config to use. 2778 * @return A boolean indicating if the changes 2779 * requires to restart the service. 2780 */ 2781 boolean changeConfig(ReplicationDomainCfg newConfig) 2782 { 2783 // These parameters needs to be renegotiated with the ReplicationServer 2784 // so if they have changed, that requires restarting the session with 2785 // the ReplicationServer. 2786 // A new session is necessary only when information regarding 2787 // the connection is modified 2788 boolean needToRestartSession = 2789 !newConfig.getReplicationServer().equals(config.getReplicationServer()) 2790 || newConfig.getWindowSize() != config.getWindowSize() 2791 || newConfig.getHeartbeatInterval() != config.getHeartbeatInterval() 2792 || newConfig.getGroupId() != config.getGroupId(); 2793 2794 this.config = newConfig; 2795 this.rcvWindow = newConfig.getWindowSize(); 2796 this.halfRcvWindow = this.rcvWindow / 2; 2797 2798 return needToRestartSession; 2799 } 2800 2801 /** 2802 * Get the version of the replication protocol. 2803 * @return The version of the replication protocol. 2804 */ 2805 public short getProtocolVersion() 2806 { 2807 final Session session = connectedRS.get().session; 2808 if (session != null) 2809 { 2810 return session.getProtocolVersion(); 2811 } 2812 return ProtocolVersion.getCurrentVersion(); 2813 } 2814 2815 /** 2816 * Check if the broker is connected to a ReplicationServer and therefore 2817 * ready to received and send Replication Messages. 2818 * 2819 * @return true if the server is connected, false if not. 2820 */ 2821 public boolean isConnected() 2822 { 2823 return connectedRS.get().isConnected(); 2824 } 2825 2826 /** 2827 * Determine whether the connection to the replication server is encrypted. 2828 * @return true if the connection is encrypted, false otherwise. 2829 */ 2830 public boolean isSessionEncrypted() 2831 { 2832 final Session session = connectedRS.get().session; 2833 return session != null ? session.isEncrypted() : false; 2834 } 2835 2836 /** 2837 * Signals the RS we just entered a new status. 2838 * @param newStatus The status the local DS just entered 2839 */ 2840 public void signalStatusChange(ServerStatus newStatus) 2841 { 2842 try 2843 { 2844 connectedRS.get().session.publish( 2845 new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus)); 2846 } catch (IOException ex) 2847 { 2848 logger.error(ERR_EXCEPTION_SENDING_CS, getBaseDN(), getServerId(), 2849 ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex)); 2850 } 2851 } 2852 2853 /** 2854 * Gets the info for DSs in the topology (except us). 2855 * @return The info for DSs in the topology (except us) 2856 */ 2857 public Map<Integer, DSInfo> getReplicaInfos() 2858 { 2859 return topology.get().replicaInfos; 2860 } 2861 2862 /** 2863 * Gets the info for RSs in the topology (except the one we are connected 2864 * to). 2865 * @return The info for RSs in the topology (except the one we are connected 2866 * to) 2867 */ 2868 public List<RSInfo> getRsInfos() 2869 { 2870 return toRSInfos(topology.get().rsInfos); 2871 } 2872 2873 private List<RSInfo> toRSInfos(Map<Integer, ReplicationServerInfo> rsInfos) 2874 { 2875 final List<RSInfo> result = new ArrayList<>(); 2876 for (ReplicationServerInfo rsInfo : rsInfos.values()) 2877 { 2878 result.add(rsInfo.toRSInfo()); 2879 } 2880 return result; 2881 } 2882 2883 /** 2884 * Processes an incoming TopologyMsg. 2885 * Updates the structures for the local view of the topology. 2886 * 2887 * @param topoMsg 2888 * The topology information received from RS. 2889 * @param rsServerId 2890 * the serverId to use for the connectedDS 2891 */ 2892 private void receiveTopo(TopologyMsg topoMsg, int rsServerId) 2893 { 2894 final Topology newTopo = computeNewTopology(topoMsg, rsServerId); 2895 for (DSInfo dsInfo : newTopo.replicaInfos.values()) 2896 { 2897 domain.setEclIncludes(dsInfo.getDsId(), dsInfo.getEclIncludes(), dsInfo 2898 .getEclIncludesForDeletes()); 2899 } 2900 } 2901 2902 private Topology computeNewTopology(TopologyMsg topoMsg, int rsServerId) 2903 { 2904 Topology oldTopo; 2905 Topology newTopo; 2906 do 2907 { 2908 oldTopo = topology.get(); 2909 newTopo = new Topology(topoMsg, getServerId(), rsServerId, 2910 getReplicationServerUrls(), oldTopo.rsInfos); 2911 } 2912 while (!topology.compareAndSet(oldTopo, newTopo)); 2913 2914 if (logger.isTraceEnabled()) 2915 { 2916 final StringBuilder sb = topologyChange(rsServerId, oldTopo, newTopo); 2917 sb.append(" received TopologyMsg=").append(topoMsg); 2918 debugInfo(sb); 2919 } 2920 return newTopo; 2921 } 2922 2923 /** 2924 * Contains the last known state of the replication topology. 2925 */ 2926 static final class Topology 2927 { 2928 2929 /** 2930 * The RS's serverId that this DS was connected to when this topology state 2931 * was computed. 2932 */ 2933 private final int rsServerId; 2934 /** 2935 * Info for other DSs. 2936 * <p> 2937 * Warning: does not contain info for us (for our server id) 2938 */ 2939 final Map<Integer, DSInfo> replicaInfos; 2940 /** 2941 * The map of replication server info initialized at connection time and 2942 * regularly updated. This is used to decide to which best suitable 2943 * replication server one wants to connect. Key: replication server id 2944 * Value: replication server info for the matching replication server id 2945 */ 2946 final Map<Integer, ReplicationServerInfo> rsInfos; 2947 2948 private Topology() 2949 { 2950 this.rsServerId = -1; 2951 this.replicaInfos = Collections.emptyMap(); 2952 this.rsInfos = Collections.emptyMap(); 2953 } 2954 2955 /** 2956 * Constructor to use when only the RSInfos need to be recomputed. 2957 * 2958 * @param dsInfosToKeep 2959 * the DSInfos that will be stored as is 2960 * @param newRSInfos 2961 * the new RSInfos from which to compute the new topology 2962 * @param dsServerId 2963 * the DS serverId 2964 * @param rsServerId 2965 * the current connected RS serverId 2966 * @param configuredReplicationServerUrls 2967 * the configured replication server URLs 2968 * @param previousRsInfos 2969 * the RSInfos computed in the previous Topology object 2970 */ 2971 Topology(Map<Integer, DSInfo> dsInfosToKeep, List<RSInfo> newRSInfos, 2972 int dsServerId, int rsServerId, 2973 Set<String> configuredReplicationServerUrls, 2974 Map<Integer, ReplicationServerInfo> previousRsInfos) 2975 { 2976 this.rsServerId = rsServerId; 2977 this.replicaInfos = dsInfosToKeep == null 2978 ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep; 2979 this.rsInfos = computeRSInfos(dsServerId, newRSInfos, 2980 previousRsInfos, configuredReplicationServerUrls); 2981 } 2982 2983 /** 2984 * Constructor to use when a new TopologyMsg has been received. 2985 * 2986 * @param topoMsg 2987 * the topology message containing the new DSInfos and RSInfos from 2988 * which to compute the new topology 2989 * @param dsServerId 2990 * the DS serverId 2991 * @param rsServerId 2992 * the current connected RS serverId 2993 * @param configuredReplicationServerUrls 2994 * the configured replication server URLs 2995 * @param previousRsInfos 2996 * the RSInfos computed in the previous Topology object 2997 */ 2998 Topology(TopologyMsg topoMsg, int dsServerId, 2999 int rsServerId, Set<String> configuredReplicationServerUrls, 3000 Map<Integer, ReplicationServerInfo> previousRsInfos) 3001 { 3002 this.rsServerId = rsServerId; 3003 this.replicaInfos = removeThisDs(topoMsg.getReplicaInfos(), dsServerId); 3004 this.rsInfos = computeRSInfos(dsServerId, topoMsg.getRsInfos(), 3005 previousRsInfos, configuredReplicationServerUrls); 3006 } 3007 3008 private Map<Integer, DSInfo> removeThisDs(Map<Integer, DSInfo> dsInfos, 3009 int dsServerId) 3010 { 3011 final Map<Integer, DSInfo> copy = new HashMap<>(dsInfos); 3012 copy.remove(dsServerId); 3013 return Collections.unmodifiableMap(copy); 3014 } 3015 3016 private Map<Integer, ReplicationServerInfo> computeRSInfos( 3017 int dsServerId, List<RSInfo> newRsInfos, 3018 Map<Integer, ReplicationServerInfo> previousRsInfos, 3019 Set<String> configuredReplicationServerUrls) 3020 { 3021 final Map<Integer, ReplicationServerInfo> results = new HashMap<>(previousRsInfos); 3022 3023 // Update replication server info list with the received topology info 3024 final Set<Integer> rssToKeep = new HashSet<>(); 3025 for (RSInfo newRSInfo : newRsInfos) 3026 { 3027 final int rsId = newRSInfo.getId(); 3028 rssToKeep.add(rsId); // Mark this server as still existing 3029 Set<Integer> connectedDSs = 3030 computeDSsConnectedTo(rsId, dsServerId); 3031 ReplicationServerInfo rsInfo = results.get(rsId); 3032 if (rsInfo == null) 3033 { 3034 // New replication server, create info for it add it to the list 3035 rsInfo = new ReplicationServerInfo(newRSInfo, connectedDSs); 3036 setLocallyConfiguredFlag(rsInfo, configuredReplicationServerUrls); 3037 results.put(rsId, rsInfo); 3038 } 3039 else 3040 { 3041 // Update the existing info for the replication server 3042 rsInfo.update(newRSInfo, connectedDSs); 3043 } 3044 } 3045 3046 // Remove any replication server that may have disappeared from the 3047 // topology 3048 results.keySet().retainAll(rssToKeep); 3049 3050 return Collections.unmodifiableMap(results); 3051 } 3052 3053 /** Computes the list of DSs connected to a particular RS. */ 3054 private Set<Integer> computeDSsConnectedTo(int rsId, int dsServerId) 3055 { 3056 final Set<Integer> connectedDSs = new HashSet<>(); 3057 if (rsServerId == rsId) 3058 { 3059 /* 3060 * If we are computing connected DSs for the RS we are connected to, we 3061 * should count the local DS as the DSInfo of the local DS is not sent 3062 * by the replication server in the topology message. We must count 3063 * ourselves as a connected server. 3064 */ 3065 connectedDSs.add(dsServerId); 3066 } 3067 3068 for (DSInfo dsInfo : replicaInfos.values()) 3069 { 3070 if (dsInfo.getRsId() == rsId) 3071 { 3072 connectedDSs.add(dsInfo.getDsId()); 3073 } 3074 } 3075 3076 return connectedDSs; 3077 } 3078 3079 /** 3080 * Sets the locally configured flag for the passed ReplicationServerInfo 3081 * object, analyzing the local configuration. 3082 * 3083 * @param rsInfo 3084 * the Replication server to check and update 3085 * @param configuredReplicationServerUrls 3086 */ 3087 private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo, 3088 Set<String> configuredReplicationServerUrls) 3089 { 3090 // Determine if the passed ReplicationServerInfo has a URL that is present 3091 // in the locally configured replication servers 3092 String rsUrl = rsInfo.getServerURL(); 3093 if (rsUrl == null) 3094 { 3095 // The ReplicationServerInfo has been generated from a server with 3096 // no URL in TopologyMsg (i.e: with replication protocol version < 4): 3097 // ignore this server as we do not know how to connect to it 3098 rsInfo.setLocallyConfigured(false); 3099 return; 3100 } 3101 for (String serverUrl : configuredReplicationServerUrls) 3102 { 3103 if (isSameReplicationServerUrl(serverUrl, rsUrl)) 3104 { 3105 // This RS is locally configured, mark this 3106 rsInfo.setLocallyConfigured(true); 3107 rsInfo.setServerURL(serverUrl); 3108 return; 3109 } 3110 } 3111 rsInfo.setLocallyConfigured(false); 3112 } 3113 3114 /** {@inheritDoc} */ 3115 @Override 3116 public boolean equals(Object obj) 3117 { 3118 if (this == obj) 3119 { 3120 return true; 3121 } 3122 if (obj == null || getClass() != obj.getClass()) 3123 { 3124 return false; 3125 } 3126 final Topology other = (Topology) obj; 3127 return rsServerId == other.rsServerId 3128 && Objects.equals(replicaInfos, other.replicaInfos) 3129 && Objects.equals(rsInfos, other.rsInfos) 3130 && urlsEqual1(replicaInfos, other.replicaInfos) 3131 && urlsEqual2(rsInfos, other.rsInfos); 3132 } 3133 3134 private boolean urlsEqual1(Map<Integer, DSInfo> replicaInfos1, 3135 Map<Integer, DSInfo> replicaInfos2) 3136 { 3137 for (Entry<Integer, DSInfo> entry : replicaInfos1.entrySet()) 3138 { 3139 DSInfo dsInfo = replicaInfos2.get(entry.getKey()); 3140 if (!Objects.equals(entry.getValue().getDsUrl(), dsInfo.getDsUrl())) 3141 { 3142 return false; 3143 } 3144 } 3145 return true; 3146 } 3147 3148 private boolean urlsEqual2(Map<Integer, ReplicationServerInfo> rsInfos1, 3149 Map<Integer, ReplicationServerInfo> rsInfos2) 3150 { 3151 for (Entry<Integer, ReplicationServerInfo> entry : rsInfos1.entrySet()) 3152 { 3153 ReplicationServerInfo rsInfo = rsInfos2.get(entry.getKey()); 3154 if (!Objects.equals(entry.getValue().getServerURL(), rsInfo.getServerURL())) 3155 { 3156 return false; 3157 } 3158 } 3159 return true; 3160 } 3161 3162 /** {@inheritDoc} */ 3163 @Override 3164 public int hashCode() 3165 { 3166 final int prime = 31; 3167 int result = 1; 3168 result = prime * result + rsServerId; 3169 result = prime * result 3170 + (replicaInfos == null ? 0 : replicaInfos.hashCode()); 3171 result = prime * result + (rsInfos == null ? 0 : rsInfos.hashCode()); 3172 return result; 3173 } 3174 3175 /** {@inheritDoc} */ 3176 @Override 3177 public String toString() 3178 { 3179 return getClass().getSimpleName() 3180 + " rsServerId=" + rsServerId 3181 + ", replicaInfos=" + replicaInfos.values() 3182 + ", rsInfos=" + rsInfos.values(); 3183 } 3184 } 3185 3186 /** 3187 * Check if the broker could not find any Replication Server and therefore 3188 * connection attempt failed. 3189 * 3190 * @return true if the server could not connect to any Replication Server. 3191 */ 3192 boolean hasConnectionError() 3193 { 3194 return connectionError; 3195 } 3196 3197 /** 3198 * Starts publishing to the RS the current timestamp used in this server. 3199 */ 3200 private void startChangeTimeHeartBeatPublishing(ConnectedRS rs) 3201 { 3202 // Start a CSN heartbeat thread. 3203 long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval(); 3204 if (changeTimeHeartbeatInterval > 0) 3205 { 3206 final String threadName = "Replica DS(" + getServerId() 3207 + ") change time heartbeat publisher for domain \"" + getBaseDN() 3208 + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer; 3209 3210 ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( 3211 threadName, rs.session, changeTimeHeartbeatInterval, getServerId()); 3212 ctHeartbeatPublisherThread.start(); 3213 } 3214 else 3215 { 3216 if (logger.isTraceEnabled()) 3217 { 3218 debugInfo("is not configured to send CSN heartbeat interval"); 3219 } 3220 } 3221 } 3222 3223 /** 3224 * Stops publishing to the RS the current timestamp used in this server. 3225 */ 3226 private synchronized void stopChangeTimeHeartBeatPublishing() 3227 { 3228 if (ctHeartbeatPublisherThread != null) 3229 { 3230 ctHeartbeatPublisherThread.shutdown(); 3231 ctHeartbeatPublisherThread = null; 3232 } 3233 } 3234 3235 /** 3236 * Set the connectRequiresRecovery to the provided value. 3237 * This flag is used to indicate if a recovery of Update is necessary 3238 * after a reconnection to a RS. 3239 * It is the responsibility of the ReplicationDomain to set it during the 3240 * sessionInitiated phase. 3241 * 3242 * @param b the new value of the connectRequiresRecovery. 3243 */ 3244 public void setRecoveryRequired(boolean b) 3245 { 3246 connectRequiresRecovery = b; 3247 } 3248 3249 /** 3250 * Returns whether the broker is shutting down. 3251 * @return whether the broker is shutting down. 3252 */ 3253 boolean shuttingDown() 3254 { 3255 return shutdown; 3256 } 3257 3258 /** 3259 * Returns the local address of this replication domain, or the empty string 3260 * if it is not yet connected. 3261 * 3262 * @return The local address. 3263 */ 3264 String getLocalUrl() 3265 { 3266 final Session session = connectedRS.get().session; 3267 return session != null ? session.getLocalUrl() : ""; 3268 } 3269 3270 /** 3271 * Returns the replication monitor instance name associated with this broker. 3272 * 3273 * @return The replication monitor instance name. 3274 */ 3275 String getReplicationMonitorInstanceName() 3276 { 3277 // Only invoked by replication domain so always non-null. 3278 return monitor.getMonitorInstanceName(); 3279 } 3280 3281 private ConnectedRS setConnectedRS(final ConnectedRS newRS) 3282 { 3283 final ConnectedRS oldRS = connectedRS.getAndSet(newRS); 3284 if (!oldRS.equals(newRS) && oldRS.session != null) 3285 { 3286 // monitor name is changing, deregister before registering again 3287 deregisterReplicationMonitor(); 3288 oldRS.session.close(); 3289 registerReplicationMonitor(); 3290 } 3291 return newRS; 3292 } 3293 3294 /** 3295 * Must be invoked each time the session changes because, the monitor name is 3296 * dynamically created with the session name, while monitor registration is 3297 * static. 3298 * 3299 * @see #monitor 3300 */ 3301 private void registerReplicationMonitor() 3302 { 3303 // The monitor should not be registered if this is a unit test 3304 // because the replication domain is null. 3305 if (monitor != null) 3306 { 3307 DirectoryServer.registerMonitorProvider(monitor); 3308 } 3309 } 3310 3311 private void deregisterReplicationMonitor() 3312 { 3313 // The monitor should not be deregistered if this is a unit test 3314 // because the replication domain is null. 3315 if (monitor != null) 3316 { 3317 DirectoryServer.deregisterMonitorProvider(monitor); 3318 } 3319 } 3320 3321 /** {@inheritDoc} */ 3322 @Override 3323 public String toString() 3324 { 3325 final StringBuilder sb = new StringBuilder(); 3326 sb.append(getClass().getSimpleName()) 3327 .append(" \"").append(getBaseDN()).append(" ") 3328 .append(getServerId()).append("\",") 3329 .append(" groupId=").append(getGroupId()) 3330 .append(", genId=").append(getGenerationID()) 3331 .append(", "); 3332 connectedRS.get().toString(sb); 3333 return sb.toString(); 3334 } 3335 3336 private void debugInfo(CharSequence message) 3337 { 3338 logger.trace(getClass().getSimpleName() + " for baseDN=" + getBaseDN() 3339 + " and serverId=" + getServerId() + ": " + message); 3340 } 3341}