001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.plugin; 028 029import static org.forgerock.opendj.ldap.ResultCode.*; 030import static org.opends.messages.ReplicationMessages.*; 031import static org.opends.messages.ToolMessages.*; 032import static org.opends.server.protocols.internal.InternalClientConnection.*; 033import static org.opends.server.protocols.internal.Requests.*; 034import static org.opends.server.replication.plugin.EntryHistorical.*; 035import static org.opends.server.replication.protocol.OperationContext.*; 036import static org.opends.server.replication.service.ReplicationMonitor.*; 037import static org.opends.server.util.CollectionUtils.*; 038import static org.opends.server.util.ServerConstants.*; 039import static org.opends.server.util.StaticUtils.*; 040 041import java.io.File; 042import java.io.InputStream; 043import java.io.OutputStream; 044import java.io.StringReader; 045import java.util.*; 046import java.util.concurrent.BlockingQueue; 047import java.util.concurrent.TimeUnit; 048import java.util.concurrent.TimeoutException; 049import java.util.concurrent.atomic.AtomicBoolean; 050import java.util.concurrent.atomic.AtomicInteger; 051import java.util.concurrent.atomic.AtomicReference; 052import java.util.zip.DataFormatException; 053 054import org.forgerock.i18n.LocalizableMessage; 055import org.forgerock.i18n.slf4j.LocalizedLogger; 056import org.forgerock.opendj.config.server.ConfigChangeResult; 057import org.forgerock.opendj.config.server.ConfigException; 058import org.forgerock.opendj.ldap.ByteString; 059import org.forgerock.opendj.ldap.DecodeException; 060import org.forgerock.opendj.ldap.ModificationType; 061import org.forgerock.opendj.ldap.ResultCode; 062import org.forgerock.opendj.ldap.SearchScope; 063import org.opends.server.admin.server.ConfigurationChangeListener; 064import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy; 065import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; 066import org.opends.server.admin.std.server.ReplicationDomainCfg; 067import org.opends.server.api.DirectoryThread; 068import org.opends.server.api.SynchronizationProvider; 069import org.opends.server.api.AlertGenerator; 070import org.opends.server.api.Backend; 071import org.opends.server.api.Backend.BackendOperation; 072import org.opends.server.api.BackendInitializationListener; 073import org.opends.server.api.ServerShutdownListener; 074import org.opends.server.backends.task.Task; 075import org.opends.server.core.*; 076import org.opends.server.protocols.internal.InternalClientConnection; 077import org.opends.server.protocols.internal.InternalSearchListener; 078import org.opends.server.protocols.internal.InternalSearchOperation; 079import org.opends.server.protocols.internal.Requests; 080import org.opends.server.protocols.internal.SearchRequest; 081import org.opends.server.protocols.ldap.LDAPAttribute; 082import org.opends.server.protocols.ldap.LDAPControl; 083import org.opends.server.protocols.ldap.LDAPFilter; 084import org.opends.server.protocols.ldap.LDAPModification; 085import org.opends.server.replication.common.CSN; 086import org.opends.server.replication.common.ServerState; 087import org.opends.server.replication.common.ServerStatus; 088import org.opends.server.replication.common.StatusMachineEvent; 089import org.opends.server.replication.protocol.*; 090import org.opends.server.replication.service.DSRSShutdownSync; 091import org.opends.server.replication.service.ReplicationBroker; 092import org.opends.server.replication.service.ReplicationDomain; 093import org.opends.server.tasks.PurgeConflictsHistoricalTask; 094import org.opends.server.tasks.TaskUtils; 095import org.opends.server.types.*; 096import org.opends.server.types.operation.*; 097import org.opends.server.util.LDIFReader; 098import org.opends.server.util.TimeThread; 099import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation; 100 101/** 102 * This class implements the bulk part of the Directory Server side 103 * of the replication code. 104 * It contains the root method for publishing a change, 105 * processing a change received from the replicationServer service, 106 * handle conflict resolution, 107 * handle protocol messages from the replicationServer. 108 * <p> 109 * FIXME Move this class to org.opends.server.replication.service 110 * or the equivalent package once this code is moved to a maven module. 111 */ 112public final class LDAPReplicationDomain extends ReplicationDomain 113 implements ConfigurationChangeListener<ReplicationDomainCfg>, 114 AlertGenerator, BackendInitializationListener, ServerShutdownListener 115{ 116 /** 117 * Set of attributes that will return all the user attributes and the 118 * replication related operational attributes when used in a search operation. 119 */ 120 private static final Set<String> USER_AND_REPL_OPERATIONAL_ATTRS = 121 newHashSet(HISTORICAL_ATTRIBUTE_NAME, ENTRYUUID_ATTRIBUTE_NAME, "*"); 122 123 /** 124 * Initializing replication for the domain initiates backend finalization/initialization 125 * This flag prevents the Replication Domain to disable/enable itself when 126 * it is the event initiator 127 */ 128 private boolean ignoreBackendInitializationEvent; 129 130 private volatile boolean serverShutdownRequested; 131 132 @Override 133 public String getShutdownListenerName() { 134 return "LDAPReplicationDomain " + getBaseDN(); 135 } 136 137 @Override 138 public void processServerShutdown(LocalizableMessage reason) { 139 serverShutdownRequested = true; 140 } 141 142 143 /** 144 * This class is used in the session establishment phase 145 * when no Replication Server with all the local changes has been found 146 * and we therefore need to recover them. 147 * A search is then performed on the database using this 148 * internalSearchListener. 149 */ 150 private class ScanSearchListener implements InternalSearchListener 151 { 152 private final CSN startCSN; 153 private final CSN endCSN; 154 155 public ScanSearchListener(CSN startCSN, CSN endCSN) 156 { 157 this.startCSN = startCSN; 158 this.endCSN = endCSN; 159 } 160 161 @Override 162 public void handleInternalSearchEntry( 163 InternalSearchOperation searchOperation, SearchResultEntry searchEntry) 164 throws DirectoryException 165 { 166 // Build the list of Operations that happened on this entry after startCSN 167 // and before endCSN and add them to the replayOperations list 168 Iterable<FakeOperation> updates = 169 EntryHistorical.generateFakeOperations(searchEntry); 170 171 for (FakeOperation op : updates) 172 { 173 CSN csn = op.getCSN(); 174 if (csn.isNewerThan(startCSN) && csn.isOlderThan(endCSN)) 175 { 176 synchronized (replayOperations) 177 { 178 replayOperations.put(csn, op); 179 } 180 } 181 } 182 } 183 184 @Override 185 public void handleInternalSearchReference( 186 InternalSearchOperation searchOperation, 187 SearchResultReference searchReference) throws DirectoryException 188 { 189 // Nothing to do. 190 } 191 } 192 193 @Override 194 public void performBackendPreInitializationProcessing(Backend<?> backend) { 195 // Nothing to do 196 } 197 198 @Override 199 public void performBackendPostFinalizationProcessing(Backend<?> backend) { 200 // Nothing to do 201 } 202 203 @Override 204 public void performBackendPostInitializationProcessing(Backend<?> backend) { 205 if (!ignoreBackendInitializationEvent 206 && getBackend().getBackendID().equals(backend.getBackendID())) { 207 enable(); 208 } 209 } 210 211 @Override 212 public void performBackendPreFinalizationProcessing(Backend<?> backend) { 213 // Do not disable itself during a shutdown 214 // And ignore the event if this replica is the event trigger (e.g. importing). 215 if (!ignoreBackendInitializationEvent 216 && !serverShutdownRequested 217 && getBackend().getBackendID().equals(backend.getBackendID())) { 218 disable(); 219 } 220 } 221 222 /** The fully-qualified name of this class. */ 223 private static final String CLASS_NAME = LDAPReplicationDomain.class.getName(); 224 225 /** 226 * The attribute used to mark conflicting entries. 227 * The value of this attribute should be the dn that this entry was 228 * supposed to have when it was marked as conflicting. 229 */ 230 public static final String DS_SYNC_CONFLICT = "ds-sync-conflict"; 231 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 232 233 private final DSRSShutdownSync dsrsShutdownSync; 234 /** 235 * The update to replay message queue where the listener thread is going to 236 * push incoming update messages. 237 */ 238 private final BlockingQueue<UpdateToReplay> updateToReplayQueue; 239 /** The number of naming conflicts successfully resolved. */ 240 private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); 241 /** The number of modify conflicts successfully resolved. */ 242 private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); 243 /** The number of unresolved naming conflicts. */ 244 private final AtomicInteger numUnresolvedNamingConflicts = 245 new AtomicInteger(); 246 /** The number of updates replayed successfully by the replication. */ 247 private final AtomicInteger numReplayedPostOpCalled = new AtomicInteger(); 248 249 private final PersistentServerState state; 250 private volatile boolean generationIdSavedStatus; 251 252 /** 253 * This object is used to store the list of update currently being 254 * done on the local database. 255 * Is is useful to make sure that the local operations are sent in a 256 * correct order to the replication server and that the ServerState 257 * is not updated too early. 258 */ 259 private final PendingChanges pendingChanges; 260 private final AtomicReference<RSUpdater> rsUpdater = new AtomicReference<>(null); 261 262 /** 263 * It contain the updates that were done on other servers, transmitted by the 264 * replication server and that are currently replayed. 265 * <p> 266 * It is useful to make sure that dependencies between operations are 267 * correctly fulfilled and to make sure that the ServerState is not updated 268 * too early. 269 */ 270 private final RemotePendingChanges remotePendingChanges; 271 private boolean solveConflictFlag = true; 272 273 private final InternalClientConnection conn = getRootConnection(); 274 private final AtomicBoolean shutdown = new AtomicBoolean(); 275 private volatile boolean disabled; 276 private volatile boolean stateSavingDisabled; 277 278 /** 279 * This list is used to temporary store operations that needs to be replayed 280 * at session establishment time. 281 */ 282 private final SortedMap<CSN, FakeOperation> replayOperations = new TreeMap<>(); 283 284 private ExternalChangelogDomain eclDomain; 285 286 /** A boolean indicating if the thread used to save the persistentServerState is terminated. */ 287 private volatile boolean done = true; 288 289 private final ServerStateFlush flushThread; 290 291 /** The attribute name used to store the generation id in the backend. */ 292 private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id"; 293 /** The attribute name used to store the fractional include configuration in the backend. */ 294 static final String REPLICATION_FRACTIONAL_INCLUDE = "ds-sync-fractional-include"; 295 /** The attribute name used to store the fractional exclude configuration in the backend. */ 296 static final String REPLICATION_FRACTIONAL_EXCLUDE = "ds-sync-fractional-exclude"; 297 298 /** 299 * Fractional replication variables. 300 */ 301 302 /** Holds the fractional configuration for this domain, if any. */ 303 private final FractionalConfig fractionalConfig; 304 305 /** The list of attributes that cannot be used in fractional replication configuration. */ 306 private static final String[] FRACTIONAL_PROHIBITED_ATTRIBUTES = new String[] 307 { 308 "objectClass", 309 "2.5.4.0" // objectClass OID 310 }; 311 312 /** 313 * When true, this flag is used to force the domain status to be put in bad 314 * data set just after the connection to the replication server. 315 * This must be used when fractional replication is enabled with a 316 * configuration different from the previous one (or at the very first 317 * fractional usage time) : after connection, a ChangeStatusMsg is sent 318 * requesting the bad data set status. Then none of the update messages 319 * received from the replication server are taken into account until the 320 * backend is synchronized with brand new data set compliant with the new 321 * fractional configuration (i.e with compliant fractional configuration in 322 * domain root entry). 323 */ 324 private boolean forceBadDataSet; 325 326 /** 327 * The message id to be used when an import is stopped with error by 328 * the fractional replication ldif import plugin. 329 */ 330 private int importErrorMessageId = -1; 331 /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE. */ 332 static final int IMPORT_ERROR_MESSAGE_BAD_REMOTE = 1; 333 /** LocalizableMessage type for ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL. */ 334 static final int IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL = 2; 335 336 /* 337 * Definitions for the return codes of the 338 * fractionalFilterOperation(PreOperationModifyOperation 339 * modifyOperation, boolean performFiltering) method 340 */ 341 /** 342 * The operation contains attributes subject to fractional filtering according 343 * to the fractional configuration. 344 */ 345 private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1; 346 /** 347 * The operation contains no attributes subject to fractional filtering 348 * according to the fractional configuration. 349 */ 350 private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2; 351 /** The operation should become a no-op. */ 352 private static final int FRACTIONAL_BECOME_NO_OP = 3; 353 354 /** 355 * The last CSN purged in this domain. Allows to have a continuous purging 356 * process from one purge processing (task run) to the next one. Values 0 when 357 * the server starts. 358 */ 359 private CSN lastCSNPurgedFromHist = new CSN(0,0,0); 360 361 /** 362 * The thread that periodically saves the ServerState of this 363 * LDAPReplicationDomain in the database. 364 */ 365 private class ServerStateFlush extends DirectoryThread 366 { 367 protected ServerStateFlush() 368 { 369 super("Replica DS(" + getServerId() + ") state checkpointer for domain \"" + getBaseDN() + "\""); 370 } 371 372 @Override 373 public void run() 374 { 375 done = false; 376 377 while (!isShutdownInitiated()) 378 { 379 try 380 { 381 synchronized (this) 382 { 383 wait(1000); 384 if (!disabled && !stateSavingDisabled) 385 { 386 // save the ServerState 387 state.save(); 388 } 389 } 390 } 391 catch (InterruptedException e) 392 { 393 // Thread interrupted: check for shutdown. 394 Thread.currentThread().interrupt(); 395 } 396 } 397 state.save(); 398 399 done = true; 400 } 401 } 402 403 /** 404 * The thread that is responsible to update the RS to which this domain is 405 * connected in case it is late and there is no RS which is up to date. 406 */ 407 private class RSUpdater extends DirectoryThread 408 { 409 private final CSN startCSN; 410 411 protected RSUpdater(CSN replServerMaxCSN) 412 { 413 super("Replica DS(" + getServerId() + ") missing change publisher for domain \"" + getBaseDN() + "\""); 414 this.startCSN = replServerMaxCSN; 415 } 416 417 @Override 418 public void run() 419 { 420 // Replication server is missing some of our changes: 421 // let's send them to him. 422 logger.trace(DEBUG_GOING_TO_SEARCH_FOR_CHANGES); 423 424 /* 425 * Get all the changes that have not been seen by this 426 * replication server and publish them. 427 */ 428 try 429 { 430 if (buildAndPublishMissingChanges(startCSN, broker)) 431 { 432 logger.trace(DEBUG_CHANGES_SENT); 433 synchronized(replayOperations) 434 { 435 replayOperations.clear(); 436 } 437 } 438 else 439 { 440 /* 441 * An error happened trying to search for the updates 442 * This server will start accepting again new updates but 443 * some inconsistencies will stay between servers. 444 * Log an error for the repair tool 445 * that will need to re-synchronize the servers. 446 */ 447 logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN()); 448 } 449 } 450 catch (Exception e) 451 { 452 /* 453 * An error happened trying to search for the updates 454 * This server will start accepting again new updates but 455 * some inconsistencies will stay between servers. 456 * Log an error for the repair tool 457 * that will need to re-synchronize the servers. 458 */ 459 logger.error(ERR_CANNOT_RECOVER_CHANGES, getBaseDN()); 460 } 461 finally 462 { 463 broker.setRecoveryRequired(false); 464 // RSUpdater thread has finished its work, let's remove it from memory 465 // so another RSUpdater thread can be started if needed. 466 rsUpdater.compareAndSet(this, null); 467 } 468 } 469 } 470 471 /** 472 * Creates a new ReplicationDomain using configuration from configEntry. 473 * 474 * @param configuration The configuration of this ReplicationDomain. 475 * @param updateToReplayQueue The queue for update messages to replay. 476 * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. 477 * @throws ConfigException In case of invalid configuration. 478 */ 479 LDAPReplicationDomain(ReplicationDomainCfg configuration, 480 BlockingQueue<UpdateToReplay> updateToReplayQueue, 481 DSRSShutdownSync dsrsShutdownSync) throws ConfigException 482 { 483 super(configuration, -1); 484 485 this.updateToReplayQueue = updateToReplayQueue; 486 this.dsrsShutdownSync = dsrsShutdownSync; 487 488 // Get assured configuration 489 readAssuredConfig(configuration, false); 490 491 // Get fractional configuration 492 fractionalConfig = new FractionalConfig(getBaseDN()); 493 readFractionalConfig(configuration, false); 494 storeECLConfiguration(configuration); 495 solveConflictFlag = isSolveConflict(configuration); 496 497 Backend<?> backend = getBackend(); 498 if (backend == null) 499 { 500 throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(getBaseDN())); 501 } 502 503 try 504 { 505 generationId = loadGenerationId(); 506 } 507 catch (DirectoryException e) 508 { 509 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 510 } 511 512 /* 513 * Create a new Persistent Server State that will be used to store 514 * the last CSN seen from all LDAP servers in the topology. 515 */ 516 state = new PersistentServerState(getBaseDN(), getServerId(), 517 getServerState()); 518 flushThread = new ServerStateFlush(); 519 520 /* 521 * CSNGenerator is used to create new unique CSNs for each operation done on 522 * this replication domain. 523 * 524 * The generator time is adjusted to the time of the last CSN received from 525 * remote other servers. 526 */ 527 pendingChanges = new PendingChanges(getGenerator(), this); 528 remotePendingChanges = new RemotePendingChanges(getServerState()); 529 530 // listen for changes on the configuration 531 configuration.addChangeListener(this); 532 533 // register as an AlertGenerator 534 DirectoryServer.registerAlertGenerator(this); 535 536 DirectoryServer.registerBackendInitializationListener(this); 537 DirectoryServer.registerShutdownListener(this); 538 539 startPublishService(); 540 } 541 542 /** 543 * Modify conflicts are solved for all suffixes but the schema suffix because 544 * we don't want to store extra information in the schema ldif files. This has 545 * no negative impact because the changes on schema should not produce 546 * conflicts. 547 */ 548 private boolean isSolveConflict(ReplicationDomainCfg cfg) 549 { 550 return !getBaseDN().equals(DirectoryServer.getSchemaDN()) 551 && cfg.isSolveConflicts(); 552 } 553 554 /** 555 * Sets the error message id to be used when online import is stopped with 556 * error by the fractional replication ldif import plugin. 557 * @param importErrorMessageId The message to use. 558 */ 559 void setImportErrorMessageId(int importErrorMessageId) 560 { 561 this.importErrorMessageId = importErrorMessageId; 562 } 563 564 /** 565 * This flag is used by the fractional replication ldif import plugin to stop 566 * the (online) import process if a fractional configuration inconsistency is 567 * detected by it. 568 * 569 * @return true if the online import currently in progress should continue, 570 * false otherwise. 571 */ 572 private boolean isFollowImport() 573 { 574 return importErrorMessageId == -1; 575 } 576 577 /** 578 * Gets and stores the fractional replication configuration parameters. 579 * @param configuration The configuration object 580 * @param allowReconnection Tells if one must reconnect if significant changes 581 * occurred 582 */ 583 private void readFractionalConfig(ReplicationDomainCfg configuration, 584 boolean allowReconnection) 585 { 586 // Read the configuration entry 587 FractionalConfig newFractionalConfig; 588 try 589 { 590 newFractionalConfig = FractionalConfig.toFractionalConfig(configuration); 591 } 592 catch(ConfigException e) 593 { 594 // Should not happen as normally already called without problem in 595 // isConfigurationChangeAcceptable or isConfigurationAcceptable 596 // if we come up to this method 597 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 598 return; 599 } 600 601 /** 602 * Is there any change in fractional configuration ? 603 */ 604 605 // Compute current configuration 606 boolean needReconnection; 607 try 608 { 609 needReconnection = !FractionalConfig. 610 isFractionalConfigEquivalent(fractionalConfig, newFractionalConfig); 611 } 612 catch (ConfigException e) 613 { 614 // Should not happen 615 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 616 return; 617 } 618 619 // Disable service if configuration changed 620 final boolean needRestart = needReconnection && allowReconnection; 621 if (needRestart) 622 { 623 disableService(); 624 } 625 // Set new configuration 626 int newFractionalMode = newFractionalConfig.fractionalConfigToInt(); 627 fractionalConfig.setFractional(newFractionalMode != 628 FractionalConfig.NOT_FRACTIONAL); 629 if (fractionalConfig.isFractional()) 630 { 631 // Set new fractional configuration values 632 fractionalConfig.setFractionalExclusive( 633 newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); 634 fractionalConfig.setFractionalSpecificClassesAttributes( 635 newFractionalConfig.getFractionalSpecificClassesAttributes()); 636 fractionalConfig.setFractionalAllClassesAttributes( 637 newFractionalConfig.fractionalAllClassesAttributes); 638 } else 639 { 640 // Reset default values 641 fractionalConfig.setFractionalExclusive(true); 642 fractionalConfig.setFractionalSpecificClassesAttributes( 643 new HashMap<String, Set<String>>()); 644 fractionalConfig.setFractionalAllClassesAttributes(new HashSet<String>()); 645 } 646 647 // Reconnect if required 648 if (needRestart) 649 { 650 enableService(); 651 } 652 } 653 654 /** 655 * Return true if the fractional configuration stored in the domain root 656 * entry of the backend is equivalent to the fractional configuration stored 657 * in the local variables. 658 */ 659 private boolean isBackendFractionalConfigConsistent() 660 { 661 // Read config stored in domain root entry 662 if (logger.isTraceEnabled()) 663 { 664 logger.trace("Attempt to read the potential fractional config in domain root entry " + getBaseDN()); 665 } 666 667 // Search the domain root entry that is used to save the generation id 668 SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT) 669 .addAttribute(REPLICATION_GENERATION_ID, REPLICATION_FRACTIONAL_EXCLUDE, REPLICATION_FRACTIONAL_INCLUDE); 670 InternalSearchOperation search = conn.processSearch(request); 671 672 if (search.getResultCode() != ResultCode.SUCCESS 673 && search.getResultCode() != ResultCode.NO_SUCH_OBJECT) 674 { 675 String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage(); 676 logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN()); 677 return false; 678 } 679 680 SearchResultEntry resultEntry = findReplicationSearchResultEntry(search); 681 if (resultEntry == null) 682 { 683 /* 684 * The backend is probably empty: if there is some fractional 685 * configuration in memory, we do not let the domain being connected, 686 * otherwise, it's ok 687 */ 688 return !fractionalConfig.isFractional(); 689 } 690 691 // Now extract fractional configuration if any 692 Iterator<String> exclIt = 693 getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_EXCLUDE); 694 Iterator<String> inclIt = 695 getAttributeValueIterator(resultEntry, REPLICATION_FRACTIONAL_INCLUDE); 696 697 // Compare backend and local fractional configuration 698 return isFractionalConfigConsistent(fractionalConfig, exclIt, inclIt); 699 } 700 701 private SearchResultEntry findReplicationSearchResultEntry( 702 InternalSearchOperation searchOperation) 703 { 704 final SearchResultEntry resultEntry = getFirstResult(searchOperation); 705 if (resultEntry != null) 706 { 707 AttributeType synchronizationGenIDType = DirectoryServer.getAttributeTypeOrNull(REPLICATION_GENERATION_ID); 708 List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType); 709 if (attrs != null) 710 { 711 Attribute attr = attrs.get(0); 712 if (attr.size() == 1) 713 { 714 return resultEntry; 715 } 716 if (attr.size() > 1) 717 { 718 String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString(); 719 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg); 720 } 721 } 722 } 723 return null; 724 } 725 726 private Iterator<String> getAttributeValueIterator(SearchResultEntry resultEntry, String attrName) 727 { 728 AttributeType attrType = DirectoryServer.getAttributeTypeOrNull(attrName); 729 List<Attribute> exclAttrs = resultEntry.getAttribute(attrType); 730 if (exclAttrs != null) 731 { 732 Attribute exclAttr = exclAttrs.get(0); 733 if (exclAttr != null) 734 { 735 return new AttributeValueStringIterator(exclAttr.iterator()); 736 } 737 } 738 return null; 739 } 740 741 /** 742 * Return true if the fractional configuration passed as fractional 743 * configuration attribute values is equivalent to the fractional 744 * configuration stored in the local variables. 745 * @param fractionalConfig The local fractional configuration 746 * @param exclIt Fractional exclude mode configuration attribute values to 747 * analyze. 748 * @param inclIt Fractional include mode configuration attribute values to 749 * analyze. 750 * @return True if the fractional configuration passed as fractional 751 * configuration attribute values is equivalent to the fractional 752 * configuration stored in the local variables. 753 */ 754 static boolean isFractionalConfigConsistent( 755 FractionalConfig fractionalConfig, Iterator<String> exclIt, 756 Iterator<String> inclIt) 757 { 758 /* 759 * Parse fractional configuration stored in passed fractional configuration 760 * attributes values 761 */ 762 763 Map<String, Set<String>> storedFractionalSpecificClassesAttributes = new HashMap<>(); 764 Set<String> storedFractionalAllClassesAttributes = new HashSet<>(); 765 766 int storedFractionalMode; 767 try 768 { 769 storedFractionalMode = FractionalConfig.parseFractionalConfig(exclIt, 770 inclIt, storedFractionalSpecificClassesAttributes, 771 storedFractionalAllClassesAttributes); 772 } catch (ConfigException e) 773 { 774 // Should not happen as configuration in domain root entry is flushed 775 // from valid configuration in local variables 776 logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e)); 777 return false; 778 } 779 780 FractionalConfig storedFractionalConfig = new FractionalConfig( 781 fractionalConfig.getBaseDn()); 782 storedFractionalConfig.setFractional(storedFractionalMode != 783 FractionalConfig.NOT_FRACTIONAL); 784 // Set stored fractional configuration values 785 if (storedFractionalConfig.isFractional()) 786 { 787 storedFractionalConfig.setFractionalExclusive( 788 storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL); 789 } 790 storedFractionalConfig.setFractionalSpecificClassesAttributes( 791 storedFractionalSpecificClassesAttributes); 792 storedFractionalConfig.setFractionalAllClassesAttributes( 793 storedFractionalAllClassesAttributes); 794 795 /* 796 * Compare configuration stored in passed fractional configuration 797 * attributes with local variable one 798 */ 799 try 800 { 801 return FractionalConfig. 802 isFractionalConfigEquivalent(fractionalConfig, storedFractionalConfig); 803 } catch (ConfigException e) 804 { 805 // Should not happen as configuration in domain root entry is flushed 806 // from valid configuration in local variables so both should have already 807 // been checked 808 logger.info(NOTE_ERR_FRACTIONAL, fractionalConfig.getBaseDn(), stackTraceToSingleLineString(e)); 809 return false; 810 } 811 } 812 813 /** 814 * Utility class to have get a string iterator from an AtributeValue iterator. 815 * Assuming the attribute values are strings. 816 */ 817 static class AttributeValueStringIterator implements Iterator<String> 818 { 819 private final Iterator<ByteString> attrValIt; 820 821 /** 822 * Creates a new AttributeValueStringIterator object. 823 * @param attrValIt The underlying attribute iterator to use, assuming 824 * internal values are strings. 825 */ 826 AttributeValueStringIterator(Iterator<ByteString> attrValIt) 827 { 828 this.attrValIt = attrValIt; 829 } 830 831 @Override 832 public boolean hasNext() 833 { 834 return attrValIt.hasNext(); 835 } 836 837 @Override 838 public String next() 839 { 840 return attrValIt.next().toString(); 841 } 842 843 // Should not be needed anyway 844 @Override 845 public void remove() 846 { 847 attrValIt.remove(); 848 } 849 } 850 851 /** 852 * Compare 2 attribute collections and returns true if they are equivalent. 853 * 854 * @param attributes1 855 * First attribute collection to compare. 856 * @param attributes2 857 * Second attribute collection to compare. 858 * @return True if both attribute collection are equivalent. 859 * @throws ConfigException 860 * If some attributes could not be retrieved from the schema. 861 */ 862 private static boolean areAttributesEquivalent( 863 Collection<String> attributes1, Collection<String> attributes2) 864 throws ConfigException 865 { 866 // Compare all classes attributes 867 if (attributes1.size() != attributes2.size()) 868 { 869 return false; 870 } 871 872 // Check consistency of all classes attributes 873 Schema schema = DirectoryServer.getSchema(); 874 /* 875 * For each attribute in attributes1, check there is the matching 876 * one in attributes2. 877 */ 878 for (String attrName1 : attributes1) 879 { 880 // Get attribute from attributes1 881 AttributeType attributeType1 = schema.getAttributeType(attrName1); 882 if (attributeType1 == null) 883 { 884 throw new ConfigException( 885 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName1)); 886 } 887 // Look for matching one in attributes2 888 boolean foundAttribute = false; 889 for (String attrName2 : attributes2) 890 { 891 AttributeType attributeType2 = schema.getAttributeType(attrName2); 892 if (attributeType2 == null) 893 { 894 throw new ConfigException( 895 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName2)); 896 } 897 if (attributeType1.equals(attributeType2)) 898 { 899 foundAttribute = true; 900 break; 901 } 902 } 903 // Found matching attribute ? 904 if (!foundAttribute) 905 { 906 return false; 907 } 908 } 909 910 return true; 911 } 912 913 /** 914 * Check that the passed fractional configuration is acceptable 915 * regarding configuration syntax, schema constraints... 916 * Throws an exception if the configuration is not acceptable. 917 * @param configuration The configuration to analyze. 918 * @throws org.opends.server.config.ConfigException if the configuration is 919 * not acceptable. 920 */ 921 private static void isFractionalConfigAcceptable( 922 ReplicationDomainCfg configuration) throws ConfigException 923 { 924 /* 925 * Parse fractional configuration 926 */ 927 928 // Read the configuration entry 929 FractionalConfig newFractionalConfig = FractionalConfig.toFractionalConfig( 930 configuration); 931 932 if (!newFractionalConfig.isFractional()) 933 { 934 // Nothing to check 935 return; 936 } 937 938 // Prepare variables to be filled with config 939 Map<String, Set<String>> newFractionalSpecificClassesAttributes = 940 newFractionalConfig.getFractionalSpecificClassesAttributes(); 941 Set<String> newFractionalAllClassesAttributes = 942 newFractionalConfig.getFractionalAllClassesAttributes(); 943 944 /* 945 * Check attributes consistency : we only allow to filter MAY (optional) 946 * attributes of a class : to be compliant with the schema, no MUST 947 * (mandatory) attribute can be filtered by fractional replication. 948 */ 949 950 // Check consistency of specific classes attributes 951 Schema schema = DirectoryServer.getSchema(); 952 int fractionalMode = newFractionalConfig.fractionalConfigToInt(); 953 for (String className : newFractionalSpecificClassesAttributes.keySet()) 954 { 955 // Does the class exist ? 956 ObjectClass fractionalClass = schema.getObjectClass( 957 className.toLowerCase()); 958 if (fractionalClass == null) 959 { 960 throw new ConfigException( 961 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className)); 962 } 963 964 boolean isExtensibleObjectClass = 965 "extensibleObject".equalsIgnoreCase(className); 966 967 Set<String> attributes = 968 newFractionalSpecificClassesAttributes.get(className); 969 970 for (String attrName : attributes) 971 { 972 // Not a prohibited attribute ? 973 if (isFractionalProhibitedAttr(attrName)) 974 { 975 throw new ConfigException( 976 NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName)); 977 } 978 979 // Does the attribute exist ? 980 AttributeType attributeType = schema.getAttributeType(attrName); 981 if (attributeType != null) 982 { 983 // No more checking for the extensibleObject class 984 if (!isExtensibleObjectClass 985 && fractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL 986 // Exclusive mode : the attribute must be optional 987 && !fractionalClass.isOptional(attributeType)) 988 { 989 throw new ConfigException( 990 NOTE_ERR_FRACTIONAL_CONFIG_NOT_OPTIONAL_ATTRIBUTE.get(attrName, 991 className)); 992 } 993 } 994 else 995 { 996 throw new ConfigException( 997 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName)); 998 } 999 } 1000 } 1001 1002 // Check consistency of all classes attributes 1003 for (String attrName : newFractionalAllClassesAttributes) 1004 { 1005 // Not a prohibited attribute ? 1006 if (isFractionalProhibitedAttr(attrName)) 1007 { 1008 throw new ConfigException( 1009 NOTE_ERR_FRACTIONAL_CONFIG_PROHIBITED_ATTRIBUTE.get(attrName)); 1010 } 1011 1012 // Does the attribute exist ? 1013 if (schema.getAttributeType(attrName) == null) 1014 { 1015 throw new ConfigException( 1016 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_ATTRIBUTE_TYPE.get(attrName)); 1017 } 1018 } 1019 } 1020 1021 /** 1022 * Test if the passed attribute is not allowed to be used in configuration of 1023 * fractional replication. 1024 * @param attr Attribute to test. 1025 * @return true if the attribute is prohibited. 1026 */ 1027 private static boolean isFractionalProhibitedAttr(String attr) 1028 { 1029 for (String forbiddenAttr : FRACTIONAL_PROHIBITED_ATTRIBUTES) 1030 { 1031 if (forbiddenAttr.equalsIgnoreCase(attr)) 1032 { 1033 return true; 1034 } 1035 } 1036 return false; 1037 } 1038 1039 /** 1040 * If fractional replication is enabled, this analyzes the operation and 1041 * suppresses the forbidden attributes in it so that they are not added in 1042 * the local backend. 1043 * 1044 * @param addOperation The operation to modify based on fractional 1045 * replication configuration 1046 * @param performFiltering Tells if the effective attribute filtering should 1047 * be performed or if the call is just to analyze if there are some 1048 * attributes filtered by fractional configuration 1049 * @return true if the operation contains some attributes subject to filtering 1050 * by the fractional configuration 1051 */ 1052 private boolean fractionalFilterOperation( 1053 PreOperationAddOperation addOperation, boolean performFiltering) 1054 { 1055 return fractionalRemoveAttributesFromEntry(fractionalConfig, 1056 addOperation.getEntryDN().rdn(), addOperation.getObjectClasses(), 1057 addOperation.getUserAttributes(), performFiltering); 1058 } 1059 1060 /** 1061 * If fractional replication is enabled, this analyzes the operation and 1062 * suppresses the forbidden attributes in it so that they are not added in 1063 * the local backend. 1064 * 1065 * @param modifyDNOperation The operation to modify based on fractional 1066 * replication configuration 1067 * @param performFiltering Tells if the effective modifications should 1068 * be performed or if the call is just to analyze if there are some 1069 * inconsistency with fractional configuration 1070 * @return true if the operation is inconsistent with fractional 1071 * configuration 1072 */ 1073 private boolean fractionalFilterOperation( 1074 PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering) 1075 { 1076 // Quick exit if not called for analyze and 1077 if (performFiltering && modifyDNOperation.deleteOldRDN()) 1078 { 1079 // The core will remove any occurrence of attribute that was part of the 1080 // old RDN, nothing more to do. 1081 return true; // Will not be used as analyze was not requested 1082 } 1083 1084 // Create a list of filtered attributes for this entry 1085 Entry concernedEntry = modifyDNOperation.getOriginalEntry(); 1086 Set<String> fractionalConcernedAttributes = 1087 createFractionalConcernedAttrList(fractionalConfig, 1088 concernedEntry.getObjectClasses().keySet()); 1089 1090 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1091 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1092 { 1093 // No attributes to filter 1094 return false; 1095 } 1096 1097 /* 1098 * Analyze the old and new rdn to see if they are some attributes to be 1099 * removed: if the oldRDN contains some forbidden attributes (for instance 1100 * it is possible if the entry was created with an add operation and the 1101 * RDN used contains a forbidden attribute: in this case the attribute value 1102 * has been kept to be consistent with the dn of the entry.) that are no 1103 * more part of the new RDN, we must remove any attribute of this type by 1104 * putting a modification to delete the attribute. 1105 */ 1106 1107 boolean inconsistentOperation = false; 1108 RDN rdn = modifyDNOperation.getEntryDN().rdn(); 1109 RDN newRdn = modifyDNOperation.getNewRDN(); 1110 1111 // Go through each attribute of the old RDN 1112 for (int i=0 ; i<rdn.getNumValues() ; i++) 1113 { 1114 AttributeType attributeType = rdn.getAttributeType(i); 1115 // Is it present in the fractional attributes established list ? 1116 boolean foundAttribute = 1117 exists(fractionalConcernedAttributes, attributeType); 1118 if (canRemoveAttribute(fractionalExclusive, foundAttribute) 1119 && !newRdn.hasAttributeType(attributeType) 1120 && !modifyDNOperation.deleteOldRDN()) 1121 { 1122 /* 1123 * A forbidden attribute is in the old RDN and no more in the new RDN, 1124 * and it has not been requested to remove attributes from old RDN: 1125 * let's remove the attribute from the entry to stay consistent with 1126 * fractional configuration 1127 */ 1128 Modification modification = new Modification(ModificationType.DELETE, 1129 Attributes.empty(attributeType)); 1130 modifyDNOperation.addModification(modification); 1131 inconsistentOperation = true; 1132 } 1133 } 1134 1135 return inconsistentOperation; 1136 } 1137 1138 private boolean exists(Set<String> attrNames, AttributeType attrTypeToFind) 1139 { 1140 for (String attrName : attrNames) 1141 { 1142 if (DirectoryServer.getAttributeTypeOrNull(attrName).equals(attrTypeToFind)) 1143 { 1144 return true; 1145 } 1146 } 1147 return false; 1148 } 1149 1150 /** 1151 * Remove attributes from an entry, according to the passed fractional 1152 * configuration. The entry is represented by the 2 passed parameters. 1153 * The attributes to be removed are removed using the remove method on the 1154 * passed iterator for the attributes in the entry. 1155 * @param fractionalConfig The fractional configuration to use 1156 * @param entryRdn The rdn of the entry to add 1157 * @param classes The object classes representing the entry to modify 1158 * @param attributesMap The map of attributes/values to be potentially removed 1159 * from the entry. 1160 * @param performFiltering Tells if the effective attribute filtering should 1161 * be performed or if the call is just an analyze to see if there are some 1162 * attributes filtered by fractional configuration 1163 * @return true if the operation contains some attributes subject to filtering 1164 * by the fractional configuration 1165 */ 1166 static boolean fractionalRemoveAttributesFromEntry( 1167 FractionalConfig fractionalConfig, RDN entryRdn, 1168 Map<ObjectClass,String> classes, Map<AttributeType, 1169 List<Attribute>> attributesMap, boolean performFiltering) 1170 { 1171 boolean hasSomeAttributesToFilter = false; 1172 /* 1173 * Prepare a list of attributes to be included/excluded according to the 1174 * fractional replication configuration 1175 */ 1176 1177 Set<String> fractionalConcernedAttributes = 1178 createFractionalConcernedAttrList(fractionalConfig, classes.keySet()); 1179 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1180 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1181 { 1182 return false; // No attributes to filter 1183 } 1184 1185 // Prepare list of object classes of the added entry 1186 Set<ObjectClass> entryClasses = classes.keySet(); 1187 1188 /* 1189 * Go through the user attributes and remove those that match filtered one 1190 * - exclude mode : remove only attributes that are in 1191 * fractionalConcernedAttributes 1192 * - include mode : remove any attribute that is not in 1193 * fractionalConcernedAttributes 1194 */ 1195 List<List<Attribute>> newRdnAttrLists = new ArrayList<>(); 1196 List<AttributeType> rdnAttrTypes = new ArrayList<>(); 1197 final Set<AttributeType> attrTypes = attributesMap.keySet(); 1198 for (Iterator<AttributeType> iter = attrTypes.iterator(); iter.hasNext();) 1199 { 1200 AttributeType attributeType = iter.next(); 1201 1202 // Only optional attributes may be removed 1203 if (isMandatoryAttribute(entryClasses, attributeType) 1204 // Do not remove an attribute if it is a prohibited one 1205 || isFractionalProhibited(attributeType) 1206 || !canRemoveAttribute(attributeType, fractionalExclusive, 1207 fractionalConcernedAttributes)) 1208 { 1209 continue; 1210 } 1211 1212 if (!performFiltering) 1213 { 1214 // The call was just to check : at least one attribute to filter 1215 // found, return immediately the answer; 1216 return true; 1217 } 1218 1219 // Do not remove an attribute/value that is part of the RDN of the 1220 // entry as it is forbidden 1221 if (entryRdn.hasAttributeType(attributeType)) 1222 { 1223 /* 1224 We must remove all values of the attributes map for this 1225 attribute type but the one that has the value which is in the RDN 1226 of the entry. In fact the (underlying )attribute list does not 1227 support remove so we have to create a new list, keeping only the 1228 attribute value which is the same as in the RDN 1229 */ 1230 ByteString rdnAttributeValue = 1231 entryRdn.getAttributeValue(attributeType); 1232 List<Attribute> attrList = attributesMap.get(attributeType); 1233 ByteString sameAttrValue = null; 1234 // Locate the attribute value identical to the one in the RDN 1235 for (Attribute attr : attrList) 1236 { 1237 if (attr.contains(rdnAttributeValue)) 1238 { 1239 for (ByteString attrValue : attr) { 1240 if (rdnAttributeValue.equals(attrValue)) { 1241 // Keep the value we want 1242 sameAttrValue = attrValue; 1243 } else { 1244 hasSomeAttributesToFilter = true; 1245 } 1246 } 1247 } 1248 else 1249 { 1250 hasSomeAttributesToFilter = true; 1251 } 1252 } 1253 // Recreate the attribute list with only the RDN attribute value 1254 if (sameAttrValue != null) 1255 // Paranoia check: should never be the case as we should always 1256 // find the attribute/value pair matching the pair in the RDN 1257 { 1258 // Construct and store new attribute list 1259 newRdnAttrLists.add(Attributes.createAsList(attributeType, sameAttrValue)); 1260 /* 1261 Store matching attribute type 1262 The mapping will be done using object from rdnAttrTypes as key 1263 and object from newRdnAttrLists (at same index) as value in 1264 the user attribute map to be modified 1265 */ 1266 rdnAttrTypes.add(attributeType); 1267 } 1268 } 1269 else 1270 { 1271 // Found an attribute to remove, remove it from the list. 1272 iter.remove(); 1273 hasSomeAttributesToFilter = true; 1274 } 1275 } 1276 // Now overwrite the attribute values for the attribute types present in the 1277 // RDN, if there are some filtered attributes in the RDN 1278 for (int index = 0 ; index < rdnAttrTypes.size() ; index++) 1279 { 1280 attributesMap.put(rdnAttrTypes.get(index), newRdnAttrLists.get(index)); 1281 } 1282 return hasSomeAttributesToFilter; 1283 } 1284 1285 private static boolean isMandatoryAttribute(Set<ObjectClass> entryClasses, AttributeType attributeType) 1286 { 1287 for (ObjectClass objectClass : entryClasses) 1288 { 1289 if (objectClass.isRequired(attributeType)) 1290 { 1291 return true; 1292 } 1293 } 1294 return false; 1295 } 1296 1297 private static boolean isFractionalProhibited(AttributeType attrType) 1298 { 1299 String attributeName = attrType.getPrimaryName(); 1300 return (attributeName != null && isFractionalProhibitedAttr(attributeName)) 1301 || isFractionalProhibitedAttr(attrType.getOID()); 1302 } 1303 1304 private static boolean canRemoveAttribute(AttributeType attributeType, 1305 boolean fractionalExclusive, Set<String> fractionalConcernedAttributes) 1306 { 1307 String attributeName = attributeType.getPrimaryName(); 1308 String attributeOid = attributeType.getOID(); 1309 1310 // Is the current attribute part of the established list ? 1311 boolean foundAttribute = 1312 contains(fractionalConcernedAttributes, attributeName, attributeOid); 1313 // Now remove the attribute or modification if: 1314 // - exclusive mode and attribute is in configuration 1315 // - inclusive mode and attribute is not in configuration 1316 return canRemoveAttribute(fractionalExclusive, foundAttribute); 1317 } 1318 1319 private static boolean canRemoveAttribute(boolean fractionalExclusive, 1320 boolean foundAttribute) 1321 { 1322 return (foundAttribute && fractionalExclusive) 1323 || (!foundAttribute && !fractionalExclusive); 1324 } 1325 1326 private static boolean contains(Set<String> attrNames, String attrName, 1327 String attrOID) 1328 { 1329 return attrNames.contains(attrOID) 1330 || (attrName != null && attrNames.contains(attrName.toLowerCase())); 1331 } 1332 1333 /** 1334 * Prepares a list of attributes of interest for the fractional feature. 1335 * @param fractionalConfig The fractional configuration to use 1336 * @param entryObjectClasses The object classes of an entry on which an 1337 * operation is going to be performed. 1338 * @return The list of attributes of the entry to be excluded/included 1339 * when the operation will be performed. 1340 */ 1341 private static Set<String> createFractionalConcernedAttrList( 1342 FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses) 1343 { 1344 /* 1345 * Is the concerned entry of a type concerned by fractional replication 1346 * configuration ? If yes, add the matching attribute names to a set of 1347 * attributes to take into account for filtering 1348 * (inclusive or exclusive mode). 1349 * Using a Set to avoid duplicate attributes (from 2 inheriting classes for 1350 * instance) 1351 */ 1352 Set<String> fractionalConcernedAttributes = new HashSet<>(); 1353 1354 // Get object classes the entry matches 1355 Set<String> fractionalAllClassesAttributes = 1356 fractionalConfig.getFractionalAllClassesAttributes(); 1357 Map<String, Set<String>> fractionalSpecificClassesAttributes = 1358 fractionalConfig.getFractionalSpecificClassesAttributes(); 1359 1360 Set<String> fractionalClasses = 1361 fractionalSpecificClassesAttributes.keySet(); 1362 for (ObjectClass entryObjectClass : entryObjectClasses) 1363 { 1364 for(String fractionalClass : fractionalClasses) 1365 { 1366 if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase())) 1367 { 1368 fractionalConcernedAttributes.addAll( 1369 fractionalSpecificClassesAttributes.get(fractionalClass)); 1370 } 1371 } 1372 } 1373 1374 // Add to the set any attribute which is class independent 1375 fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes); 1376 1377 return fractionalConcernedAttributes; 1378 } 1379 1380 /** 1381 * If fractional replication is enabled, this analyzes the operation and 1382 * suppresses the forbidden attributes in it so that they are not added/ 1383 * deleted/modified in the local backend. 1384 * 1385 * @param modifyOperation The operation to modify based on fractional 1386 * replication configuration 1387 * @param performFiltering Tells if the effective attribute filtering should 1388 * be performed or if the call is just to analyze if there are some 1389 * attributes filtered by fractional configuration 1390 * @return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES, 1391 * FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES or FRACTIONAL_BECOME_NO_OP 1392 */ 1393 private int fractionalFilterOperation(PreOperationModifyOperation 1394 modifyOperation, boolean performFiltering) 1395 { 1396 /* 1397 * Prepare a list of attributes to be included/excluded according to the 1398 * fractional replication configuration 1399 */ 1400 1401 Entry modifiedEntry = modifyOperation.getCurrentEntry(); 1402 Set<String> fractionalConcernedAttributes = 1403 createFractionalConcernedAttrList(fractionalConfig, 1404 modifiedEntry.getObjectClasses().keySet()); 1405 boolean fractionalExclusive = fractionalConfig.isFractionalExclusive(); 1406 if (fractionalExclusive && fractionalConcernedAttributes.isEmpty()) 1407 { 1408 // No attributes to filter 1409 return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1410 } 1411 1412 // Prepare list of object classes of the modified entry 1413 DN entryToModifyDn = modifyOperation.getEntryDN(); 1414 Entry entryToModify; 1415 try 1416 { 1417 entryToModify = DirectoryServer.getEntry(entryToModifyDn); 1418 } 1419 catch(DirectoryException e) 1420 { 1421 logger.info(NOTE_ERR_FRACTIONAL, getBaseDN(), stackTraceToSingleLineString(e)); 1422 return FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1423 } 1424 Set<ObjectClass> entryClasses = entryToModify.getObjectClasses().keySet(); 1425 1426 /* 1427 * Now go through the attribute modifications and filter the mods according 1428 * to the fractional configuration (using the just established concerned 1429 * attributes list): 1430 * - delete attributes: remove them if regarding a filtered attribute 1431 * - add attributes: remove them if regarding a filtered attribute 1432 * - modify attributes: remove them if regarding a filtered attribute 1433 */ 1434 1435 int result = FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES; 1436 List<Modification> mods = modifyOperation.getModifications(); 1437 Iterator<Modification> modsIt = mods.iterator(); 1438 while (modsIt.hasNext()) 1439 { 1440 Modification mod = modsIt.next(); 1441 Attribute attr = mod.getAttribute(); 1442 AttributeType attrType = attr.getAttributeType(); 1443 // Fractional replication ignores operational attributes 1444 if (attrType.isOperational() 1445 || isMandatoryAttribute(entryClasses, attrType) 1446 || isFractionalProhibited(attrType) 1447 || !canRemoveAttribute(attrType, fractionalExclusive, 1448 fractionalConcernedAttributes)) 1449 { 1450 continue; 1451 } 1452 1453 if (!performFiltering) 1454 { 1455 // The call was just to check : at least one attribute to filter 1456 // found, return immediately the answer; 1457 return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; 1458 } 1459 1460 // Found a modification to remove, remove it from the list. 1461 modsIt.remove(); 1462 result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; 1463 if (mods.isEmpty()) 1464 { 1465 // This operation must become a no-op as no more modification in it 1466 return FRACTIONAL_BECOME_NO_OP; 1467 } 1468 } 1469 1470 return result; 1471 } 1472 1473 /** 1474 * This is overwritten to allow stopping the (online) import process by the 1475 * fractional ldif import plugin when it detects that the (imported) remote 1476 * data set is not consistent with the local fractional configuration. 1477 * {@inheritDoc} 1478 */ 1479 @Override 1480 protected byte[] receiveEntryBytes() 1481 { 1482 if (isFollowImport()) 1483 { 1484 // Ok, next entry is allowed to be received 1485 return super.receiveEntryBytes(); 1486 } 1487 1488 // Fractional ldif import plugin detected inconsistency between local and 1489 // remote server fractional configuration and is stopping the import 1490 // process: 1491 // This is an error termination during the import 1492 // The error is stored and the import is ended by returning null 1493 final ImportExportContext ieCtx = getImportExportContext(); 1494 LocalizableMessage msg = null; 1495 switch (importErrorMessageId) 1496 { 1497 case IMPORT_ERROR_MESSAGE_BAD_REMOTE: 1498 msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_BAD_REMOTE.get(getBaseDN(), ieCtx.getImportSource()); 1499 break; 1500 case IMPORT_ERROR_MESSAGE_REMOTE_IS_FRACTIONAL: 1501 msg = NOTE_ERR_FULL_UPDATE_IMPORT_FRACTIONAL_REMOTE_IS_FRACTIONAL.get(getBaseDN(), ieCtx.getImportSource()); 1502 break; 1503 } 1504 ieCtx.setException(new DirectoryException(UNWILLING_TO_PERFORM, msg)); 1505 return null; 1506 } 1507 1508 /** 1509 * This is overwritten to allow stopping the (online) export process if the 1510 * local domain is fractional and the destination is all other servers: 1511 * This make no sense to have only fractional servers in a replicated 1512 * topology. This prevents from administrator manipulation error that would 1513 * lead to whole topology data corruption. 1514 * {@inheritDoc} 1515 */ 1516 @Override 1517 protected void initializeRemote(int target, int requestorID, 1518 Task initTask, int initWindow) throws DirectoryException 1519 { 1520 if (target == RoutableMsg.ALL_SERVERS && fractionalConfig.isFractional()) 1521 { 1522 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_FULL_UPDATE_FRACTIONAL.get(getBaseDN(), getServerId()); 1523 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, msg); 1524 } 1525 1526 super.initializeRemote(target, requestorID, initTask, initWindow); 1527 } 1528 1529 /** 1530 * Implement the handleConflictResolution phase of the deleteOperation. 1531 * 1532 * @param deleteOperation The deleteOperation. 1533 * @return A SynchronizationProviderResult indicating if the operation 1534 * can continue. 1535 */ 1536 SynchronizationProviderResult handleConflictResolution( 1537 PreOperationDeleteOperation deleteOperation) 1538 { 1539 if (!deleteOperation.isSynchronizationOperation() && !brokerIsConnected()) 1540 { 1541 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1542 return new SynchronizationProviderResult.StopProcessing( 1543 ResultCode.UNWILLING_TO_PERFORM, msg); 1544 } 1545 1546 DeleteContext ctx = 1547 (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT); 1548 Entry deletedEntry = deleteOperation.getEntryToDelete(); 1549 1550 if (ctx != null) 1551 { 1552 /* 1553 * This is a replication operation 1554 * Check that the modified entry has the same entryuuid 1555 * as it was in the original message. 1556 */ 1557 String operationEntryUUID = ctx.getEntryUUID(); 1558 String deletedEntryUUID = getEntryUUID(deletedEntry); 1559 if (!operationEntryUUID.equals(deletedEntryUUID)) 1560 { 1561 /* 1562 * The changes entry is not the same entry as the one on 1563 * the original change was performed. 1564 * Probably the original entry was renamed and replaced with 1565 * another entry. 1566 * We must not let the change proceed, return a negative 1567 * result and set the result code to NO_SUCH_OBJECT. 1568 * When the operation will return, the thread that started the operation 1569 * will try to find the correct entry and restart a new operation. 1570 */ 1571 return new SynchronizationProviderResult.StopProcessing( 1572 ResultCode.NO_SUCH_OBJECT, null); 1573 } 1574 } 1575 else 1576 { 1577 // There is no replication context attached to the operation 1578 // so this is not a replication operation. 1579 CSN csn = generateCSN(deleteOperation); 1580 String modifiedEntryUUID = getEntryUUID(deletedEntry); 1581 ctx = new DeleteContext(csn, modifiedEntryUUID); 1582 deleteOperation.setAttachment(SYNCHROCONTEXT, ctx); 1583 1584 synchronized (replayOperations) 1585 { 1586 int size = replayOperations.size(); 1587 if (size >= 10000) 1588 { 1589 replayOperations.remove(replayOperations.firstKey()); 1590 } 1591 FakeOperation op = new FakeDelOperation( 1592 deleteOperation.getEntryDN(), csn, modifiedEntryUUID); 1593 replayOperations.put(csn, op); 1594 } 1595 } 1596 1597 return new SynchronizationProviderResult.ContinueProcessing(); 1598 } 1599 1600 /** 1601 * Implement the handleConflictResolution phase of the addOperation. 1602 * 1603 * @param addOperation The AddOperation. 1604 * @return A SynchronizationProviderResult indicating if the operation 1605 * can continue. 1606 */ 1607 SynchronizationProviderResult handleConflictResolution( 1608 PreOperationAddOperation addOperation) 1609 { 1610 if (!addOperation.isSynchronizationOperation() && !brokerIsConnected()) 1611 { 1612 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1613 return new SynchronizationProviderResult.StopProcessing( 1614 ResultCode.UNWILLING_TO_PERFORM, msg); 1615 } 1616 1617 if (fractionalConfig.isFractional()) 1618 { 1619 if (addOperation.isSynchronizationOperation()) 1620 { 1621 /* 1622 * Filter attributes here for fractional replication. If fractional 1623 * replication is enabled, we analyze the operation to suppress the 1624 * forbidden attributes in it so that they are not added in the local 1625 * backend. This must be called before any other plugin is called, to 1626 * keep coherency across plugin calls. 1627 */ 1628 fractionalFilterOperation(addOperation, true); 1629 } 1630 else 1631 { 1632 /* 1633 * Direct access from an LDAP client : if some attributes are to be 1634 * removed according to the fractional configuration, simply forbid 1635 * the operation 1636 */ 1637 if (fractionalFilterOperation(addOperation, false)) 1638 { 1639 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), addOperation); 1640 return new SynchronizationProviderResult.StopProcessing( 1641 ResultCode.UNWILLING_TO_PERFORM, msg); 1642 } 1643 } 1644 } 1645 1646 if (addOperation.isSynchronizationOperation()) 1647 { 1648 AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT); 1649 /* 1650 * If an entry with the same entry uniqueID already exist then 1651 * this operation has already been replayed in the past. 1652 */ 1653 String uuid = ctx.getEntryUUID(); 1654 if (findEntryDN(uuid) != null) 1655 { 1656 return new SynchronizationProviderResult.StopProcessing( 1657 ResultCode.NO_OPERATION, null); 1658 } 1659 1660 /* The parent entry may have been renamed here since the change was done 1661 * on the first server, and another entry have taken the former dn 1662 * of the parent entry 1663 */ 1664 1665 String parentEntryUUID = ctx.getParentEntryUUID(); 1666 // root entry have no parent, there is no need to check for it. 1667 if (parentEntryUUID != null) 1668 { 1669 // There is a potential of perfs improvement here 1670 // if we could avoid the following parent entry retrieval 1671 DN parentDnFromCtx = findEntryDN(ctx.getParentEntryUUID()); 1672 if (parentDnFromCtx == null) 1673 { 1674 // The parent does not exist with the specified unique id 1675 // stop the operation with NO_SUCH_OBJECT and let the 1676 // conflict resolution or the dependency resolution solve this. 1677 return new SynchronizationProviderResult.StopProcessing( 1678 ResultCode.NO_SUCH_OBJECT, null); 1679 } 1680 1681 DN entryDN = addOperation.getEntryDN(); 1682 DN parentDnFromEntryDn = entryDN.getParentDNInSuffix(); 1683 if (parentDnFromEntryDn != null 1684 && !parentDnFromCtx.equals(parentDnFromEntryDn)) 1685 { 1686 // parentEntry has been renamed 1687 // replication name conflict resolution is expected to fix that 1688 // later in the flow 1689 return new SynchronizationProviderResult.StopProcessing( 1690 ResultCode.NO_SUCH_OBJECT, null); 1691 } 1692 } 1693 } 1694 return new SynchronizationProviderResult.ContinueProcessing(); 1695 } 1696 1697 /** 1698 * Check that the broker associated to this ReplicationDomain has found 1699 * a Replication Server and that this LDAP server is therefore able to 1700 * process operations. 1701 * If not, set the ResultCode, the response message, 1702 * interrupt the operation, and return false 1703 * 1704 * @return true when it OK to process the Operation, false otherwise. 1705 * When false is returned the resultCode and the response message 1706 * is also set in the Operation. 1707 */ 1708 private boolean brokerIsConnected() 1709 { 1710 final IsolationPolicy isolationPolicy = config.getIsolationPolicy(); 1711 if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) 1712 { 1713 // this policy imply that we always accept updates. 1714 return true; 1715 } 1716 if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)) 1717 { 1718 // this isolation policy specifies that the updates are denied 1719 // when the broker had problems during the connection phase 1720 // Updates are still accepted if the broker is currently connecting.. 1721 return !hasConnectionError(); 1722 } 1723 // we should never get there as the only possible policies are 1724 // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES 1725 return true; 1726 } 1727 1728 /** 1729 * Implement the handleConflictResolution phase of the ModifyDNOperation. 1730 * 1731 * @param modifyDNOperation The ModifyDNOperation. 1732 * @return A SynchronizationProviderResult indicating if the operation 1733 * can continue. 1734 */ 1735 SynchronizationProviderResult handleConflictResolution( 1736 PreOperationModifyDNOperation modifyDNOperation) 1737 { 1738 if (!modifyDNOperation.isSynchronizationOperation() && !brokerIsConnected()) 1739 { 1740 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1741 return new SynchronizationProviderResult.StopProcessing( 1742 ResultCode.UNWILLING_TO_PERFORM, msg); 1743 } 1744 1745 if (fractionalConfig.isFractional()) 1746 { 1747 if (modifyDNOperation.isSynchronizationOperation()) 1748 { 1749 /* 1750 * Filter operation here for fractional replication. If fractional 1751 * replication is enabled, we analyze the operation and modify it if 1752 * necessary to stay consistent with what is defined in fractional 1753 * configuration. 1754 */ 1755 fractionalFilterOperation(modifyDNOperation, true); 1756 } 1757 else 1758 { 1759 /* 1760 * Direct access from an LDAP client : something is inconsistent with 1761 * the fractional configuration, forbid the operation. 1762 */ 1763 if (fractionalFilterOperation(modifyDNOperation, false)) 1764 { 1765 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyDNOperation); 1766 return new SynchronizationProviderResult.StopProcessing( 1767 ResultCode.UNWILLING_TO_PERFORM, msg); 1768 } 1769 } 1770 } 1771 1772 ModifyDnContext ctx = 1773 (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT); 1774 if (ctx != null) 1775 { 1776 /* 1777 * This is a replication operation 1778 * Check that the modified entry has the same entryuuid 1779 * as was in the original message. 1780 */ 1781 final String modifiedEntryUUID = 1782 getEntryUUID(modifyDNOperation.getOriginalEntry()); 1783 if (!modifiedEntryUUID.equals(ctx.getEntryUUID())) 1784 { 1785 /* 1786 * The modified entry is not the same entry as the one on 1787 * the original change was performed. 1788 * Probably the original entry was renamed and replaced with 1789 * another entry. 1790 * We must not let the change proceed, return a negative 1791 * result and set the result code to NO_SUCH_OBJECT. 1792 * When the operation will return, the thread that started the operation 1793 * will try to find the correct entry and restart a new operation. 1794 */ 1795 return new SynchronizationProviderResult.StopProcessing( 1796 ResultCode.NO_SUCH_OBJECT, null); 1797 } 1798 1799 if (modifyDNOperation.getNewSuperior() != null) 1800 { 1801 /* 1802 * Also check that the current id of the 1803 * parent is the same as when the operation was performed. 1804 */ 1805 String newParentId = findEntryUUID(modifyDNOperation.getNewSuperior()); 1806 if (newParentId != null && ctx.getNewSuperiorEntryUUID() != null 1807 && !newParentId.equals(ctx.getNewSuperiorEntryUUID())) 1808 { 1809 return new SynchronizationProviderResult.StopProcessing( 1810 ResultCode.NO_SUCH_OBJECT, null); 1811 } 1812 } 1813 1814 /* 1815 * If the object has been renamed more recently than this 1816 * operation, cancel the operation. 1817 */ 1818 EntryHistorical hist = EntryHistorical.newInstanceFromEntry( 1819 modifyDNOperation.getOriginalEntry()); 1820 if (hist.addedOrRenamedAfter(ctx.getCSN())) 1821 { 1822 return new SynchronizationProviderResult.StopProcessing( 1823 ResultCode.NO_OPERATION, null); 1824 } 1825 } 1826 else 1827 { 1828 // There is no replication context attached to the operation 1829 // so this is not a replication operation. 1830 CSN csn = generateCSN(modifyDNOperation); 1831 String newParentId = null; 1832 if (modifyDNOperation.getNewSuperior() != null) 1833 { 1834 newParentId = findEntryUUID(modifyDNOperation.getNewSuperior()); 1835 } 1836 1837 Entry modifiedEntry = modifyDNOperation.getOriginalEntry(); 1838 String modifiedEntryUUID = getEntryUUID(modifiedEntry); 1839 ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId); 1840 modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx); 1841 } 1842 return new SynchronizationProviderResult.ContinueProcessing(); 1843 } 1844 1845 /** 1846 * Handle the conflict resolution. 1847 * Called by the core server after locking the entry and before 1848 * starting the actual modification. 1849 * @param modifyOperation the operation 1850 * @return code indicating is operation must proceed 1851 */ 1852 SynchronizationProviderResult handleConflictResolution( 1853 PreOperationModifyOperation modifyOperation) 1854 { 1855 if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected()) 1856 { 1857 LocalizableMessage msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(getBaseDN()); 1858 return new SynchronizationProviderResult.StopProcessing( 1859 ResultCode.UNWILLING_TO_PERFORM, msg); 1860 } 1861 1862 if (fractionalConfig.isFractional()) 1863 { 1864 if (modifyOperation.isSynchronizationOperation()) 1865 { 1866 /* 1867 * Filter attributes here for fractional replication. If fractional 1868 * replication is enabled, we analyze the operation and modify it so 1869 * that no forbidden attribute is added/modified/deleted in the local 1870 * backend. This must be called before any other plugin is called, to 1871 * keep coherency across plugin calls. 1872 */ 1873 if (fractionalFilterOperation(modifyOperation, true) == 1874 FRACTIONAL_BECOME_NO_OP) 1875 { 1876 // Every modifications filtered in this operation: the operation 1877 // becomes a no-op 1878 return new SynchronizationProviderResult.StopProcessing( 1879 ResultCode.NO_OPERATION, null); 1880 } 1881 } 1882 else 1883 { 1884 /* 1885 * Direct access from an LDAP client : if some attributes are to be 1886 * removed according to the fractional configuration, simply forbid 1887 * the operation 1888 */ 1889 switch(fractionalFilterOperation(modifyOperation, false)) 1890 { 1891 case FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES: 1892 // Ok, let the operation happen 1893 break; 1894 case FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES: 1895 // Some attributes not compliant with fractional configuration : 1896 // forbid the operation 1897 LocalizableMessage msg = NOTE_ERR_FRACTIONAL_FORBIDDEN_OPERATION.get(getBaseDN(), modifyOperation); 1898 return new SynchronizationProviderResult.StopProcessing( 1899 ResultCode.UNWILLING_TO_PERFORM, msg); 1900 } 1901 } 1902 } 1903 1904 ModifyContext ctx = 1905 (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT); 1906 1907 Entry modifiedEntry = modifyOperation.getModifiedEntry(); 1908 if (ctx == null) 1909 { 1910 // No replication ctx attached => not a replicated operation 1911 // - create a ctx with : CSN, entryUUID 1912 // - attach the context to the op 1913 1914 CSN csn = generateCSN(modifyOperation); 1915 ctx = new ModifyContext(csn, getEntryUUID(modifiedEntry)); 1916 1917 modifyOperation.setAttachment(SYNCHROCONTEXT, ctx); 1918 } 1919 else 1920 { 1921 // Replication ctx attached => this is a replicated operation being 1922 // replayed here, it is necessary to 1923 // - check if the entry has been renamed 1924 // - check for conflicts 1925 String modifiedEntryUUID = ctx.getEntryUUID(); 1926 String currentEntryUUID = getEntryUUID(modifiedEntry); 1927 if (currentEntryUUID != null 1928 && !currentEntryUUID.equals(modifiedEntryUUID)) 1929 { 1930 /* 1931 * The current modified entry is not the same entry as the one on 1932 * the original modification was performed. 1933 * Probably the original entry was renamed and replaced with 1934 * another entry. 1935 * We must not let the modification proceed, return a negative 1936 * result and set the result code to NO_SUCH_OBJECT. 1937 * When the operation will return, the thread that started the 1938 * operation will try to find the correct entry and restart a new 1939 * operation. 1940 */ 1941 return new SynchronizationProviderResult.StopProcessing( 1942 ResultCode.NO_SUCH_OBJECT, null); 1943 } 1944 1945 // Solve the conflicts between modify operations 1946 EntryHistorical historicalInformation = 1947 EntryHistorical.newInstanceFromEntry(modifiedEntry); 1948 modifyOperation.setAttachment(EntryHistorical.HISTORICAL, 1949 historicalInformation); 1950 1951 if (historicalInformation.replayOperation(modifyOperation, modifiedEntry)) 1952 { 1953 numResolvedModifyConflicts.incrementAndGet(); 1954 } 1955 } 1956 return new SynchronizationProviderResult.ContinueProcessing(); 1957 } 1958 1959 /** 1960 * The preOperation phase for the add Operation. 1961 * Its job is to generate the replication context associated to the 1962 * operation. It is necessary to do it in this phase because contrary to 1963 * the other operations, the entry UUID is not set when the handleConflict 1964 * phase is called. 1965 * 1966 * @param addOperation The Add Operation. 1967 */ 1968 void doPreOperation(PreOperationAddOperation addOperation) 1969 { 1970 final CSN csn = generateCSN(addOperation); 1971 final String entryUUID = getEntryUUID(addOperation); 1972 final AddContext ctx = new AddContext(csn, entryUUID, 1973 findEntryUUID(addOperation.getEntryDN().getParentDNInSuffix())); 1974 addOperation.setAttachment(SYNCHROCONTEXT, ctx); 1975 } 1976 1977 @Override 1978 public void publishReplicaOfflineMsg() 1979 { 1980 pendingChanges.putReplicaOfflineMsg(); 1981 dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN()); 1982 } 1983 1984 /** 1985 * Check if an operation must be synchronized. 1986 * Also update the list of pending changes and the server RUV 1987 * @param op the operation 1988 */ 1989 void synchronize(PostOperationOperation op) 1990 { 1991 ResultCode result = op.getResultCode(); 1992 // Note that a failed non-replication operation might not have a change 1993 // number. 1994 CSN curCSN = OperationContext.getCSN(op); 1995 if (curCSN != null && config.isLogChangenumber()) 1996 { 1997 op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(), 1998 "replicationCSN", curCSN)); 1999 } 2000 2001 if (result == ResultCode.SUCCESS) 2002 { 2003 if (op.isSynchronizationOperation()) 2004 { // Replaying a sync operation 2005 numReplayedPostOpCalled.incrementAndGet(); 2006 try 2007 { 2008 remotePendingChanges.commit(curCSN); 2009 } 2010 catch (NoSuchElementException e) 2011 { 2012 logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN); 2013 return; 2014 } 2015 } 2016 else 2017 { 2018 // Generate a replication message for a successful non-replication 2019 // operation. 2020 LDAPUpdateMsg msg = LDAPUpdateMsg.generateMsg(op); 2021 2022 if (msg == null) 2023 { 2024 /* 2025 * This is an operation type that we do not know about 2026 * It should never happen. 2027 */ 2028 pendingChanges.remove(curCSN); 2029 logger.error(ERR_UNKNOWN_TYPE, op.getOperationType()); 2030 return; 2031 } 2032 2033 addEntryAttributesForCL(msg,op); 2034 2035 // If assured replication is configured, this will prepare blocking 2036 // mechanism. If assured replication is disabled, this returns 2037 // immediately 2038 prepareWaitForAckIfAssuredEnabled(msg); 2039 try 2040 { 2041 msg.encode(); 2042 pendingChanges.commitAndPushCommittedChanges(curCSN, msg); 2043 } 2044 catch (NoSuchElementException e) 2045 { 2046 logger.error(ERR_OPERATION_NOT_FOUND_IN_PENDING, op, curCSN); 2047 return; 2048 } 2049 // If assured replication is enabled, this will wait for the matching 2050 // ack or time out. If assured replication is disabled, this returns 2051 // immediately 2052 try 2053 { 2054 waitForAckIfAssuredEnabled(msg); 2055 } catch (TimeoutException ex) 2056 { 2057 // This exception may only be raised if assured replication is enabled 2058 logger.info(NOTE_DS_ACK_TIMEOUT, getBaseDN(), getAssuredTimeout(), msg); 2059 } 2060 } 2061 2062 /* 2063 * If the operation is a DELETE on the base entry of the suffix 2064 * that is replicated, the generation is now lost because the 2065 * DB is empty. We need to save it again the next time we add an entry. 2066 */ 2067 if (OperationType.DELETE.equals(op.getOperationType()) 2068 && ((PostOperationDeleteOperation) op) 2069 .getEntryDN().equals(getBaseDN())) 2070 { 2071 generationIdSavedStatus = false; 2072 } 2073 2074 if (!generationIdSavedStatus) 2075 { 2076 saveGenerationId(generationId); 2077 } 2078 } 2079 else if (!op.isSynchronizationOperation() && curCSN != null) 2080 { 2081 // Remove an unsuccessful non-replication operation from the pending 2082 // changes list. 2083 pendingChanges.remove(curCSN); 2084 pendingChanges.pushCommittedChanges(); 2085 } 2086 2087 checkForClearedConflict(op); 2088 } 2089 2090 /** 2091 * Check if the operation that just happened has cleared a conflict : 2092 * Clearing a conflict happens if the operation has free a DN that 2093 * for which an other entry was in conflict. 2094 * Steps: 2095 * - get the DN freed by a DELETE or MODRDN op 2096 * - search for entries put in the conflict space (dn=entryUUID'+'....) 2097 * because the expected DN was not available (ds-sync-conflict=expected DN) 2098 * - retain the entry with the oldest conflict 2099 * - rename this entry with the freedDN as it was expected originally 2100 */ 2101 private void checkForClearedConflict(PostOperationOperation op) 2102 { 2103 OperationType type = op.getOperationType(); 2104 if (op.getResultCode() != ResultCode.SUCCESS) 2105 { 2106 // those operations cannot have cleared a conflict 2107 return; 2108 } 2109 2110 DN freedDN; 2111 if (type == OperationType.DELETE) 2112 { 2113 freedDN = ((PostOperationDeleteOperation) op).getEntryDN(); 2114 } 2115 else if (type == OperationType.MODIFY_DN) 2116 { 2117 freedDN = ((PostOperationModifyDNOperation) op).getEntryDN(); 2118 } 2119 else 2120 { 2121 return; 2122 } 2123 2124 SearchFilter filter; 2125 try 2126 { 2127 filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT, 2128 ByteString.valueOfUtf8(freedDN.toString())).toSearchFilter(); 2129 } 2130 catch (DirectoryException e) 2131 { 2132 // can not happen? 2133 logger.traceException(e); 2134 return; 2135 } 2136 2137 SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter) 2138 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 2139 InternalSearchOperation searchOp = conn.processSearch(request); 2140 2141 Entry entryToRename = null; 2142 CSN entryToRenameCSN = null; 2143 for (SearchResultEntry entry : searchOp.getSearchEntries()) 2144 { 2145 EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry); 2146 if (entryToRename == null) 2147 { 2148 entryToRename = entry; 2149 entryToRenameCSN = history.getDNDate(); 2150 } 2151 else if (!history.addedOrRenamedAfter(entryToRenameCSN)) 2152 { 2153 // this conflict is older than the previous, keep it. 2154 entryToRename = entry; 2155 entryToRenameCSN = history.getDNDate(); 2156 } 2157 } 2158 2159 if (entryToRename != null) 2160 { 2161 DN entryDN = entryToRename.getName(); 2162 ModifyDNOperation newOp = renameEntry( 2163 entryDN, freedDN.rdn(), freedDN.parent(), false); 2164 2165 ResultCode res = newOp.getResultCode(); 2166 if (res != ResultCode.SUCCESS) 2167 { 2168 logger.error(ERR_COULD_NOT_SOLVE_CONFLICT, entryDN, res); 2169 } 2170 } 2171 } 2172 2173 /** 2174 * Rename an Entry Using a synchronization, non-replicated operation. 2175 * This method should be used instead of the InternalConnection methods 2176 * when the operation that need to be run must be local only and therefore 2177 * not replicated to the RS. 2178 * 2179 * @param targetDN The DN of the entry to rename. 2180 * @param newRDN The new RDN to be used. 2181 * @param parentDN The parentDN to be used. 2182 * @param markConflict A boolean indicating is this entry should be marked 2183 * as a conflicting entry. In such case the 2184 * DS_SYNC_CONFLICT attribute will be added to the entry 2185 * with the value of its original DN. 2186 * If false, the DS_SYNC_CONFLICT attribute will be 2187 * cleared. 2188 * 2189 * @return The operation that was run to rename the entry. 2190 */ 2191 private ModifyDNOperation renameEntry(DN targetDN, RDN newRDN, DN parentDN, 2192 boolean markConflict) 2193 { 2194 ModifyDNOperation newOp = new ModifyDNOperationBasis( 2195 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 2196 targetDN, newRDN, false, parentDN); 2197 2198 if (markConflict) 2199 { 2200 Attribute attr = Attributes.create(DS_SYNC_CONFLICT, targetDN.toString()); 2201 newOp.addModification(new Modification(ModificationType.REPLACE, attr)); 2202 } 2203 else 2204 { 2205 Attribute attr = Attributes.empty(DS_SYNC_CONFLICT); 2206 newOp.addModification(new Modification(ModificationType.DELETE, attr)); 2207 } 2208 2209 runAsSynchronizedOperation(newOp); 2210 return newOp; 2211 } 2212 2213 private void runAsSynchronizedOperation(Operation op) 2214 { 2215 op.setInternalOperation(true); 2216 op.setSynchronizationOperation(true); 2217 op.setDontSynchronize(true); 2218 op.run(); 2219 } 2220 2221 /** Delete this ReplicationDomain. */ 2222 void delete() 2223 { 2224 shutdown(); 2225 removeECLDomainCfg(); 2226 } 2227 2228 /** Shutdown this ReplicationDomain. */ 2229 public void shutdown() 2230 { 2231 if (shutdown.compareAndSet(false, true)) 2232 { 2233 final RSUpdater rsUpdater = this.rsUpdater.get(); 2234 if (rsUpdater != null) 2235 { 2236 rsUpdater.initiateShutdown(); 2237 } 2238 2239 // stop the thread in charge of flushing the ServerState. 2240 if (flushThread != null) 2241 { 2242 flushThread.initiateShutdown(); 2243 synchronized (flushThread) 2244 { 2245 flushThread.notify(); 2246 } 2247 } 2248 2249 DirectoryServer.deregisterAlertGenerator(this); 2250 DirectoryServer.deregisterBackendInitializationListener(this); 2251 DirectoryServer.deregisterShutdownListener(this); 2252 2253 // stop the ReplicationDomain 2254 disableService(); 2255 } 2256 2257 // wait for completion of the ServerStateFlush thread. 2258 try 2259 { 2260 while (!done) 2261 { 2262 Thread.sleep(50); 2263 } 2264 } catch (InterruptedException e) 2265 { 2266 Thread.currentThread().interrupt(); 2267 } 2268 } 2269 2270 /** 2271 * Create and replay a synchronized Operation from an UpdateMsg. 2272 * 2273 * @param msg 2274 * The UpdateMsg to be replayed. 2275 * @param shutdown 2276 * whether the server initiated shutdown 2277 */ 2278 void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown) 2279 { 2280 // Try replay the operation, then flush (replaying) any pending operation 2281 // whose dependency has been replayed until no more left. 2282 do 2283 { 2284 Operation op = null; // the last operation on which replay was attempted 2285 boolean dependency = false; 2286 String replayErrorMsg = null; 2287 CSN csn = null; 2288 try 2289 { 2290 // The next operation for which to attempt replay. 2291 // This local variable allow to keep error messages in the "op" local 2292 // variable until the next loop iteration starts. 2293 // "op" is already initialized to the next Operation because of the 2294 // error handling paths. 2295 Operation nextOp = op = msg.createOperation(conn); 2296 dependency = remotePendingChanges.checkDependencies(op, msg); 2297 2298 boolean replayDone = false; 2299 int retryCount = 10; 2300 while (!dependency && !replayDone && retryCount-- > 0) 2301 { 2302 if (shutdown.get()) 2303 { 2304 // shutdown initiated, let's leave 2305 return; 2306 } 2307 // Try replay the operation 2308 op = nextOp; 2309 op.setInternalOperation(true); 2310 op.setSynchronizationOperation(true); 2311 2312 // Always add the ManageDSAIT control so that updates to referrals 2313 // are processed locally. 2314 op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL)); 2315 2316 csn = OperationContext.getCSN(op); 2317 op.run(); 2318 2319 ResultCode result = op.getResultCode(); 2320 2321 if (result != ResultCode.SUCCESS) 2322 { 2323 if (result == ResultCode.NO_OPERATION) 2324 { 2325 // Pre-operation conflict resolution detected that the operation 2326 // was a no-op. For example, an add which has already been 2327 // replayed, or a modify DN operation on an entry which has been 2328 // renamed by a more recent modify DN. 2329 replayDone = true; 2330 } 2331 else if (result == ResultCode.BUSY) 2332 { 2333 /* 2334 * We probably could not get a lock (OPENDJ-885). Give the server 2335 * another chance to process this operation immediately. 2336 */ 2337 Thread.yield(); 2338 continue; 2339 } 2340 else if (result == ResultCode.UNAVAILABLE) 2341 { 2342 /* 2343 * It can happen when a rebuild is performed or the backend is 2344 * offline (OPENDJ-49). Give the server another chance to process 2345 * this operation after some time. 2346 */ 2347 Thread.sleep(50); 2348 continue; 2349 } 2350 else if (op instanceof ModifyOperation) 2351 { 2352 ModifyOperation castOp = (ModifyOperation) op; 2353 dependency = remotePendingChanges.checkDependencies(castOp); 2354 ModifyMsg modifyMsg = (ModifyMsg) msg; 2355 replayDone = solveNamingConflict(castOp, modifyMsg); 2356 } 2357 else if (op instanceof DeleteOperation) 2358 { 2359 DeleteOperation castOp = (DeleteOperation) op; 2360 dependency = remotePendingChanges.checkDependencies(castOp); 2361 replayDone = solveNamingConflict(castOp, msg); 2362 } 2363 else if (op instanceof AddOperation) 2364 { 2365 AddOperation castOp = (AddOperation) op; 2366 AddMsg addMsg = (AddMsg) msg; 2367 dependency = remotePendingChanges.checkDependencies(castOp); 2368 replayDone = solveNamingConflict(castOp, addMsg); 2369 } 2370 else if (op instanceof ModifyDNOperation) 2371 { 2372 ModifyDNOperation castOp = (ModifyDNOperation) op; 2373 replayDone = solveNamingConflict(castOp, msg); 2374 } 2375 else 2376 { 2377 replayDone = true; // unknown type of operation ?! 2378 } 2379 2380 if (replayDone) 2381 { 2382 // the update became a dummy update and the result 2383 // of the conflict resolution phase is to do nothing. 2384 // however we still need to push this change to the serverState 2385 updateError(csn); 2386 } 2387 else 2388 { 2389 /* 2390 * Create a new operation reflecting the new state of the 2391 * UpdateMsg after conflict resolution modified it. 2392 * Note: When msg is a DeleteMsg, the DeleteOperation is properly 2393 * created with subtreeDelete request control when needed. 2394 */ 2395 nextOp = msg.createOperation(conn); 2396 } 2397 } 2398 else 2399 { 2400 replayDone = true; 2401 } 2402 } 2403 2404 if (!replayDone && !dependency) 2405 { 2406 // Continue with the next change but the servers could now become 2407 // inconsistent. 2408 // Let the repair tool know about this. 2409 final LocalizableMessage message = ERR_LOOP_REPLAYING_OPERATION.get( 2410 op, op.getErrorMessage()); 2411 logger.error(message); 2412 numUnresolvedNamingConflicts.incrementAndGet(); 2413 replayErrorMsg = message.toString(); 2414 updateError(csn); 2415 } 2416 } catch (DecodeException | LDAPException | DataFormatException e) 2417 { 2418 replayErrorMsg = logDecodingOperationError(msg, e); 2419 } catch (Exception e) 2420 { 2421 if (csn != null) 2422 { 2423 /* 2424 * An Exception happened during the replay process. 2425 * Continue with the next change but the servers will now start 2426 * to be inconsistent. 2427 * Let the repair tool know about this. 2428 */ 2429 LocalizableMessage message = 2430 ERR_EXCEPTION_REPLAYING_OPERATION.get( 2431 stackTraceToSingleLineString(e), op); 2432 logger.error(message); 2433 replayErrorMsg = message.toString(); 2434 updateError(csn); 2435 } else 2436 { 2437 replayErrorMsg = logDecodingOperationError(msg, e); 2438 } 2439 } finally 2440 { 2441 if (!dependency) 2442 { 2443 processUpdateDone(msg, replayErrorMsg); 2444 } 2445 } 2446 2447 // Now replay any pending update that had a dependency and whose 2448 // dependency has been replayed, do that until no more updates of that 2449 // type left... 2450 msg = remotePendingChanges.getNextUpdate(); 2451 } while (msg != null); 2452 } 2453 2454 private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e) 2455 { 2456 LocalizableMessage message = 2457 ERR_EXCEPTION_DECODING_OPERATION.get(msg + " " + stackTraceToSingleLineString(e)); 2458 logger.error(message); 2459 return message.toString(); 2460 } 2461 2462 /** 2463 * This method is called when an error happens while replaying 2464 * an operation. 2465 * It is necessary because the postOperation does not always get 2466 * called when error or Exceptions happen during the operation replay. 2467 * 2468 * @param csn the CSN of the operation with error. 2469 */ 2470 private void updateError(CSN csn) 2471 { 2472 try 2473 { 2474 remotePendingChanges.commit(csn); 2475 } 2476 catch (NoSuchElementException e) 2477 { 2478 // A failure occurred after the change had been removed from the pending 2479 // changes table. 2480 if (logger.isTraceEnabled()) 2481 { 2482 logger.trace( 2483 "LDAPReplicationDomain.updateError: Unable to find remote " 2484 + "pending change for CSN %s", csn); 2485 } 2486 } 2487 } 2488 2489 /** 2490 * Generate a new CSN and insert it in the pending list. 2491 * 2492 * @param operation 2493 * The operation for which the CSN must be generated. 2494 * @return The new CSN. 2495 */ 2496 private CSN generateCSN(PluginOperation operation) 2497 { 2498 return pendingChanges.putLocalOperation(operation); 2499 } 2500 2501 /** 2502 * Find the Unique Id of the entry with the provided DN by doing a 2503 * search of the entry and extracting its entryUUID from its attributes. 2504 * 2505 * @param dn The dn of the entry for which the unique Id is searched. 2506 * 2507 * @return The unique Id of the entry with the provided DN. 2508 */ 2509 static String findEntryUUID(DN dn) 2510 { 2511 if (dn == null) 2512 { 2513 return null; 2514 } 2515 final SearchRequest request = newSearchRequest(dn, SearchScope.BASE_OBJECT) 2516 .addAttribute(ENTRYUUID_ATTRIBUTE_NAME); 2517 final InternalSearchOperation search = getRootConnection().processSearch(request); 2518 final SearchResultEntry resultEntry = getFirstResult(search); 2519 if (resultEntry != null) 2520 { 2521 return getEntryUUID(resultEntry); 2522 } 2523 return null; 2524 } 2525 2526 private static SearchResultEntry getFirstResult(InternalSearchOperation search) 2527 { 2528 if (search.getResultCode() == ResultCode.SUCCESS) 2529 { 2530 final LinkedList<SearchResultEntry> results = search.getSearchEntries(); 2531 if (!results.isEmpty()) 2532 { 2533 return results.getFirst(); 2534 } 2535 } 2536 return null; 2537 } 2538 2539 /** 2540 * Find the current DN of an entry from its entry UUID. 2541 * 2542 * @param uuid the Entry Unique ID. 2543 * @return The current DN of the entry or null if there is no entry with 2544 * the specified UUID. 2545 */ 2546 private DN findEntryDN(String uuid) 2547 { 2548 try 2549 { 2550 final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, "entryuuid=" + uuid); 2551 InternalSearchOperation search = conn.processSearch(request); 2552 final SearchResultEntry resultEntry = getFirstResult(search); 2553 if (resultEntry != null) 2554 { 2555 return resultEntry.getName(); 2556 } 2557 } 2558 catch (DirectoryException e) 2559 { 2560 // never happens because the filter is always valid. 2561 } 2562 return null; 2563 } 2564 2565 /** 2566 * Solve a conflict detected when replaying a modify operation. 2567 * 2568 * @param op The operation that triggered the conflict detection. 2569 * @param msg The operation that triggered the conflict detection. 2570 * @return true if the process is completed, false if it must continue.. 2571 */ 2572 private boolean solveNamingConflict(ModifyOperation op, ModifyMsg msg) 2573 { 2574 ResultCode result = op.getResultCode(); 2575 ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT); 2576 String entryUUID = ctx.getEntryUUID(); 2577 2578 if (result == ResultCode.NO_SUCH_OBJECT) 2579 { 2580 /* 2581 * The operation is a modification but 2582 * the entry has been renamed on a different master in the same time. 2583 * search if the entry has been renamed, and return the new dn 2584 * of the entry. 2585 */ 2586 DN newDN = findEntryDN(entryUUID); 2587 if (newDN != null) 2588 { 2589 // There is an entry with the same unique id as this modify operation 2590 // replay the modify using the current dn of this entry. 2591 msg.setDN(newDN); 2592 numResolvedNamingConflicts.incrementAndGet(); 2593 return false; 2594 } 2595 else 2596 { 2597 // This entry does not exist anymore. 2598 // It has probably been deleted, stop the processing of this operation 2599 numResolvedNamingConflicts.incrementAndGet(); 2600 return true; 2601 } 2602 } 2603 else if (result == ResultCode.NOT_ALLOWED_ON_RDN) 2604 { 2605 DN currentDN = findEntryDN(entryUUID); 2606 RDN currentRDN; 2607 if (currentDN != null) 2608 { 2609 currentRDN = currentDN.rdn(); 2610 } 2611 else 2612 { 2613 // The entry does not exist anymore. 2614 numResolvedNamingConflicts.incrementAndGet(); 2615 return true; 2616 } 2617 2618 // The modify operation is trying to delete the value that is 2619 // currently used in the RDN. We need to alter the modify so that it does 2620 // not remove the current RDN value(s). 2621 2622 List<Modification> mods = op.getModifications(); 2623 for (Modification mod : mods) 2624 { 2625 AttributeType modAttrType = mod.getAttribute().getAttributeType(); 2626 if ((mod.getModificationType() == ModificationType.DELETE 2627 || mod.getModificationType() == ModificationType.REPLACE) 2628 && currentRDN.hasAttributeType(modAttrType)) 2629 { 2630 // the attribute can't be deleted because it is used in the RDN, 2631 // turn this operation is a replace with the current RDN value(s); 2632 mod.setModificationType(ModificationType.REPLACE); 2633 Attribute newAttribute = mod.getAttribute(); 2634 AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute); 2635 attrBuilder.add(currentRDN.getAttributeValue(modAttrType)); 2636 mod.setAttribute(attrBuilder.toAttribute()); 2637 } 2638 } 2639 msg.setMods(mods); 2640 numResolvedNamingConflicts.incrementAndGet(); 2641 return false; 2642 } 2643 else 2644 { 2645 // The other type of errors can not be caused by naming conflicts. 2646 // Log a message for the repair tool. 2647 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2648 op, ctx.getCSN(), result, op.getErrorMessage()); 2649 return true; 2650 } 2651 } 2652 2653 /** 2654 * Solve a conflict detected when replaying a delete operation. 2655 * 2656 * @param op The operation that triggered the conflict detection. 2657 * @param msg The operation that triggered the conflict detection. 2658 * @return true if the process is completed, false if it must continue.. 2659 */ 2660 private boolean solveNamingConflict(DeleteOperation op, LDAPUpdateMsg msg) 2661 { 2662 ResultCode result = op.getResultCode(); 2663 DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); 2664 String entryUUID = ctx.getEntryUUID(); 2665 2666 if (result == ResultCode.NO_SUCH_OBJECT) 2667 { 2668 /* Find if the entry is still in the database. */ 2669 DN currentDN = findEntryDN(entryUUID); 2670 if (currentDN == null) 2671 { 2672 /* 2673 * The entry has already been deleted, either because this delete 2674 * has already been replayed or because another concurrent delete 2675 * has already done the job. 2676 * In any case, there is nothing more to do. 2677 */ 2678 numResolvedNamingConflicts.incrementAndGet(); 2679 return true; 2680 } 2681 else 2682 { 2683 // This entry has been renamed, replay the delete using its new DN. 2684 msg.setDN(currentDN); 2685 numResolvedNamingConflicts.incrementAndGet(); 2686 return false; 2687 } 2688 } 2689 else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF) 2690 { 2691 /* 2692 * This may happen when we replay a DELETE done on a master 2693 * but children of this entry have been added on another master. 2694 * 2695 * Rename all the children by adding entryuuid in dn and delete this entry. 2696 * 2697 * The action taken here must be consistent with the actions 2698 * done in the solveNamingConflict(AddOperation) method 2699 * when we are adding an entry whose parent entry has already been deleted. 2700 */ 2701 if (findAndRenameChild(op.getEntryDN(), op)) 2702 { 2703 numUnresolvedNamingConflicts.incrementAndGet(); 2704 } 2705 2706 return false; 2707 } 2708 else 2709 { 2710 // The other type of errors can not be caused by naming conflicts. 2711 // Log a message for the repair tool. 2712 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2713 op, ctx.getCSN(), result, op.getErrorMessage()); 2714 return true; 2715 } 2716 } 2717 2718/** 2719 * Solve a conflict detected when replaying a Modify DN operation. 2720 * 2721 * @param op The operation that triggered the conflict detection. 2722 * @param msg The operation that triggered the conflict detection. 2723 * @return true if the process is completed, false if it must continue. 2724 * @throws Exception When the operation is not valid. 2725 */ 2726private boolean solveNamingConflict(ModifyDNOperation op, LDAPUpdateMsg msg) 2727 throws Exception 2728{ 2729 ResultCode result = op.getResultCode(); 2730 ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); 2731 String entryUUID = ctx.getEntryUUID(); 2732 String newSuperiorID = ctx.getNewSuperiorEntryUUID(); 2733 2734 /* 2735 * four possible cases : 2736 * - the modified entry has been renamed 2737 * - the new parent has been renamed 2738 * - the operation is replayed for the second time. 2739 * - the entry has been deleted 2740 * action : 2741 * - change the target dn and the new parent dn and 2742 * restart the operation, 2743 * - don't do anything if the operation is replayed. 2744 */ 2745 2746 // get the current DN of this entry in the database. 2747 DN currentDN = findEntryDN(entryUUID); 2748 2749 // Construct the new DN to use for the entry. 2750 DN entryDN = op.getEntryDN(); 2751 DN newSuperior; 2752 RDN newRDN = op.getNewRDN(); 2753 2754 if (newSuperiorID != null) 2755 { 2756 newSuperior = findEntryDN(newSuperiorID); 2757 } 2758 else 2759 { 2760 newSuperior = entryDN.parent(); 2761 } 2762 2763 //If we could not find the new parent entry, we missed this entry 2764 // earlier or it has disappeared from the database 2765 // Log this information for the repair tool and mark the entry 2766 // as conflicting. 2767 // stop the processing. 2768 if (newSuperior == null) 2769 { 2770 markConflictEntry(op, currentDN, currentDN.parent().child(newRDN)); 2771 numUnresolvedNamingConflicts.incrementAndGet(); 2772 return true; 2773 } 2774 2775 DN newDN = newSuperior.child(newRDN); 2776 2777 if (currentDN == null) 2778 { 2779 // The entry targeted by the Modify DN is not in the database 2780 // anymore. 2781 // This is a conflict between a delete and this modify DN. 2782 // The entry has been deleted, we can safely assume 2783 // that the operation is completed. 2784 numResolvedNamingConflicts.incrementAndGet(); 2785 return true; 2786 } 2787 2788 // if the newDN and the current DN match then the operation 2789 // is a no-op (this was probably a second replay) 2790 // don't do anything. 2791 if (newDN.equals(currentDN)) 2792 { 2793 numResolvedNamingConflicts.incrementAndGet(); 2794 return true; 2795 } 2796 2797 if (result == ResultCode.NO_SUCH_OBJECT 2798 || result == ResultCode.UNWILLING_TO_PERFORM 2799 || result == ResultCode.OBJECTCLASS_VIOLATION) 2800 { 2801 /* 2802 * The entry or it's new parent has not been found 2803 * reconstruct the operation with the DN we just built 2804 */ 2805 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 2806 modifyDnMsg.setDN(currentDN); 2807 modifyDnMsg.setNewSuperior(newSuperior.toString()); 2808 numResolvedNamingConflicts.incrementAndGet(); 2809 return false; 2810 } 2811 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 2812 { 2813 /* 2814 * This may happen when two modifyDn operation 2815 * are done on different servers but with the same target DN 2816 * add the conflict object class to the entry 2817 * and rename it using its entryuuid. 2818 */ 2819 ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg; 2820 markConflictEntry(op, op.getEntryDN(), newDN); 2821 modifyDnMsg.setNewRDN(generateConflictRDN(entryUUID, 2822 modifyDnMsg.getNewRDN())); 2823 modifyDnMsg.setNewSuperior(newSuperior.toString()); 2824 numUnresolvedNamingConflicts.incrementAndGet(); 2825 return false; 2826 } 2827 else 2828 { 2829 // The other type of errors can not be caused by naming conflicts. 2830 // Log a message for the repair tool. 2831 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2832 op, ctx.getCSN(), result, op.getErrorMessage()); 2833 return true; 2834 } 2835} 2836 2837 /** 2838 * Solve a conflict detected when replaying a ADD operation. 2839 * 2840 * @param op The operation that triggered the conflict detection. 2841 * @param msg The message that triggered the conflict detection. 2842 * @return true if the process is completed, false if it must continue. 2843 * @throws Exception When the operation is not valid. 2844 */ 2845 private boolean solveNamingConflict(AddOperation op, AddMsg msg) 2846 throws Exception 2847 { 2848 ResultCode result = op.getResultCode(); 2849 AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT); 2850 String entryUUID = ctx.getEntryUUID(); 2851 String parentUniqueId = ctx.getParentEntryUUID(); 2852 2853 if (result == ResultCode.NO_SUCH_OBJECT) 2854 { 2855 /* 2856 * This can happen if the parent has been renamed or deleted 2857 * find the parent dn and calculate a new dn for the entry 2858 */ 2859 if (parentUniqueId == null) 2860 { 2861 /* 2862 * This entry is the base dn of the backend. 2863 * It is quite surprising that the operation result be NO_SUCH_OBJECT. 2864 * There is nothing more we can do except log a 2865 * message for the repair tool to look at this problem. 2866 * TODO : Log the message 2867 */ 2868 return true; 2869 } 2870 DN parentDn = findEntryDN(parentUniqueId); 2871 if (parentDn == null) 2872 { 2873 /* 2874 * The parent has been deleted 2875 * rename the entry as a conflicting entry. 2876 * The action taken here must be consistent with the actions 2877 * done when in the solveNamingConflict(DeleteOperation) method 2878 * when we are deleting an entry that have some child entries. 2879 */ 2880 addConflict(msg); 2881 2882 String conflictRDN = 2883 generateConflictRDN(entryUUID, op.getEntryDN().rdn().toString()); 2884 msg.setDN(DN.valueOf(conflictRDN + "," + getBaseDN())); 2885 // reset the parent entryUUID so that the check done is the 2886 // handleConflict phase does not fail. 2887 msg.setParentEntryUUID(null); 2888 numUnresolvedNamingConflicts.incrementAndGet(); 2889 } 2890 else 2891 { 2892 msg.setDN(DN.valueOf(msg.getDN().rdn() + "," + parentDn)); 2893 numResolvedNamingConflicts.incrementAndGet(); 2894 } 2895 return false; 2896 } 2897 else if (result == ResultCode.ENTRY_ALREADY_EXISTS) 2898 { 2899 /* 2900 * This can happen if 2901 * - two adds are done on different servers but with the 2902 * same target DN. 2903 * - the same ADD is being replayed for the second time on this server. 2904 * if the entryUUID already exist, assume this is a replay and 2905 * don't do anything 2906 * if the entry unique id do not exist, generate conflict. 2907 */ 2908 if (findEntryDN(entryUUID) != null) 2909 { 2910 // entry already exist : this is a replay 2911 return true; 2912 } 2913 else 2914 { 2915 addConflict(msg); 2916 String conflictRDN = 2917 generateConflictRDN(entryUUID, msg.getDN().toString()); 2918 msg.setDN(DN.valueOf(conflictRDN)); 2919 numUnresolvedNamingConflicts.incrementAndGet(); 2920 return false; 2921 } 2922 } 2923 else 2924 { 2925 // The other type of errors can not be caused by naming conflicts. 2926 // log a message for the repair tool. 2927 logger.error(ERR_ERROR_REPLAYING_OPERATION, 2928 op, ctx.getCSN(), result, op.getErrorMessage()); 2929 return true; 2930 } 2931 } 2932 2933 /** 2934 * Find all the entries below the provided DN and rename them 2935 * so that they stay below the baseDN of this replicationDomain and 2936 * use the conflicting name and attribute. 2937 * 2938 * @param entryDN The DN of the entry whose child must be renamed. 2939 * @param conflictOp The Operation that generated the conflict. 2940 */ 2941 private boolean findAndRenameChild(DN entryDN, Operation conflictOp) 2942 { 2943 /* 2944 * TODO JNR Ludo thinks that: "Ideally, the operation should verify that the 2945 * entryUUID has not changed or try to use the entryUUID rather than the 2946 * DN.". entryUUID can be obtained from the caller of the current method. 2947 */ 2948 boolean conflict = false; 2949 2950 // Find and rename child entries. 2951 final SearchRequest request = newSearchRequest(entryDN, SearchScope.SINGLE_LEVEL) 2952 .addAttribute(ENTRYUUID_ATTRIBUTE_NAME, HISTORICAL_ATTRIBUTE_NAME); 2953 InternalSearchOperation op = conn.processSearch(request); 2954 if (op.getResultCode() == ResultCode.SUCCESS) 2955 { 2956 for (SearchResultEntry entry : op.getSearchEntries()) 2957 { 2958 /* 2959 * Check the ADD and ModRDN date of the child entry 2960 * (All of them, not only the one that are newer than the DEL op) 2961 * and keep the entry as a conflicting entry. 2962 */ 2963 conflict = true; 2964 renameConflictEntry(conflictOp, entry.getName(), getEntryUUID(entry)); 2965 } 2966 } 2967 else 2968 { 2969 // log error and information for the REPAIR tool. 2970 logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, entryDN, conflictOp, op.getResultCode()); 2971 } 2972 2973 return conflict; 2974 } 2975 2976 /** 2977 * Rename an entry that was conflicting so that it stays below the 2978 * baseDN of the replicationDomain. 2979 * 2980 * @param conflictOp The Operation that caused the conflict. 2981 * @param dn The DN of the entry to be renamed. 2982 * @param entryUUID The uniqueID of the entry to be renamed. 2983 */ 2984 private void renameConflictEntry(Operation conflictOp, DN dn, 2985 String entryUUID) 2986 { 2987 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(dn); 2988 DirectoryServer.sendAlertNotification(this, 2989 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 2990 2991 RDN newRDN = generateDeleteConflictDn(entryUUID, dn); 2992 ModifyDNOperation newOp = renameEntry(dn, newRDN, getBaseDN(), true); 2993 2994 if (newOp.getResultCode() != ResultCode.SUCCESS) 2995 { 2996 // log information for the repair tool. 2997 logger.error(ERR_CANNOT_RENAME_CONFLICT_ENTRY, 2998 dn, conflictOp, newOp.getResultCode()); 2999 } 3000 } 3001 3002 /** 3003 * Generate a modification to add the conflict attribute to an entry 3004 * whose Dn is now conflicting with another entry. 3005 * 3006 * @param op The operation causing the conflict. 3007 * @param currentDN The current DN of the operation to mark as conflicting. 3008 * @param conflictDN The newDn on which the conflict happened. 3009 */ 3010 private void markConflictEntry(Operation op, DN currentDN, DN conflictDN) 3011 { 3012 // create new internal modify operation and run it. 3013 Attribute attr = Attributes.create(DS_SYNC_CONFLICT, conflictDN.toString()); 3014 List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr)); 3015 3016 ModifyOperation newOp = new ModifyOperationBasis( 3017 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 3018 currentDN, mods); 3019 runAsSynchronizedOperation(newOp); 3020 3021 if (newOp.getResultCode() != ResultCode.SUCCESS) 3022 { 3023 // Log information for the repair tool. 3024 logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, op, newOp.getResultCode()); 3025 } 3026 3027 // Generate an alert to let the administration know that some 3028 // conflict could not be solved. 3029 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN); 3030 DirectoryServer.sendAlertNotification(this, 3031 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 3032 } 3033 3034 /** 3035 * Add the conflict attribute to an entry that could 3036 * not be added because it is conflicting with another entry. 3037 * 3038 * @param msg The conflicting Add Operation. 3039 * 3040 * @throws DecodeException When an encoding error happened manipulating the 3041 * msg. 3042 */ 3043 private void addConflict(AddMsg msg) throws DecodeException 3044 { 3045 String normalizedDN = msg.getDN().toString(); 3046 3047 // Generate an alert to let the administrator know that some 3048 // conflict could not be solved. 3049 LocalizableMessage alertMessage = NOTE_UNRESOLVED_CONFLICT.get(normalizedDN); 3050 DirectoryServer.sendAlertNotification(this, 3051 ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, alertMessage); 3052 3053 // Add the conflict attribute 3054 msg.addAttribute(DS_SYNC_CONFLICT, normalizedDN); 3055 } 3056 3057 /** 3058 * Generate the Dn to use for a conflicting entry. 3059 * 3060 * @param entryUUID The unique identifier of the entry involved in the 3061 * conflict. 3062 * @param rdn Original rdn. 3063 * @return The generated RDN for a conflicting entry. 3064 */ 3065 private String generateConflictRDN(String entryUUID, String rdn) 3066 { 3067 return "entryuuid=" + entryUUID + "+" + rdn; 3068 } 3069 3070 /** 3071 * Generate the RDN to use for a conflicting entry whose father was deleted. 3072 * 3073 * @param entryUUID The unique identifier of the entry involved in the 3074 * conflict. 3075 * @param dn The original DN of the entry. 3076 * 3077 * @return The generated RDN for a conflicting entry. 3078 */ 3079 private RDN generateDeleteConflictDn(String entryUUID, DN dn) 3080 { 3081 String newRDN = "entryuuid=" + entryUUID + "+" + dn.rdn(); 3082 try 3083 { 3084 return RDN.decode(newRDN); 3085 } catch (DirectoryException e) 3086 { 3087 // cannot happen 3088 return null; 3089 } 3090 } 3091 3092 /** 3093 * Check if the domain solve conflicts. 3094 * 3095 * @return a boolean indicating if the domain should solve conflicts. 3096 */ 3097 boolean solveConflict() 3098 { 3099 return solveConflictFlag; 3100 } 3101 3102 /** 3103 * Disable the replication on this domain. 3104 * The session to the replication server will be stopped. 3105 * The domain will not be destroyed but call to the pre-operation 3106 * methods will result in failure. 3107 * The listener thread will be destroyed. 3108 * The monitor informations will still be accessible. 3109 */ 3110 public void disable() 3111 { 3112 state.save(); 3113 state.clearInMemory(); 3114 disabled = true; 3115 disableService(); // This will cut the session and wake up the listener 3116 } 3117 3118 /** 3119 * Do what necessary when the data have changed : load state, load 3120 * generation Id. 3121 * If there is no such information check if there is a 3122 * ReplicaUpdateVector entry and translate it into a state 3123 * and generationId. 3124 * @exception DirectoryException Thrown when an error occurs. 3125 */ 3126 private void loadDataState() throws DirectoryException 3127 { 3128 state.clearInMemory(); 3129 state.loadState(); 3130 getGenerator().adjust(state.getMaxCSN(getServerId())); 3131 3132 // Retrieves the generation ID associated with the data imported 3133 generationId = loadGenerationId(); 3134 } 3135 3136 /** 3137 * Enable back the domain after a previous disable. 3138 * The domain will connect back to a replication Server and 3139 * will recreate threads to listen for messages from the Synchronization 3140 * server. 3141 * The generationId will be retrieved or computed if necessary. 3142 * The ServerState will also be read again from the local database. 3143 */ 3144 public void enable() 3145 { 3146 try 3147 { 3148 loadDataState(); 3149 } 3150 catch (Exception e) 3151 { 3152 /* TODO should mark that replicationServer service is 3153 * not available, log an error and retry upon timeout 3154 * should we stop the modifications ? 3155 */ 3156 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 3157 return; 3158 } 3159 3160 enableService(); 3161 3162 disabled = false; 3163 } 3164 3165 /** 3166 * Compute the data generationId associated with the current data present 3167 * in the backend for this domain. 3168 * @return The computed generationId. 3169 * @throws DirectoryException When an error occurs. 3170 */ 3171 private long computeGenerationId() throws DirectoryException 3172 { 3173 final long genId = exportBackend(null, true); 3174 if (logger.isTraceEnabled()) 3175 { 3176 logger.trace("Computed generationId: generationId=" + genId); 3177 } 3178 return genId; 3179 } 3180 3181 /** 3182 * Run a modify operation to update the entry whose DN is given as 3183 * a parameter with the generationID information. 3184 * 3185 * @param entryDN The DN of the entry to be updated. 3186 * @param generationId The value of the generationID to be saved. 3187 * 3188 * @return A ResultCode indicating if the operation was successful. 3189 */ 3190 private ResultCode runSaveGenerationId(DN entryDN, long generationId) 3191 { 3192 // The generationId is stored in the root entry of the domain. 3193 final ByteString asn1BaseDn = ByteString.valueOfUtf8(entryDN.toString()); 3194 3195 LDAPAttribute attr = new LDAPAttribute(REPLICATION_GENERATION_ID, Long.toString(generationId)); 3196 List<RawModification> mods = new ArrayList<>(1); 3197 mods.add(new LDAPModification(ModificationType.REPLACE, attr)); 3198 3199 ModifyOperation op = new ModifyOperationBasis( 3200 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 3201 asn1BaseDn, mods); 3202 runAsSynchronizedOperation(op); 3203 return op.getResultCode(); 3204 } 3205 3206 /** 3207 * Stores the value of the generationId. 3208 * @param generationId The value of the generationId. 3209 * @return a ResultCode indicating if the method was successful. 3210 */ 3211 private ResultCode saveGenerationId(long generationId) 3212 { 3213 ResultCode result = runSaveGenerationId(getBaseDN(), generationId); 3214 if (result != ResultCode.SUCCESS) 3215 { 3216 generationIdSavedStatus = false; 3217 if (result == ResultCode.NO_SUCH_OBJECT) 3218 { 3219 // If the base entry does not exist, save the generation 3220 // ID in the config entry 3221 result = runSaveGenerationId(config.dn(), generationId); 3222 } 3223 3224 if (result != ResultCode.SUCCESS) 3225 { 3226 logger.error(ERR_UPDATING_GENERATION_ID, result.getName(), getBaseDN()); 3227 } 3228 } 3229 else 3230 { 3231 generationIdSavedStatus = true; 3232 } 3233 return result; 3234 } 3235 3236 /** 3237 * Load the GenerationId from the root entry of the domain 3238 * from the REPLICATION_GENERATION_ID attribute in database 3239 * to memory, or compute it if not found. 3240 * 3241 * @return generationId The retrieved value of generationId 3242 * @throws DirectoryException When an error occurs. 3243 */ 3244 private long loadGenerationId() throws DirectoryException 3245 { 3246 if (logger.isTraceEnabled()) 3247 { 3248 logger.trace("Attempt to read generation ID from DB " + getBaseDN()); 3249 } 3250 3251 // Search the database entry that is used to periodically save the generation id 3252 final SearchRequest request = newSearchRequest(getBaseDN(), SearchScope.BASE_OBJECT) 3253 .addAttribute(REPLICATION_GENERATION_ID); 3254 InternalSearchOperation search = conn.processSearch(request); 3255 if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT) 3256 { 3257 // if the base entry does not exist look for the generationID 3258 // in the config entry. 3259 request.setName(config.dn()); 3260 search = conn.processSearch(request); 3261 } 3262 3263 boolean found = false; 3264 long aGenerationId = -1; 3265 if (search.getResultCode() != ResultCode.SUCCESS) 3266 { 3267 if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT) 3268 { 3269 String errorMsg = search.getResultCode().getName() + " " + search.getErrorMessage(); 3270 logger.error(ERR_SEARCHING_GENERATION_ID, errorMsg, getBaseDN()); 3271 } 3272 } 3273 else 3274 { 3275 List<SearchResultEntry> result = search.getSearchEntries(); 3276 SearchResultEntry resultEntry = result.get(0); 3277 if (resultEntry != null) 3278 { 3279 AttributeType synchronizationGenIDType = DirectoryServer.getAttributeTypeOrNull(REPLICATION_GENERATION_ID); 3280 List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType); 3281 if (attrs != null) 3282 { 3283 Attribute attr = attrs.get(0); 3284 if (attr.size()>1) 3285 { 3286 String errorMsg = "#Values=" + attr.size() + " Must be exactly 1 in entry " + resultEntry.toLDIFString(); 3287 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), errorMsg); 3288 } 3289 else if (attr.size() == 1) 3290 { 3291 found = true; 3292 try 3293 { 3294 aGenerationId = Long.decode(attr.iterator().next().toString()); 3295 } 3296 catch(Exception e) 3297 { 3298 logger.error(ERR_LOADING_GENERATION_ID, getBaseDN(), stackTraceToSingleLineString(e)); 3299 } 3300 } 3301 } 3302 } 3303 } 3304 3305 if (!found) 3306 { 3307 aGenerationId = computeGenerationId(); 3308 saveGenerationId(aGenerationId); 3309 3310 if (logger.isTraceEnabled()) 3311 { 3312 logger.trace("Generation ID created for domain baseDN=" + getBaseDN() + " generationId=" + aGenerationId); 3313 } 3314 } 3315 else 3316 { 3317 generationIdSavedStatus = true; 3318 if (logger.isTraceEnabled()) 3319 { 3320 logger.trace("Generation ID successfully read from domain baseDN=" + getBaseDN() 3321 + " generationId=" + aGenerationId); 3322 } 3323 } 3324 return aGenerationId; 3325 } 3326 3327 /** 3328 * Do whatever is needed when a backup is started. 3329 * We need to make sure that the serverState is correctly save. 3330 */ 3331 void backupStart() 3332 { 3333 state.save(); 3334 } 3335 3336 /** Do whatever is needed when a backup is finished. */ 3337 void backupEnd() 3338 { 3339 // Nothing is needed at the moment 3340 } 3341 3342 /* 3343 * Total Update >> 3344 */ 3345 3346 /** 3347 * This method trigger an export of the replicated data. 3348 * 3349 * @param output The OutputStream where the export should 3350 * be produced. 3351 * @throws DirectoryException When needed. 3352 */ 3353 @Override 3354 protected void exportBackend(OutputStream output) throws DirectoryException 3355 { 3356 exportBackend(output, false); 3357 } 3358 3359 /** 3360 * Export the entries from the backend and/or compute the generation ID. 3361 * The ieContext must have been set before calling. 3362 * 3363 * @param output The OutputStream where the export should 3364 * be produced. 3365 * @param checksumOutput A boolean indicating if this export is 3366 * invoked to perform a checksum only 3367 * 3368 * @return The computed GenerationID. 3369 * 3370 * @throws DirectoryException when an error occurred 3371 */ 3372 private long exportBackend(OutputStream output, boolean checksumOutput) 3373 throws DirectoryException 3374 { 3375 Backend<?> backend = getBackend(); 3376 3377 // Acquire a shared lock for the backend. 3378 try 3379 { 3380 String lockFile = LockFileManager.getBackendLockFileName(backend); 3381 StringBuilder failureReason = new StringBuilder(); 3382 if (! LockFileManager.acquireSharedLock(lockFile, failureReason)) 3383 { 3384 LocalizableMessage message = 3385 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason); 3386 logger.error(message); 3387 throw new DirectoryException(ResultCode.OTHER, message); 3388 } 3389 } 3390 catch (Exception e) 3391 { 3392 LocalizableMessage message = 3393 ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), 3394 stackTraceToSingleLineString(e)); 3395 logger.error(message); 3396 throw new DirectoryException(ResultCode.OTHER, message); 3397 } 3398 3399 long numberOfEntries = backend.getNumberOfEntriesInBaseDN(getBaseDN()); 3400 long entryCount = Math.min(numberOfEntries, 1000); 3401 OutputStream os; 3402 ReplLDIFOutputStream ros = null; 3403 if (checksumOutput) 3404 { 3405 ros = new ReplLDIFOutputStream(entryCount); 3406 os = ros; 3407 try 3408 { 3409 os.write(Long.toString(numberOfEntries).getBytes()); 3410 } 3411 catch(Exception e) 3412 { 3413 // Should never happen 3414 } 3415 } 3416 else 3417 { 3418 os = output; 3419 } 3420 3421 // baseDN branch is the only one included in the export 3422 LDIFExportConfig exportConfig = new LDIFExportConfig(os); 3423 exportConfig.setIncludeBranches(newArrayList(getBaseDN())); 3424 3425 // For the checksum computing mode, only consider the 'stable' attributes 3426 if (checksumOutput) 3427 { 3428 String includeAttributeStrings[] = { "objectclass", "sn", "cn", "entryuuid" }; 3429 Set<AttributeType> includeAttributes = new HashSet<>(); 3430 for (String attrName : includeAttributeStrings) 3431 { 3432 includeAttributes.add(DirectoryServer.getAttributeTypeOrDefault(attrName)); 3433 } 3434 exportConfig.setIncludeAttributes(includeAttributes); 3435 } 3436 3437 // Launch the export. 3438 long genID = 0; 3439 try 3440 { 3441 backend.exportLDIF(exportConfig); 3442 } 3443 catch (DirectoryException de) 3444 { 3445 if (ros == null || ros.getNumExportedEntries() < entryCount) 3446 { 3447 LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); 3448 logger.error(message); 3449 throw new DirectoryException(ResultCode.OTHER, message); 3450 } 3451 } 3452 catch (Exception e) 3453 { 3454 LocalizableMessage message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(stackTraceToSingleLineString(e)); 3455 logger.error(message); 3456 throw new DirectoryException(ResultCode.OTHER, message); 3457 } 3458 finally 3459 { 3460 // Clean up after the export by closing the export config. 3461 // Will also flush the export and export the remaining entries. 3462 exportConfig.close(); 3463 3464 if (checksumOutput) 3465 { 3466 genID = ros.getChecksumValue(); 3467 } 3468 3469 // Release the shared lock on the backend. 3470 try 3471 { 3472 String lockFile = LockFileManager.getBackendLockFileName(backend); 3473 StringBuilder failureReason = new StringBuilder(); 3474 if (! LockFileManager.releaseLock(lockFile, failureReason)) 3475 { 3476 LocalizableMessage message = 3477 WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason); 3478 logger.warn(message); 3479 throw new DirectoryException(ResultCode.OTHER, message); 3480 } 3481 } 3482 catch (Exception e) 3483 { 3484 LocalizableMessage message = 3485 WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), 3486 stackTraceToSingleLineString(e)); 3487 logger.warn(message); 3488 throw new DirectoryException(ResultCode.OTHER, message); 3489 } 3490 } 3491 return genID; 3492 } 3493 3494 /** 3495 * Process backend before import. 3496 * 3497 * @param backend 3498 * The backend. 3499 * @throws DirectoryException 3500 * If the backend could not be disabled or locked exclusively. 3501 */ 3502 private void preBackendImport(Backend<?> backend) throws DirectoryException 3503 { 3504 // Stop saving state 3505 stateSavingDisabled = true; 3506 3507 // Prevent the processing of the backend finalisation event as the import will disable the attached backend 3508 ignoreBackendInitializationEvent = true; 3509 3510 // FIXME setBackendEnabled should be part of TaskUtils ? 3511 TaskUtils.disableBackend(backend.getBackendID()); 3512 3513 // Acquire an exclusive lock for the backend. 3514 String lockFile = LockFileManager.getBackendLockFileName(backend); 3515 StringBuilder failureReason = new StringBuilder(); 3516 if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason)) 3517 { 3518 LocalizableMessage message = ERR_INIT_CANNOT_LOCK_BACKEND.get(backend.getBackendID(), failureReason); 3519 logger.error(message); 3520 throw new DirectoryException(ResultCode.OTHER, message); 3521 } 3522 } 3523 3524 /** 3525 * This method triggers an import of the replicated data. 3526 * 3527 * @param input The InputStream from which the data are read. 3528 * @throws DirectoryException When needed. 3529 */ 3530 @Override 3531 protected void importBackend(InputStream input) throws DirectoryException 3532 { 3533 Backend<?> backend = getBackend(); 3534 3535 LDIFImportConfig importConfig = null; 3536 ImportExportContext ieCtx = getImportExportContext(); 3537 try 3538 { 3539 if (!backend.supports(BackendOperation.LDIF_IMPORT)) 3540 { 3541 ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER, 3542 ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID()))); 3543 return; 3544 } 3545 3546 importConfig = new LDIFImportConfig(input); 3547 importConfig.setIncludeBranches(newLinkedHashSet(getBaseDN())); 3548 importConfig.setAppendToExistingData(false); 3549 importConfig.setSkipDNValidation(true); 3550 // We should not validate schema for replication 3551 importConfig.setValidateSchema(false); 3552 // Allow fractional replication ldif import plugin to be called 3553 importConfig.setInvokeImportPlugins(true); 3554 // Reset the follow import flag and message before starting the import 3555 importErrorMessageId = -1; 3556 3557 // TODO How to deal with rejected entries during the import 3558 File rejectsFile = 3559 getFileForPath("logs" + File.separator + "replInitRejectedEntries"); 3560 importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(), 3561 ExistingFileBehavior.OVERWRITE); 3562 3563 // Process import 3564 preBackendImport(backend); 3565 backend.importLDIF(importConfig, DirectoryServer.getInstance().getServerContext()); 3566 3567 stateSavingDisabled = false; 3568 } 3569 catch(Exception e) 3570 { 3571 ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER, 3572 ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(e)))); 3573 } 3574 finally 3575 { 3576 try 3577 { 3578 // Cleanup 3579 if (importConfig != null) 3580 { 3581 importConfig.close(); 3582 closeBackendImport(backend); // Re-enable backend 3583 backend = getBackend(); 3584 } 3585 3586 loadDataState(); 3587 3588 if (ieCtx.getException() != null) 3589 { 3590 // When an error occurred during an import, most of times 3591 // the generationId coming in the root entry of the imported data, 3592 // is not valid anymore (partial data in the backend). 3593 generationId = computeGenerationId(); 3594 saveGenerationId(generationId); 3595 } 3596 } 3597 catch (DirectoryException fe) 3598 { 3599 // If we already catch an Exception it's quite possible 3600 // that the loadDataState() and setGenerationId() fail 3601 // so we don't bother about the new Exception. 3602 // However if there was no Exception before we want 3603 // to return this Exception to the task creator. 3604 ieCtx.setExceptionIfNoneSet(new DirectoryException( 3605 ResultCode.OTHER, 3606 ERR_INIT_IMPORT_FAILURE.get(stackTraceToSingleLineString(fe)))); 3607 } 3608 } 3609 3610 if (ieCtx.getException() != null) 3611 { 3612 throw ieCtx.getException(); 3613 } 3614 } 3615 3616 /** 3617 * Make post import operations. 3618 * @param backend The backend implied in the import. 3619 * @exception DirectoryException Thrown when an error occurs. 3620 */ 3621 private void closeBackendImport(Backend<?> backend) throws DirectoryException 3622 { 3623 String lockFile = LockFileManager.getBackendLockFileName(backend); 3624 StringBuilder failureReason = new StringBuilder(); 3625 3626 // Release lock 3627 if (!LockFileManager.releaseLock(lockFile, failureReason)) 3628 { 3629 LocalizableMessage message = 3630 WARN_LDIFIMPORT_CANNOT_UNLOCK_BACKEND.get(backend.getBackendID(), failureReason); 3631 logger.warn(message); 3632 throw new DirectoryException(ResultCode.OTHER, message); 3633 } 3634 3635 TaskUtils.enableBackend(backend.getBackendID()); 3636 3637 // Restore the processing of backend finalization events. 3638 ignoreBackendInitializationEvent = false; 3639 3640 } 3641 3642 /** 3643 * Retrieves a replication domain based on the baseDN. 3644 * 3645 * @param baseDN The baseDN of the domain to retrieve 3646 * @return The domain retrieved 3647 * @throws DirectoryException When an error occurred or no domain 3648 * match the provided baseDN. 3649 */ 3650 public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDN) 3651 throws DirectoryException 3652 { 3653 LDAPReplicationDomain replicationDomain = null; 3654 3655 // Retrieves the domain 3656 for (SynchronizationProvider<?> provider : 3657 DirectoryServer.getSynchronizationProviders()) 3658 { 3659 if (!(provider instanceof MultimasterReplication)) 3660 { 3661 LocalizableMessage message = ERR_INVALID_PROVIDER.get(); 3662 throw new DirectoryException(ResultCode.OTHER, message); 3663 } 3664 3665 // From the domainDN retrieves the replication domain 3666 LDAPReplicationDomain domain = 3667 MultimasterReplication.findDomain(baseDN, null); 3668 if (domain == null) 3669 { 3670 break; 3671 } 3672 if (replicationDomain != null) 3673 { 3674 // Should never happen 3675 LocalizableMessage message = ERR_MULTIPLE_MATCHING_DOMAIN.get(); 3676 throw new DirectoryException(ResultCode.OTHER, message); 3677 } 3678 replicationDomain = domain; 3679 } 3680 3681 if (replicationDomain == null) 3682 { 3683 throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(baseDN)); 3684 } 3685 return replicationDomain; 3686 } 3687 3688 /** 3689 * Returns the backend associated to this domain. 3690 * @return The associated backend. 3691 */ 3692 private Backend<?> getBackend() 3693 { 3694 return DirectoryServer.getBackend(getBaseDN()); 3695 } 3696 3697 /* 3698 * <<Total Update 3699 */ 3700 3701 /** 3702 * Push the schema modifications contained in the given parameter as a 3703 * modification that would happen on a local server. The modifications are not 3704 * applied to the local schema backend and historical information is not 3705 * updated; but a CSN is generated and the ServerState associated to the 3706 * schema domain is updated. 3707 * 3708 * @param modifications 3709 * The schema modifications to push 3710 */ 3711 void synchronizeSchemaModifications(List<Modification> modifications) 3712 { 3713 ModifyOperation op = new ModifyOperationBasis( 3714 conn, nextOperationID(), nextMessageID(), null, 3715 DirectoryServer.getSchemaDN(), modifications); 3716 3717 final Entry schema; 3718 try 3719 { 3720 schema = DirectoryServer.getEntry(DirectoryServer.getSchemaDN()); 3721 } 3722 catch (DirectoryException e) 3723 { 3724 logger.traceException(e); 3725 logger.error(ERR_BACKEND_SEARCH_ENTRY.get(DirectoryServer.getSchemaDN().toString(), 3726 stackTraceToSingleLineString(e))); 3727 return; 3728 } 3729 3730 LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op); 3731 CSN csn = generateCSN(localOp); 3732 OperationContext ctx = new ModifyContext(csn, getEntryUUID(schema)); 3733 localOp.setAttachment(SYNCHROCONTEXT, ctx); 3734 localOp.setResultCode(ResultCode.SUCCESS); 3735 synchronize(localOp); 3736 } 3737 3738 /** 3739 * Check if the provided configuration is acceptable for add. 3740 * 3741 * @param configuration The configuration to check. 3742 * @param unacceptableReasons When the configuration is not acceptable, this 3743 * table is use to return the reasons why this 3744 * configuration is not acceptable. 3745 * 3746 * @return true if the configuration is acceptable, false other wise. 3747 */ 3748 static boolean isConfigurationAcceptable(ReplicationDomainCfg configuration, 3749 List<LocalizableMessage> unacceptableReasons) 3750 { 3751 // Check that there is not already a domain with the same DN 3752 final DN dn = configuration.getBaseDN(); 3753 LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null); 3754 if (domain != null && domain.getBaseDN().equals(dn)) 3755 { 3756 unacceptableReasons.add(ERR_SYNC_INVALID_DN.get()); 3757 return false; 3758 } 3759 3760 // Check that the base DN is configured as a base-dn of the directory server 3761 if (DirectoryServer.getBackend(dn) == null) 3762 { 3763 unacceptableReasons.add(ERR_UNKNOWN_DN.get(dn)); 3764 return false; 3765 } 3766 3767 // Check fractional configuration 3768 try 3769 { 3770 isFractionalConfigAcceptable(configuration); 3771 } catch (ConfigException e) 3772 { 3773 unacceptableReasons.add(e.getMessageObject()); 3774 return false; 3775 } 3776 3777 return true; 3778 } 3779 3780 @Override 3781 public ConfigChangeResult applyConfigurationChange( 3782 ReplicationDomainCfg configuration) 3783 { 3784 this.config = configuration; 3785 changeConfig(configuration); 3786 3787 // Read assured + fractional configuration and each time reconnect if needed 3788 readAssuredConfig(configuration, true); 3789 readFractionalConfig(configuration, true); 3790 3791 solveConflictFlag = isSolveConflict(configuration); 3792 3793 final ConfigChangeResult ccr = new ConfigChangeResult(); 3794 try 3795 { 3796 storeECLConfiguration(configuration); 3797 } 3798 catch(Exception e) 3799 { 3800 ccr.setResultCode(ResultCode.OTHER); 3801 } 3802 return ccr; 3803 } 3804 3805 @Override 3806 public boolean isConfigurationChangeAcceptable( 3807 ReplicationDomainCfg configuration, List<LocalizableMessage> unacceptableReasons) 3808 { 3809 // Check that a import/export is not in progress 3810 if (ieRunning()) 3811 { 3812 unacceptableReasons.add( 3813 NOTE_ERR_CANNOT_CHANGE_CONFIG_DURING_TOTAL_UPDATE.get()); 3814 return false; 3815 } 3816 3817 // Check fractional configuration 3818 try 3819 { 3820 isFractionalConfigAcceptable(configuration); 3821 return true; 3822 } 3823 catch (ConfigException e) 3824 { 3825 unacceptableReasons.add(e.getMessageObject()); 3826 return false; 3827 } 3828 } 3829 3830 @Override 3831 public Map<String, String> getAlerts() 3832 { 3833 Map<String, String> alerts = new LinkedHashMap<>(); 3834 3835 alerts.put(ALERT_TYPE_REPLICATION_UNRESOLVED_CONFLICT, 3836 ALERT_DESCRIPTION_REPLICATION_UNRESOLVED_CONFLICT); 3837 return alerts; 3838 } 3839 3840 @Override 3841 public String getClassName() 3842 { 3843 return CLASS_NAME; 3844 } 3845 3846 @Override 3847 public DN getComponentEntryDN() 3848 { 3849 return config.dn(); 3850 } 3851 3852 /** Starts the Replication Domain. */ 3853 public void start() 3854 { 3855 // Create the ServerStateFlush thread 3856 flushThread.start(); 3857 3858 startListenService(); 3859 } 3860 3861 /** Remove the configuration of the external changelog from this domain configuration. */ 3862 private void removeECLDomainCfg() 3863 { 3864 try 3865 { 3866 DN eclConfigEntryDN = DN.valueOf("cn=external changeLog," + config.dn()); 3867 if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) 3868 { 3869 DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null); 3870 } 3871 } 3872 catch(Exception e) 3873 { 3874 logger.traceException(e); 3875 logger.error(ERR_CHECK_CREATE_REPL_BACKEND_FAILED, stackTraceToSingleLineString(e)); 3876 } 3877 } 3878 3879 /** 3880 * Store the provided ECL configuration for the domain. 3881 * @param domCfg The provided configuration. 3882 * @throws ConfigException When an error occurred. 3883 */ 3884 private void storeECLConfiguration(ReplicationDomainCfg domCfg) 3885 throws ConfigException 3886 { 3887 ExternalChangelogDomainCfg eclDomCfg = null; 3888 // create the ecl config if it does not exist 3889 // There may not be any config entry related to this domain in some 3890 // unit test cases 3891 try 3892 { 3893 DN configDn = config.dn(); 3894 if (DirectoryServer.getConfigHandler().entryExists(configDn)) 3895 { 3896 try 3897 { eclDomCfg = domCfg.getExternalChangelogDomain(); 3898 } catch(Exception e) { /* do nothing */ } 3899 // domain with no config entry only when running unit tests 3900 if (eclDomCfg == null) 3901 { 3902 // no ECL config provided hence create a default one 3903 // create the default one 3904 DN eclConfigEntryDN = DN.valueOf("cn=external changelog," + configDn); 3905 if (!DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN)) 3906 { 3907 // no entry exist yet for the ECL config for this domain 3908 // create it 3909 String ldif = makeLdif( 3910 "dn: cn=external changelog," + configDn, 3911 "objectClass: top", 3912 "objectClass: ds-cfg-external-changelog-domain", 3913 "cn: external changelog", 3914 "ds-cfg-enabled: " + !getBackend().isPrivateBackend()); 3915 LDIFImportConfig ldifImportConfig = new LDIFImportConfig( 3916 new StringReader(ldif)); 3917 // No need to validate schema in replication 3918 ldifImportConfig.setValidateSchema(false); 3919 LDIFReader reader = new LDIFReader(ldifImportConfig); 3920 Entry eclEntry = reader.readEntry(); 3921 DirectoryServer.getConfigHandler().addEntry(eclEntry, null); 3922 ldifImportConfig.close(); 3923 } 3924 } 3925 } 3926 eclDomCfg = domCfg.getExternalChangelogDomain(); 3927 if (eclDomain != null) 3928 { 3929 eclDomain.applyConfigurationChange(eclDomCfg); 3930 } 3931 else 3932 { 3933 // Create the ECL domain object 3934 eclDomain = new ExternalChangelogDomain(this, eclDomCfg); 3935 } 3936 } 3937 catch (Exception e) 3938 { 3939 throw new ConfigException(NOTE_ERR_UNABLE_TO_ENABLE_ECL.get( 3940 "Replication Domain on " + getBaseDN(), stackTraceToSingleLineString(e)), e); 3941 } 3942 } 3943 3944 private static String makeLdif(String... lines) 3945 { 3946 final StringBuilder buffer = new StringBuilder(); 3947 for (String line : lines) { 3948 buffer.append(line).append(EOL); 3949 } 3950 // Append an extra line so we can append LDIF Strings. 3951 buffer.append(EOL); 3952 return buffer.toString(); 3953 } 3954 3955 @Override 3956 public void sessionInitiated(ServerStatus initStatus, ServerState rsState) 3957 { 3958 // Check domain fractional configuration consistency with local 3959 // configuration variables 3960 forceBadDataSet = !isBackendFractionalConfigConsistent(); 3961 3962 super.sessionInitiated(initStatus, rsState); 3963 3964 // Now for bad data set status if needed 3965 if (forceBadDataSet) 3966 { 3967 signalNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT); 3968 logger.info(NOTE_FRACTIONAL_BAD_DATA_SET_NEED_RESYNC, getBaseDN()); 3969 return; // Do not send changes to the replication server 3970 } 3971 3972 try 3973 { 3974 /* 3975 * We must not publish changes to a replicationServer that has 3976 * not seen all our previous changes because this could cause 3977 * some other ldap servers to miss those changes. 3978 * Check that the ReplicationServer has seen all our previous 3979 * changes. 3980 */ 3981 CSN replServerMaxCSN = rsState.getCSN(getServerId()); 3982 3983 // we don't want to update from here (a DS) an empty RS because 3984 // normally the RS should have been updated by other RSes except for 3985 // very last changes lost if the local connection was broken 3986 // ... hence the RS we are connected to should not be empty 3987 // ... or if it is empty, it is due to a voluntary reset 3988 // and we don't want to update it with our changes that could be huge. 3989 if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0) 3990 { 3991 CSN ourMaxCSN = state.getMaxCSN(getServerId()); 3992 if (ourMaxCSN != null 3993 && !ourMaxCSN.isOlderThanOrEqualTo(replServerMaxCSN)) 3994 { 3995 pendingChanges.setRecovering(true); 3996 broker.setRecoveryRequired(true); 3997 final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN); 3998 if (this.rsUpdater.compareAndSet(null, rsUpdater)) 3999 { 4000 rsUpdater.start(); 4001 } 4002 } 4003 } 4004 } catch (Exception e) 4005 { 4006 logger.error(ERR_PUBLISHING_FAKE_OPS, getBaseDN(), stackTraceToSingleLineString(e)); 4007 } 4008 } 4009 4010 /** 4011 * Build the list of changes that have been processed by this server after the 4012 * CSN given as a parameter and publish them using the given session. 4013 * 4014 * @param startCSN 4015 * The CSN where we need to start the search 4016 * @param session 4017 * The session to use to publish the changes 4018 * @return A boolean indicating he success of the operation. 4019 * @throws Exception 4020 * if an Exception happens during the search. 4021 */ 4022 boolean buildAndPublishMissingChanges(CSN startCSN, ReplicationBroker session) 4023 throws Exception 4024 { 4025 // Trim the changes in replayOperations that are older than the startCSN. 4026 synchronized (replayOperations) 4027 { 4028 Iterator<CSN> it = replayOperations.keySet().iterator(); 4029 while (it.hasNext()) 4030 { 4031 if (shutdown.get()) 4032 { 4033 return false; 4034 } 4035 if (it.next().isNewerThan(startCSN)) 4036 { 4037 break; 4038 } 4039 it.remove(); 4040 } 4041 } 4042 4043 CSN lastRetrievedChange; 4044 InternalSearchOperation op; 4045 CSN currentStartCSN = startCSN; 4046 do 4047 { 4048 if (shutdown.get()) 4049 { 4050 return false; 4051 } 4052 4053 lastRetrievedChange = null; 4054 // We can't do the search in one go because we need to store the results 4055 // so that we are sure we send the operations in order and because the 4056 // list might be large. 4057 // So we search by interval of 10 seconds and store the results in the 4058 // replayOperations list so that they are sorted before sending them. 4059 long missingChangesDelta = currentStartCSN.getTime() + 10000; 4060 CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, getServerId()); 4061 4062 ScanSearchListener listener = 4063 new ScanSearchListener(currentStartCSN, endCSN); 4064 op = searchForChangedEntries(getBaseDN(), currentStartCSN, endCSN, 4065 listener); 4066 4067 // Publish and remove all the changes from the replayOperations list 4068 // that are older than the endCSN. 4069 final List<FakeOperation> opsToSend = new LinkedList<>(); 4070 synchronized (replayOperations) 4071 { 4072 Iterator<FakeOperation> itOp = replayOperations.values().iterator(); 4073 while (itOp.hasNext()) 4074 { 4075 if (shutdown.get()) 4076 { 4077 return false; 4078 } 4079 FakeOperation fakeOp = itOp.next(); 4080 if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check 4081 || !state.cover(fakeOp.getCSN()) 4082 // do not look for replay operations in the future 4083 || currentStartCSN.isNewerThan(now())) 4084 { 4085 break; 4086 } 4087 4088 lastRetrievedChange = fakeOp.getCSN(); 4089 opsToSend.add(fakeOp); 4090 itOp.remove(); 4091 } 4092 } 4093 4094 for (FakeOperation opToSend : opsToSend) 4095 { 4096 if (shutdown.get()) 4097 { 4098 return false; 4099 } 4100 session.publishRecovery(opToSend.generateMessage()); 4101 } 4102 4103 if (lastRetrievedChange != null) 4104 { 4105 if (logger.isInfoEnabled()) 4106 { 4107 logger.info(LocalizableMessage.raw("publish loop" 4108 + " >=" + currentStartCSN + " <=" + endCSN 4109 + " nentries=" + op.getEntriesSent() 4110 + " result=" + op.getResultCode() 4111 + " lastRetrievedChange=" + lastRetrievedChange)); 4112 } 4113 currentStartCSN = lastRetrievedChange; 4114 } 4115 else 4116 { 4117 if (logger.isInfoEnabled()) 4118 { 4119 logger.info(LocalizableMessage.raw("publish loop" 4120 + " >=" + currentStartCSN + " <=" + endCSN 4121 + " nentries=" + op.getEntriesSent() 4122 + " result=" + op.getResultCode() 4123 + " no changes")); 4124 } 4125 currentStartCSN = endCSN; 4126 } 4127 } while (pendingChanges.recoveryUntil(currentStartCSN) 4128 && op.getResultCode().equals(ResultCode.SUCCESS)); 4129 4130 return op.getResultCode().equals(ResultCode.SUCCESS); 4131 } 4132 4133 private static CSN now() 4134 { 4135 // ensure now() will always come last with isNewerThan() test, 4136 // even though the timestamp, or the timestamp and seqnum would be the same 4137 return new CSN(TimeThread.getTime(), Integer.MAX_VALUE, Integer.MAX_VALUE); 4138 } 4139 4140 /** 4141 * Search for the changes that happened since fromCSN based on the historical 4142 * attribute. The only changes that will be send will be the one generated on 4143 * the serverId provided in fromCSN. 4144 * 4145 * @param baseDN 4146 * the base DN 4147 * @param fromCSN 4148 * The CSN from which we want the changes 4149 * @param lastCSN 4150 * The max CSN that the search should return 4151 * @param resultListener 4152 * The listener that will process the entries returned 4153 * @return the internal search operation 4154 * @throws Exception 4155 * when raised. 4156 */ 4157 private static InternalSearchOperation searchForChangedEntries(DN baseDN, 4158 CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener) 4159 throws Exception 4160 { 4161 String maxValueForId; 4162 if (lastCSN == null) 4163 { 4164 final Integer serverId = fromCSN.getServerId(); 4165 maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId) 4166 + "ffffffff"; 4167 } 4168 else 4169 { 4170 maxValueForId = lastCSN.toString(); 4171 } 4172 4173 String filter = 4174 "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" + 4175 "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))"; 4176 SearchRequest request = Requests.newSearchRequest(baseDN, SearchScope.WHOLE_SUBTREE, filter) 4177 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 4178 return getRootConnection().processSearch(request, resultListener); 4179 } 4180 4181 /** 4182 * Search for the changes that happened since fromCSN based on the historical 4183 * attribute. The only changes that will be send will be the one generated on 4184 * the serverId provided in fromCSN. 4185 * 4186 * @param baseDN 4187 * the base DN 4188 * @param fromCSN 4189 * The CSN from which we want the changes 4190 * @param resultListener 4191 * that will process the entries returned. 4192 * @return the internal search operation 4193 * @throws Exception 4194 * when raised. 4195 */ 4196 static InternalSearchOperation searchForChangedEntries(DN baseDN, 4197 CSN fromCSN, InternalSearchListener resultListener) throws Exception 4198 { 4199 return searchForChangedEntries(baseDN, fromCSN, null, resultListener); 4200 } 4201 4202 /** 4203 * This method should return the total number of objects in the 4204 * replicated domain. 4205 * This count will be used for reporting. 4206 * 4207 * @throws DirectoryException when needed. 4208 * 4209 * @return The number of objects in the replication domain. 4210 */ 4211 @Override 4212 public long countEntries() throws DirectoryException 4213 { 4214 Backend<?> backend = getBackend(); 4215 if (!backend.supports(BackendOperation.LDIF_EXPORT)) 4216 { 4217 LocalizableMessage msg = ERR_INIT_EXPORT_NOT_SUPPORTED.get(backend.getBackendID()); 4218 logger.error(msg); 4219 throw new DirectoryException(ResultCode.OTHER, msg); 4220 } 4221 4222 return backend.getNumberOfEntriesInBaseDN(getBaseDN()); 4223 } 4224 4225 @Override 4226 public boolean processUpdate(UpdateMsg updateMsg) 4227 { 4228 // Ignore message if fractional configuration is inconsistent and 4229 // we have been passed into bad data set status 4230 if (forceBadDataSet) 4231 { 4232 return false; 4233 } 4234 4235 if (updateMsg instanceof LDAPUpdateMsg) 4236 { 4237 LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg; 4238 4239 // Put the UpdateMsg in the RemotePendingChanges list. 4240 if (!remotePendingChanges.putRemoteUpdate(msg)) 4241 { 4242 /* 4243 * Already received this change so ignore it. This may happen if there 4244 * are uncommitted changes in the queue and session failover occurs 4245 * causing a recovery of all changes since the current committed server 4246 * state. See OPENDJ-1115. 4247 */ 4248 if (logger.isTraceEnabled()) 4249 { 4250 logger.trace( 4251 "LDAPReplicationDomain.processUpdate: ignoring " 4252 + "duplicate change %s", msg.getCSN()); 4253 } 4254 return true; 4255 } 4256 4257 // Put update message into the replay queue 4258 // (block until some place in the queue is available) 4259 final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); 4260 while (!isListenerShuttingDown()) 4261 { 4262 // loop until we can offer to the queue or shutdown was initiated 4263 try 4264 { 4265 if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS)) 4266 { 4267 // successful offer to the queue, let's exit the loop 4268 break; 4269 } 4270 } 4271 catch (InterruptedException e) 4272 { 4273 // Thread interrupted: check for shutdown. 4274 Thread.currentThread().interrupt(); 4275 } 4276 } 4277 4278 return false; 4279 } 4280 4281 // unknown message type, this should not happen, just ignore it. 4282 return true; 4283 } 4284 4285 /** 4286 * Monitoring information for the LDAPReplicationDomain. 4287 * 4288 * @return Monitoring attributes specific to the LDAPReplicationDomain. 4289 */ 4290 @Override 4291 public Collection<Attribute> getAdditionalMonitoring() 4292 { 4293 List<Attribute> attributes = new ArrayList<>(); 4294 4295 // number of updates in the pending list 4296 addMonitorData(attributes, "pending-updates", pendingChanges.size()); 4297 4298 addMonitorData(attributes, "replayed-updates-ok", 4299 numReplayedPostOpCalled.get()); 4300 addMonitorData(attributes, "resolved-modify-conflicts", 4301 numResolvedModifyConflicts.get()); 4302 addMonitorData(attributes, "resolved-naming-conflicts", 4303 numResolvedNamingConflicts.get()); 4304 addMonitorData(attributes, "unresolved-naming-conflicts", 4305 numUnresolvedNamingConflicts.get()); 4306 addMonitorData(attributes, "remote-pending-changes-size", 4307 remotePendingChanges.getQueueSize()); 4308 4309 return attributes; 4310 } 4311 4312 /** 4313 * Verifies that the given string represents a valid source 4314 * from which this server can be initialized. 4315 * @param sourceString The string representing the source 4316 * @return The source as a integer value 4317 * @throws DirectoryException if the string is not valid 4318 */ 4319 public int decodeSource(String sourceString) throws DirectoryException 4320 { 4321 int source = 0; 4322 try 4323 { 4324 source = Integer.decode(sourceString); 4325 if (source >= -1 && source != getServerId()) 4326 { 4327 // TODO Verifies serverID is in the domain 4328 // We should check here that this is a server implied 4329 // in the current domain. 4330 return source; 4331 } 4332 } 4333 catch (Exception e) 4334 { 4335 LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get( 4336 getBaseDN(), getServerId(), sourceString, stackTraceToSingleLineString(e)); 4337 throw new DirectoryException(ResultCode.OTHER, message, e); 4338 } 4339 4340 LocalizableMessage message = ERR_INVALID_IMPORT_SOURCE.get(getBaseDN(), getServerId(), source, ""); 4341 throw new DirectoryException(ResultCode.OTHER, message); 4342 } 4343 4344 /** 4345 * Called by synchronize post op plugin in order to add the entry historical 4346 * attributes to the UpdateMsg. 4347 * @param msg an replication update message 4348 * @param op the operation in progress 4349 */ 4350 private void addEntryAttributesForCL(UpdateMsg msg, 4351 PostOperationOperation op) 4352 { 4353 if (op instanceof PostOperationDeleteOperation) 4354 { 4355 PostOperationDeleteOperation delOp = (PostOperationDeleteOperation) op; 4356 final Set<String> names = getEclIncludesForDeletes(); 4357 Entry entry = delOp.getEntryToDelete(); 4358 final DeleteMsg deleteMsg = (DeleteMsg) msg; 4359 deleteMsg.setEclIncludes(getIncludedAttributes(entry, names)); 4360 4361 // For delete only, add the Authorized DN since it's required in the 4362 // ECL entry but is not part of rest of the message. 4363 DN deleterDN = delOp.getAuthorizationDN(); 4364 if (deleterDN != null) 4365 { 4366 deleteMsg.setInitiatorsName(deleterDN.toString()); 4367 } 4368 } 4369 else if (op instanceof PostOperationModifyOperation) 4370 { 4371 PostOperationModifyOperation modOp = (PostOperationModifyOperation) op; 4372 Set<String> names = getEclIncludes(); 4373 Entry entry = modOp.getCurrentEntry(); 4374 ((ModifyMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4375 } 4376 else if (op instanceof PostOperationModifyDNOperation) 4377 { 4378 PostOperationModifyDNOperation modDNOp = 4379 (PostOperationModifyDNOperation) op; 4380 Set<String> names = getEclIncludes(); 4381 Entry entry = modDNOp.getOriginalEntry(); 4382 ((ModifyDNMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4383 } 4384 else if (op instanceof PostOperationAddOperation) 4385 { 4386 PostOperationAddOperation addOp = (PostOperationAddOperation) op; 4387 Set<String> names = getEclIncludes(); 4388 Entry entry = addOp.getEntryToAdd(); 4389 ((AddMsg) msg).setEclIncludes(getIncludedAttributes(entry, names)); 4390 } 4391 } 4392 4393 private Collection<Attribute> getIncludedAttributes(Entry entry, 4394 Set<String> names) 4395 { 4396 if (names.isEmpty()) 4397 { 4398 // Fast-path. 4399 return Collections.emptySet(); 4400 } 4401 else if (names.size() == 1 && names.contains("*")) 4402 { 4403 // Potential fast-path for delete operations. 4404 List<Attribute> attributes = new LinkedList<>(); 4405 for (List<Attribute> attributeList : entry.getUserAttributes().values()) 4406 { 4407 attributes.addAll(attributeList); 4408 } 4409 Attribute objectClassAttribute = entry.getObjectClassAttribute(); 4410 if (objectClassAttribute != null) 4411 { 4412 attributes.add(objectClassAttribute); 4413 } 4414 return attributes; 4415 } 4416 else 4417 { 4418 // Expand @objectclass references in attribute list if needed. 4419 // We do this now in order to take into account dynamic schema changes. 4420 final Set<String> expandedNames = getExpandedNames(names); 4421 final Entry filteredEntry = 4422 entry.filterEntry(expandedNames, false, false, false); 4423 return filteredEntry.getAttributes(); 4424 } 4425 } 4426 4427 private Set<String> getExpandedNames(Set<String> names) 4428 { 4429 // Only rebuild the attribute set if necessary. 4430 if (!needsExpanding(names)) 4431 { 4432 return names; 4433 } 4434 4435 final Set<String> expandedNames = new HashSet<>(names.size()); 4436 for (String name : names) 4437 { 4438 if (name.startsWith("@")) 4439 { 4440 String ocName = name.substring(1); 4441 ObjectClass objectClass = 4442 DirectoryServer.getObjectClass(toLowerCase(ocName)); 4443 if (objectClass != null) 4444 { 4445 for (AttributeType at : objectClass.getRequiredAttributeChain()) 4446 { 4447 expandedNames.add(at.getNameOrOID()); 4448 } 4449 for (AttributeType at : objectClass.getOptionalAttributeChain()) 4450 { 4451 expandedNames.add(at.getNameOrOID()); 4452 } 4453 } 4454 } 4455 else 4456 { 4457 expandedNames.add(name); 4458 } 4459 } 4460 return expandedNames; 4461 } 4462 4463 private boolean needsExpanding(Set<String> names) 4464 { 4465 for (String name : names) 4466 { 4467 if (name.startsWith("@")) 4468 { 4469 return true; 4470 } 4471 } 4472 return false; 4473 } 4474 4475 /** 4476 * Gets the fractional configuration of this domain. 4477 * @return The fractional configuration of this domain. 4478 */ 4479 FractionalConfig getFractionalConfig() 4480 { 4481 return fractionalConfig; 4482 } 4483 4484 /** 4485 * This bean is a utility class used for holding the parsing 4486 * result of a fractional configuration. It also contains some facility 4487 * methods like fractional configuration comparison... 4488 */ 4489 static class FractionalConfig 4490 { 4491 /** 4492 * Tells if fractional replication is enabled or not (some fractional 4493 * constraints have been put in place). If this is true then 4494 * fractionalExclusive explains the configuration mode and either 4495 * fractionalSpecificClassesAttributes or fractionalAllClassesAttributes or 4496 * both should be filled with something. 4497 */ 4498 private boolean fractional; 4499 4500 /** 4501 * - If true, tells that the configured fractional replication is exclusive: 4502 * Every attributes contained in fractionalSpecificClassesAttributes and 4503 * fractionalAllClassesAttributes should be ignored when replaying operation 4504 * in local backend. 4505 * - If false, tells that the configured fractional replication is 4506 * inclusive: 4507 * Only attributes contained in fractionalSpecificClassesAttributes and 4508 * fractionalAllClassesAttributes should be taken into account in local 4509 * backend. 4510 */ 4511 private boolean fractionalExclusive = true; 4512 4513 /** 4514 * Used in fractional replication: holds attributes of a specific object class. 4515 * - key = object class (name or OID of the class) 4516 * - value = the attributes of that class that should be taken into account 4517 * (inclusive or exclusive fractional replication) (name or OID of the 4518 * attribute) 4519 * When an operation coming from the network is to be locally replayed, if 4520 * the concerned entry has an objectClass attribute equals to 'key': 4521 * - inclusive mode: only the attributes in 'value' will be added/deleted/modified 4522 * - exclusive mode: the attributes in 'value' will not be added/deleted/modified 4523 */ 4524 private Map<String, Set<String>> fractionalSpecificClassesAttributes = new HashMap<>(); 4525 4526 /** 4527 * Used in fractional replication: holds attributes of any object class. 4528 * When an operation coming from the network is to be locally replayed: 4529 * - inclusive mode: only attributes of the matching entry not present in 4530 * fractionalAllClassesAttributes will be added/deleted/modified 4531 * - exclusive mode: attributes of the matching entry present in 4532 * fractionalAllClassesAttributes will not be added/deleted/modified 4533 * The attributes may be in human readable form of OID form. 4534 */ 4535 private Set<String> fractionalAllClassesAttributes = new HashSet<>(); 4536 4537 /** Base DN the fractional configuration is for. */ 4538 private final DN baseDN; 4539 4540 /** 4541 * Constructs a new fractional configuration object. 4542 * @param baseDN The base DN the object is for. 4543 */ 4544 private FractionalConfig(DN baseDN) 4545 { 4546 this.baseDN = baseDN; 4547 } 4548 4549 /** 4550 * Getter for fractional. 4551 * @return True if the configuration has fractional enabled 4552 */ 4553 boolean isFractional() 4554 { 4555 return fractional; 4556 } 4557 4558 /** 4559 * Set the fractional parameter. 4560 * @param fractional The fractional parameter 4561 */ 4562 private void setFractional(boolean fractional) 4563 { 4564 this.fractional = fractional; 4565 } 4566 4567 /** 4568 * Getter for fractionalExclusive. 4569 * @return True if the configuration has fractional exclusive enabled 4570 */ 4571 boolean isFractionalExclusive() 4572 { 4573 return fractionalExclusive; 4574 } 4575 4576 /** 4577 * Set the fractionalExclusive parameter. 4578 * @param fractionalExclusive The fractionalExclusive parameter 4579 */ 4580 private void setFractionalExclusive(boolean fractionalExclusive) 4581 { 4582 this.fractionalExclusive = fractionalExclusive; 4583 } 4584 4585 /** 4586 * Getter for fractionalSpecificClassesAttributes attribute. 4587 * @return The fractionalSpecificClassesAttributes attribute. 4588 */ 4589 Map<String, Set<String>> getFractionalSpecificClassesAttributes() 4590 { 4591 return fractionalSpecificClassesAttributes; 4592 } 4593 4594 /** 4595 * Set the fractionalSpecificClassesAttributes parameter. 4596 * @param fractionalSpecificClassesAttributes The 4597 * fractionalSpecificClassesAttributes parameter to set. 4598 */ 4599 private void setFractionalSpecificClassesAttributes( 4600 Map<String, Set<String>> fractionalSpecificClassesAttributes) 4601 { 4602 this.fractionalSpecificClassesAttributes = 4603 fractionalSpecificClassesAttributes; 4604 } 4605 4606 /** 4607 * Getter for fractionalSpecificClassesAttributes attribute. 4608 * @return The fractionalSpecificClassesAttributes attribute. 4609 */ 4610 Set<String> getFractionalAllClassesAttributes() 4611 { 4612 return fractionalAllClassesAttributes; 4613 } 4614 4615 /** 4616 * Set the fractionalAllClassesAttributes parameter. 4617 * @param fractionalAllClassesAttributes The 4618 * fractionalSpecificClassesAttributes parameter to set. 4619 */ 4620 private void setFractionalAllClassesAttributes( 4621 Set<String> fractionalAllClassesAttributes) 4622 { 4623 this.fractionalAllClassesAttributes = fractionalAllClassesAttributes; 4624 } 4625 4626 /** 4627 * Getter for the base baseDN. 4628 * @return The baseDN attribute. 4629 */ 4630 DN getBaseDn() 4631 { 4632 return baseDN; 4633 } 4634 4635 /** 4636 * Extract the fractional configuration from the passed domain configuration 4637 * entry. 4638 * @param configuration The configuration object 4639 * @return The fractional replication configuration. 4640 * @throws ConfigException If an error occurred. 4641 */ 4642 static FractionalConfig toFractionalConfig( 4643 ReplicationDomainCfg configuration) throws ConfigException 4644 { 4645 // Prepare fractional configuration variables to parse 4646 Iterator<String> exclIt = configuration.getFractionalExclude().iterator(); 4647 Iterator<String> inclIt = configuration.getFractionalInclude().iterator(); 4648 4649 // Get potentially new fractional configuration 4650 Map<String, Set<String>> newFractionalSpecificClassesAttributes = new HashMap<>(); 4651 Set<String> newFractionalAllClassesAttributes = new HashSet<>(); 4652 4653 int newFractionalMode = parseFractionalConfig(exclIt, inclIt, 4654 newFractionalSpecificClassesAttributes, 4655 newFractionalAllClassesAttributes); 4656 4657 // Create matching parsed config object 4658 FractionalConfig result = new FractionalConfig(configuration.getBaseDN()); 4659 switch (newFractionalMode) 4660 { 4661 case NOT_FRACTIONAL: 4662 result.setFractional(false); 4663 result.setFractionalExclusive(true); 4664 break; 4665 case EXCLUSIVE_FRACTIONAL: 4666 case INCLUSIVE_FRACTIONAL: 4667 result.setFractional(true); 4668 result.setFractionalExclusive( 4669 newFractionalMode == EXCLUSIVE_FRACTIONAL); 4670 break; 4671 } 4672 result.setFractionalSpecificClassesAttributes( 4673 newFractionalSpecificClassesAttributes); 4674 result.setFractionalAllClassesAttributes( 4675 newFractionalAllClassesAttributes); 4676 return result; 4677 } 4678 4679 /** 4680 * Parses a fractional replication configuration, filling the empty passed 4681 * variables and returning the used fractional mode. The 2 passed variables 4682 * to fill should be initialized (not null) and empty. 4683 * @param exclIt The list of fractional exclude configuration values (may be 4684 * null) 4685 * @param inclIt The list of fractional include configuration values (may be 4686 * null) 4687 * @param fractionalSpecificClassesAttributes An empty map to be filled with 4688 * what is read from the fractional configuration properties. 4689 * @param fractionalAllClassesAttributes An empty list to be filled with 4690 * what is read from the fractional configuration properties. 4691 * @return the fractional mode deduced from the passed configuration: 4692 * not fractional, exclusive fractional or inclusive fractional 4693 * modes 4694 */ 4695 private static int parseFractionalConfig ( 4696 Iterator<String> exclIt, Iterator<String> inclIt, 4697 Map<String, Set<String>> fractionalSpecificClassesAttributes, 4698 Set<String> fractionalAllClassesAttributes) throws ConfigException 4699 { 4700 // Determine if fractional-exclude or fractional-include property is used: 4701 // only one of them is allowed 4702 int fractionalMode; 4703 Iterator<String> iterator; 4704 if (exclIt != null && exclIt.hasNext()) 4705 { 4706 if (inclIt != null && inclIt.hasNext()) 4707 { 4708 throw new ConfigException( 4709 NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get()); 4710 } 4711 4712 fractionalMode = EXCLUSIVE_FRACTIONAL; 4713 iterator = exclIt; 4714 } 4715 else 4716 { 4717 if (inclIt != null && inclIt.hasNext()) 4718 { 4719 fractionalMode = INCLUSIVE_FRACTIONAL; 4720 iterator = inclIt; 4721 } 4722 else 4723 { 4724 return NOT_FRACTIONAL; 4725 } 4726 } 4727 4728 while (iterator.hasNext()) 4729 { 4730 // Parse a value with the form class:attr1,attr2... 4731 // or *:attr1,attr2... 4732 String fractCfgStr = iterator.next(); 4733 StringTokenizer st = new StringTokenizer(fractCfgStr, ":"); 4734 int nTokens = st.countTokens(); 4735 if (nTokens < 2) 4736 { 4737 throw new ConfigException(NOTE_ERR_FRACTIONAL_CONFIG_WRONG_FORMAT. 4738 get(fractCfgStr)); 4739 } 4740 // Get the class name 4741 String classNameLower = st.nextToken().toLowerCase(); 4742 boolean allClasses = "*".equals(classNameLower); 4743 // Get the attributes 4744 String attributes = st.nextToken(); 4745 st = new StringTokenizer(attributes, ","); 4746 while (st.hasMoreTokens()) 4747 { 4748 String attrNameLower = st.nextToken().toLowerCase(); 4749 // Store attribute in the appropriate variable 4750 if (allClasses) 4751 { 4752 fractionalAllClassesAttributes.add(attrNameLower); 4753 } 4754 else 4755 { 4756 Set<String> attrList = fractionalSpecificClassesAttributes.get(classNameLower); 4757 if (attrList == null) 4758 { 4759 attrList = new LinkedHashSet<>(); 4760 fractionalSpecificClassesAttributes.put(classNameLower, attrList); 4761 } 4762 attrList.add(attrNameLower); 4763 } 4764 } 4765 } 4766 return fractionalMode; 4767 } 4768 4769 /** Return type of the parseFractionalConfig method. */ 4770 private static final int NOT_FRACTIONAL = 0; 4771 private static final int EXCLUSIVE_FRACTIONAL = 1; 4772 private static final int INCLUSIVE_FRACTIONAL = 2; 4773 4774 /** 4775 * Get an integer representation of the domain fractional configuration. 4776 * @return An integer representation of the domain fractional configuration. 4777 */ 4778 private int fractionalConfigToInt() 4779 { 4780 if (!fractional) 4781 { 4782 return NOT_FRACTIONAL; 4783 } 4784 else if (fractionalExclusive) 4785 { 4786 return EXCLUSIVE_FRACTIONAL; 4787 } 4788 return INCLUSIVE_FRACTIONAL; 4789 } 4790 4791 /** 4792 * Compare 2 fractional replication configurations and returns true if they 4793 * are equivalent. 4794 * @param cfg1 First fractional configuration 4795 * @param cfg2 Second fractional configuration 4796 * @return True if both configurations are equivalent. 4797 * @throws ConfigException If some classes or attributes could not be 4798 * retrieved from the schema. 4799 */ 4800 private static boolean isFractionalConfigEquivalent(FractionalConfig cfg1, 4801 FractionalConfig cfg2) throws ConfigException 4802 { 4803 // Compare base DNs just to be consistent 4804 if (!cfg1.getBaseDn().equals(cfg2.getBaseDn())) 4805 { 4806 return false; 4807 } 4808 4809 // Compare modes 4810 if (cfg1.isFractional() != cfg2.isFractional() 4811 || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()) 4812 { 4813 return false; 4814 } 4815 4816 // Compare all classes attributes 4817 Set<String> allClassesAttrs1 = cfg1.getFractionalAllClassesAttributes(); 4818 Set<String> allClassesAttrs2 = cfg2.getFractionalAllClassesAttributes(); 4819 if (!areAttributesEquivalent(allClassesAttrs1, allClassesAttrs2)) 4820 { 4821 return false; 4822 } 4823 4824 // Compare specific classes attributes 4825 Map<String, Set<String>> specificClassesAttrs1 = 4826 cfg1.getFractionalSpecificClassesAttributes(); 4827 Map<String, Set<String>> specificClassesAttrs2 = 4828 cfg2.getFractionalSpecificClassesAttributes(); 4829 if (specificClassesAttrs1.size() != specificClassesAttrs2.size()) 4830 { 4831 return false; 4832 } 4833 4834 /* 4835 * Check consistency of specific classes attributes 4836 * 4837 * For each class in specificClassesAttributes1, check that the attribute 4838 * list is equivalent to specificClassesAttributes2 attribute list 4839 */ 4840 Schema schema = DirectoryServer.getSchema(); 4841 for (String className1 : specificClassesAttrs1.keySet()) 4842 { 4843 // Get class from specificClassesAttributes1 4844 ObjectClass objectClass1 = schema.getObjectClass(className1); 4845 if (objectClass1 == null) 4846 { 4847 throw new ConfigException( 4848 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className1)); 4849 } 4850 4851 // Look for matching one in specificClassesAttributes2 4852 boolean foundClass = false; 4853 for (String className2 : specificClassesAttrs2.keySet()) 4854 { 4855 ObjectClass objectClass2 = schema.getObjectClass(className2); 4856 if (objectClass2 == null) 4857 { 4858 throw new ConfigException( 4859 NOTE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS.get(className2)); 4860 } 4861 if (objectClass1.equals(objectClass2)) 4862 { 4863 foundClass = true; 4864 // Now compare the 2 attribute lists 4865 Set<String> attributes1 = specificClassesAttrs1.get(className1); 4866 Set<String> attributes2 = specificClassesAttrs2.get(className2); 4867 if (!areAttributesEquivalent(attributes1, attributes2)) 4868 { 4869 return false; 4870 } 4871 break; 4872 } 4873 } 4874 // Found matching class ? 4875 if (!foundClass) 4876 { 4877 return false; 4878 } 4879 } 4880 4881 return true; 4882 } 4883 } 4884 4885 /** 4886 * Specifies whether this domain is enabled/disabled regarding the ECL. 4887 * @return enabled/disabled for the ECL. 4888 */ 4889 boolean isECLEnabled() 4890 { 4891 return this.eclDomain.isEnabled(); 4892 } 4893 4894 /** 4895 * Return the minimum time (in ms) that the domain keeps the historical 4896 * information necessary to solve conflicts. 4897 * 4898 * @return the purge delay. 4899 */ 4900 long getHistoricalPurgeDelay() 4901 { 4902 return config.getConflictsHistoricalPurgeDelay() * 60 * 1000; 4903 } 4904 4905 /** 4906 * Check if the operation that just happened has cleared a conflict : Clearing 4907 * a conflict happens if the operation has freed a DN for which another entry 4908 * was in conflict. 4909 * <p> 4910 * Steps: 4911 * <ul> 4912 * <li>get the DN freed by a DELETE or MODRDN op</li> 4913 * <li>search for entries put in the conflict space (dn=entryUUID'+'....) 4914 * because the expected DN was not available (ds-sync-conflict=expected DN) 4915 * </li> 4916 * <li>retain the entry with the oldest conflict</li> 4917 * <li>rename this entry with the freedDN as it was expected originally</li> 4918 * </ul> 4919 * 4920 * @param task 4921 * the task raising this purge. 4922 * @param endDate 4923 * the date to stop this task whether the job is done or not. 4924 * @throws DirectoryException 4925 * when an exception happens. 4926 */ 4927 public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task, 4928 long endDate) throws DirectoryException 4929 { 4930 logger.trace("[PURGE] purgeConflictsHistorical " 4931 + "on domain: " + getBaseDN() 4932 + "endDate:" + new Date(endDate) 4933 + "lastCSNPurgedFromHist: " 4934 + lastCSNPurgedFromHist.toStringUI()); 4935 4936 String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")"; 4937 SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter) 4938 .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS); 4939 InternalSearchOperation searchOp = conn.processSearch(request); 4940 4941 int count = 0; 4942 if (task != null) 4943 { 4944 task.setProgressStats(lastCSNPurgedFromHist, count); 4945 } 4946 4947 for (SearchResultEntry entry : searchOp.getSearchEntries()) 4948 { 4949 long maxTimeToRun = endDate - TimeThread.getTime(); 4950 if (maxTimeToRun < 0) 4951 { 4952 throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED, 4953 LocalizableMessage.raw(" end date reached")); 4954 } 4955 4956 EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); 4957 lastCSNPurgedFromHist = entryHist.getOldestCSN(); 4958 entryHist.setPurgeDelay(getHistoricalPurgeDelay()); 4959 Attribute attr = entryHist.encodeAndPurge(); 4960 count += entryHist.getLastPurgedValuesCount(); 4961 List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr)); 4962 4963 ModifyOperation newOp = new ModifyOperationBasis( 4964 conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0), 4965 entry.getName(), mods); 4966 runAsSynchronizedOperation(newOp); 4967 4968 if (newOp.getResultCode() != ResultCode.SUCCESS) 4969 { 4970 // Log information for the repair tool. 4971 logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode()); 4972 } 4973 else if (task != null) 4974 { 4975 task.setProgressStats(lastCSNPurgedFromHist, count); 4976 } 4977 } 4978 } 4979}