001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2016 ForgeRock AS 026 */ 027package org.opends.server.replication.plugin; 028 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; 031import static org.opends.server.util.ServerConstants.*; 032import static org.opends.server.util.StaticUtils.*; 033 034import java.util.ArrayList; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.BlockingQueue; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.LinkedBlockingQueue; 043import java.util.concurrent.atomic.AtomicReference; 044 045import org.forgerock.i18n.LocalizableMessage; 046import org.forgerock.i18n.slf4j.LocalizedLogger; 047import org.forgerock.opendj.config.server.ConfigChangeResult; 048import org.forgerock.opendj.config.server.ConfigException; 049import org.forgerock.opendj.ldap.ResultCode; 050import org.opends.server.admin.server.ConfigurationAddListener; 051import org.opends.server.admin.server.ConfigurationChangeListener; 052import org.opends.server.admin.server.ConfigurationDeleteListener; 053import org.opends.server.admin.std.server.ReplicationDomainCfg; 054import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; 055import org.opends.server.api.Backend; 056import org.opends.server.api.BackupTaskListener; 057import org.opends.server.api.ExportTaskListener; 058import org.opends.server.api.ImportTaskListener; 059import org.opends.server.api.RestoreTaskListener; 060import org.opends.server.api.SynchronizationProvider; 061import org.opends.server.core.DirectoryServer; 062import org.opends.server.replication.service.DSRSShutdownSync; 063import org.opends.server.types.BackupConfig; 064import org.opends.server.types.Control; 065import org.opends.server.types.DN; 066import org.opends.server.types.DirectoryException; 067import org.opends.server.types.Entry; 068import org.opends.server.types.LDIFExportConfig; 069import org.opends.server.types.LDIFImportConfig; 070import org.opends.server.types.Modification; 071import org.opends.server.types.Operation; 072import org.opends.server.types.RestoreConfig; 073import org.opends.server.types.SynchronizationProviderResult; 074import org.opends.server.types.operation.PluginOperation; 075import org.opends.server.types.operation.PostOperationAddOperation; 076import org.opends.server.types.operation.PostOperationDeleteOperation; 077import org.opends.server.types.operation.PostOperationModifyDNOperation; 078import org.opends.server.types.operation.PostOperationModifyOperation; 079import org.opends.server.types.operation.PostOperationOperation; 080import org.opends.server.types.operation.PreOperationAddOperation; 081import org.opends.server.types.operation.PreOperationDeleteOperation; 082import org.opends.server.types.operation.PreOperationModifyDNOperation; 083import org.opends.server.types.operation.PreOperationModifyOperation; 084import org.opends.server.util.Platform; 085 086/** 087 * This class is used to load the Replication code inside the JVM 088 * and to trigger initialization of the replication. 089 * 090 * It also extends the SynchronizationProvider class in order to have some 091 * replication code running during the operation process 092 * as pre-op, conflictResolution, and post-op. 093 */ 094public class MultimasterReplication 095 extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> 096 implements ConfigurationAddListener<ReplicationDomainCfg>, 097 ConfigurationDeleteListener<ReplicationDomainCfg>, 098 ConfigurationChangeListener 099 <ReplicationSynchronizationProviderCfg>, 100 BackupTaskListener, RestoreTaskListener, ImportTaskListener, 101 ExportTaskListener 102{ 103 104 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 105 106 private ReplicationServerListener replicationServerListener; 107 private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<>(4); 108 private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync(); 109 /** The queue of received update messages, to be treated by the ReplayThread threads. */ 110 private static final BlockingQueue<UpdateToReplay> updateToReplayQueue = new LinkedBlockingQueue<>(10000); 111 /** The list of ReplayThread threads. */ 112 private static final List<ReplayThread> replayThreads = new ArrayList<>(); 113 /** The configurable number of replay threads. */ 114 private static int replayThreadNumber = 10; 115 116 /** Enum that symbolizes the state of the multimaster replication. */ 117 private static enum State 118 { 119 STARTING, RUNNING, STOPPING 120 } 121 122 private static final AtomicReference<State> state = new AtomicReference<>(State.STARTING); 123 124 /** The configurable connection/handshake timeout. */ 125 private static volatile int connectionTimeoutMS = 5000; 126 127 /** 128 * Finds the domain for a given DN. 129 * 130 * @param dn The DN for which the domain must be returned. 131 * @param pluginOp An optional operation for which the check is done. 132 * Can be null is the request has no associated operation. 133 * @return The domain for this DN. 134 */ 135 public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp) 136 { 137 /* 138 * Don't run the special replication code on Operation that are 139 * specifically marked as don't synchronize. 140 */ 141 if (pluginOp instanceof Operation) 142 { 143 final Operation op = (Operation) pluginOp; 144 if (op.dontSynchronize()) 145 { 146 return null; 147 } 148 149 /* 150 * Check if the provided operation is a repair operation and set the 151 * synchronization flags if necessary. 152 * The repair operations are tagged as synchronization operations so 153 * that the core server let the operation modify the entryuuid and 154 * ds-sync-hist attributes. 155 * They are also tagged as dontSynchronize so that the replication code 156 * running later do not generate CSN, solve conflicts and forward the 157 * operation to the replication server. 158 */ 159 final List<Control> controls = op.getRequestControls(); 160 for (Iterator<Control> iter = controls.iterator(); iter.hasNext();) 161 { 162 Control c = iter.next(); 163 if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID())) 164 { 165 op.setSynchronizationOperation(true); 166 op.setDontSynchronize(true); 167 /* 168 remove this control from the list of controls since it has now been 169 processed and the local backend will fail if it finds a control that 170 it does not know about and that is marked as critical. 171 */ 172 iter.remove(); 173 return null; 174 } 175 } 176 } 177 178 179 LDAPReplicationDomain domain = null; 180 DN temp = dn; 181 while (domain == null && temp != null) 182 { 183 domain = domains.get(temp); 184 temp = temp.getParentDNInSuffix(); 185 } 186 187 return domain; 188 } 189 190 /** 191 * Creates a new domain from its configEntry, do the 192 * necessary initialization and starts it so that it is 193 * fully operational when this method returns. 194 * @param configuration The entry with the configuration of this domain. 195 * @return The domain created. 196 * @throws ConfigException When the configuration is not valid. 197 */ 198 public static LDAPReplicationDomain createNewDomain( 199 ReplicationDomainCfg configuration) 200 throws ConfigException 201 { 202 try 203 { 204 final LDAPReplicationDomain domain = new LDAPReplicationDomain( 205 configuration, updateToReplayQueue, dsrsShutdownSync); 206 if (domains.isEmpty()) 207 { 208 // Create the threads that will process incoming update messages 209 createReplayThreads(); 210 } 211 212 domains.put(domain.getBaseDN(), domain); 213 return domain; 214 } 215 catch (ConfigException e) 216 { 217 logger.error(ERR_COULD_NOT_START_REPLICATION, configuration.dn(), 218 e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)); 219 } 220 return null; 221 } 222 223 /** 224 * Creates a new domain from its configEntry, do the necessary initialization 225 * and starts it so that it is fully operational when this method returns. It 226 * is only used for tests so far. 227 * 228 * @param configuration The entry with the configuration of this domain. 229 * @param queue The BlockingQueue that this domain will use. 230 * 231 * @return The domain created. 232 * 233 * @throws ConfigException When the configuration is not valid. 234 */ 235 static LDAPReplicationDomain createNewDomain( 236 ReplicationDomainCfg configuration, 237 BlockingQueue<UpdateToReplay> queue) 238 throws ConfigException 239 { 240 final LDAPReplicationDomain domain = 241 new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync); 242 domains.put(domain.getBaseDN(), domain); 243 return domain; 244 } 245 246 /** 247 * Deletes a domain. 248 * @param dn : the base DN of the domain to delete. 249 */ 250 public static void deleteDomain(DN dn) 251 { 252 LDAPReplicationDomain domain = domains.remove(dn); 253 if (domain != null) 254 { 255 domain.delete(); 256 } 257 258 // No replay threads running if no replication need 259 if (domains.isEmpty()) { 260 stopReplayThreads(); 261 } 262 } 263 264 /** {@inheritDoc} */ 265 @Override 266 public void initializeSynchronizationProvider( 267 ReplicationSynchronizationProviderCfg cfg) throws ConfigException 268 { 269 domains.clear(); 270 replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync); 271 272 // Register as an add and delete listener with the root configuration so we 273 // can be notified if Multimaster domain entries are added or removed. 274 cfg.addReplicationDomainAddListener(this); 275 cfg.addReplicationDomainDeleteListener(this); 276 277 // Register as a root configuration listener so that we can be notified if 278 // number of replay threads is changed and apply changes. 279 cfg.addReplicationChangeListener(this); 280 281 replayThreadNumber = getNumberOfReplayThreadsOrDefault(cfg); 282 connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE); 283 284 // Create the list of domains that are already defined. 285 for (String name : cfg.listReplicationDomains()) 286 { 287 createNewDomain(cfg.getReplicationDomain(name)); 288 } 289 290 // If any schema changes were made with the server offline, then handle them now. 291 List<Modification> offlineSchemaChanges = 292 DirectoryServer.getOfflineSchemaChanges(); 293 if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty()) 294 { 295 processSchemaChange(offlineSchemaChanges); 296 } 297 298 DirectoryServer.registerBackupTaskListener(this); 299 DirectoryServer.registerRestoreTaskListener(this); 300 DirectoryServer.registerExportTaskListener(this); 301 DirectoryServer.registerImportTaskListener(this); 302 303 DirectoryServer.registerSupportedControl( 304 ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL); 305 } 306 307 private int getNumberOfReplayThreadsOrDefault(ReplicationSynchronizationProviderCfg cfg) 308 { 309 Integer value = cfg.getNumUpdateReplayThreads(); 310 return value == null ? Platform.computeNumberOfThreads(16, 2.0f) : value; 311 } 312 313 /** 314 * Create the threads that will wait for incoming update messages. 315 */ 316 private static synchronized void createReplayThreads() 317 { 318 replayThreads.clear(); 319 320 for (int i = 0; i < replayThreadNumber; i++) 321 { 322 ReplayThread replayThread = new ReplayThread(updateToReplayQueue); 323 replayThread.start(); 324 replayThreads.add(replayThread); 325 } 326 } 327 328 /** 329 * Stop the threads that are waiting for incoming update messages. 330 */ 331 private static synchronized void stopReplayThreads() 332 { 333 // stop the replay threads 334 for (ReplayThread replayThread : replayThreads) 335 { 336 replayThread.shutdown(); 337 } 338 339 for (ReplayThread replayThread : replayThreads) 340 { 341 try 342 { 343 replayThread.join(); 344 } 345 catch(InterruptedException e) 346 { 347 Thread.currentThread().interrupt(); 348 } 349 } 350 replayThreads.clear(); 351 } 352 353 /** {@inheritDoc} */ 354 @Override 355 public boolean isConfigurationAddAcceptable( 356 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 357 { 358 return LDAPReplicationDomain.isConfigurationAcceptable( 359 configuration, unacceptableReasons); 360 } 361 362 /** {@inheritDoc} */ 363 @Override 364 public ConfigChangeResult applyConfigurationAdd( 365 ReplicationDomainCfg configuration) 366 { 367 ConfigChangeResult ccr = new ConfigChangeResult(); 368 try 369 { 370 LDAPReplicationDomain rd = createNewDomain(configuration); 371 if (State.RUNNING.equals(state.get())) 372 { 373 rd.start(); 374 if (State.STOPPING.equals(state.get())) { 375 rd.shutdown(); 376 } 377 } 378 } catch (ConfigException e) 379 { 380 // we should never get to this point because the configEntry has 381 // already been validated in isConfigurationAddAcceptable() 382 ccr.setResultCode(ResultCode.CONSTRAINT_VIOLATION); 383 } 384 return ccr; 385 } 386 387 /** {@inheritDoc} */ 388 @Override 389 public void doPostOperation(PostOperationAddOperation addOperation) 390 { 391 DN dn = addOperation.getEntryDN(); 392 genericPostOperation(addOperation, dn); 393 } 394 395 396 /** {@inheritDoc} */ 397 @Override 398 public void doPostOperation(PostOperationDeleteOperation deleteOperation) 399 { 400 DN dn = deleteOperation.getEntryDN(); 401 genericPostOperation(deleteOperation, dn); 402 } 403 404 /** {@inheritDoc} */ 405 @Override 406 public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation) 407 { 408 DN dn = modifyDNOperation.getEntryDN(); 409 genericPostOperation(modifyDNOperation, dn); 410 } 411 412 /** {@inheritDoc} */ 413 @Override 414 public void doPostOperation(PostOperationModifyOperation modifyOperation) 415 { 416 DN dn = modifyOperation.getEntryDN(); 417 genericPostOperation(modifyOperation, dn); 418 } 419 420 /** {@inheritDoc} */ 421 @Override 422 public SynchronizationProviderResult handleConflictResolution( 423 PreOperationModifyOperation modifyOperation) 424 { 425 LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation); 426 if (domain != null) 427 { 428 return domain.handleConflictResolution(modifyOperation); 429 } 430 return new SynchronizationProviderResult.ContinueProcessing(); 431 } 432 433 /** {@inheritDoc} */ 434 @Override 435 public SynchronizationProviderResult handleConflictResolution( 436 PreOperationAddOperation addOperation) throws DirectoryException 437 { 438 LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); 439 if (domain != null) 440 { 441 return domain.handleConflictResolution(addOperation); 442 } 443 return new SynchronizationProviderResult.ContinueProcessing(); 444 } 445 446 /** {@inheritDoc} */ 447 @Override 448 public SynchronizationProviderResult handleConflictResolution( 449 PreOperationDeleteOperation deleteOperation) throws DirectoryException 450 { 451 LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation); 452 if (domain != null) 453 { 454 return domain.handleConflictResolution(deleteOperation); 455 } 456 return new SynchronizationProviderResult.ContinueProcessing(); 457 } 458 459 /** {@inheritDoc} */ 460 @Override 461 public SynchronizationProviderResult handleConflictResolution( 462 PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException 463 { 464 LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); 465 if (domain != null) 466 { 467 return domain.handleConflictResolution(modifyDNOperation); 468 } 469 return new SynchronizationProviderResult.ContinueProcessing(); 470 } 471 472 /** {@inheritDoc} */ 473 @Override 474 public SynchronizationProviderResult 475 doPreOperation(PreOperationModifyOperation modifyOperation) 476 { 477 DN operationDN = modifyOperation.getEntryDN(); 478 LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation); 479 480 if (domain == null || !domain.solveConflict()) 481 { 482 return new SynchronizationProviderResult.ContinueProcessing(); 483 } 484 485 EntryHistorical historicalInformation = (EntryHistorical) 486 modifyOperation.getAttachment(EntryHistorical.HISTORICAL); 487 if (historicalInformation == null) 488 { 489 Entry entry = modifyOperation.getModifiedEntry(); 490 historicalInformation = EntryHistorical.newInstanceFromEntry(entry); 491 modifyOperation.setAttachment(EntryHistorical.HISTORICAL, 492 historicalInformation); 493 } 494 historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); 495 historicalInformation.setHistoricalAttrToOperation(modifyOperation); 496 497 if (modifyOperation.getModifications().isEmpty()) 498 { 499 /* 500 * This operation becomes a no-op due to conflict resolution 501 * stop the processing and send an OK result 502 */ 503 return new SynchronizationProviderResult.StopProcessing( 504 ResultCode.SUCCESS, null); 505 } 506 507 return new SynchronizationProviderResult.ContinueProcessing(); 508 } 509 510 /** {@inheritDoc} */ 511 @Override 512 public SynchronizationProviderResult doPreOperation( 513 PreOperationDeleteOperation deleteOperation) throws DirectoryException 514 { 515 return new SynchronizationProviderResult.ContinueProcessing(); 516 } 517 518 /** {@inheritDoc} */ 519 @Override 520 public SynchronizationProviderResult doPreOperation( 521 PreOperationModifyDNOperation modifyDNOperation) 522 throws DirectoryException 523 { 524 DN operationDN = modifyDNOperation.getEntryDN(); 525 LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation); 526 527 if (domain == null || !domain.solveConflict()) 528 { 529 return new SynchronizationProviderResult.ContinueProcessing(); 530 } 531 532 // The historical object is retrieved from the attachment created 533 // in the HandleConflictResolution phase. 534 EntryHistorical historicalInformation = (EntryHistorical) 535 modifyDNOperation.getAttachment(EntryHistorical.HISTORICAL); 536 if (historicalInformation == null) 537 { 538 // When no Historical attached, create once by loading from the entry 539 // and attach it to the operation 540 Entry entry = modifyDNOperation.getUpdatedEntry(); 541 historicalInformation = EntryHistorical.newInstanceFromEntry(entry); 542 modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL, 543 historicalInformation); 544 } 545 historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); 546 547 // Add to the operation the historical attribute : "dn:changeNumber:moddn" 548 historicalInformation.setHistoricalAttrToOperation(modifyDNOperation); 549 550 return new SynchronizationProviderResult.ContinueProcessing(); 551 } 552 553 /** {@inheritDoc} */ 554 @Override 555 public SynchronizationProviderResult doPreOperation( 556 PreOperationAddOperation addOperation) 557 { 558 // Check replication domain 559 LDAPReplicationDomain domain = 560 findDomain(addOperation.getEntryDN(), addOperation); 561 if (domain == null) 562 { 563 return new SynchronizationProviderResult.ContinueProcessing(); 564 } 565 566 // For LOCAL op only, generate CSN and attach Context 567 if (!addOperation.isSynchronizationOperation()) 568 { 569 domain.doPreOperation(addOperation); 570 } 571 572 // Add to the operation the historical attribute : "dn:changeNumber:add" 573 EntryHistorical.setHistoricalAttrToOperation(addOperation); 574 575 return new SynchronizationProviderResult.ContinueProcessing(); 576 } 577 578 /** {@inheritDoc} */ 579 @Override 580 public void finalizeSynchronizationProvider() 581 { 582 setState(State.STOPPING); 583 584 for (LDAPReplicationDomain domain : domains.values()) 585 { 586 domain.shutdown(); 587 } 588 domains.clear(); 589 590 stopReplayThreads(); 591 592 if (replicationServerListener != null) 593 { 594 replicationServerListener.shutdown(); 595 } 596 597 DirectoryServer.deregisterBackupTaskListener(this); 598 DirectoryServer.deregisterRestoreTaskListener(this); 599 DirectoryServer.deregisterExportTaskListener(this); 600 DirectoryServer.deregisterImportTaskListener(this); 601 } 602 603 /** 604 * This method is called whenever the server detects a modification 605 * of the schema done by directly modifying the backing files 606 * of the schema backend. 607 * Call the schema Domain if it exists. 608 * 609 * @param modifications The list of modifications that was 610 * applied to the schema. 611 * 612 */ 613 @Override 614 public void processSchemaChange(List<Modification> modifications) 615 { 616 LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null); 617 if (domain != null) 618 { 619 domain.synchronizeSchemaModifications(modifications); 620 } 621 } 622 623 /** {@inheritDoc} */ 624 @Override 625 public void processBackupBegin(Backend backend, BackupConfig config) 626 { 627 for (DN dn : backend.getBaseDNs()) 628 { 629 LDAPReplicationDomain domain = findDomain(dn, null); 630 if (domain != null) 631 { 632 domain.backupStart(); 633 } 634 } 635 } 636 637 /** {@inheritDoc} */ 638 @Override 639 public void processBackupEnd(Backend backend, BackupConfig config, 640 boolean successful) 641 { 642 for (DN dn : backend.getBaseDNs()) 643 { 644 LDAPReplicationDomain domain = findDomain(dn, null); 645 if (domain != null) 646 { 647 domain.backupEnd(); 648 } 649 } 650 } 651 652 /** {@inheritDoc} */ 653 @Override 654 public void processRestoreBegin(Backend backend, RestoreConfig config) 655 { 656 for (DN dn : backend.getBaseDNs()) 657 { 658 LDAPReplicationDomain domain = findDomain(dn, null); 659 if (domain != null) 660 { 661 domain.disable(); 662 } 663 } 664 } 665 666 /** {@inheritDoc} */ 667 @Override 668 public void processRestoreEnd(Backend backend, RestoreConfig config, 669 boolean successful) 670 { 671 for (DN dn : backend.getBaseDNs()) 672 { 673 LDAPReplicationDomain domain = findDomain(dn, null); 674 if (domain != null) 675 { 676 domain.enable(); 677 } 678 } 679 } 680 681 /** {@inheritDoc} */ 682 @Override 683 public void processImportBegin(Backend backend, LDIFImportConfig config) 684 { 685 for (DN dn : backend.getBaseDNs()) 686 { 687 LDAPReplicationDomain domain = findDomain(dn, null); 688 if (domain != null) 689 { 690 domain.disable(); 691 } 692 } 693 } 694 695 /** {@inheritDoc} */ 696 @Override 697 public void processImportEnd(Backend backend, LDIFImportConfig config, 698 boolean successful) 699 { 700 for (DN dn : backend.getBaseDNs()) 701 { 702 LDAPReplicationDomain domain = findDomain(dn, null); 703 if (domain != null) 704 { 705 domain.enable(); 706 } 707 } 708 } 709 710 /** {@inheritDoc} */ 711 @Override 712 public void processExportBegin(Backend backend, LDIFExportConfig config) 713 { 714 for (DN dn : backend.getBaseDNs()) 715 { 716 LDAPReplicationDomain domain = findDomain(dn, null); 717 if (domain != null) 718 { 719 domain.backupStart(); 720 } 721 } 722 } 723 724 /** {@inheritDoc} */ 725 @Override 726 public void processExportEnd(Backend backend, LDIFExportConfig config, 727 boolean successful) 728 { 729 for (DN dn : backend.getBaseDNs()) 730 { 731 LDAPReplicationDomain domain = findDomain(dn, null); 732 if (domain != null) 733 { 734 domain.backupEnd(); 735 } 736 } 737 } 738 739 /** {@inheritDoc} */ 740 @Override 741 public ConfigChangeResult applyConfigurationDelete( 742 ReplicationDomainCfg configuration) 743 { 744 deleteDomain(configuration.getBaseDN()); 745 746 return new ConfigChangeResult(); 747 } 748 749 /** {@inheritDoc} */ 750 @Override 751 public boolean isConfigurationDeleteAcceptable( 752 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 753 { 754 return true; 755 } 756 757 /** 758 * Generic code for all the postOperation entry point. 759 * 760 * @param operation The Operation for which the post-operation is called. 761 * @param dn The Dn for which the post-operation is called. 762 */ 763 private void genericPostOperation(PostOperationOperation operation, DN dn) 764 { 765 LDAPReplicationDomain domain = findDomain(dn, operation); 766 if (domain != null) { 767 domain.synchronize(operation); 768 } 769 } 770 771 /** 772 * Returns the replication server listener associated to that Multimaster 773 * Replication. 774 * @return the listener. 775 */ 776 public ReplicationServerListener getReplicationServerListener() 777 { 778 return replicationServerListener; 779 } 780 781 /** {@inheritDoc} */ 782 @Override 783 public boolean isConfigurationChangeAcceptable( 784 ReplicationSynchronizationProviderCfg configuration, 785 List<LocalizableMessage> unacceptableReasons) 786 { 787 return true; 788 } 789 790 @Override 791 public ConfigChangeResult applyConfigurationChange(ReplicationSynchronizationProviderCfg configuration) 792 { 793 794 // Stop threads then restart new number of threads 795 stopReplayThreads(); 796 replayThreadNumber = getNumberOfReplayThreadsOrDefault(configuration); 797 if (!domains.isEmpty()) 798 { 799 createReplayThreads(); 800 } 801 802 connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(), 803 Integer.MAX_VALUE); 804 805 return new ConfigChangeResult(); 806 } 807 808 /** {@inheritDoc} */ 809 @Override 810 public void completeSynchronizationProvider() 811 { 812 for (LDAPReplicationDomain domain : domains.values()) 813 { 814 domain.start(); 815 } 816 setState(State.RUNNING); 817 } 818 819 private void setState(State newState) 820 { 821 state.set(newState); 822 synchronized (state) 823 { 824 state.notifyAll(); 825 } 826 } 827 828 /** 829 * Gets the number of handled domain objects. 830 * @return The number of handled domain objects 831 */ 832 public static int getNumberOfDomains() 833 { 834 return domains.size(); 835 } 836 837 /** 838 * Gets the Set of domain baseDN which are disabled for the external changelog. 839 * 840 * @return The Set of domain baseDNs which are disabled for the external changelog. 841 * @throws DirectoryException 842 * if a problem occurs 843 */ 844 public static Set<DN> getExcludedChangelogDomains() throws DirectoryException 845 { 846 final Set<DN> disabledBaseDNs = new HashSet<>(domains.size() + 1); 847 disabledBaseDNs.add(DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT)); 848 for (LDAPReplicationDomain domain : domains.values()) 849 { 850 if (!domain.isECLEnabled()) 851 { 852 disabledBaseDNs.add(domain.getBaseDN()); 853 } 854 } 855 return disabledBaseDNs; 856 } 857 858 /** 859 * Returns whether the provided baseDN represents a replication domain enabled 860 * for the external changelog. 861 * 862 * @param baseDN 863 * the replication domain to check 864 * @return true if the provided baseDN is enabled for the external changelog, 865 * false if the provided baseDN is disabled for the external changelog 866 * or unknown to multimaster replication. 867 */ 868 public static boolean isECLEnabledDomain(DN baseDN) 869 { 870 waitForStartup(); 871 // if state is STOPPING, then we need to return from this method 872 final LDAPReplicationDomain domain = domains.get(baseDN); 873 return domain != null && domain.isECLEnabled(); 874 } 875 876 /** 877 * Returns whether the external change-log contains data from at least a domain. 878 * @return whether the external change-log contains data from at least a domain 879 */ 880 public static boolean isECLEnabled() 881 { 882 waitForStartup(); 883 for (LDAPReplicationDomain domain : domains.values()) 884 { 885 if (domain.isECLEnabled()) 886 { 887 return true; 888 } 889 } 890 return false; 891 } 892 893 private static void waitForStartup() 894 { 895 if (State.STARTING.equals(state.get())) 896 { 897 synchronized (state) 898 { 899 while (State.STARTING.equals(state.get())) 900 { 901 try 902 { 903 state.wait(); 904 } 905 catch (InterruptedException ignored) 906 { 907 // loop and check state again 908 } 909 } 910 } 911 } 912 } 913 914 /** 915 * Returns the connection timeout in milli-seconds. 916 * 917 * @return The connection timeout in milli-seconds. 918 */ 919 public static int getConnectionTimeoutMS() 920 { 921 return connectionTimeoutMS; 922 } 923 924}