001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2008-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.service; 028 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.replication.common.AssuredMode.*; 031import static org.opends.server.replication.common.StatusMachine.*; 032import static org.opends.server.util.CollectionUtils.*; 033 034import java.io.BufferedOutputStream; 035import java.io.IOException; 036import java.io.InputStream; 037import java.io.OutputStream; 038import java.net.SocketTimeoutException; 039import java.util.*; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.TimeoutException; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.concurrent.atomic.AtomicReference; 044 045import org.forgerock.i18n.LocalizableMessage; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.config.server.ConfigException; 048import org.forgerock.opendj.ldap.ResultCode; 049import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType; 050import org.opends.server.admin.std.server.ReplicationDomainCfg; 051import org.opends.server.api.DirectoryThread; 052import org.opends.server.backends.task.Task; 053import org.opends.server.replication.common.*; 054import org.opends.server.replication.protocol.*; 055import org.opends.server.tasks.InitializeTargetTask; 056import org.opends.server.tasks.InitializeTask; 057import org.opends.server.types.Attribute; 058import org.opends.server.types.DN; 059import org.opends.server.types.DirectoryException; 060 061/** 062 * This class should be used as a base for Replication implementations. 063 * <p> 064 * It is intended that developer in need of a replication mechanism 065 * subclass this class with their own implementation. 066 * <p> 067 * The startup phase of the ReplicationDomain subclass, 068 * should read the list of replication servers from the configuration, 069 * instantiate a {@link ServerState} then start the publish service 070 * by calling {@link #startPublishService()}. 071 * At this point it can start calling the {@link #publish(UpdateMsg)} 072 * method if needed. 073 * <p> 074 * When the startup phase reach the point when the subclass is ready 075 * to handle updates the Replication Domain implementation should call the 076 * {@link #startListenService()} method. 077 * At this point a Listener thread is created on the Replication Service 078 * and which can start receiving updates. 079 * <p> 080 * When updates are received the Replication Service calls the 081 * {@link #processUpdate(UpdateMsg)} method. 082 * ReplicationDomain implementation should implement the appropriate code 083 * for replaying the update on the local repository. 084 * When fully done the subclass must call the 085 * {@link #processUpdateDone(UpdateMsg, String)} method. 086 * This allows to process the update asynchronously if necessary. 087 * 088 * <p> 089 * To propagate changes to other replica, a ReplicationDomain implementation 090 * must use the {@link #publish(UpdateMsg)} method. 091 * <p> 092 * If the Full Initialization process is needed then implementation 093 * for {@code importBackend(InputStream)} and 094 * {@code exportBackend(OutputStream)} must be 095 * provided. 096 * <p> 097 * Full Initialization of a replica can be triggered by LDAP clients 098 * by creating InitializeTasks or InitializeTargetTask. 099 * Full initialization can also be triggered from the ReplicationDomain 100 * implementation using methods {@link #initializeRemote(int, Task)} 101 * or {@link #initializeFromRemote(int, Task)}. 102 * <p> 103 * At shutdown time, the {@link #disableService()} method should be called to 104 * cleanly stop the replication service. 105 */ 106public abstract class ReplicationDomain 107{ 108 109 /** 110 * Contains all the attributes included for the ECL (External Changelog). 111 */ 112 // @Immutable 113 private static final class ECLIncludes 114 { 115 116 final Map<Integer, Set<String>> includedAttrsByServer; 117 final Set<String> includedAttrsAllServers; 118 119 final Map<Integer, Set<String>> includedAttrsForDeletesByServer; 120 final Set<String> includedAttrsForDeletesAllServers; 121 122 private ECLIncludes( 123 Map<Integer, Set<String>> includedAttrsByServer, 124 Set<String> includedAttrsAllServers, 125 Map<Integer, Set<String>> includedAttrsForDeletesByServer, 126 Set<String> includedAttrsForDeletesAllServers) 127 { 128 this.includedAttrsByServer = includedAttrsByServer; 129 this.includedAttrsAllServers = includedAttrsAllServers; 130 131 this.includedAttrsForDeletesByServer = includedAttrsForDeletesByServer; 132 this.includedAttrsForDeletesAllServers =includedAttrsForDeletesAllServers; 133 } 134 135 @SuppressWarnings("unchecked") 136 public ECLIncludes() 137 { 138 this(Collections.EMPTY_MAP, Collections.EMPTY_SET, Collections.EMPTY_MAP, 139 Collections.EMPTY_SET); 140 } 141 142 /** 143 * Add attributes to be included in the ECL. 144 * 145 * @param serverId 146 * Server where these attributes are configured. 147 * @param includeAttributes 148 * Attributes to be included with all change records, may include 149 * wild-cards. 150 * @param includeAttributesForDeletes 151 * Additional attributes to be included with delete change records, 152 * may include wild-cards. 153 * @return a new {@link ECLIncludes} object if included attributes have 154 * changed, or the current object otherwise. 155 */ 156 public ECLIncludes addIncludedAttributes(int serverId, 157 Set<String> includeAttributes, Set<String> includeAttributesForDeletes) 158 { 159 boolean configurationChanged = false; 160 161 Set<String> s1 = new HashSet<>(includeAttributes); 162 163 // Combine all+delete attributes. 164 Set<String> s2 = new HashSet<>(s1); 165 s2.addAll(includeAttributesForDeletes); 166 167 Map<Integer,Set<String>> eclIncludesByServer = this.includedAttrsByServer; 168 if (!s1.equals(this.includedAttrsByServer.get(serverId))) 169 { 170 configurationChanged = true; 171 eclIncludesByServer = new HashMap<>(this.includedAttrsByServer); 172 eclIncludesByServer.put(serverId, Collections.unmodifiableSet(s1)); 173 } 174 175 Map<Integer, Set<String>> eclIncludesForDeletesByServer = this.includedAttrsForDeletesByServer; 176 if (!s2.equals(this.includedAttrsForDeletesByServer.get(serverId))) 177 { 178 configurationChanged = true; 179 eclIncludesForDeletesByServer = new HashMap<>(this.includedAttrsForDeletesByServer); 180 eclIncludesForDeletesByServer.put(serverId, Collections.unmodifiableSet(s2)); 181 } 182 183 if (!configurationChanged) 184 { 185 return this; 186 } 187 188 // and rebuild the global list to be ready for usage 189 Set<String> eclIncludesAllServer = new HashSet<>(); 190 for (Set<String> attributes : eclIncludesByServer.values()) 191 { 192 eclIncludesAllServer.addAll(attributes); 193 } 194 195 Set<String> eclIncludesForDeletesAllServer = new HashSet<>(); 196 for (Set<String> attributes : eclIncludesForDeletesByServer.values()) 197 { 198 eclIncludesForDeletesAllServer.addAll(attributes); 199 } 200 return new ECLIncludes(eclIncludesByServer, 201 Collections.unmodifiableSet(eclIncludesAllServer), 202 eclIncludesForDeletesByServer, 203 Collections.unmodifiableSet(eclIncludesForDeletesAllServer)); 204 } 205 } 206 207 /** 208 * Current status for this replicated domain. 209 */ 210 private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS; 211 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 212 213 /** The configuration of the replication domain. */ 214 protected volatile ReplicationDomainCfg config; 215 /** 216 * The assured configuration of the replication domain. It is a duplicate of 217 * {@link #config} because of its update model. 218 * 219 * @see #readAssuredConfig(ReplicationDomainCfg, boolean) 220 */ 221 private volatile ReplicationDomainCfg assuredConfig; 222 223 /** 224 * The ReplicationBroker that is used by this ReplicationDomain to 225 * connect to the ReplicationService. 226 */ 227 protected ReplicationBroker broker; 228 229 /** 230 * This Map is used to store all outgoing assured messages in order 231 * to be able to correlate all the coming back acks to the original 232 * operation. 233 */ 234 private final Map<CSN, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<>(); 235 /** 236 * The context related to an import or export being processed 237 * Null when none is being processed. 238 */ 239 private final AtomicReference<ImportExportContext> importExportContext = new AtomicReference<>(); 240 241 /** 242 * The Thread waiting for incoming update messages for this domain and pushing 243 * them to the global incoming update message queue for later processing by 244 * replay threads. 245 */ 246 private volatile DirectoryThread listenerThread; 247 248 /** A set of counters used for Monitoring. */ 249 private AtomicInteger numProcessedUpdates = new AtomicInteger(0); 250 private AtomicInteger numRcvdUpdates = new AtomicInteger(0); 251 private AtomicInteger numSentUpdates = new AtomicInteger(0); 252 253 /** Assured replication monitoring counters. */ 254 255 /** Number of updates sent in Assured Mode, Safe Read. */ 256 private AtomicInteger assuredSrSentUpdates = new AtomicInteger(0); 257 /** 258 * Number of updates sent in Assured Mode, Safe Read, that have been 259 * successfully acknowledged. 260 */ 261 private AtomicInteger assuredSrAcknowledgedUpdates = new AtomicInteger(0); 262 /** 263 * Number of updates sent in Assured Mode, Safe Read, that have not been 264 * successfully acknowledged (either because of timeout, wrong status or error 265 * at replay). 266 */ 267 private AtomicInteger assuredSrNotAcknowledgedUpdates = new AtomicInteger(0); 268 /** 269 * Number of updates sent in Assured Mode, Safe Read, that have not been 270 * successfully acknowledged because of timeout. 271 */ 272 private AtomicInteger assuredSrTimeoutUpdates = new AtomicInteger(0); 273 /** 274 * Number of updates sent in Assured Mode, Safe Read, that have not been 275 * successfully acknowledged because of wrong status. 276 */ 277 private AtomicInteger assuredSrWrongStatusUpdates = new AtomicInteger(0); 278 /** 279 * Number of updates sent in Assured Mode, Safe Read, that have not been 280 * successfully acknowledged because of replay error. 281 */ 282 private AtomicInteger assuredSrReplayErrorUpdates = new AtomicInteger(0); 283 /** 284 * Multiple values allowed: number of updates sent in Assured Mode, Safe Read, 285 * that have not been successfully acknowledged (either because of timeout, 286 * wrong status or error at replay) for a particular server (DS or RS). 287 * <p> 288 * String format: <server id>:<number of failed updates> 289 */ 290 private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<>(); 291 /** Number of updates received in Assured Mode, Safe Read request. */ 292 private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); 293 /** 294 * Number of updates received in Assured Mode, Safe Read request that we have 295 * acked without errors. 296 */ 297 private AtomicInteger assuredSrReceivedUpdatesAcked = new AtomicInteger(0); 298 /** 299 * Number of updates received in Assured Mode, Safe Read request that we have 300 * acked with errors. 301 */ 302 private AtomicInteger assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); 303 /** Number of updates sent in Assured Mode, Safe Data. */ 304 private AtomicInteger assuredSdSentUpdates = new AtomicInteger(0); 305 /** 306 * Number of updates sent in Assured Mode, Safe Data, that have been 307 * successfully acknowledged. 308 */ 309 private AtomicInteger assuredSdAcknowledgedUpdates = new AtomicInteger(0); 310 /** 311 * Number of updates sent in Assured Mode, Safe Data, that have not been 312 * successfully acknowledged because of timeout. 313 */ 314 private AtomicInteger assuredSdTimeoutUpdates = new AtomicInteger(0); 315 /** 316 * Multiple values allowed: number of updates sent in Assured Mode, Safe Data, 317 * that have not been successfully acknowledged because of timeout for a 318 * particular RS. 319 * <p> 320 * String format: <server id>:<number of failed updates> 321 */ 322 private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<>(); 323 324 /* Status related monitoring fields */ 325 326 /** 327 * Indicates the date when the status changed. This may be used to indicate 328 * the date the session with the current replication server started (when 329 * status is NORMAL for instance). All the above assured monitoring fields 330 * are also reset each time the status is changed 331 */ 332 private Date lastStatusChangeDate = new Date(); 333 334 /** 335 * The state maintained by the Concrete Class. 336 */ 337 private final ServerState state; 338 339 /** 340 * The generator that will be used to generate {@link CSN} 341 * for this domain. 342 */ 343 private final CSNGenerator generator; 344 345 private final AtomicReference<ECLIncludes> eclIncludes = new AtomicReference<>(new ECLIncludes()); 346 347 /** 348 * An object used to protect the initialization of the underlying broker 349 * session of this ReplicationDomain. 350 */ 351 private final Object sessionLock = new Object(); 352 353 /** 354 * The generationId for this replication domain. It is made of a hash of the 355 * 1000 first entries for this domain. 356 */ 357 protected volatile long generationId; 358 359 /** 360 * Returns the {@link CSNGenerator} that will be used to 361 * generate {@link CSN} for this domain. 362 * 363 * @return The {@link CSNGenerator} that will be used to 364 * generate {@link CSN} for this domain. 365 */ 366 public CSNGenerator getGenerator() 367 { 368 return generator; 369 } 370 371 /** 372 * Creates a ReplicationDomain with the provided parameters. 373 * 374 * @param config 375 * The configuration object for this ReplicationDomain 376 * @param generationId 377 * the generation of this ReplicationDomain 378 */ 379 public ReplicationDomain(ReplicationDomainCfg config, long generationId) 380 { 381 this(config, generationId, new ServerState()); 382 } 383 384 /** 385 * Creates a ReplicationDomain with the provided parameters. (for unit test 386 * purpose only) 387 * 388 * @param config 389 * The configuration object for this ReplicationDomain 390 * @param generationId 391 * the generation of this ReplicationDomain 392 * @param serverState 393 * The serverState to use 394 */ 395 public ReplicationDomain(ReplicationDomainCfg config, long generationId, 396 ServerState serverState) 397 { 398 this.config = config; 399 this.assuredConfig = config; 400 this.generationId = generationId; 401 this.state = serverState; 402 this.generator = new CSNGenerator(getServerId(), state); 403 } 404 405 /** 406 * Set the initial status of the domain and perform necessary initializations. 407 * This method will be called by the Broker each time the ReplicationBroker 408 * establish a new session to a Replication Server. 409 * 410 * Implementations may override this method when they need to perform 411 * additional computing after session establishment. 412 * The default implementation should be sufficient for ReplicationDomains 413 * that don't need to perform additional computing. 414 * 415 * @param initStatus The status to enter the state machine with. 416 * @param rsState The ServerState of the ReplicationServer 417 * with which the session was established. 418 */ 419 public void sessionInitiated(ServerStatus initStatus, ServerState rsState) 420 { 421 // Sanity check: is it a valid initial status? 422 if (!isValidInitialStatus(initStatus)) 423 { 424 logger.error(ERR_DS_INVALID_INIT_STATUS, initStatus, getBaseDN(), getServerId()); 425 } 426 else 427 { 428 status = initStatus; 429 } 430 generator.adjust(state); 431 generator.adjust(rsState); 432 } 433 434 /** 435 * Processes an incoming ChangeStatusMsg. Compute new status according to 436 * given order. Then update domain for being compliant with new status 437 * definition. 438 * @param csMsg The received status message 439 */ 440 private void receiveChangeStatus(ChangeStatusMsg csMsg) 441 { 442 if (logger.isTraceEnabled()) 443 { 444 logger.trace("Replication domain " + getBaseDN() + 445 " received change status message:\n" + csMsg); 446 } 447 448 ServerStatus reqStatus = csMsg.getRequestedStatus(); 449 450 // Translate requested status to a state machine event 451 StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); 452 if (event == StatusMachineEvent.INVALID_EVENT) 453 { 454 logger.error(ERR_DS_INVALID_REQUESTED_STATUS, reqStatus, getBaseDN(), getServerId()); 455 return; 456 } 457 458 // Set the new status to the requested one 459 setNewStatus(event); 460 } 461 462 /** 463 * Called when first connection or disconnection detected. 464 */ 465 void toNotConnectedStatus() 466 { 467 // Go into not connected status 468 setNewStatus(StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT); 469 } 470 471 /** 472 * Perform whatever actions are needed to apply properties for being 473 * compliant with new status. Must be called in synchronized section for 474 * status. The new status is already set in status variable. 475 */ 476 private void updateDomainForNewStatus() 477 { 478 switch (status) 479 { 480 case FULL_UPDATE_STATUS: 481 // Signal RS we just entered the full update status 482 broker.signalStatusChange(status); 483 break; 484 case NOT_CONNECTED_STATUS: 485 case NORMAL_STATUS: 486 case DEGRADED_STATUS: 487 case BAD_GEN_ID_STATUS: 488 break; 489 default: 490 if (logger.isTraceEnabled()) 491 { 492 logger.trace("updateDomainForNewStatus: unexpected status: " + status); 493 } 494 } 495 } 496 497 /** 498 * Gets the status for this domain. 499 * @return The status for this domain. 500 */ 501 public ServerStatus getStatus() 502 { 503 return status; 504 } 505 506 /** 507 * Returns the base DN of this ReplicationDomain. All Replication Domain using 508 * this baseDN will be connected through the Replication Service. 509 * 510 * @return The base DN of this ReplicationDomain 511 */ 512 public DN getBaseDN() 513 { 514 return config.getBaseDN(); 515 } 516 517 /** 518 * Get the server ID. The identifier of this Replication Domain inside the 519 * Replication Service. Each Domain must use a unique ServerID. 520 * 521 * @return The server ID. 522 */ 523 public int getServerId() 524 { 525 return config.getServerId(); 526 } 527 528 /** 529 * Window size used during initialization .. between - the 530 * initializer/exporter DS that listens/waits acknowledges and that slows down 531 * data msg publishing based on the slowest server - and each 532 * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg 533 * received. 534 * 535 * @return the initWindow 536 */ 537 protected int getInitWindow() 538 { 539 return config.getInitializationWindowSize(); 540 } 541 542 /** 543 * Tells if assured replication is enabled for this domain. 544 * @return True if assured replication is enabled for this domain. 545 */ 546 public boolean isAssured() 547 { 548 return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType()) 549 || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType()); 550 } 551 552 /** 553 * Gives the mode for the assured replication of the domain. Only used when 554 * assured is true). 555 * 556 * @return The mode for the assured replication of the domain. 557 */ 558 public AssuredMode getAssuredMode() 559 { 560 switch (assuredConfig.getAssuredType()) 561 { 562 case SAFE_DATA: 563 case NOT_ASSURED: // The assured mode will be ignored in that case anyway 564 return AssuredMode.SAFE_DATA_MODE; 565 case SAFE_READ: 566 return AssuredMode.SAFE_READ_MODE; 567 } 568 return null; // should never happen 569 } 570 571 /** 572 * Gives the assured Safe Data level of the replication of the domain. (used 573 * when assuredMode is SAFE_DATA). 574 * 575 * @return The assured level of the replication of the domain. 576 */ 577 public byte getAssuredSdLevel() 578 { 579 return (byte) assuredConfig.getAssuredSdLevel(); 580 } 581 582 /** 583 * Gives the assured timeout of the replication of the domain (in ms). 584 * @return The assured timeout of the replication of the domain. 585 */ 586 public long getAssuredTimeout() 587 { 588 return assuredConfig.getAssuredTimeout(); 589 } 590 591 /** 592 * Gets the group id for this domain. 593 * @return The group id for this domain. 594 */ 595 public byte getGroupId() 596 { 597 return (byte) config.getGroupId(); 598 } 599 600 /** 601 * Gets the referrals URLs this domain publishes. Referrals urls to be 602 * published to other servers of the topology. 603 * <p> 604 * TODO: fill that with all currently opened urls if no urls configured 605 * 606 * @return The referrals URLs this domain publishes. 607 */ 608 public Set<String> getRefUrls() 609 { 610 return config.getReferralsUrl(); 611 } 612 613 /** 614 * Gets the info for Replicas in the topology (except us). 615 * @return The info for Replicas in the topology (except us) 616 */ 617 public Map<Integer, DSInfo> getReplicaInfos() 618 { 619 return broker.getReplicaInfos(); 620 } 621 622 /** 623 * Returns information about the DS server related to the provided serverId. 624 * based on the TopologyMsg we received when the remote replica connected or 625 * disconnected. Return null when no server with the provided serverId is 626 * connected. 627 * 628 * @param dsId The provided serverId of the remote replica 629 * @return the info related to this remote server if it is connected, 630 * null is the server is NOT connected. 631 */ 632 private DSInfo getConnectedRemoteDS(int dsId) 633 { 634 return getReplicaInfos().get(dsId); 635 } 636 637 /** 638 * Gets the States of all the Replicas currently in the 639 * Topology. 640 * When this method is called, a Monitoring message will be sent 641 * to the Replication Server to which this domain is currently connected 642 * so that it computes a table containing information about 643 * all Directory Servers in the topology. 644 * This Computation involves communications will all the servers 645 * currently connected and 646 * 647 * @return The States of all Replicas in the topology (except us) 648 */ 649 public Map<Integer, ServerState> getReplicaStates() 650 { 651 return broker.getReplicaStates(); 652 } 653 654 /** 655 * Gets the info for RSs in the topology (except the one we are connected 656 * to). 657 * @return The info for RSs in the topology (except the one we are connected 658 * to) 659 */ 660 public List<RSInfo> getRsInfos() 661 { 662 return broker.getRsInfos(); 663 } 664 665 666 /** 667 * Gets the server ID of the Replication Server to which the domain 668 * is currently connected. 669 * 670 * @return The server ID of the Replication Server to which the domain 671 * is currently connected. 672 */ 673 public int getRsServerId() 674 { 675 return broker.getRsServerId(); 676 } 677 678 /** 679 * Increment the number of processed updates. 680 */ 681 private void incProcessedUpdates() 682 { 683 numProcessedUpdates.incrementAndGet(); 684 } 685 686 /** 687 * Get the number of updates replayed by the replication. 688 * 689 * @return The number of updates replayed by the replication 690 */ 691 int getNumProcessedUpdates() 692 { 693 if (numProcessedUpdates != null) 694 { 695 return numProcessedUpdates.get(); 696 } 697 return 0; 698 } 699 700 /** 701 * Get the number of updates received by the replication plugin. 702 * 703 * @return the number of updates received 704 */ 705 int getNumRcvdUpdates() 706 { 707 if (numRcvdUpdates != null) 708 { 709 return numRcvdUpdates.get(); 710 } 711 return 0; 712 } 713 714 /** 715 * Get the number of updates sent by the replication plugin. 716 * 717 * @return the number of updates sent 718 */ 719 int getNumSentUpdates() 720 { 721 if (numSentUpdates != null) 722 { 723 return numSentUpdates.get(); 724 } 725 return 0; 726 } 727 728 /** 729 * Receives an update message from the replicationServer. 730 * The other types of messages are processed in an opaque way for the caller. 731 * Also responsible for updating the list of pending changes 732 * @return the received message - null if none 733 */ 734 private UpdateMsg receive() 735 { 736 UpdateMsg update = null; 737 738 while (update == null) 739 { 740 InitializeRequestMsg initReqMsg = null; 741 ReplicationMsg msg; 742 try 743 { 744 msg = broker.receive(true, true, false); 745 if (msg == null) 746 { 747 // The server is in the shutdown process 748 return null; 749 } 750 751 if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg)) 752 { 753 logger.trace("LocalizableMessage received <" + msg + ">"); 754 } 755 756 if (msg instanceof AckMsg) 757 { 758 AckMsg ack = (AckMsg) msg; 759 receiveAck(ack); 760 } 761 else if (msg instanceof InitializeRequestMsg) 762 { 763 // Another server requests us to provide entries 764 // for a total update 765 initReqMsg = (InitializeRequestMsg)msg; 766 } 767 else if (msg instanceof InitializeTargetMsg) 768 { 769 // Another server is exporting its entries to us 770 InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg; 771 772 /* 773 This must be done while we are still holding the broker lock 774 because we are now going to receive a bunch of entries from the 775 remote server and we want the import thread to catch them and 776 not the ListenerThread. 777 */ 778 initialize(initTargetMsg, initTargetMsg.getSenderID()); 779 } 780 else if (msg instanceof ErrorMsg) 781 { 782 ErrorMsg errorMsg = (ErrorMsg)msg; 783 ImportExportContext ieCtx = importExportContext.get(); 784 if (ieCtx != null) 785 { 786 /* 787 This is an error termination for the 2 following cases : 788 - either during an export 789 - or before an import really started 790 For example, when we publish a request and the 791 replicationServer did not find the import source. 792 793 A remote error during the import will be received in the 794 receiveEntryBytes() method. 795 */ 796 if (logger.isTraceEnabled()) 797 { 798 logger.trace( 799 "[IE] processErrorMsg:" + getServerId() + 800 " baseDN: " + getBaseDN() + 801 " Error Msg received: " + errorMsg); 802 } 803 804 if (errorMsg.getCreationTime() > ieCtx.startTime) 805 { 806 // consider only ErrorMsg that relate to the current import/export 807 processErrorMsg(errorMsg, ieCtx); 808 } 809 else 810 { 811 /* 812 Simply log - happen when the ErrorMsg relates to a previous 813 attempt of initialization while we have started a new one 814 on this side. 815 */ 816 logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails()); 817 } 818 } 819 else 820 { 821 // Simply log - happen if import/export has been terminated 822 // on our side before receiving this ErrorMsg. 823 logger.error(ERR_ERROR_MSG_RECEIVED, errorMsg.getDetails()); 824 } 825 } 826 else if (msg instanceof ChangeStatusMsg) 827 { 828 ChangeStatusMsg csMsg = (ChangeStatusMsg)msg; 829 receiveChangeStatus(csMsg); 830 } 831 else if (msg instanceof UpdateMsg) 832 { 833 update = (UpdateMsg) msg; 834 generator.adjust(update.getCSN()); 835 } 836 else if (msg instanceof InitializeRcvAckMsg) 837 { 838 ImportExportContext ieCtx = importExportContext.get(); 839 if (ieCtx != null) 840 { 841 InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg; 842 ieCtx.setAckVal(ackMsg.getSenderID(), ackMsg.getNumAck()); 843 } 844 // Trash this msg When no input/export is running/should never happen 845 } 846 } 847 catch (SocketTimeoutException e) 848 { 849 // just retry 850 } 851 /* 852 Test if we have received and export request message and 853 if that's the case handle it now. 854 This must be done outside of the portion of code protected 855 by the broker lock so that we keep receiving update 856 when we are doing and export and so that a possible 857 closure of the socket happening when we are publishing the 858 entries to the remote can be handled by the other 859 replay thread when they call this method and therefore the 860 broker.receive() method. 861 */ 862 if (initReqMsg != null) 863 { 864 // Do this work in a thread to allow replay thread continue working 865 ExportThread exportThread = new ExportThread( 866 initReqMsg.getSenderID(), initReqMsg.getInitWindow()); 867 exportThread.start(); 868 } 869 } 870 871 numRcvdUpdates.incrementAndGet(); 872 if (update.isAssured() 873 && broker.getRsGroupId() == getGroupId() 874 && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE) 875 { 876 assuredSrReceivedUpdates.incrementAndGet(); 877 } 878 return update; 879 } 880 881 /** 882 * Updates the passed monitoring list of errors received for assured messages 883 * (safe data or safe read, depending of the passed list to update) for a 884 * particular server in the list. This increments the counter of error for the 885 * passed server, or creates an initial value of 1 error for it if the server 886 * is not yet present in the map. 887 * @param errorsByServer map of number of errors per serverID 888 * @param sid the ID of the server which produced an error 889 */ 890 private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer, 891 Integer sid) 892 { 893 synchronized (errorsByServer) 894 { 895 Integer serverErrCount = errorsByServer.get(sid); 896 if (serverErrCount == null) 897 { 898 // Server not present in list, create an entry with an 899 // initial number of errors set to 1 900 errorsByServer.put(sid, 1); 901 } else 902 { 903 // Server already present in list, just increment number of 904 // errors for the server 905 int val = serverErrCount; 906 val++; 907 errorsByServer.put(sid, val); 908 } 909 } 910 } 911 912 /** 913 * Do the necessary processing when an AckMsg is received. 914 * 915 * @param ack The AckMsg that was received. 916 */ 917 private void receiveAck(AckMsg ack) 918 { 919 CSN csn = ack.getCSN(); 920 921 // Remove the message for pending ack list (this may already make the thread 922 // that is waiting for the ack be aware of its reception) 923 UpdateMsg update = waitingAckMsgs.remove(csn); 924 925 // Signal waiting thread ack has been received 926 if (update != null) 927 { 928 synchronized (update) 929 { 930 update.notify(); 931 } 932 933 // Analyze status of embedded in the ack to see if everything went well 934 boolean hasTimeout = ack.hasTimeout(); 935 boolean hasReplayErrors = ack.hasReplayError(); 936 boolean hasWrongStatus = ack.hasWrongStatus(); 937 938 AssuredMode updateAssuredMode = update.getAssuredMode(); 939 940 if ( hasTimeout || hasReplayErrors || hasWrongStatus) 941 { 942 /* 943 Some problems detected: message did not correctly reach every 944 requested servers. Log problem 945 */ 946 logger.info(NOTE_DS_RECEIVED_ACK_ERROR, getBaseDN(), getServerId(), update, ack.errorsToString()); 947 948 List<Integer> failedServers = ack.getFailedServers(); 949 950 // Increment assured replication monitoring counters 951 switch (updateAssuredMode) 952 { 953 case SAFE_READ_MODE: 954 assuredSrNotAcknowledgedUpdates.incrementAndGet(); 955 if (hasTimeout) 956 { 957 assuredSrTimeoutUpdates.incrementAndGet(); 958 } 959 if (hasReplayErrors) 960 { 961 assuredSrReplayErrorUpdates.incrementAndGet(); 962 } 963 if (hasWrongStatus) 964 { 965 assuredSrWrongStatusUpdates.incrementAndGet(); 966 } 967 if (failedServers != null) // This should always be the case ! 968 { 969 for(Integer sid : failedServers) 970 { 971 updateAssuredErrorsByServer( 972 assuredSrServerNotAcknowledgedUpdates, sid); 973 } 974 } 975 break; 976 case SAFE_DATA_MODE: 977 // The only possible cause of ack error in safe data mode is timeout 978 if (hasTimeout) // So should always be the case 979 { 980 assuredSdTimeoutUpdates.incrementAndGet(); 981 } 982 if (failedServers != null) // This should always be the case ! 983 { 984 for(Integer sid : failedServers) 985 { 986 updateAssuredErrorsByServer( 987 assuredSdServerTimeoutUpdates, sid); 988 } 989 } 990 break; 991 default: 992 // Should not happen 993 } 994 } else 995 { 996 // Update has been acknowledged without errors 997 // Increment assured replication monitoring counters 998 switch (updateAssuredMode) 999 { 1000 case SAFE_READ_MODE: 1001 assuredSrAcknowledgedUpdates.incrementAndGet(); 1002 break; 1003 case SAFE_DATA_MODE: 1004 assuredSdAcknowledgedUpdates.incrementAndGet(); 1005 break; 1006 default: 1007 // Should not happen 1008 } 1009 } 1010 } 1011 } 1012 1013 1014 /* 1015 * After this point the code is related to the Total Update. 1016 */ 1017 1018 /** 1019 * This thread is launched when we want to export data to another server. 1020 * 1021 * When a task is created locally (so this local server is the initiator) 1022 * of the export (Example: dsreplication initialize-all), 1023 * this thread is NOT used but the task thread is running the export instead). 1024 */ 1025 private class ExportThread extends DirectoryThread 1026 { 1027 /** Id of server that will be initialized. */ 1028 private final int serverIdToInitialize; 1029 private final int initWindow; 1030 1031 1032 1033 /** 1034 * Constructor for the ExportThread. 1035 * 1036 * @param serverIdToInitialize 1037 * serverId of server that will receive entries 1038 * @param initWindow 1039 * The value of the initialization window for flow control between 1040 * the importer and the exporter. 1041 */ 1042 public ExportThread(int serverIdToInitialize, int initWindow) 1043 { 1044 super("Export thread from serverId=" + getServerId() + " to serverId=" 1045 + serverIdToInitialize); 1046 this.serverIdToInitialize = serverIdToInitialize; 1047 this.initWindow = initWindow; 1048 } 1049 1050 1051 1052 /** 1053 * Run method for this class. 1054 */ 1055 @Override 1056 public void run() 1057 { 1058 if (logger.isTraceEnabled()) 1059 { 1060 logger.trace("[IE] starting " + getName()); 1061 } 1062 try 1063 { 1064 initializeRemote(serverIdToInitialize, serverIdToInitialize, null, 1065 initWindow); 1066 } catch (DirectoryException de) 1067 { 1068 /* 1069 An error message has been sent to the peer 1070 This server is not the initiator of the export so there is 1071 nothing more to do locally. 1072 */ 1073 } 1074 1075 if (logger.isTraceEnabled()) 1076 { 1077 logger.trace("[IE] ending " + getName()); 1078 } 1079 } 1080 } 1081 1082 /** 1083 * This class contains the context related to an import or export launched on 1084 * the domain. 1085 */ 1086 protected class ImportExportContext 1087 { 1088 /** The private task that initiated the operation. */ 1089 private Task initializeTask; 1090 /** The destination in the case of an export. */ 1091 private int exportTarget = RoutableMsg.UNKNOWN_SERVER; 1092 /** The source in the case of an import. */ 1093 private int importSource = RoutableMsg.UNKNOWN_SERVER; 1094 1095 /** The total entry count expected to be processed. */ 1096 private long entryCount; 1097 /** The count for the entry not yet processed. */ 1098 private long entryLeftCount; 1099 1100 /** Exception raised during the initialization. */ 1101 private DirectoryException exception; 1102 1103 /** Whether the context is related to an import or an export. */ 1104 private final boolean importInProgress; 1105 1106 /** Current counter of messages exchanged during the initialization. */ 1107 private int msgCnt; 1108 1109 /** 1110 * Number of connections lost when we start the initialization. Will help 1111 * counting connections lost during initialization, 1112 */ 1113 private int initNumLostConnections; 1114 1115 /** 1116 * Request message sent when this server has the initializeFromRemote task. 1117 */ 1118 private InitializeRequestMsg initReqMsgSent; 1119 1120 /** 1121 * Start time of the initialization process. ErrorMsg timestamped before 1122 * this startTime will be ignored. 1123 */ 1124 private final long startTime; 1125 1126 /** List for replicas (DS) connected to the topology when initialization started. */ 1127 private final Set<Integer> startList = new HashSet<>(0); 1128 1129 /** 1130 * List for replicas (DS) with a failure (disconnected from the topology) 1131 * since the initialization started. 1132 */ 1133 private final Set<Integer> failureList = new HashSet<>(0); 1134 1135 /** 1136 * Flow control during initialization: for each remote server, counter of 1137 * messages received. 1138 */ 1139 private final Map<Integer, Integer> ackVals = new HashMap<>(); 1140 /** ServerId of the slowest server (the one with the smallest non null counter). */ 1141 private int slowestServerId = -1; 1142 1143 private short exporterProtocolVersion = -1; 1144 1145 /** Window used during this initialization. */ 1146 private int initWindow; 1147 1148 /** Number of attempt already done for this initialization. */ 1149 private short attemptCnt; 1150 1151 /** 1152 * Creates a new IEContext. 1153 * 1154 * @param importInProgress true if the IEContext will be used 1155 * for and import, false if the IEContext 1156 * will be used for and export. 1157 */ 1158 private ImportExportContext(boolean importInProgress) 1159 { 1160 this.importInProgress = importInProgress; 1161 this.startTime = System.currentTimeMillis(); 1162 this.attemptCnt = 0; 1163 } 1164 1165 /** 1166 * Returns a boolean indicating if a total update import is currently in 1167 * Progress. 1168 * 1169 * @return A boolean indicating if a total update import is currently in 1170 * Progress. 1171 */ 1172 boolean importInProgress() 1173 { 1174 return importInProgress; 1175 } 1176 1177 /** 1178 * Returns the total number of entries to be processed when a total update 1179 * is in progress. 1180 * 1181 * @return The total number of entries to be processed when a total update 1182 * is in progress. 1183 */ 1184 long getTotalEntryCount() 1185 { 1186 return entryCount; 1187 } 1188 1189 /** 1190 * Returns the number of entries still to be processed when a total update 1191 * is in progress. 1192 * 1193 * @return The number of entries still to be processed when a total update 1194 * is in progress. 1195 */ 1196 long getLeftEntryCount() 1197 { 1198 return entryLeftCount; 1199 } 1200 1201 /** 1202 * Initializes the import/export counters with the provider value. 1203 * @param total Total number of entries to be processed. 1204 * @throws DirectoryException if an error occurred. 1205 */ 1206 private void initializeCounters(long total) throws DirectoryException 1207 { 1208 entryCount = total; 1209 entryLeftCount = total; 1210 1211 if (initializeTask instanceof InitializeTask) 1212 { 1213 final InitializeTask task = (InitializeTask) initializeTask; 1214 task.setTotal(entryCount); 1215 task.setLeft(entryCount); 1216 } 1217 else if (initializeTask instanceof InitializeTargetTask) 1218 { 1219 final InitializeTargetTask task = (InitializeTargetTask) initializeTask; 1220 task.setTotal(entryCount); 1221 task.setLeft(entryCount); 1222 } 1223 } 1224 1225 /** 1226 * Update the counters of the task for each entry processed during 1227 * an import or export. 1228 * 1229 * @param entriesDone The number of entries that were processed 1230 * since the last time this method was called. 1231 * 1232 * @throws DirectoryException if an error occurred. 1233 */ 1234 private void updateCounters(int entriesDone) throws DirectoryException 1235 { 1236 entryLeftCount -= entriesDone; 1237 1238 if (initializeTask != null) 1239 { 1240 if (initializeTask instanceof InitializeTask) 1241 { 1242 ((InitializeTask)initializeTask).setLeft(entryLeftCount); 1243 } 1244 else if (initializeTask instanceof InitializeTargetTask) 1245 { 1246 ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); 1247 } 1248 } 1249 } 1250 1251 /** {@inheritDoc} */ 1252 @Override 1253 public String toString() 1254 { 1255 return "[Entry count=" + this.entryCount + 1256 ", Entry left count=" + this.entryLeftCount + "]"; 1257 } 1258 1259 /** 1260 * Gets the server id of the exporting server. 1261 * @return the server id of the exporting server. 1262 */ 1263 public int getExportTarget() 1264 { 1265 return exportTarget; 1266 } 1267 1268 /** 1269 * Gets the server id of the importing server. 1270 * @return the server id of the importing server. 1271 */ 1272 public int getImportSource() 1273 { 1274 return importSource; 1275 } 1276 1277 /** 1278 * Get the exception that occurred during the import/export. 1279 * @return the exception that occurred during the import/export. 1280 */ 1281 public DirectoryException getException() 1282 { 1283 return exception; 1284 } 1285 1286 /** 1287 * Set the exception that occurred during the import/export. 1288 * @param exception the exception that occurred during the import/export. 1289 */ 1290 public void setException(DirectoryException exception) 1291 { 1292 this.exception = exception; 1293 } 1294 1295 /** 1296 * Only sets the exception that occurred during the import/export if none 1297 * was already set on this object. 1298 * 1299 * @param exception the exception that occurred during the import/export. 1300 */ 1301 public void setExceptionIfNoneSet(DirectoryException exception) 1302 { 1303 if (exception == null) 1304 { 1305 this.exception = exception; 1306 } 1307 } 1308 1309 /** 1310 * Set the id of the EntryMsg acknowledged from a receiver (importer)server. 1311 * (updated via the listener thread) 1312 * @param serverId serverId of the acknowledger/receiver/importer server. 1313 * @param numAck id of the message received. 1314 */ 1315 private void setAckVal(int serverId, int numAck) 1316 { 1317 if (logger.isTraceEnabled()) 1318 { 1319 logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck); 1320 } 1321 1322 this.ackVals.put(serverId, numAck); 1323 1324 // Recompute the server with the minAck returned,means the slowest server. 1325 slowestServerId = serverId; 1326 for (Integer sid : importExportContext.get().ackVals.keySet()) 1327 { 1328 if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId)) 1329 { 1330 slowestServerId = sid; 1331 } 1332 } 1333 } 1334 1335 /** 1336 * Returns the serverId of the server that acknowledged the smallest 1337 * EntryMsg id. 1338 * @return serverId of the server with latest acknowledge. 1339 * 0 when no ack has been received yet. 1340 */ 1341 public int getSlowestServer() 1342 { 1343 if (logger.isTraceEnabled()) 1344 { 1345 logger.trace("[IE] getSlowestServer" + slowestServerId 1346 + " " + this.ackVals.get(slowestServerId)); 1347 } 1348 1349 return this.slowestServerId; 1350 } 1351 1352 } 1353 1354 /** 1355 * Verifies that the given string represents a valid source 1356 * from which this server can be initialized. 1357 * 1358 * @param targetString The string representing the source 1359 * @return The source as a integer value 1360 * @throws DirectoryException if the string is not valid 1361 */ 1362 public int decodeTarget(String targetString) throws DirectoryException 1363 { 1364 if ("all".equalsIgnoreCase(targetString)) 1365 { 1366 return RoutableMsg.ALL_SERVERS; 1367 } 1368 1369 // So should be a serverID 1370 try 1371 { 1372 int target = Integer.decode(targetString); 1373 if (target >= 0) 1374 { 1375 // FIXME Could we check now that it is a know server in the domain ? 1376 // JNR: Yes please 1377 } 1378 return target; 1379 } 1380 catch (Exception e) 1381 { 1382 ResultCode resultCode = ResultCode.OTHER; 1383 LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get(); 1384 throw new DirectoryException(resultCode, message, e); 1385 } 1386 } 1387 1388 /** 1389 * Initializes a remote server from this server. 1390 * <p> 1391 * The {@code exportBackend(OutputStream)} will therefore be called 1392 * on this server, and the {@code importBackend(InputStream)} 1393 * will be called on the remote server. 1394 * <p> 1395 * The InputStream and OutputStream given as a parameter to those 1396 * methods will be connected through the replication protocol. 1397 * 1398 * @param target The server-id of the server that should be initialized. 1399 * The target can be discovered using the 1400 * {@link #getReplicaInfos()} method. 1401 * @param initTask The task that triggers this initialization and that should 1402 * be updated with its progress. 1403 * 1404 * @throws DirectoryException If it was not possible to publish the 1405 * Initialization message to the Topology. 1406 */ 1407 public void initializeRemote(int target, Task initTask) 1408 throws DirectoryException 1409 { 1410 initializeRemote(target, getServerId(), initTask, getInitWindow()); 1411 } 1412 1413 /** 1414 * Process the initialization of some other server or servers in the topology 1415 * specified by the target argument when this initialization specifying the 1416 * server that requests the initialization. 1417 * 1418 * @param serverToInitialize The target server that should be initialized. 1419 * @param serverRunningTheTask The server that initiated the export. It can 1420 * be the serverID of this server, or the serverID of a remote server. 1421 * @param initTask The task in this server that triggers this initialization 1422 * and that should be updated with its progress. Null when the export is done 1423 * following a request coming from a remote server (task is remote). 1424 * @param initWindow The value of the initialization window for flow control 1425 * between the importer and the exporter. 1426 * 1427 * @exception DirectoryException When an error occurs. No exception raised 1428 * means success. 1429 */ 1430 protected void initializeRemote(int serverToInitialize, 1431 int serverRunningTheTask, Task initTask, int initWindow) 1432 throws DirectoryException 1433 { 1434 final ImportExportContext ieCtx = acquireIEContext(false); 1435 1436 /* 1437 We manage the list of servers to initialize in order : 1438 - to test at the end that all expected servers have reconnected 1439 after their import and with the right genId 1440 - to update the task with the server(s) where this test failed 1441 */ 1442 1443 if (serverToInitialize == RoutableMsg.ALL_SERVERS) 1444 { 1445 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL, 1446 countEntries(), getBaseDN(), getServerId()); 1447 1448 ieCtx.startList.addAll(getReplicaInfos().keySet()); 1449 1450 // We manage the list of servers with which a flow control can be enabled 1451 for (DSInfo dsi : getReplicaInfos().values()) 1452 { 1453 if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) 1454 { 1455 ieCtx.setAckVal(dsi.getDsId(), 0); 1456 } 1457 } 1458 } 1459 else 1460 { 1461 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(), 1462 getBaseDN(), getServerId(), serverToInitialize); 1463 1464 ieCtx.startList.add(serverToInitialize); 1465 1466 // We manage the list of servers with which a flow control can be enabled 1467 for (DSInfo dsi : getReplicaInfos().values()) 1468 { 1469 if (dsi.getDsId() == serverToInitialize && 1470 dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4) 1471 { 1472 ieCtx.setAckVal(dsi.getDsId(), 0); 1473 } 1474 } 1475 } 1476 1477 DirectoryException exportRootException = null; 1478 1479 // loop for the case where the exporter is the initiator 1480 int attempt = 0; 1481 boolean done = false; 1482 while (!done && ++attempt < 2) // attempt loop 1483 { 1484 try 1485 { 1486 ieCtx.exportTarget = serverToInitialize; 1487 if (initTask != null) 1488 { 1489 ieCtx.initializeTask = initTask; 1490 } 1491 ieCtx.initializeCounters(countEntries()); 1492 ieCtx.msgCnt = 0; 1493 ieCtx.initNumLostConnections = broker.getNumLostConnections(); 1494 ieCtx.initWindow = initWindow; 1495 1496 // Send start message to the peer 1497 InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( 1498 getBaseDN(), getServerId(), serverToInitialize, 1499 serverRunningTheTask, ieCtx.entryCount, initWindow); 1500 1501 broker.publish(initTargetMsg); 1502 1503 // Wait for all servers to be ok 1504 waitForRemoteStartOfInit(ieCtx); 1505 1506 // Servers that left in the list are those for which we could not test 1507 // that they have been successfully initialized. 1508 if (!ieCtx.failureList.isEmpty()) 1509 { 1510 throw new DirectoryException( 1511 ResultCode.OTHER, 1512 ERR_INIT_NO_SUCCESS_START_FROM_SERVERS.get(getBaseDN(), ieCtx.failureList)); 1513 } 1514 1515 exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); 1516 1517 // Notify the peer of the success 1518 broker.publish( 1519 new DoneMsg(getServerId(), initTargetMsg.getDestination())); 1520 } 1521 catch(DirectoryException exportException) 1522 { 1523 // Give priority to the first exception raised - stored in the context 1524 final DirectoryException ieEx = ieCtx.exception; 1525 exportRootException = ieEx != null ? ieEx : exportException; 1526 } 1527 1528 if (logger.isTraceEnabled()) 1529 { 1530 logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName() 1531 + " export ends with connected=" + broker.isConnected() 1532 + " exportRootException=" + exportRootException); 1533 } 1534 1535 if (exportRootException != null) 1536 { 1537 try 1538 { 1539 /* 1540 Handling the errors during export 1541 1542 Note: we could have lost the connection and another thread 1543 the listener one) has already managed to reconnect. 1544 So we MUST rely on the test broker.isConnected() 1545 ONLY to do 'wait to be reconnected by another thread' 1546 (if not yet reconnected already). 1547 */ 1548 if (!broker.isConnected()) 1549 { 1550 // We are still disconnected, so we wait for the listener thread 1551 // to reconnect - wait 10s 1552 if (logger.isTraceEnabled()) 1553 { 1554 logger.trace("[IE] Exporter wait for reconnection by the listener thread"); 1555 } 1556 int att=0; 1557 while (!broker.shuttingDown() 1558 && !broker.isConnected() 1559 && ++att < 100) 1560 { 1561 try { Thread.sleep(100); } 1562 catch(Exception e){ /* do nothing */ } 1563 } 1564 } 1565 1566 if (initTask != null 1567 && broker.isConnected() 1568 && serverToInitialize != RoutableMsg.ALL_SERVERS) 1569 { 1570 /* 1571 NewAttempt case : In the case where 1572 - it's not an InitializeAll 1573 - AND the previous export attempt failed 1574 - AND we are (now) connected 1575 - and we own the task and this task is not an InitializeAll 1576 Let's : 1577 - sleep to let time to the other peer to reconnect if needed 1578 - and launch another attempt 1579 */ 1580 try { Thread.sleep(1000); } 1581 catch(Exception e){ /* do nothing */ } 1582 1583 logger.info(NOTE_RESENDING_INIT_TARGET, exportRootException.getLocalizedMessage()); 1584 continue; 1585 } 1586 1587 broker.publish(new ErrorMsg( 1588 serverToInitialize, exportRootException.getMessageObject())); 1589 } 1590 catch(Exception e) 1591 { 1592 // Ignore the failure raised while proceeding the root failure 1593 } 1594 } 1595 1596 // We are always done for this export ... 1597 // ... except in the NewAttempt case (see above) 1598 done = true; 1599 1600 } // attempt loop 1601 1602 // Wait for all servers to be ok, and build the failure list 1603 waitForRemoteEndOfInit(ieCtx); 1604 1605 // Servers that left in the list are those for which we could not test 1606 // that they have been successfully initialized. 1607 if (!ieCtx.failureList.isEmpty() && exportRootException == null) 1608 { 1609 exportRootException = new DirectoryException(ResultCode.OTHER, 1610 ERR_INIT_NO_SUCCESS_END_FROM_SERVERS.get(getGenerationID(), ieCtx.failureList)); 1611 } 1612 1613 // Don't forget to release IEcontext acquired at beginning. 1614 releaseIEContext(); // FIXME should not this be in a finally? 1615 1616 final String cause = exportRootException == null ? "" 1617 : exportRootException.getLocalizedMessage(); 1618 if (serverToInitialize == RoutableMsg.ALL_SERVERS) 1619 { 1620 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL, 1621 getBaseDN(), getServerId(), cause); 1622 } 1623 else 1624 { 1625 logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END, 1626 getBaseDN(), getServerId(), serverToInitialize, cause); 1627 } 1628 1629 1630 if (exportRootException != null) 1631 { 1632 throw exportRootException; 1633 } 1634 } 1635 1636 /** 1637 * For all remote servers in the start list: 1638 * - wait it has finished the import and present the expected generationID, 1639 * - build the failureList. 1640 */ 1641 private void waitForRemoteStartOfInit(ImportExportContext ieCtx) 1642 { 1643 final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList); 1644 1645 if (logger.isTraceEnabled()) 1646 { 1647 logger.trace("[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); 1648 } 1649 1650 int waitResultAttempt = 0; 1651 boolean done; 1652 do 1653 { 1654 done = true; 1655 for (DSInfo dsi : getReplicaInfos().values()) 1656 { 1657 if (logger.isTraceEnabled()) 1658 { 1659 logger.trace( 1660 "[IE] wait for start dsId " + dsi.getDsId() 1661 + " " + dsi.getStatus() 1662 + " " + dsi.getGenerationId() 1663 + " " + getGenerationID()); 1664 } 1665 if (ieCtx.startList.contains(dsi.getDsId())) 1666 { 1667 if (dsi.getStatus() != ServerStatus.FULL_UPDATE_STATUS) 1668 { 1669 // this one is still not doing the Full Update ... retry later 1670 done = false; 1671 try { Thread.sleep(100); 1672 } 1673 catch (InterruptedException e) { 1674 Thread.currentThread().interrupt(); 1675 } 1676 waitResultAttempt++; 1677 break; 1678 } 1679 else 1680 { 1681 // this one is ok 1682 replicasWeAreWaitingFor.remove(dsi.getDsId()); 1683 } 1684 } 1685 } 1686 } 1687 while (!done && waitResultAttempt < 1200 && !broker.shuttingDown()); 1688 1689 ieCtx.failureList.addAll(replicasWeAreWaitingFor); 1690 1691 if (logger.isTraceEnabled()) 1692 { 1693 logger.trace("[IE] wait for start ends with " + ieCtx.failureList); 1694 } 1695 } 1696 1697 /** 1698 * For all remote servers in the start list: 1699 * - wait it has finished the import and present the expected generationID, 1700 * - build the failureList. 1701 */ 1702 private void waitForRemoteEndOfInit(ImportExportContext ieCtx) 1703 { 1704 final Set<Integer> replicasWeAreWaitingFor = new HashSet<>(ieCtx.startList); 1705 1706 if (logger.isTraceEnabled()) 1707 { 1708 logger.trace("[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); 1709 } 1710 1711 /* 1712 In case some new servers appear during the init, we want them to be 1713 considered in the processing of sorting the successfully initialized 1714 and the others 1715 */ 1716 replicasWeAreWaitingFor.addAll(getReplicaInfos().keySet()); 1717 1718 boolean done; 1719 do 1720 { 1721 done = true; 1722 int reconnectMaxDelayInSec = 10; 1723 int reconnectWait = 0; 1724 Iterator<Integer> it = replicasWeAreWaitingFor.iterator(); 1725 while (it.hasNext()) 1726 { 1727 int serverId = it.next(); 1728 if (ieCtx.failureList.contains(serverId)) 1729 { 1730 /* 1731 this server has already been in error during initialization 1732 don't wait for it 1733 */ 1734 continue; 1735 } 1736 1737 DSInfo dsInfo = getConnectedRemoteDS(serverId); 1738 if (dsInfo == null) 1739 { 1740 /* 1741 this server is disconnected 1742 may be for a long time if it crashed or had been stopped 1743 may be just the time to reconnect after import : should be short 1744 */ 1745 if (++reconnectWait<reconnectMaxDelayInSec) 1746 { 1747 // let's still wait to give a chance to this server to reconnect 1748 done = false; 1749 } 1750 // Else we left enough time to the servers to reconnect 1751 } 1752 else 1753 { 1754 // this server is connected 1755 if (dsInfo.getStatus() == ServerStatus.FULL_UPDATE_STATUS) 1756 { 1757 // this one is still doing the Full Update ... retry later 1758 done = false; 1759 break; 1760 } 1761 1762 if (dsInfo.getGenerationId() == getGenerationID()) 1763 { // and with the expected generationId 1764 // We're done with this server 1765 it.remove(); 1766 } 1767 } 1768 } 1769 1770 // loop and wait 1771 if (!done) 1772 { 1773 try { Thread.sleep(1000); } 1774 catch (InterruptedException e) { 1775 Thread.currentThread().interrupt(); 1776 } // 1sec 1777 } 1778 } 1779 while (!done && !broker.shuttingDown()); // infinite wait 1780 1781 ieCtx.failureList.addAll(replicasWeAreWaitingFor); 1782 1783 if (logger.isTraceEnabled()) 1784 { 1785 logger.trace("[IE] wait for end ends with " + ieCtx.failureList); 1786 } 1787 } 1788 1789 /** 1790 * Get the ServerState maintained by the Concrete class. 1791 * 1792 * @return the ServerState maintained by the Concrete class. 1793 */ 1794 public ServerState getServerState() 1795 { 1796 return state; 1797 } 1798 1799 /** 1800 * Acquire and initialize the import/export context, verifying no other 1801 * import/export is in progress. 1802 */ 1803 private ImportExportContext acquireIEContext(boolean importInProgress) 1804 throws DirectoryException 1805 { 1806 final ImportExportContext ieCtx = new ImportExportContext(importInProgress); 1807 if (!importExportContext.compareAndSet(null, ieCtx)) 1808 { 1809 // Rejects 2 simultaneous exports 1810 LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); 1811 throw new DirectoryException(ResultCode.OTHER, message); 1812 } 1813 return ieCtx; 1814 } 1815 1816 private void releaseIEContext() 1817 { 1818 importExportContext.set(null); 1819 } 1820 1821 /** 1822 * Processes an error message received while an export is 1823 * on going, or an import will start. 1824 * 1825 * @param errorMsg The error message received. 1826 */ 1827 private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx) 1828 { 1829 //Exporting must not be stopped on the first error, if we run initialize-all 1830 if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS) 1831 { 1832 // The ErrorMsg is received while we have started an initialization 1833 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1834 ResultCode.OTHER, errorMsg.getDetails())); 1835 1836 /* 1837 * This can happen : 1838 * - on the first InitReqMsg sent when source in not known for example 1839 * - on the next attempt when source crashed and did not reconnect 1840 * even after the nextInitAttemptDelay 1841 * During the import, the ErrorMsg will be received by receiveEntryBytes 1842 */ 1843 if (ieCtx.initializeTask instanceof InitializeTask) 1844 { 1845 // Update the task that initiated the import 1846 ((InitializeTask) ieCtx.initializeTask) 1847 .updateTaskCompletionState(ieCtx.getException()); 1848 1849 releaseIEContext(); 1850 } 1851 } 1852 } 1853 1854 /** 1855 * Receives bytes related to an entry in the context of an import to 1856 * initialize the domain (called by ReplLDIFInputStream). 1857 * 1858 * @return The bytes. Null when the Done or Err message has been received 1859 */ 1860 protected byte[] receiveEntryBytes() 1861 { 1862 ReplicationMsg msg; 1863 while (true) 1864 { 1865 ImportExportContext ieCtx = importExportContext.get(); 1866 try 1867 { 1868 // In the context of the total update, we don't want any automatic 1869 // re-connection done transparently by the broker because of a better 1870 // RS or because of a connection failure. 1871 // We want to be notified of topology change in order to track a 1872 // potential disconnection of the exporter. 1873 msg = broker.receive(false, false, true); 1874 1875 if (logger.isTraceEnabled()) 1876 { 1877 logger.trace("[IE] In " 1878 + broker.getReplicationMonitorInstanceName() 1879 + ", receiveEntryBytes " + msg); 1880 } 1881 1882 if (msg == null) 1883 { 1884 if (broker.shuttingDown()) 1885 { 1886 // The server is in the shutdown process 1887 return null; 1888 } 1889 else 1890 { 1891 // Handle connection issues 1892 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1893 ResultCode.OTHER, ERR_INIT_RS_DISCONNECTION_DURING_IMPORT 1894 .get(broker.getReplicationServer()))); 1895 return null; 1896 } 1897 } 1898 1899 // Check good ordering of msg received 1900 if (msg instanceof EntryMsg) 1901 { 1902 EntryMsg entryMsg = (EntryMsg)msg; 1903 byte[] entryBytes = entryMsg.getEntryBytes(); 1904 ieCtx.updateCounters(countEntryLimits(entryBytes)); 1905 1906 if (ieCtx.exporterProtocolVersion >= 1907 ProtocolVersion.REPLICATION_PROTOCOL_V4) 1908 { 1909 // check the msgCnt of the msg received to check ordering 1910 if (++ieCtx.msgCnt != entryMsg.getMsgId()) 1911 { 1912 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1913 ResultCode.OTHER, ERR_INIT_BAD_MSG_ID_SEQ_DURING_IMPORT.get(ieCtx.msgCnt, entryMsg.getMsgId()))); 1914 return null; 1915 } 1916 1917 // send the ack of flow control mgmt 1918 if ((ieCtx.msgCnt % (ieCtx.initWindow/2)) == 0) 1919 { 1920 final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( 1921 getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt); 1922 broker.publish(amsg, false); 1923 if (logger.isTraceEnabled()) 1924 { 1925 logger.trace("[IE] In " 1926 + broker.getReplicationMonitorInstanceName() 1927 + ", publish InitializeRcvAckMsg" + amsg); 1928 } 1929 } 1930 } 1931 return entryBytes; 1932 } 1933 else if (msg instanceof DoneMsg) 1934 { 1935 /* 1936 This is the normal termination of the import 1937 No error is stored and the import is ended by returning null 1938 */ 1939 return null; 1940 } 1941 else if (msg instanceof ErrorMsg) 1942 { 1943 /* 1944 This is an error termination during the import 1945 The error is stored and the import is ended by returning null 1946 */ 1947 if (ieCtx.getException() == null) 1948 { 1949 ErrorMsg errMsg = (ErrorMsg)msg; 1950 if (errMsg.getCreationTime() > ieCtx.startTime) 1951 { 1952 ieCtx.setException( 1953 new DirectoryException(ResultCode.OTHER,errMsg.getDetails())); 1954 return null; 1955 } 1956 } 1957 } 1958 else 1959 { 1960 // Other messages received during an import are trashed except 1961 // the topologyMsg. 1962 if (msg instanceof TopologyMsg 1963 && getConnectedRemoteDS(ieCtx.importSource) == null) 1964 { 1965 LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get( 1966 getBaseDN(), getServerId(), ieCtx.importSource); 1967 ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, errMsg)); 1968 return null; 1969 } 1970 } 1971 } 1972 catch(Exception e) 1973 { 1974 ieCtx.setExceptionIfNoneSet(new DirectoryException( 1975 ResultCode.OTHER, 1976 ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); 1977 } 1978 } 1979 } 1980 1981 /** 1982 * Count the number of entries in the provided byte[]. 1983 * This is based on the hypothesis that the entries are separated 1984 * by a "\n\n" String. 1985 * 1986 * @param entryBytes the set of bytes containing one or more entries. 1987 * @return The number of entries in the provided byte[]. 1988 */ 1989 private int countEntryLimits(byte[] entryBytes) 1990 { 1991 return countEntryLimits(entryBytes, 0, entryBytes.length); 1992 } 1993 1994 /** 1995 * Count the number of entries in the provided byte[]. 1996 * This is based on the hypothesis that the entries are separated 1997 * by a "\n\n" String. 1998 * 1999 * @param entryBytes the set of bytes containing one or more entries. 2000 * @return The number of entries in the provided byte[]. 2001 */ 2002 private int countEntryLimits(byte[] entryBytes, int pos, int length) 2003 { 2004 int entryCount = 0; 2005 int count = 0; 2006 while (count<=length-2) 2007 { 2008 if (entryBytes[pos+count] == '\n' && entryBytes[pos+count+1] == '\n') 2009 { 2010 entryCount++; 2011 count++; 2012 } 2013 count++; 2014 } 2015 return entryCount; 2016 } 2017 2018 /** 2019 * Exports an entry in LDIF format. 2020 * 2021 * @param lDIFEntry The entry to be exported in byte[] form. 2022 * @param pos The starting Position in the array. 2023 * @param length Number of array elements to be copied. 2024 * 2025 * @throws IOException when an error occurred. 2026 */ 2027 void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) 2028 throws IOException 2029 { 2030 if (logger.isTraceEnabled()) 2031 { 2032 logger.trace("[IE] Entering exportLDIFEntry entry=" + Arrays.toString(lDIFEntry)); 2033 } 2034 2035 // build the message 2036 ImportExportContext ieCtx = importExportContext.get(); 2037 EntryMsg entryMessage = new EntryMsg( 2038 getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length, 2039 ++ieCtx.msgCnt); 2040 2041 // Waiting the slowest loop 2042 while (!broker.shuttingDown()) 2043 { 2044 /* 2045 If an error was raised - like receiving an ErrorMsg from a remote 2046 server that have been stored by the listener thread in the ieContext, 2047 we just abandon the export by throwing an exception. 2048 */ 2049 if (ieCtx.getException() != null) 2050 { 2051 throw new IOException(ieCtx.getException().getMessage()); 2052 } 2053 2054 int slowestServerId = ieCtx.getSlowestServer(); 2055 if (getConnectedRemoteDS(slowestServerId) == null) 2056 { 2057 ieCtx.setException(new DirectoryException(ResultCode.OTHER, 2058 ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer()))); 2059 2060 throw new IOException("IOException with nested DirectoryException", 2061 ieCtx.getException()); 2062 } 2063 2064 int ourLastExportedCnt = ieCtx.msgCnt; 2065 int slowestCnt = ieCtx.ackVals.get(slowestServerId); 2066 2067 if (logger.isTraceEnabled()) 2068 { 2069 logger.trace("[IE] Entering exportLDIFEntry waiting " + 2070 " our=" + ourLastExportedCnt + " slowest=" + slowestCnt); 2071 } 2072 2073 if (ourLastExportedCnt - slowestCnt > ieCtx.initWindow) 2074 { 2075 if (logger.isTraceEnabled()) 2076 { 2077 logger.trace("[IE] Entering exportLDIFEntry waiting"); 2078 } 2079 2080 // our export is too far beyond the slowest importer - let's wait 2081 try { Thread.sleep(100); } 2082 catch(Exception e) { /* do nothing */ } 2083 2084 // process any connection error 2085 if (broker.hasConnectionError() 2086 || broker.getNumLostConnections() != ieCtx.initNumLostConnections) 2087 { 2088 // publish failed - store the error in the ieContext ... 2089 DirectoryException de = new DirectoryException(ResultCode.OTHER, 2090 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId())); 2091 ieCtx.setExceptionIfNoneSet(de); 2092 // .. and abandon the export by throwing an exception. 2093 throw new IOException(de.getMessage()); 2094 } 2095 } 2096 else 2097 { 2098 if (logger.isTraceEnabled()) 2099 { 2100 logger.trace("[IE] slowest got to us => stop waiting"); 2101 } 2102 break; 2103 } 2104 } // Waiting the slowest loop 2105 2106 if (logger.isTraceEnabled()) 2107 { 2108 logger.trace("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry)); 2109 } 2110 2111 boolean sent = broker.publish(entryMessage, false); 2112 2113 // process any publish error 2114 if (!sent 2115 || broker.hasConnectionError() 2116 || broker.getNumLostConnections() != ieCtx.initNumLostConnections) 2117 { 2118 // publish failed - store the error in the ieContext ... 2119 DirectoryException de = new DirectoryException(ResultCode.OTHER, 2120 ERR_INIT_RS_DISCONNECTION_DURING_EXPORT.get(broker.getRsServerId())); 2121 ieCtx.setExceptionIfNoneSet(de); 2122 // .. and abandon the export by throwing an exception. 2123 throw new IOException(de.getMessage()); 2124 } 2125 2126 // publish succeeded 2127 try 2128 { 2129 ieCtx.updateCounters(countEntryLimits(lDIFEntry, pos, length)); 2130 } 2131 catch (DirectoryException de) 2132 { 2133 ieCtx.setExceptionIfNoneSet(de); 2134 // .. and abandon the export by throwing an exception. 2135 throw new IOException(de.getMessage()); 2136 } 2137 } 2138 2139 /** 2140 * Initializes asynchronously this domain from a remote source server. 2141 * Before returning from this call, for the provided task : 2142 * - the progressing counters are updated during the initialization using 2143 * setTotal() and setLeft(). 2144 * - the end of the initialization using updateTaskCompletionState(). 2145 * <p> 2146 * When this method is called, a request for initialization is sent to the 2147 * remote source server requesting initialization. 2148 * <p> 2149 * 2150 * @param source The server-id of the source from which to initialize. 2151 * The source can be discovered using the 2152 * {@link #getReplicaInfos()} method. 2153 * 2154 * @param initTask The task that launched the initialization 2155 * and should be updated of its progress. 2156 * 2157 * @throws DirectoryException If it was not possible to publish the 2158 * Initialization message to the Topology. 2159 * The task state is updated. 2160 */ 2161 public void initializeFromRemote(int source, Task initTask) 2162 throws DirectoryException 2163 { 2164 if (logger.isTraceEnabled()) 2165 { 2166 logger.trace("[IE] Entering initializeFromRemote for " + this); 2167 } 2168 2169 LocalizableMessage errMsg = !broker.isConnected() 2170 ? ERR_INITIALIZATION_FAILED_NOCONN.get(getBaseDN()) 2171 : null; 2172 2173 /* 2174 We must not test here whether the remote source is connected to 2175 the topology by testing if it stands in the replicas list since. 2176 In the case of a re-attempt of initialization, the listener thread is 2177 running this method directly coming from initialize() method and did 2178 not processed any topology message in between the failure and the 2179 new attempt. 2180 */ 2181 try 2182 { 2183 /* 2184 We must immediately acquire a context to store the task inside 2185 The context will be used when we (the listener thread) will receive 2186 the InitializeTargetMsg, process the import, and at the end 2187 update the task. 2188 */ 2189 2190 final ImportExportContext ieCtx = acquireIEContext(true); 2191 ieCtx.initializeTask = initTask; 2192 ieCtx.attemptCnt = 0; 2193 ieCtx.initReqMsgSent = new InitializeRequestMsg( 2194 getBaseDN(), getServerId(), source, getInitWindow()); 2195 broker.publish(ieCtx.initReqMsgSent); 2196 2197 /* 2198 The normal success processing is now to receive InitTargetMsg then 2199 entries from the remote server. 2200 The error cases are : 2201 - either local error immediately caught below 2202 - a remote error we will receive as an ErrorMsg 2203 */ 2204 } 2205 catch(DirectoryException de) 2206 { 2207 errMsg = de.getMessageObject(); 2208 } 2209 catch(Exception e) 2210 { 2211 // Should not happen 2212 errMsg = LocalizableMessage.raw(e.getLocalizedMessage()); 2213 logger.error(errMsg); 2214 } 2215 2216 // When error, update the task and raise the error to the caller 2217 if (errMsg != null) 2218 { 2219 // No need to call here updateTaskCompletionState - will be done 2220 // by the caller 2221 releaseIEContext(); 2222 throw new DirectoryException(ResultCode.OTHER, errMsg); 2223 } 2224 } 2225 2226 /** 2227 * Processes an InitializeTargetMsg received from a remote server 2228 * meaning processes an initialization from the entries expected to be 2229 * received now. 2230 * 2231 * @param initTargetMsgReceived The message received from the remote server. 2232 * 2233 * @param requesterServerId The serverId of the server that requested the 2234 * initialization meaning the server where the 2235 * task has initially been created (this server, 2236 * or the remote server). 2237 */ 2238 private void initialize(InitializeTargetMsg initTargetMsgReceived, int requesterServerId) 2239 { 2240 if (logger.isTraceEnabled()) 2241 { 2242 logger.trace("[IE] Entering initialize - domain=" + this); 2243 } 2244 2245 InitializeTask initFromTask = null; 2246 int source = initTargetMsgReceived.getSenderID(); 2247 ImportExportContext ieCtx = importExportContext.get(); 2248 try 2249 { 2250 // Log starting 2251 logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START, getBaseDN(), 2252 initTargetMsgReceived.getSenderID(), getServerId()); 2253 2254 // Go into full update status 2255 setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT); 2256 2257 // Acquire an import context if no already done (and initialize). 2258 if (initTargetMsgReceived.getInitiatorID() != getServerId()) 2259 { 2260 /* 2261 The initTargetMsgReceived is for an import initiated by the remote server. 2262 Test and set if no import already in progress 2263 */ 2264 ieCtx = acquireIEContext(true); 2265 } 2266 2267 // Initialize stuff 2268 ieCtx.importSource = source; 2269 ieCtx.initializeCounters(initTargetMsgReceived.getEntryCount()); 2270 ieCtx.initWindow = initTargetMsgReceived.getInitWindow(); 2271 ieCtx.exporterProtocolVersion = getProtocolVersion(source); 2272 initFromTask = (InitializeTask) ieCtx.initializeTask; 2273 2274 // Launch the import 2275 importBackend(new ReplInputStream(this)); 2276 } 2277 catch (DirectoryException e) 2278 { 2279 /* 2280 Store the exception raised. It will be considered if no other exception 2281 has been previously stored in the context 2282 */ 2283 ieCtx.setExceptionIfNoneSet(e); 2284 } 2285 finally 2286 { 2287 if (logger.isTraceEnabled()) 2288 { 2289 logger.trace("[IE] Domain=" + this 2290 + " ends import with exception=" + ieCtx.getException() 2291 + " connected=" + broker.isConnected()); 2292 } 2293 2294 /* 2295 It is necessary to restart (reconnect to RS) for different reasons 2296 - when everything went well, reconnect in order to exchange 2297 new state, new generation ID 2298 - when we have connection failure, reconnect to retry a new import 2299 right here, right now 2300 we never want retryOnFailure if we fails reconnecting in the restart. 2301 */ 2302 broker.reStart(false); 2303 2304 if (ieCtx.getException() != null 2305 && broker.isConnected() 2306 && initFromTask != null 2307 && ++ieCtx.attemptCnt < 2) 2308 { 2309 /* 2310 Worth a new attempt 2311 since initFromTask is in this server, connection is ok 2312 */ 2313 try 2314 { 2315 /* 2316 Wait for the exporter to stabilize - eventually reconnect as 2317 well if it was connected to the same RS than the one we lost ... 2318 */ 2319 Thread.sleep(1000); 2320 2321 /* 2322 Restart the whole import protocol exchange by sending again 2323 the request 2324 */ 2325 logger.info(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST, 2326 ieCtx.getException().getLocalizedMessage()); 2327 2328 broker.publish(ieCtx.initReqMsgSent); 2329 2330 ieCtx.initializeCounters(0); 2331 ieCtx.exception = null; 2332 ieCtx.msgCnt = 0; 2333 2334 // Processing of the received initTargetMsgReceived is done 2335 // let's wait for the next one 2336 return; 2337 } 2338 catch(Exception e) 2339 { 2340 /* 2341 An error occurs when sending a new request for a new import. 2342 This error is not stored, preferring to keep the initial one. 2343 */ 2344 logger.error(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST, 2345 e.getLocalizedMessage(), ieCtx.getException().getLocalizedMessage()); 2346 } 2347 } 2348 2349 // =================== 2350 // No new attempt case 2351 2352 if (logger.isTraceEnabled()) 2353 { 2354 logger.trace("[IE] Domain=" + this 2355 + " ends initialization with exception=" + ieCtx.getException() 2356 + " connected=" + broker.isConnected() 2357 + " task=" + initFromTask 2358 + " attempt=" + ieCtx.attemptCnt); 2359 } 2360 2361 try 2362 { 2363 if (broker.isConnected() && ieCtx.getException() != null) 2364 { 2365 // Let's notify the exporter 2366 ErrorMsg errorMsg = new ErrorMsg(requesterServerId, 2367 ieCtx.getException().getMessageObject()); 2368 broker.publish(errorMsg); 2369 } 2370 /* 2371 Update the task that initiated the import must be the last thing. 2372 Particularly, broker.restart() after import success must be done 2373 before some other operations/tasks to be launched, 2374 like resetting the generation ID. 2375 */ 2376 if (initFromTask != null) 2377 { 2378 initFromTask.updateTaskCompletionState(ieCtx.getException()); 2379 } 2380 } 2381 finally 2382 { 2383 String errorMsg = ieCtx.getException() != null ? ieCtx.getException().getLocalizedMessage() : ""; 2384 logger.info(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END, 2385 getBaseDN(), initTargetMsgReceived.getSenderID(), getServerId(), errorMsg); 2386 releaseIEContext(); 2387 } // finally 2388 } // finally 2389 } 2390 2391 /** 2392 * Return the protocol version of the DS related to the provided serverId. 2393 * Returns -1 when the protocol version is not known. 2394 * @param dsServerId The provided serverId. 2395 * @return The protocol version. 2396 */ 2397 private short getProtocolVersion(int dsServerId) 2398 { 2399 final DSInfo dsInfo = getReplicaInfos().get(dsServerId); 2400 if (dsInfo != null) 2401 { 2402 return dsInfo.getProtocolVersion(); 2403 } 2404 return -1; 2405 } 2406 2407 /** 2408 * Sets the status to a new value depending of the passed status machine 2409 * event. 2410 * @param event The event that may make the status be changed 2411 */ 2412 protected void signalNewStatus(StatusMachineEvent event) 2413 { 2414 setNewStatus(event); 2415 broker.signalStatusChange(status); 2416 } 2417 2418 private void setNewStatus(StatusMachineEvent event) 2419 { 2420 ServerStatus newStatus = StatusMachine.computeNewStatus(status, event); 2421 if (newStatus == ServerStatus.INVALID_STATUS) 2422 { 2423 logger.error(ERR_DS_CANNOT_CHANGE_STATUS, getBaseDN(), getServerId(), status, event); 2424 return; 2425 } 2426 2427 if (newStatus != status) 2428 { 2429 // Reset status date 2430 lastStatusChangeDate = new Date(); 2431 // Reset monitoring counters if reconnection 2432 if (newStatus == ServerStatus.NOT_CONNECTED_STATUS) 2433 { 2434 resetMonitoringCounters(); 2435 } 2436 2437 status = newStatus; 2438 if (logger.isTraceEnabled()) 2439 { 2440 logger.trace("Replication domain " + getBaseDN() 2441 + " new status is: " + status); 2442 } 2443 2444 // Perform whatever actions are needed to apply properties for being 2445 // compliant with new status 2446 updateDomainForNewStatus(); 2447 } 2448 } 2449 2450 /** 2451 * Returns a boolean indicating if an import or export is currently 2452 * processed. 2453 * 2454 * @return The status 2455 */ 2456 public boolean ieRunning() 2457 { 2458 return importExportContext.get() != null; 2459 } 2460 2461 /** 2462 * Check the value of the Replication Servers generation ID. 2463 * 2464 * @param generationID The expected value of the generation ID. 2465 * 2466 * @throws DirectoryException When the generation ID of the Replication 2467 * Servers is not the expected value. 2468 */ 2469 private void checkGenerationID(long generationID) throws DirectoryException 2470 { 2471 boolean allSet = true; 2472 2473 for (int i = 0; i< 50; i++) 2474 { 2475 allSet = true; 2476 for (RSInfo rsInfo : getRsInfos()) 2477 { 2478 // the 'empty' RSes (generationId==-1) are considered as good citizens 2479 if (rsInfo.getGenerationId() != -1 && 2480 rsInfo.getGenerationId() != generationID) 2481 { 2482 try 2483 { 2484 Thread.sleep(i*100); 2485 } catch (InterruptedException e) 2486 { 2487 Thread.currentThread().interrupt(); 2488 } 2489 allSet = false; 2490 break; 2491 } 2492 } 2493 if (allSet) 2494 { 2495 break; 2496 } 2497 } 2498 if (!allSet) 2499 { 2500 LocalizableMessage message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDN()); 2501 throw new DirectoryException(ResultCode.OTHER, message); 2502 } 2503 } 2504 2505 /** 2506 * Reset the Replication Log. 2507 * Calling this method will remove all the Replication information that 2508 * was kept on all the Replication Servers currently connected in the 2509 * topology. 2510 * 2511 * @throws DirectoryException If this ReplicationDomain is not currently 2512 * connected to a Replication Server or it 2513 * was not possible to contact it. 2514 */ 2515 void resetReplicationLog() throws DirectoryException 2516 { 2517 // Reset the Generation ID to -1 to clean the ReplicationServers. 2518 resetGenerationId(-1L); 2519 2520 // check that at least one ReplicationServer did change its generation-id 2521 checkGenerationID(-1); 2522 2523 // Reconnect to the Replication Server so that it adopts our GenerationID. 2524 restartService(); 2525 2526 // wait for the domain to reconnect. 2527 int count = 0; 2528 while (!isConnected() && count < 10) 2529 { 2530 try 2531 { 2532 Thread.sleep(100); 2533 } catch (InterruptedException e) 2534 { 2535 Thread.currentThread().interrupt(); 2536 } 2537 } 2538 2539 resetGenerationId(getGenerationID()); 2540 2541 // check that at least one ReplicationServer did change its generation-id 2542 checkGenerationID(getGenerationID()); 2543 } 2544 2545 /** 2546 * Reset the generationId of this domain in the whole topology. 2547 * A message is sent to the Replication Servers for them to reset 2548 * their change dbs. 2549 * 2550 * @param generationIdNewValue The new value of the generation Id. 2551 * @throws DirectoryException When an error occurs 2552 */ 2553 public void resetGenerationId(Long generationIdNewValue) 2554 throws DirectoryException 2555 { 2556 if (logger.isTraceEnabled()) 2557 { 2558 logger.trace("Server id " + getServerId() + " and domain " 2559 + getBaseDN() + " resetGenerationId " + generationIdNewValue); 2560 } 2561 2562 ResetGenerationIdMsg genIdMessage = 2563 new ResetGenerationIdMsg(getGenId(generationIdNewValue)); 2564 2565 if (!isConnected()) 2566 { 2567 LocalizableMessage message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDN(), 2568 getServerId(), genIdMessage.getGenerationId()); 2569 throw new DirectoryException(ResultCode.OTHER, message); 2570 } 2571 broker.publish(genIdMessage); 2572 2573 // check that at least one ReplicationServer did change its generation-id 2574 checkGenerationID(getGenId(generationIdNewValue)); 2575 } 2576 2577 private long getGenId(Long generationIdNewValue) 2578 { 2579 if (generationIdNewValue != null) 2580 { 2581 return generationIdNewValue; 2582 } 2583 return getGenerationID(); 2584 } 2585 2586 2587 /* 2588 ******** End of The total Update code ********* 2589 */ 2590 2591 /* 2592 ******* Start of Monitoring Code ********** 2593 */ 2594 2595 /** 2596 * Get the maximum receive window size. 2597 * 2598 * @return The maximum receive window size. 2599 */ 2600 int getMaxRcvWindow() 2601 { 2602 if (broker != null) 2603 { 2604 return broker.getMaxRcvWindow(); 2605 } 2606 return 0; 2607 } 2608 2609 /** 2610 * Get the current receive window size. 2611 * 2612 * @return The current receive window size. 2613 */ 2614 int getCurrentRcvWindow() 2615 { 2616 if (broker != null) 2617 { 2618 return broker.getCurrentRcvWindow(); 2619 } 2620 return 0; 2621 } 2622 2623 /** 2624 * Get the maximum send window size. 2625 * 2626 * @return The maximum send window size. 2627 */ 2628 int getMaxSendWindow() 2629 { 2630 if (broker != null) 2631 { 2632 return broker.getMaxSendWindow(); 2633 } 2634 return 0; 2635 } 2636 2637 /** 2638 * Get the current send window size. 2639 * 2640 * @return The current send window size. 2641 */ 2642 int getCurrentSendWindow() 2643 { 2644 if (broker != null) 2645 { 2646 return broker.getCurrentSendWindow(); 2647 } 2648 return 0; 2649 } 2650 2651 /** 2652 * Get the number of times the replication connection was lost. 2653 * @return The number of times the replication connection was lost. 2654 */ 2655 int getNumLostConnections() 2656 { 2657 if (broker != null) 2658 { 2659 return broker.getNumLostConnections(); 2660 } 2661 return 0; 2662 } 2663 2664 /** 2665 * Determine whether the connection to the replication server is encrypted. 2666 * @return true if the connection is encrypted, false otherwise. 2667 */ 2668 boolean isSessionEncrypted() 2669 { 2670 return broker != null && broker.isSessionEncrypted(); 2671 } 2672 2673 /** 2674 * Check if the domain is connected to a ReplicationServer. 2675 * 2676 * @return true if the server is connected, false if not. 2677 */ 2678 public boolean isConnected() 2679 { 2680 return broker != null && broker.isConnected(); 2681 } 2682 2683 /** 2684 * Check if the domain has a connection error. 2685 * A Connection error happens when the broker could not be created 2686 * or when the broker could not find any ReplicationServer to connect to. 2687 * 2688 * @return true if the domain has a connection error. 2689 */ 2690 public boolean hasConnectionError() 2691 { 2692 return broker == null || broker.hasConnectionError(); 2693 } 2694 2695 /** 2696 * Get the name of the replicationServer to which this domain is currently 2697 * connected. 2698 * 2699 * @return the name of the replicationServer to which this domain 2700 * is currently connected. 2701 */ 2702 public String getReplicationServer() 2703 { 2704 if (broker != null) 2705 { 2706 return broker.getReplicationServer(); 2707 } 2708 return ReplicationBroker.NO_CONNECTED_SERVER; 2709 } 2710 2711 /** 2712 * Gets the number of updates sent in assured safe read mode. 2713 * @return The number of updates sent in assured safe read mode. 2714 */ 2715 public int getAssuredSrSentUpdates() 2716 { 2717 return assuredSrSentUpdates.get(); 2718 } 2719 2720 /** 2721 * Gets the number of updates sent in assured safe read mode that have been 2722 * acknowledged without errors. 2723 * @return The number of updates sent in assured safe read mode that have been 2724 * acknowledged without errors. 2725 */ 2726 public int getAssuredSrAcknowledgedUpdates() 2727 { 2728 return assuredSrAcknowledgedUpdates.get(); 2729 } 2730 2731 /** 2732 * Gets the number of updates sent in assured safe read mode that have not 2733 * been acknowledged. 2734 * @return The number of updates sent in assured safe read mode that have not 2735 * been acknowledged. 2736 */ 2737 public int getAssuredSrNotAcknowledgedUpdates() 2738 { 2739 return assuredSrNotAcknowledgedUpdates.get(); 2740 } 2741 2742 /** 2743 * Gets the number of updates sent in assured safe read mode that have not 2744 * been acknowledged due to timeout error. 2745 * @return The number of updates sent in assured safe read mode that have not 2746 * been acknowledged due to timeout error. 2747 */ 2748 public int getAssuredSrTimeoutUpdates() 2749 { 2750 return assuredSrTimeoutUpdates.get(); 2751 } 2752 2753 /** 2754 * Gets the number of updates sent in assured safe read mode that have not 2755 * been acknowledged due to wrong status error. 2756 * @return The number of updates sent in assured safe read mode that have not 2757 * been acknowledged due to wrong status error. 2758 */ 2759 public int getAssuredSrWrongStatusUpdates() 2760 { 2761 return assuredSrWrongStatusUpdates.get(); 2762 } 2763 2764 /** 2765 * Gets the number of updates sent in assured safe read mode that have not 2766 * been acknowledged due to replay error. 2767 * @return The number of updates sent in assured safe read mode that have not 2768 * been acknowledged due to replay error. 2769 */ 2770 public int getAssuredSrReplayErrorUpdates() 2771 { 2772 return assuredSrReplayErrorUpdates.get(); 2773 } 2774 2775 /** 2776 * Gets the number of updates sent in assured safe read mode that have not 2777 * been acknowledged per server. 2778 * @return A copy of the map that contains the number of updates sent in 2779 * assured safe read mode that have not been acknowledged per server. 2780 */ 2781 public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates() 2782 { 2783 synchronized(assuredSrServerNotAcknowledgedUpdates) 2784 { 2785 return new HashMap<>(assuredSrServerNotAcknowledgedUpdates); 2786 } 2787 } 2788 2789 /** 2790 * Gets the number of updates received in assured safe read mode request. 2791 * @return The number of updates received in assured safe read mode request. 2792 */ 2793 public int getAssuredSrReceivedUpdates() 2794 { 2795 return assuredSrReceivedUpdates.get(); 2796 } 2797 2798 /** 2799 * Gets the number of updates received in assured safe read mode that we acked 2800 * without error (no replay error). 2801 * @return The number of updates received in assured safe read mode that we 2802 * acked without error (no replay error). 2803 */ 2804 public int getAssuredSrReceivedUpdatesAcked() 2805 { 2806 return this.assuredSrReceivedUpdatesAcked.get(); 2807 } 2808 2809 /** 2810 * Gets the number of updates received in assured safe read mode that we did 2811 * not ack due to error (replay error). 2812 * @return The number of updates received in assured safe read mode that we 2813 * did not ack due to error (replay error). 2814 */ 2815 public int getAssuredSrReceivedUpdatesNotAcked() 2816 { 2817 return this.assuredSrReceivedUpdatesNotAcked.get(); 2818 } 2819 2820 /** 2821 * Gets the number of updates sent in assured safe data mode. 2822 * @return The number of updates sent in assured safe data mode. 2823 */ 2824 public int getAssuredSdSentUpdates() 2825 { 2826 return assuredSdSentUpdates.get(); 2827 } 2828 2829 /** 2830 * Gets the number of updates sent in assured safe data mode that have been 2831 * acknowledged without errors. 2832 * @return The number of updates sent in assured safe data mode that have been 2833 * acknowledged without errors. 2834 */ 2835 public int getAssuredSdAcknowledgedUpdates() 2836 { 2837 return assuredSdAcknowledgedUpdates.get(); 2838 } 2839 2840 /** 2841 * Gets the number of updates sent in assured safe data mode that have not 2842 * been acknowledged due to timeout error. 2843 * @return The number of updates sent in assured safe data mode that have not 2844 * been acknowledged due to timeout error. 2845 */ 2846 public int getAssuredSdTimeoutUpdates() 2847 { 2848 return assuredSdTimeoutUpdates.get(); 2849 } 2850 2851 /** 2852 * Gets the number of updates sent in assured safe data mode that have not 2853 * been acknowledged due to timeout error per server. 2854 * @return A copy of the map that contains the number of updates sent in 2855 * assured safe data mode that have not been acknowledged due to timeout 2856 * error per server. 2857 */ 2858 public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates() 2859 { 2860 synchronized(assuredSdServerTimeoutUpdates) 2861 { 2862 return new HashMap<>(assuredSdServerTimeoutUpdates); 2863 } 2864 } 2865 2866 /** 2867 * Gets the date of the last status change. 2868 * @return The date of the last status change. 2869 */ 2870 public Date getLastStatusChangeDate() 2871 { 2872 return lastStatusChangeDate; 2873 } 2874 2875 /** 2876 * Resets the values of the monitoring counters. 2877 */ 2878 private void resetMonitoringCounters() 2879 { 2880 numProcessedUpdates = new AtomicInteger(0); 2881 numRcvdUpdates = new AtomicInteger(0); 2882 numSentUpdates = new AtomicInteger(0); 2883 2884 assuredSrSentUpdates = new AtomicInteger(0); 2885 assuredSrAcknowledgedUpdates = new AtomicInteger(0); 2886 assuredSrNotAcknowledgedUpdates = new AtomicInteger(0); 2887 assuredSrTimeoutUpdates = new AtomicInteger(0); 2888 assuredSrWrongStatusUpdates = new AtomicInteger(0); 2889 assuredSrReplayErrorUpdates = new AtomicInteger(0); 2890 synchronized (assuredSrServerNotAcknowledgedUpdates) 2891 { 2892 assuredSrServerNotAcknowledgedUpdates.clear(); 2893 } 2894 assuredSrReceivedUpdates = new AtomicInteger(0); 2895 assuredSrReceivedUpdatesAcked = new AtomicInteger(0); 2896 assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); 2897 assuredSdSentUpdates = new AtomicInteger(0); 2898 assuredSdAcknowledgedUpdates = new AtomicInteger(0); 2899 assuredSdTimeoutUpdates = new AtomicInteger(0); 2900 synchronized (assuredSdServerTimeoutUpdates) 2901 { 2902 assuredSdServerTimeoutUpdates.clear(); 2903 } 2904 } 2905 2906 /* 2907 ********** End of Monitoring Code ************** 2908 */ 2909 2910 /** 2911 * Start the publish mechanism of the Replication Service. After this method 2912 * has been called, the publish service can be used by calling the 2913 * {@link #publish(UpdateMsg)} method. 2914 * 2915 * @throws ConfigException 2916 * If the DirectoryServer configuration was incorrect. 2917 */ 2918 public void startPublishService() throws ConfigException 2919 { 2920 synchronized (sessionLock) 2921 { 2922 if (broker == null) 2923 { 2924 // create the broker object used to publish and receive changes 2925 broker = new ReplicationBroker( 2926 this, state, config, new ReplSessionSecurity()); 2927 broker.start(); 2928 } 2929 } 2930 } 2931 2932 /** 2933 * Starts the receiver side of the Replication Service. 2934 * <p> 2935 * After this method has been called, the Replication Service will start 2936 * calling the {@link #processUpdate(UpdateMsg)}. 2937 * <p> 2938 * This method must be called once and must be called after the 2939 * {@link #startPublishService()}. 2940 */ 2941 public void startListenService() 2942 { 2943 synchronized (sessionLock) 2944 { 2945 if (listenerThread != null) 2946 { 2947 return; 2948 } 2949 2950 final String threadName = "Replica DS(" + getServerId() + ") listener for domain \"" + getBaseDN() + "\""; 2951 2952 listenerThread = new DirectoryThread(new Runnable() 2953 { 2954 @Override 2955 public void run() 2956 { 2957 if (logger.isTraceEnabled()) 2958 { 2959 logger.trace("Replication Listener thread starting."); 2960 } 2961 2962 // Loop processing any incoming update messages. 2963 while (!listenerThread.isShutdownInitiated()) 2964 { 2965 final UpdateMsg updateMsg = receive(); 2966 if (updateMsg == null) 2967 { 2968 // The server is shutting down. 2969 listenerThread.initiateShutdown(); 2970 } 2971 else if (processUpdate(updateMsg) 2972 && updateMsg.contributesToDomainState()) 2973 { 2974 /* 2975 * Warning: in synchronous mode, no way to tell the replay of an 2976 * update went wrong Just put null in processUpdateDone so that if 2977 * assured replication is used the ack is sent without error at 2978 * replay flag. 2979 */ 2980 processUpdateDone(updateMsg, null); 2981 state.update(updateMsg.getCSN()); 2982 } 2983 } 2984 2985 if (logger.isTraceEnabled()) 2986 { 2987 logger.trace("Replication Listener thread stopping."); 2988 } 2989 } 2990 }, threadName); 2991 2992 listenerThread.start(); 2993 } 2994 } 2995 2996 /** 2997 * Temporarily disable the Replication Service. 2998 * The Replication Service can be enabled again using 2999 * {@link #enableService()}. 3000 * <p> 3001 * It can be useful to disable the Replication Service when the 3002 * repository where the replicated information is stored becomes 3003 * temporarily unavailable and replicated updates can therefore not 3004 * be replayed during a while. This method is not MT safe. 3005 */ 3006 public void disableService() 3007 { 3008 synchronized (sessionLock) 3009 { 3010 /* 3011 Stop the broker first in order to prevent the listener from 3012 reconnecting - see OPENDJ-457. 3013 */ 3014 if (broker != null) 3015 { 3016 broker.stop(); 3017 } 3018 3019 // Stop the listener thread 3020 if (listenerThread != null) 3021 { 3022 listenerThread.initiateShutdown(); 3023 try 3024 { 3025 listenerThread.join(); 3026 } 3027 catch (InterruptedException e) 3028 { 3029 // Give up waiting. 3030 } 3031 listenerThread = null; 3032 } 3033 } 3034 } 3035 3036 /** 3037 * Returns {@code true} if the listener thread is shutting down or has 3038 * shutdown. 3039 * 3040 * @return {@code true} if the listener thread is shutting down or has 3041 * shutdown. 3042 */ 3043 protected final boolean isListenerShuttingDown() 3044 { 3045 final DirectoryThread tmp = listenerThread; 3046 return tmp == null || tmp.isShutdownInitiated(); 3047 } 3048 3049 /** 3050 * Restart the Replication service after a {@link #disableService()}. 3051 * <p> 3052 * The Replication Service will restart from the point indicated by the 3053 * {@link ServerState} that was given as a parameter to the 3054 * {@link #startPublishService()} at startup time. 3055 * <p> 3056 * If some data have changed in the repository during the period of time when 3057 * the Replication Service was disabled, this {@link ServerState} should 3058 * therefore be updated by the Replication Domain subclass before calling this 3059 * method. This method is not MT safe. 3060 */ 3061 public void enableService() 3062 { 3063 synchronized (sessionLock) 3064 { 3065 broker.start(); 3066 startListenService(); 3067 } 3068 } 3069 3070 /** 3071 * Change some ReplicationDomain parameters. 3072 * 3073 * @param config 3074 * The new configuration that this domain should now use. 3075 */ 3076 protected void changeConfig(ReplicationDomainCfg config) 3077 { 3078 if (broker != null && broker.changeConfig(config)) 3079 { 3080 restartService(); 3081 } 3082 } 3083 3084 /** 3085 * Applies a configuration change to the attributes which should be included 3086 * in the ECL. 3087 * 3088 * @param includeAttributes 3089 * attributes to be included with all change records. 3090 * @param includeAttributesForDeletes 3091 * additional attributes to be included with delete change records. 3092 */ 3093 public void changeConfig(Set<String> includeAttributes, 3094 Set<String> includeAttributesForDeletes) 3095 { 3096 final boolean attrsModified = setEclIncludes( 3097 getServerId(), includeAttributes, includeAttributesForDeletes); 3098 if (attrsModified && broker != null) 3099 { 3100 restartService(); 3101 } 3102 } 3103 3104 private void restartService() 3105 { 3106 disableService(); 3107 enableService(); 3108 } 3109 3110 /** 3111 * This method should trigger an export of the replicated data. 3112 * to the provided outputStream. 3113 * When finished the outputStream should be flushed and closed. 3114 * 3115 * @param output The OutputStream where the export should 3116 * be produced. 3117 * @throws DirectoryException When needed. 3118 */ 3119 protected abstract void exportBackend(OutputStream output) 3120 throws DirectoryException; 3121 3122 /** 3123 * This method should trigger an import of the replicated data. 3124 * 3125 * @param input The InputStream from which 3126 * the import should be reading entries. 3127 * 3128 * @throws DirectoryException When needed. 3129 */ 3130 protected abstract void importBackend(InputStream input) 3131 throws DirectoryException; 3132 3133 /** 3134 * This method should return the total number of objects in the 3135 * replicated domain. 3136 * This count will be used for reporting. 3137 * 3138 * @throws DirectoryException when needed. 3139 * 3140 * @return The number of objects in the replication domain. 3141 */ 3142 public abstract long countEntries() throws DirectoryException; 3143 3144 3145 3146 /** 3147 * This method should handle the processing of {@link UpdateMsg} receive from 3148 * remote replication entities. 3149 * <p> 3150 * This method will be called by a single thread and should therefore should 3151 * not be blocking. 3152 * 3153 * @param updateMsg 3154 * The {@link UpdateMsg} that was received. 3155 * @return A boolean indicating if the processing is completed at return time. 3156 * If <code> true </code> is returned, no further processing is 3157 * necessary. If <code> false </code> is returned, the subclass should 3158 * call the method {@link #processUpdateDone(UpdateMsg, String)} and 3159 * update the ServerState When this processing is complete. 3160 */ 3161 public abstract boolean processUpdate(UpdateMsg updateMsg); 3162 3163 /** 3164 * This method must be called after each call to 3165 * {@link #processUpdate(UpdateMsg)} when the processing of the 3166 * update is completed. 3167 * <p> 3168 * It is useful for implementation needing to process the update in an 3169 * asynchronous way or using several threads, but must be called even by 3170 * implementation doing it in a synchronous, single-threaded way. 3171 * 3172 * @param msg 3173 * The UpdateMsg whose processing was completed. 3174 * @param replayErrorMsg 3175 * if not null, this means an error occurred during the replay of 3176 * this update, and this is the matching human readable message 3177 * describing the problem. 3178 */ 3179 protected void processUpdateDone(UpdateMsg msg, String replayErrorMsg) 3180 { 3181 broker.updateWindowAfterReplay(); 3182 3183 /* 3184 Send an ack if it was requested and the group id is the same of the RS 3185 one. Only Safe Read mode makes sense in DS for returning an ack. 3186 */ 3187 // Assured feature is supported starting from replication protocol V2 3188 if (msg.isAssured() 3189 && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2) 3190 { 3191 if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) 3192 { 3193 if (broker.getRsGroupId() == getGroupId()) 3194 { 3195 // Send the ack 3196 AckMsg ackMsg = new AckMsg(msg.getCSN()); 3197 if (replayErrorMsg != null) 3198 { 3199 // Mark the error in the ack 3200 // -> replay error occurred 3201 ackMsg.setHasReplayError(true); 3202 // -> replay error occurred in our server 3203 ackMsg.setFailedServers(newArrayList(getServerId())); 3204 } 3205 broker.publish(ackMsg); 3206 if (replayErrorMsg != null) 3207 { 3208 assuredSrReceivedUpdatesNotAcked.incrementAndGet(); 3209 } 3210 else 3211 { 3212 assuredSrReceivedUpdatesAcked.incrementAndGet(); 3213 } 3214 } 3215 } 3216 else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE) 3217 { 3218 logger.error(ERR_DS_UNKNOWN_ASSURED_MODE, getServerId(), msg.getAssuredMode(), getBaseDN(), msg); 3219 } 3220 // Nothing to do in Assured safe data mode, only RS ack updates. 3221 } 3222 3223 incProcessedUpdates(); 3224 } 3225 3226 /** 3227 * Prepare a message if it is to be sent in assured mode. 3228 * If the assured mode is enabled, this method should be called before 3229 * publish(UpdateMsg msg) method. This will configure the update accordingly 3230 * before it is sent and will prepare the mechanism that will block until the 3231 * matching ack is received. To wait for the ack after publish call, use 3232 * the waitForAckIfAssuredEnabled() method. 3233 * The expected typical usage in a service inheriting from this class is 3234 * the following sequence: 3235 * UpdateMsg msg = xxx; 3236 * prepareWaitForAckIfAssuredEnabled(msg); 3237 * publish(msg); 3238 * waitForAckIfAssuredEnabled(msg); 3239 * 3240 * Note: prepareWaitForAckIfAssuredEnabled and waitForAckIfAssuredEnabled have 3241 * no effect if assured replication is disabled. 3242 * Note: this mechanism should not be used if using publish(byte[] msg) 3243 * version as usage of these methods is already hidden inside. 3244 * 3245 * @param msg The update message to be sent soon. 3246 */ 3247 protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg) 3248 { 3249 /* 3250 * If assured configured, set message accordingly to request an ack in the 3251 * right assured mode. 3252 * No ack requested for a RS with a different group id. 3253 * Assured replication supported for the same locality, 3254 * i.e: a topology working in the same geographical location). 3255 * If we are connected to a RS which is not in our locality, 3256 * no need to ask for an ack. 3257 */ 3258 if (needsAck()) 3259 { 3260 msg.setAssured(true); 3261 msg.setAssuredMode(getAssuredMode()); 3262 if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE) 3263 { 3264 msg.setSafeDataLevel(getAssuredSdLevel()); 3265 } 3266 3267 // Add the assured message to the list of update that are waiting for acks 3268 waitingAckMsgs.put(msg.getCSN(), msg); 3269 } 3270 } 3271 3272 private boolean needsAck() 3273 { 3274 return isAssured() && broker.getRsGroupId() == getGroupId(); 3275 } 3276 3277 /** 3278 * Wait for the processing of an assured message after it has been sent, if 3279 * assured replication is configured, otherwise, do nothing. 3280 * The prepareWaitForAckIfAssuredEnabled method should have been called 3281 * before, see its comment for the full picture. 3282 * 3283 * @param msg The UpdateMsg for which we are waiting for an ack. 3284 * @throws TimeoutException When the configured timeout occurs waiting for the 3285 * ack. 3286 */ 3287 protected void waitForAckIfAssuredEnabled(UpdateMsg msg) 3288 throws TimeoutException 3289 { 3290 if (needsAck()) 3291 { 3292 // Increment assured replication monitoring counters 3293 switch (getAssuredMode()) 3294 { 3295 case SAFE_READ_MODE: 3296 assuredSrSentUpdates.incrementAndGet(); 3297 break; 3298 case SAFE_DATA_MODE: 3299 assuredSdSentUpdates.incrementAndGet(); 3300 break; 3301 default: 3302 // Should not happen 3303 } 3304 } else 3305 { 3306 // Not assured or bad group id, return immediately 3307 return; 3308 } 3309 3310 // Wait for the ack to be received, timing out if necessary 3311 long startTime = System.currentTimeMillis(); 3312 synchronized (msg) 3313 { 3314 CSN csn = msg.getCSN(); 3315 while (waitingAckMsgs.containsKey(csn)) 3316 { 3317 try 3318 { 3319 /* 3320 WARNING: this timeout may be difficult to optimize: too low, it 3321 may use too much CPU, too high, it may penalize performance... 3322 */ 3323 msg.wait(10); 3324 } catch (InterruptedException e) 3325 { 3326 if (logger.isTraceEnabled()) 3327 { 3328 logger.trace("waitForAck method interrupted for replication " + 3329 "baseDN: " + getBaseDN()); 3330 } 3331 break; 3332 } 3333 // Timeout ? 3334 if (System.currentTimeMillis() - startTime >= getAssuredTimeout()) 3335 { 3336 /* 3337 Timeout occurred, be sure that ack is not being received and if so, 3338 remove the update from the wait list, log the timeout error and 3339 also update assured monitoring counters 3340 */ 3341 final UpdateMsg update = waitingAckMsgs.remove(csn); 3342 if (update == null) 3343 { 3344 // Ack received just before timeout limit: we can exit 3345 break; 3346 } 3347 3348 // No luck, this is a real timeout 3349 // Increment assured replication monitoring counters 3350 switch (msg.getAssuredMode()) 3351 { 3352 case SAFE_READ_MODE: 3353 assuredSrNotAcknowledgedUpdates.incrementAndGet(); 3354 assuredSrTimeoutUpdates.incrementAndGet(); 3355 // Increment number of errors for our RS 3356 updateAssuredErrorsByServer(assuredSrServerNotAcknowledgedUpdates, 3357 broker.getRsServerId()); 3358 break; 3359 case SAFE_DATA_MODE: 3360 assuredSdTimeoutUpdates.incrementAndGet(); 3361 // Increment number of errors for our RS 3362 updateAssuredErrorsByServer(assuredSdServerTimeoutUpdates, 3363 broker.getRsServerId()); 3364 break; 3365 default: 3366 // Should not happen 3367 } 3368 3369 throw new TimeoutException("No ack received for message csn: " + csn 3370 + " and replication domain: " + getBaseDN() + " after " 3371 + getAssuredTimeout() + " ms."); 3372 } 3373 } 3374 } 3375 } 3376 3377 /** 3378 * Publish an {@link UpdateMsg} to the Replication Service. 3379 * <p> 3380 * The Replication Service will handle the delivery of this {@link UpdateMsg} 3381 * to all the participants of this Replication Domain. These members will be 3382 * receive this {@link UpdateMsg} through a call of the 3383 * {@link #processUpdate(UpdateMsg)} message. 3384 * 3385 * @param msg The UpdateMsg that should be published. 3386 */ 3387 public void publish(UpdateMsg msg) 3388 { 3389 broker.publish(msg); 3390 if (msg.contributesToDomainState()) 3391 { 3392 state.update(msg.getCSN()); 3393 } 3394 numSentUpdates.incrementAndGet(); 3395 } 3396 3397 /** 3398 * Publishes a replica offline message if all pending changes for current 3399 * replica have been sent out. 3400 */ 3401 public void publishReplicaOfflineMsg() 3402 { 3403 // Here to be overridden 3404 } 3405 3406 /** 3407 * This method should return the generationID to use for this 3408 * ReplicationDomain. 3409 * This method can be called at any time after the ReplicationDomain 3410 * has been started. 3411 * 3412 * @return The GenerationID. 3413 */ 3414 public long getGenerationID() 3415 { 3416 return generationId; 3417 } 3418 3419 /** 3420 * Sets the generationId for this replication domain. 3421 * 3422 * @param generationId 3423 * the generationId to set 3424 */ 3425 public void setGenerationID(long generationId) 3426 { 3427 this.generationId = generationId; 3428 } 3429 3430 /** 3431 * Subclasses should use this method to add additional monitoring information 3432 * in the ReplicationDomain. 3433 * 3434 * @return Additional monitoring attributes that will be added in the 3435 * ReplicationDomain monitoring entry. 3436 */ 3437 public Collection<Attribute> getAdditionalMonitoring() 3438 { 3439 return new ArrayList<>(); 3440 } 3441 3442 /** 3443 * Returns the Import/Export context associated to this ReplicationDomain. 3444 * 3445 * @return the Import/Export context associated to this ReplicationDomain 3446 */ 3447 protected ImportExportContext getImportExportContext() 3448 { 3449 return importExportContext.get(); 3450 } 3451 3452 /** 3453 * Returns the local address of this replication domain, or the empty string 3454 * if it is not yet connected. 3455 * 3456 * @return The local address. 3457 */ 3458 String getLocalUrl() 3459 { 3460 final ReplicationBroker tmp = broker; 3461 return tmp != null ? tmp.getLocalUrl() : ""; 3462 } 3463 3464 /** 3465 * Set the attributes configured on a server to be included in the ECL. 3466 * 3467 * @param serverId 3468 * Server where these attributes are configured. 3469 * @param includeAttributes 3470 * Attributes to be included with all change records, may include 3471 * wild-cards. 3472 * @param includeAttributesForDeletes 3473 * Additional attributes to be included with delete change records, 3474 * may include wild-cards. 3475 * @return {@code true} if the set of attributes was modified. 3476 */ 3477 public boolean setEclIncludes(int serverId, 3478 Set<String> includeAttributes, 3479 Set<String> includeAttributesForDeletes) 3480 { 3481 ECLIncludes current; 3482 ECLIncludes updated; 3483 do 3484 { 3485 current = this.eclIncludes.get(); 3486 updated = current.addIncludedAttributes( 3487 serverId, includeAttributes, includeAttributesForDeletes); 3488 } 3489 while (!this.eclIncludes.compareAndSet(current, updated)); 3490 return current != updated; 3491 } 3492 3493 3494 3495 /** 3496 * Get the attributes to include in each change for the ECL. 3497 * 3498 * @return The attributes to include in each change for the ECL. 3499 */ 3500 public Set<String> getEclIncludes() 3501 { 3502 return eclIncludes.get().includedAttrsAllServers; 3503 } 3504 3505 3506 3507 /** 3508 * Get the attributes to include in each delete change for the ECL. 3509 * 3510 * @return The attributes to include in each delete change for the ECL. 3511 */ 3512 public Set<String> getEclIncludesForDeletes() 3513 { 3514 return eclIncludes.get().includedAttrsForDeletesAllServers; 3515 } 3516 3517 3518 3519 /** 3520 * Get the attributes to include in each change for the ECL for a given 3521 * serverId. 3522 * 3523 * @param serverId 3524 * The serverId for which we want the include attributes. 3525 * @return The attributes. 3526 */ 3527 Set<String> getEclIncludes(int serverId) 3528 { 3529 return eclIncludes.get().includedAttrsByServer.get(serverId); 3530 } 3531 3532 3533 3534 /** 3535 * Get the attributes to include in each change for the ECL for a given 3536 * serverId. 3537 * 3538 * @param serverId 3539 * The serverId for which we want the include attributes. 3540 * @return The attributes. 3541 */ 3542 Set<String> getEclIncludesForDeletes(int serverId) 3543 { 3544 return eclIncludes.get().includedAttrsForDeletesByServer.get(serverId); 3545 } 3546 3547 /** 3548 * Returns the CSN of the last Change that was fully processed by this 3549 * ReplicationDomain. 3550 * 3551 * @return The CSN of the last Change that was fully processed by this 3552 * ReplicationDomain. 3553 */ 3554 public CSN getLastLocalChange() 3555 { 3556 return state.getCSN(getServerId()); 3557 } 3558 3559 /** 3560 * Gets and stores the assured replication configuration parameters. Returns a 3561 * boolean indicating if the passed configuration has changed compared to 3562 * previous values and the changes require a reconnection. 3563 * 3564 * @param config 3565 * The configuration object 3566 * @param allowReconnection 3567 * Tells if one must reconnect if significant changes occurred 3568 */ 3569 protected void readAssuredConfig(ReplicationDomainCfg config, 3570 boolean allowReconnection) 3571 { 3572 // Disconnect if required: changing configuration values before 3573 // disconnection would make assured replication used immediately and 3574 // disconnection could cause some timeouts error. 3575 if (needReconnection(config) && allowReconnection) 3576 { 3577 disableService(); 3578 3579 assuredConfig = config; 3580 3581 enableService(); 3582 } 3583 } 3584 3585 private boolean needReconnection(ReplicationDomainCfg cfg) 3586 { 3587 final AssuredMode assuredMode = getAssuredMode(); 3588 switch (cfg.getAssuredType()) 3589 { 3590 case NOT_ASSURED: 3591 if (isAssured()) 3592 { 3593 return true; 3594 } 3595 break; 3596 case SAFE_DATA: 3597 if (!isAssured() || assuredMode == SAFE_READ_MODE) 3598 { 3599 return true; 3600 } 3601 break; 3602 case SAFE_READ: 3603 if (!isAssured() || assuredMode == SAFE_DATA_MODE) 3604 { 3605 return true; 3606 } 3607 break; 3608 } 3609 3610 return isAssured() 3611 && assuredMode == SAFE_DATA_MODE 3612 && cfg.getAssuredSdLevel() != getAssuredSdLevel(); 3613 } 3614 3615 /** {@inheritDoc} */ 3616 @Override 3617 public String toString() 3618 { 3619 return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId(); 3620 } 3621}