001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2014-2015 ForgeRock AS. 025 */ 026package org.opends.server.backends; 027 028import static org.opends.messages.BackendMessages.*; 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.config.ConfigConstants.*; 031import static org.opends.server.replication.plugin.MultimasterReplication.*; 032import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; 033import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; 034import static org.opends.server.util.LDIFWriter.*; 035import static org.opends.server.util.ServerConstants.*; 036import static org.opends.server.util.StaticUtils.*; 037 038import java.text.SimpleDateFormat; 039import java.util.Collection; 040import java.util.Collections; 041import java.util.Date; 042import java.util.Iterator; 043import java.util.LinkedHashMap; 044import java.util.List; 045import java.util.Map; 046import java.util.Set; 047import java.util.TimeZone; 048import java.util.concurrent.ConcurrentLinkedQueue; 049import java.util.concurrent.ConcurrentSkipListMap; 050import java.util.concurrent.atomic.AtomicReference; 051 052import org.forgerock.i18n.LocalizableMessage; 053import org.forgerock.i18n.slf4j.LocalizedLogger; 054import org.forgerock.opendj.config.server.ConfigException; 055import org.forgerock.opendj.ldap.ByteString; 056import org.forgerock.opendj.ldap.ConditionResult; 057import org.forgerock.opendj.ldap.ModificationType; 058import org.forgerock.opendj.ldap.ResultCode; 059import org.forgerock.opendj.ldap.SearchScope; 060import org.opends.server.admin.Configuration; 061import org.opends.server.api.Backend; 062import org.opends.server.config.ConfigConstants; 063import org.opends.server.controls.EntryChangelogNotificationControl; 064import org.opends.server.controls.ExternalChangelogRequestControl; 065import org.opends.server.core.AddOperation; 066import org.opends.server.core.DeleteOperation; 067import org.opends.server.core.DirectoryServer; 068import org.opends.server.core.ModifyDNOperation; 069import org.opends.server.core.ModifyOperation; 070import org.opends.server.core.PersistentSearch; 071import org.opends.server.core.SearchOperation; 072import org.opends.server.core.ServerContext; 073import org.opends.server.replication.common.CSN; 074import org.opends.server.replication.common.MultiDomainServerState; 075import org.opends.server.replication.common.ServerState; 076import org.opends.server.replication.protocol.AddMsg; 077import org.opends.server.replication.protocol.DeleteMsg; 078import org.opends.server.replication.protocol.LDAPUpdateMsg; 079import org.opends.server.replication.protocol.ModifyCommonMsg; 080import org.opends.server.replication.protocol.ModifyDNMsg; 081import org.opends.server.replication.protocol.UpdateMsg; 082import org.opends.server.replication.server.ReplicationServer; 083import org.opends.server.replication.server.ReplicationServerDomain; 084import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; 085import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; 086import org.opends.server.replication.server.changelog.api.ChangelogDB; 087import org.opends.server.replication.server.changelog.api.ChangelogException; 088import org.opends.server.replication.server.changelog.api.DBCursor; 089import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; 090import org.opends.server.replication.server.changelog.api.ReplicaId; 091import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; 092import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; 093import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor; 094import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; 095import org.opends.server.types.Attribute; 096import org.opends.server.types.AttributeType; 097import org.opends.server.types.Attributes; 098import org.opends.server.types.BackupConfig; 099import org.opends.server.types.BackupDirectory; 100import org.opends.server.types.CanceledOperationException; 101import org.opends.server.types.Control; 102import org.opends.server.types.DN; 103import org.opends.server.types.DirectoryException; 104import org.opends.server.types.Entry; 105import org.opends.server.types.FilterType; 106import org.opends.server.types.IndexType; 107import org.opends.server.types.InitializationException; 108import org.opends.server.types.LDIFExportConfig; 109import org.opends.server.types.LDIFImportConfig; 110import org.opends.server.types.LDIFImportResult; 111import org.opends.server.types.Modification; 112import org.opends.server.types.ObjectClass; 113import org.opends.server.types.Privilege; 114import org.opends.server.types.RDN; 115import org.opends.server.types.RawAttribute; 116import org.opends.server.types.RestoreConfig; 117import org.opends.server.types.SearchFilter; 118import org.opends.server.types.WritabilityMode; 119import org.opends.server.util.StaticUtils; 120 121/** 122 * A backend that provides access to the changelog, i.e. the "cn=changelog" 123 * suffix. It is a read-only backend that is created by a 124 * {@link ReplicationServer} and is not configurable. 125 * <p> 126 * There are two modes to search the changelog: 127 * <ul> 128 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the 129 * request. The cookie provided in the control is used to retrieve entries from 130 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with 131 * the entries.</li> 132 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided 133 * with the request. The entries are retrieved using the ChangeNumberIndexDB and 134 * their attributes are set with the information from the ReplicasDBs. The 135 * <code>changeNumber</code> attribute value is set from the content of 136 * ChangeNumberIndexDB.</li> 137 * </ul> 138 * <h3>Searches flow</h3> 139 * <p> 140 * Here is the flow of searches within the changelog backend APIs: 141 * <ul> 142 * <li>Normal searches only go through: 143 * <ol> 144 * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> 145 * </ol> 146 * </li> 147 * <li>Persistent searches with <code>changesOnly=false</code> go through: 148 * <ol> 149 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} 150 * (once, single threaded),</li> 151 * <li> 152 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> 153 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi 154 * threaded)</li> 155 * </ol> 156 * </li> 157 * <li>Persistent searches with <code>changesOnly=true</code> go through: 158 * <ol> 159 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} 160 * (once, single threaded)</li> 161 * <li> 162 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi 163 * threaded)</li> 164 * </ol> 165 * </li> 166 * </ul> 167 * 168 * @see ReplicationServer 169 */ 170public class ChangelogBackend extends Backend<Configuration> 171{ 172 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 173 174 /** The id of this backend. */ 175 public static final String BACKEND_ID = "changelog"; 176 177 private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L; 178 179 private static final String CHANGE_NUMBER_ATTR = "changeNumber"; 180 private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase(); 181 private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender"; 182 183 /** The set of objectclasses that will be used in root entry. */ 184 private static final Map<ObjectClass, String> 185 CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<>(2); 186 static 187 { 188 CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP); 189 CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container"); 190 } 191 192 /** The set of objectclasses that will be used in ECL entries. */ 193 private static final Map<ObjectClass, String> 194 CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<>(2); 195 static 196 { 197 CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP); 198 CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY); 199 } 200 201 /** The attribute type for the "creatorsName" attribute. */ 202 private static final AttributeType CREATORS_NAME_TYPE = 203 DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_CREATORS_NAME_LC); 204 /** The attribute type for the "modifiersName" attribute. */ 205 private static final AttributeType MODIFIERS_NAME_TYPE = 206 DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_MODIFIERS_NAME_LC); 207 208 /** The base DN for the external change log. */ 209 public static final DN CHANGELOG_BASE_DN; 210 211 static 212 { 213 try 214 { 215 CHANGELOG_BASE_DN = DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT); 216 } 217 catch (DirectoryException e) 218 { 219 throw new RuntimeException(e); 220 } 221 } 222 223 /** The set of base DNs for this backend. */ 224 private DN[] baseDNs; 225 /** The set of supported controls for this backend. */ 226 private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL); 227 /** Whether the base changelog entry has subordinates. */ 228 private Boolean baseEntryHasSubordinates; 229 230 /** The replication server on which the changelog is read. */ 231 private final ReplicationServer replicationServer; 232 private final ECLEnabledDomainPredicate domainPredicate; 233 234 /** The set of cookie-based persistent searches registered with this backend. */ 235 private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = new ConcurrentLinkedQueue<>(); 236 /** The set of change number-based persistent searches registered with this backend. */ 237 private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches = 238 new ConcurrentLinkedQueue<>(); 239 240 /** 241 * Creates a new backend with the provided replication server. 242 * 243 * @param replicationServer 244 * The replication server on which the changes are read. 245 * @param domainPredicate 246 * Returns whether a domain is enabled for the external changelog. 247 */ 248 public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate) 249 { 250 this.replicationServer = replicationServer; 251 this.domainPredicate = domainPredicate; 252 setBackendID(BACKEND_ID); 253 setWritabilityMode(WritabilityMode.DISABLED); 254 setPrivateBackend(true); 255 } 256 257 private ChangelogDB getChangelogDB() 258 { 259 return replicationServer.getChangelogDB(); 260 } 261 262 /** 263 * Returns the ChangelogBackend configured for "cn=changelog" in this directory server. 264 * 265 * @return the ChangelogBackend configured for "cn=changelog" in this directory server 266 * @deprecated instead inject the required object where needed 267 */ 268 @Deprecated 269 public static ChangelogBackend getInstance() 270 { 271 return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN); 272 } 273 274 @Override 275 public void configureBackend(final Configuration config, ServerContext serverContext) throws ConfigException 276 { 277 throw new UnsupportedOperationException("The changelog backend is not configurable"); 278 } 279 280 @Override 281 public void openBackend() throws InitializationException 282 { 283 baseDNs = new DN[] { CHANGELOG_BASE_DN }; 284 285 try 286 { 287 DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true); 288 } 289 catch (final DirectoryException e) 290 { 291 throw new InitializationException( 292 ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e); 293 } 294 } 295 296 @Override 297 public void closeBackend() 298 { 299 try 300 { 301 DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN); 302 } 303 catch (final DirectoryException e) 304 { 305 logger.traceException(e); 306 } 307 } 308 309 @Override 310 public DN[] getBaseDNs() 311 { 312 return baseDNs; 313 } 314 315 @Override 316 public boolean isIndexed(final AttributeType attributeType, final IndexType indexType) 317 { 318 return true; 319 } 320 321 @Override 322 public Entry getEntry(final DN entryDN) throws DirectoryException 323 { 324 if (entryDN == null) 325 { 326 throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), 327 ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID())); 328 } 329 throw new RuntimeException("Not implemented"); 330 } 331 332 @Override 333 public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException 334 { 335 if (CHANGELOG_BASE_DN.equals(entryDN)) 336 { 337 final Boolean hasSubs = baseChangelogHasSubordinates(); 338 if (hasSubs == null) 339 { 340 return ConditionResult.UNDEFINED; 341 } 342 return ConditionResult.valueOf(hasSubs); 343 } 344 return ConditionResult.FALSE; 345 } 346 347 private Boolean baseChangelogHasSubordinates() throws DirectoryException 348 { 349 if (baseEntryHasSubordinates == null) 350 { 351 // compute its value 352 try 353 { 354 final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); 355 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); 356 try (final MultiDomainDBCursor cursor = 357 replicationDomainDB.getCursorFrom(new MultiDomainServerState(), options, getExcludedBaseDNs())) 358 { 359 baseEntryHasSubordinates = cursor.next(); 360 } 361 } 362 catch (ChangelogException e) 363 { 364 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get( 365 "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e))); 366 } 367 } 368 return baseEntryHasSubordinates; 369 } 370 371 @Override 372 public long getNumberOfEntriesInBaseDN(final DN baseDN) throws DirectoryException 373 { 374 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 375 } 376 377 @Override 378 public long getNumberOfChildren(final DN parentDN) throws DirectoryException 379 { 380 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get()); 381 } 382 383 /** 384 * Notifies persistent searches of this backend that a new cookie entry was added to it. 385 * <p> 386 * Note: This method correspond to the "persistent search" phase. 387 * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. 388 * <p> 389 * This method must only be called after the provided data have been persisted to disk. 390 * 391 * @param baseDN 392 * the baseDN of the newly added entry. 393 * @param updateMsg 394 * the update message of the newly added entry 395 * @throws ChangelogException 396 * If a problem occurs while notifying of the newly added entry. 397 */ 398 public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException 399 { 400 if (!(updateMsg instanceof LDAPUpdateMsg)) 401 { 402 return; 403 } 404 405 try 406 { 407 for (PersistentSearch pSearch : cookieBasedPersistentSearches) 408 { 409 final SearchOperation searchOp = pSearch.getSearchOperation(); 410 final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); 411 entrySender.persistentSearchSendEntry(baseDN, updateMsg); 412 } 413 } 414 catch (DirectoryException e) 415 { 416 throw new ChangelogException(e.getMessageObject(), e); 417 } 418 } 419 420 /** 421 * Notifies persistent searches of this backend that a new change number entry was added to it. 422 * <p> 423 * Note: This method correspond to the "persistent search" phase. 424 * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. 425 * <p> 426 * This method must only be called after the provided data have been persisted to disk. 427 * 428 * @param baseDN 429 * the baseDN of the newly added entry. 430 * @param changeNumber 431 * the change number of the newly added entry. It will be greater 432 * than zero for entries added to the change number index and less 433 * than or equal to zero for entries added to any replica DB 434 * @param cookieString 435 * a string representing the cookie of the newly added entry. 436 * This is only meaningful for entries added to the change number index 437 * @param updateMsg 438 * the update message of the newly added entry 439 * @throws ChangelogException 440 * If a problem occurs while notifying of the newly added entry. 441 */ 442 public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg) 443 throws ChangelogException 444 { 445 if (!(updateMsg instanceof LDAPUpdateMsg) 446 || changeNumberBasedPersistentSearches.isEmpty()) 447 { 448 return; 449 } 450 451 try 452 { 453 // changeNumber entry can be shared with multiple persistent searches 454 final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg); 455 for (PersistentSearch pSearch : changeNumberBasedPersistentSearches) 456 { 457 final SearchOperation searchOp = pSearch.getSearchOperation(); 458 final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); 459 entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry); 460 } 461 } 462 catch (DirectoryException e) 463 { 464 throw new ChangelogException(e.getMessageObject(), e); 465 } 466 } 467 468 private boolean isCookieBased(final SearchOperation searchOp) 469 { 470 for (Control c : searchOp.getRequestControls()) 471 { 472 if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID())) 473 { 474 return true; 475 } 476 } 477 return false; 478 } 479 480 @Override 481 public void addEntry(Entry entry, AddOperation addOperation) 482 throws DirectoryException, CanceledOperationException 483 { 484 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 485 ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getName()), getBackendID())); 486 } 487 488 @Override 489 public void deleteEntry(DN entryDN, DeleteOperation deleteOperation) 490 throws DirectoryException, CanceledOperationException 491 { 492 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 493 ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID())); 494 } 495 496 @Override 497 public void replaceEntry(Entry oldEntry, Entry newEntry, 498 ModifyOperation modifyOperation) throws DirectoryException, 499 CanceledOperationException 500 { 501 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 502 ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getName()), getBackendID())); 503 } 504 505 @Override 506 public void renameEntry(DN currentDN, Entry entry, 507 ModifyDNOperation modifyDNOperation) throws DirectoryException, 508 CanceledOperationException 509 { 510 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 511 ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID())); 512 } 513 514 /** 515 * {@inheritDoc} 516 * <p> 517 * Runs the "initial search" phase (as opposed to a "persistent search" 518 * phase). The "initial search" phase is the only search run by normal 519 * searches, but it is also run by persistent searches with 520 * <code>changesOnly=false</code>. Persistent searches with 521 * <code>changesOnly=true</code> never execute this code. 522 * <p> 523 * Note: this method is executed only once per persistent search, single 524 * threaded. 525 */ 526 @Override 527 public void search(final SearchOperation searchOperation) throws DirectoryException 528 { 529 checkChangelogReadPrivilege(searchOperation); 530 531 final Set<DN> excludedBaseDNs = getExcludedBaseDNs(); 532 final MultiDomainServerState cookie = getCookieFromControl(searchOperation, excludedBaseDNs); 533 534 final ChangeNumberRange range = optimizeSearch(searchOperation.getBaseDN(), searchOperation.getFilter()); 535 try 536 { 537 final boolean isPersistentSearch = isPersistentSearch(searchOperation); 538 if (cookie != null) 539 { 540 initialSearchFromCookie( 541 getCookieEntrySender(SearchPhase.INITIAL, searchOperation, cookie, excludedBaseDNs, isPersistentSearch)); 542 } 543 else 544 { 545 initialSearchFromChangeNumber( 546 getChangeNumberEntrySender(SearchPhase.INITIAL, searchOperation, range, isPersistentSearch)); 547 } 548 } 549 catch (ChangelogException e) 550 { 551 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get( 552 searchOperation.getBaseDN(), searchOperation.getFilter(), stackTraceToSingleLineString(e))); 553 } 554 } 555 556 private MultiDomainServerState getCookieFromControl(final SearchOperation searchOperation, Set<DN> excludedBaseDNs) 557 throws DirectoryException 558 { 559 final ExternalChangelogRequestControl eclRequestControl = 560 searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER); 561 if (eclRequestControl != null) 562 { 563 final MultiDomainServerState cookie = eclRequestControl.getCookie(); 564 validateProvidedCookie(cookie, excludedBaseDNs); 565 return cookie; 566 } 567 return null; 568 } 569 570 @Override 571 public Set<String> getSupportedControls() 572 { 573 return supportedControls; 574 } 575 576 @Override 577 public Set<String> getSupportedFeatures() 578 { 579 return Collections.emptySet(); 580 } 581 582 @Override 583 public boolean supports(BackendOperation backendOperation) 584 { 585 return false; 586 } 587 588 @Override 589 public void exportLDIF(final LDIFExportConfig exportConfig) 590 throws DirectoryException 591 { 592 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 593 ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID())); 594 } 595 596 @Override 597 public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) 598 throws DirectoryException 599 { 600 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 601 ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID())); 602 } 603 604 @Override 605 public void createBackup(BackupConfig backupConfig) throws DirectoryException 606 { 607 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 608 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 609 } 610 611 @Override 612 public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException 613 { 614 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 615 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 616 } 617 618 @Override 619 public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException 620 { 621 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, 622 ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); 623 } 624 625 @Override 626 public long getEntryCount() 627 { 628 try 629 { 630 return getNumberOfEntriesInBaseDN(CHANGELOG_BASE_DN) + 1; 631 } 632 catch (DirectoryException e) 633 { 634 logger.traceException(e); 635 return -1; 636 } 637 } 638 639 /** 640 * Represent the change number range targeted by a search operation. 641 * <p> 642 * This class should be visible for tests. 643 */ 644 static final class ChangeNumberRange 645 { 646 private long lowerBound = -1; 647 private long upperBound = -1; 648 649 /** 650 * Returns the lowest change number to retrieve (inclusive). 651 * 652 * @return the lowest change number 653 */ 654 long getLowerBound() 655 { 656 return lowerBound; 657 } 658 659 /** 660 * Returns the highest change number to retrieve (inclusive). 661 * 662 * @return the highest change number 663 */ 664 long getUpperBound() 665 { 666 return upperBound; 667 } 668 } 669 670 /** 671 * Returns the set of DNs to exclude from the search. 672 * 673 * @return the DNs corresponding to domains to exclude from the search. 674 * @throws DirectoryException 675 * If a DN can't be decoded. 676 */ 677 private static Set<DN> getExcludedBaseDNs() throws DirectoryException 678 { 679 return getExcludedChangelogDomains(); 680 } 681 682 /** 683 * Optimize the search parameters by analyzing the DN and filter. 684 * It also performs validation on some search parameters 685 * for both cookie and change number based changelogs. 686 * 687 * @param baseDN the provided search baseDN. 688 * @param userFilter the provided search filter. 689 * @return the optimized change number range 690 * @throws DirectoryException when an exception occurs. 691 */ 692 ChangeNumberRange optimizeSearch(final DN baseDN, final SearchFilter userFilter) throws DirectoryException 693 { 694 SearchFilter equalityFilter = null; 695 switch (baseDN.size()) 696 { 697 case 1: 698 // "cn=changelog" : use user-provided search filter. 699 break; 700 case 2: 701 // It is probably "changeNumber=xxx,cn=changelog", use equality filter 702 // But it also could be "<service-id>,cn=changelog" so need to check on attribute 703 equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR); 704 break; 705 default: 706 // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter 707 equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN"); 708 break; 709 } 710 711 return optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter); 712 } 713 714 /** 715 * Build a search filter from given DN and attribute. 716 * 717 * @return the search filter or {@code null} if attribute is not present in 718 * the provided DN 719 */ 720 private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr) 721 { 722 final RDN rdn = baseDN.rdn(); 723 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(lowerCaseAttr, upperCaseAttr); 724 final ByteString attrValue = rdn.getAttributeValue(attrType); 725 if (attrValue != null) 726 { 727 return SearchFilter.createEqualityFilter(attrType, attrValue); 728 } 729 return null; 730 } 731 732 private ChangeNumberRange optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException 733 { 734 final ChangeNumberRange range = new ChangeNumberRange(); 735 if (filter == null) 736 { 737 return range; 738 } 739 740 if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR)) 741 { 742 range.lowerBound = decodeChangeNumber(filter.getAssertionValue()); 743 } 744 else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR)) 745 { 746 range.upperBound = decodeChangeNumber(filter.getAssertionValue()); 747 } 748 else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR)) 749 { 750 final long number = decodeChangeNumber(filter.getAssertionValue()); 751 range.lowerBound = number; 752 range.upperBound = number; 753 } 754 else if (matches(filter, FilterType.EQUALITY, "replicationcsn")) 755 { 756 // == exact CSN 757 // validate provided CSN is correct 758 new CSN(filter.getAssertionValue().toString()); 759 } 760 else if (filter.getFilterType() == FilterType.AND) 761 { 762 // TODO: it looks like it could be generalized to N components, not only two 763 final Collection<SearchFilter> components = filter.getFilterComponents(); 764 final SearchFilter filters[] = components.toArray(new SearchFilter[0]); 765 long upper1 = -1; 766 long lower1 = -1; 767 long upper2 = -1; 768 long lower2 = -1; 769 if (filters.length > 0) 770 { 771 ChangeNumberRange range1 = optimizeSearchUsingFilter(filters[0]); 772 upper1 = range1.upperBound; 773 lower1 = range1.lowerBound; 774 } 775 if (filters.length > 1) 776 { 777 ChangeNumberRange range2 = optimizeSearchUsingFilter(filters[1]); 778 upper2 = range2.upperBound; 779 lower2 = range2.lowerBound; 780 } 781 if (upper1 == -1) 782 { 783 range.upperBound = upper2; 784 } 785 else if (upper2 == -1) 786 { 787 range.upperBound = upper1; 788 } 789 else 790 { 791 range.upperBound = Math.min(upper1, upper2); 792 } 793 794 range.lowerBound = Math.max(lower1, lower2); 795 } 796 return range; 797 } 798 799 private static long decodeChangeNumber(final ByteString assertionValue) 800 throws DirectoryException 801 { 802 try 803 { 804 return Long.decode(assertionValue.toString()); 805 } 806 catch (NumberFormatException e) 807 { 808 throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX, 809 LocalizableMessage.raw("Could not convert value '%s' to long", assertionValue)); 810 } 811 } 812 813 private boolean matches(SearchFilter filter, FilterType filterType, String primaryName) 814 { 815 return filter.getFilterType() == filterType 816 && filter.getAttributeType() != null 817 && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName); 818 } 819 820 /** Search the changelog when a cookie control is provided. */ 821 private void initialSearchFromCookie(final CookieEntrySender entrySender) 822 throws DirectoryException, ChangelogException 823 { 824 if (!sendBaseChangelogEntry(entrySender.searchOp)) 825 { // only return the base entry: stop here 826 return; 827 } 828 829 final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); 830 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); 831 try (final MultiDomainDBCursor cursor = 832 replicationDomainDB.getCursorFrom(entrySender.cookie, options, entrySender.excludedBaseDNs); 833 ECLMultiDomainDBCursor replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) 834 { 835 if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor)) 836 { 837 entrySender.transitioningToPersistentSearchPhase(); 838 sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor); 839 } 840 } 841 finally 842 { 843 entrySender.finalizeInitialSearch(); 844 } 845 } 846 847 private CookieEntrySender getCookieEntrySender(SearchPhase startPhase, final SearchOperation searchOperation, 848 MultiDomainServerState cookie, Set<DN> excludedBaseDNs, boolean isPersistentSearch) 849 { 850 if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase)) 851 { 852 return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); 853 } 854 return new CookieEntrySender(searchOperation, startPhase, cookie, excludedBaseDNs); 855 } 856 857 private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender, 858 final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException 859 { 860 boolean continueSearch = true; 861 while (continueSearch && replicaUpdatesCursor.next()) 862 { 863 final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); 864 final DN domainBaseDN = replicaUpdatesCursor.getData(); 865 continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN); 866 } 867 return continueSearch; 868 } 869 870 private boolean isPersistentSearch(SearchOperation op) 871 { 872 for (PersistentSearch pSearch : getPersistentSearches()) 873 { 874 if (op == pSearch.getSearchOperation()) 875 { 876 return true; 877 } 878 } 879 return false; 880 } 881 882 @Override 883 public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException 884 { 885 initializePersistentSearch(pSearch); 886 887 if (isCookieBased(pSearch.getSearchOperation())) 888 { 889 cookieBasedPersistentSearches.add(pSearch); 890 } 891 else 892 { 893 changeNumberBasedPersistentSearches.add(pSearch); 894 } 895 super.registerPersistentSearch(pSearch); 896 } 897 898 private void initializePersistentSearch(PersistentSearch pSearch) throws DirectoryException 899 { 900 final SearchOperation searchOp = pSearch.getSearchOperation(); 901 902 // Validation must be done during registration for changes only persistent searches. 903 // Otherwise, when there is an initial search phase, 904 // validation is performed by the search() method. 905 if (pSearch.isChangesOnly()) 906 { 907 checkChangelogReadPrivilege(searchOp); 908 } 909 final ChangeNumberRange range = optimizeSearch(searchOp.getBaseDN(), searchOp.getFilter()); 910 911 final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL; 912 if (isCookieBased(searchOp)) 913 { 914 final Set<DN> excludedBaseDNs = getExcludedBaseDNs(); 915 final MultiDomainServerState cookie = getCookie(pSearch.isChangesOnly(), searchOp, excludedBaseDNs); 916 searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, 917 new CookieEntrySender(searchOp, startPhase, cookie, excludedBaseDNs)); 918 } 919 else 920 { 921 searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, 922 new ChangeNumberEntrySender(searchOp, startPhase, range)); 923 } 924 } 925 926 private MultiDomainServerState getCookie(boolean isChangesOnly, SearchOperation searchOp, Set<DN> excludedBaseDNs) 927 throws DirectoryException 928 { 929 if (isChangesOnly) 930 { 931 // this changesOnly persistent search will not go through #initialSearch() 932 // so we must initialize the cookie here 933 return getNewestCookie(searchOp); 934 } 935 return getCookieFromControl(searchOp, excludedBaseDNs); 936 } 937 938 private MultiDomainServerState getNewestCookie(SearchOperation searchOp) 939 { 940 if (!isCookieBased(searchOp)) 941 { 942 return null; 943 } 944 945 final MultiDomainServerState cookie = new MultiDomainServerState(); 946 for (final Iterator<ReplicationServerDomain> it = 947 replicationServer.getDomainIterator(); it.hasNext();) 948 { 949 final DN baseDN = it.next().getBaseDN(); 950 final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN); 951 cookie.update(baseDN, state); 952 } 953 return cookie; 954 } 955 956 /** 957 * Validates the cookie contained in search parameters by checking its content 958 * with the actual replication server state. 959 * 960 * @throws DirectoryException 961 * If the state is not valid 962 */ 963 private void validateProvidedCookie(final MultiDomainServerState cookie, Set<DN> excludedBaseDNs) 964 throws DirectoryException 965 { 966 if (cookie != null && !cookie.isEmpty()) 967 { 968 replicationServer.validateCookie(cookie, excludedBaseDNs); 969 } 970 } 971 972 /** Search the changelog using change number(s). */ 973 private void initialSearchFromChangeNumber(final ChangeNumberEntrySender entrySender) 974 throws ChangelogException, DirectoryException 975 { 976 if (!sendBaseChangelogEntry(entrySender.searchOp)) 977 { // only return the base entry: stop here 978 return; 979 } 980 981 final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<>(); 982 try (DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = getCNIndexDBCursor(entrySender.lowestChangeNumber)) 983 { 984 final MultiDomainServerState cookie = new MultiDomainServerState(); 985 986 if (sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie)) 987 { 988 entrySender.transitioningToPersistentSearchPhase(); 989 sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie); 990 } 991 } 992 finally 993 { 994 entrySender.finalizeInitialSearch(); 995 StaticUtils.close(replicaUpdatesCursor.get()); 996 } 997 } 998 999 private ChangeNumberEntrySender getChangeNumberEntrySender(SearchPhase startPhase, 1000 final SearchOperation searchOperation, ChangeNumberRange range, boolean isPersistentSearch) 1001 { 1002 if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase)) 1003 { 1004 return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); 1005 } 1006 return new ChangeNumberEntrySender(searchOperation, SearchPhase.INITIAL, range); 1007 } 1008 1009 private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender, 1010 DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, 1011 MultiDomainServerState cookie) throws ChangelogException, DirectoryException 1012 { 1013 boolean continueSearch = true; 1014 while (continueSearch && cnIndexDBCursor.next()) 1015 { 1016 // Handle the current cnIndex record 1017 final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord(); 1018 if (replicaUpdatesCursor.get() == null) 1019 { 1020 replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord)); 1021 initializeCookieForChangeNumberMode(cookie, cnIndexRecord); 1022 } 1023 else 1024 { 1025 cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1026 } 1027 continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); 1028 if (continueSearch) 1029 { 1030 final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN()); 1031 if (updateMsg != null) 1032 { 1033 continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); 1034 replicaUpdatesCursor.get().next(); 1035 } 1036 } 1037 } 1038 return continueSearch; 1039 } 1040 1041 /** Initialize the provided cookie from the provided change number index record. */ 1042 private void initializeCookieForChangeNumberMode( 1043 MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException 1044 { 1045 // Initialize the multi domain cursor only from the change number index record. 1046 // The cookie is always empty at this stage. 1047 CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN()); 1048 MultiDomainServerState unused = new MultiDomainServerState(); 1049 MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options); 1050 try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) 1051 { 1052 updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord); 1053 } 1054 } 1055 1056 /** 1057 * Rebuilds the changelogcookie starting at the newest change number index record. 1058 * <p> 1059 * It updates the provided cookie with the changes from the provided ECL cursor, 1060 * up to (and including) the provided change number index record. 1061 * <p> 1062 * Therefore, after calling this method, the cursor is positioned 1063 * to the change immediately following the provided change number index record. 1064 * 1065 * @param cookie the cookie to update 1066 * @param cursor the cursor where to read changes from 1067 * @param cnIndexRecord the change number index record to go right after 1068 * @throws ChangelogException if any problem occurs 1069 */ 1070 public static void updateCookieToMediumConsistencyPoint( 1071 MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord) 1072 throws ChangelogException 1073 { 1074 if (cnIndexRecord == null) 1075 { 1076 return; 1077 } 1078 1079 while (cursor.next()) 1080 { 1081 UpdateMsg updateMsg = cursor.getRecord(); 1082 if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0) 1083 { 1084 break; 1085 } 1086 cookie.update(cursor.getData(), updateMsg.getCSN()); 1087 } 1088 } 1089 1090 private MultiDomainDBCursor initializeReplicaUpdatesCursor( 1091 final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException 1092 { 1093 final MultiDomainServerState state = new MultiDomainServerState(); 1094 state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); 1095 1096 // No need for ECLMultiDomainDBCursor in this case 1097 // as updateMsg will be matched with cnIndexRecord 1098 CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); 1099 final MultiDomainDBCursor replicaUpdatesCursor = 1100 getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options); 1101 replicaUpdatesCursor.next(); 1102 return replicaUpdatesCursor; 1103 } 1104 1105 /** 1106 * Returns the replica update message corresponding to the provided 1107 * cnIndexRecord. 1108 * 1109 * @return the update message, which may be {@code null} if the update message 1110 * could not be found because it was purged or because corresponding 1111 * baseDN was removed from the changelog 1112 * @throws DirectoryException 1113 * If inconsistency is detected between the available update 1114 * messages and the provided cnIndexRecord 1115 */ 1116 private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn) 1117 throws ChangelogException, DirectoryException 1118 { 1119 while (true) 1120 { 1121 final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); 1122 final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN()); 1123 if (compareIndexWithUpdateMsg < 0) { 1124 // Either update message has been purged or baseDN has been removed from changelogDB, 1125 // ignore current index record and go to the next one 1126 return null; 1127 } 1128 else if (compareIndexWithUpdateMsg == 0) 1129 { 1130 // Found the matching update message 1131 return updateMsg; 1132 } 1133 // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet 1134 if (!replicaUpdatesCursor.next()) 1135 { 1136 // Should never happen, as it means some messages have disappeared 1137 // TODO : put the correct I18N message 1138 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1139 LocalizableMessage.raw("Could not find replica update message matching index record. " + 1140 "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist.")); 1141 } 1142 } 1143 } 1144 1145 /** Returns a cursor on CNIndexDB for the provided first change number. */ 1146 private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor( 1147 final long firstChangeNumber) throws ChangelogException 1148 { 1149 final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB(); 1150 long changeNumberToUse = firstChangeNumber; 1151 if (changeNumberToUse <= 1) 1152 { 1153 final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); 1154 changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber(); 1155 } 1156 return cnIndexDB.getCursorFrom(changeNumberToUse); 1157 } 1158 1159 /** Creates a changelog entry. */ 1160 private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, 1161 final UpdateMsg msg) throws DirectoryException 1162 { 1163 if (msg instanceof AddMsg) 1164 { 1165 return createAddMsg(baseDN, changeNumber, cookie, msg); 1166 } 1167 else if (msg instanceof ModifyCommonMsg) 1168 { 1169 return createModifyMsg(baseDN, changeNumber, cookie, msg); 1170 } 1171 else if (msg instanceof DeleteMsg) 1172 { 1173 final DeleteMsg delMsg = (DeleteMsg) msg; 1174 return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName()); 1175 } 1176 throw new DirectoryException(ResultCode.OPERATIONS_ERROR, 1177 LocalizableMessage.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN, 1178 msg.getClass())); 1179 } 1180 1181 /** 1182 * Creates an entry from an add message. 1183 * <p> 1184 * Map addMsg to an LDIF string for the 'changes' attribute, and pull out 1185 * change initiators name if available which is contained in the creatorsName 1186 * attribute. 1187 */ 1188 private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) 1189 throws DirectoryException 1190 { 1191 final AddMsg addMsg = (AddMsg) msg; 1192 String changeInitiatorsName = null; 1193 String ldifChanges = null; 1194 try 1195 { 1196 final StringBuilder builder = new StringBuilder(256); 1197 for (Attribute attr : addMsg.getAttributes()) 1198 { 1199 if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty()) 1200 { 1201 // This attribute is not multi-valued. 1202 changeInitiatorsName = attr.iterator().next().toString(); 1203 } 1204 final String attrName = attr.getNameWithOptions(); 1205 for (ByteString value : attr) 1206 { 1207 builder.append(attrName); 1208 appendLDIFSeparatorAndValue(builder, value); 1209 builder.append('\n'); 1210 } 1211 } 1212 ldifChanges = builder.toString(); 1213 } 1214 catch (Exception e) 1215 { 1216 logEncodingMessageError("add", addMsg.getDN(), e); 1217 } 1218 1219 return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName); 1220 } 1221 1222 /** 1223 * Creates an entry from a modify message. 1224 * <p> 1225 * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull 1226 * out change initiators name if available which is contained in the 1227 * modifiersName attribute. 1228 */ 1229 private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, 1230 final UpdateMsg msg) throws DirectoryException 1231 { 1232 final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg; 1233 String changeInitiatorsName = null; 1234 String ldifChanges = null; 1235 try 1236 { 1237 final StringBuilder builder = new StringBuilder(128); 1238 for (Modification mod : modifyMsg.getMods()) 1239 { 1240 final Attribute attr = mod.getAttribute(); 1241 if (mod.getModificationType() == ModificationType.REPLACE 1242 && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE) 1243 && !attr.isEmpty()) 1244 { 1245 // This attribute is not multi-valued. 1246 changeInitiatorsName = attr.iterator().next().toString(); 1247 } 1248 final String attrName = attr.getNameWithOptions(); 1249 builder.append(mod.getModificationType()); 1250 builder.append(": "); 1251 builder.append(attrName); 1252 builder.append('\n'); 1253 1254 for (ByteString value : attr) 1255 { 1256 builder.append(attrName); 1257 appendLDIFSeparatorAndValue(builder, value); 1258 builder.append('\n'); 1259 } 1260 builder.append("-\n"); 1261 } 1262 ldifChanges = builder.toString(); 1263 } 1264 catch (Exception e) 1265 { 1266 logEncodingMessageError("modify", modifyMsg.getDN(), e); 1267 } 1268 1269 final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg; 1270 final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges, 1271 isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName); 1272 1273 if (isModifyDNMsg) 1274 { 1275 final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg; 1276 addAttribute(entry, "newrdn", modDNMsg.getNewRDN()); 1277 if (modDNMsg.getNewSuperior() != null) 1278 { 1279 addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior()); 1280 } 1281 addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn())); 1282 } 1283 return entry; 1284 } 1285 1286 /** 1287 * Log an encoding message error. 1288 * 1289 * @param messageType 1290 * String identifying type of message. Should be "add" or "modify". 1291 * @param entryDN 1292 * DN of original entry 1293 */ 1294 private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception) 1295 { 1296 logger.traceException(exception); 1297 logger.error(LocalizableMessage.raw( 1298 "An exception was encountered while trying to encode a replication " + messageType + " message for entry \"" 1299 + entryDN + "\" into an External Change Log entry: " + exception.getMessage())); 1300 } 1301 1302 private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException 1303 { 1304 if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp)) 1305 { 1306 throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS, 1307 NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get()); 1308 } 1309 } 1310 1311 /** 1312 * Create a changelog entry from a set of provided information. This is the part of 1313 * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN). 1314 */ 1315 private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie, 1316 final LDAPUpdateMsg msg, final String ldifChanges, final String changeType, 1317 final String changeInitiatorsName) throws DirectoryException 1318 { 1319 final CSN csn = msg.getCSN(); 1320 String dnString; 1321 if (changeNumber > 0) 1322 { 1323 // change number mode 1324 dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT; 1325 } 1326 else 1327 { 1328 // Cookie mode 1329 dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT; 1330 } 1331 1332 final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>(); 1333 final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<>(); 1334 1335 // Operational standard attributes 1336 addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC, 1337 ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs); 1338 addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs); 1339 addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs); 1340 addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs); 1341 1342 // REQUIRED attributes 1343 if (changeNumber > 0) 1344 { 1345 addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs); 1346 } 1347 SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME); 1348 dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ?? 1349 final String format = dateFormat.format(new Date(csn.getTime())); 1350 addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs); 1351 addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs); 1352 addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs); 1353 1354 // NON REQUESTED attributes 1355 addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs); 1356 addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()), 1357 userAttrs, opAttrs); 1358 1359 if (ldifChanges != null) 1360 { 1361 addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs); 1362 } 1363 if (changeInitiatorsName != null) 1364 { 1365 addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs); 1366 } 1367 1368 final String targetUUID = msg.getEntryUUID(); 1369 if (targetUUID != null) 1370 { 1371 addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs); 1372 } 1373 final String cookie2 = cookie != null ? cookie : ""; 1374 addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs); 1375 1376 final List<RawAttribute> includedAttributes = msg.getEclIncludes(); 1377 if (includedAttributes != null && !includedAttributes.isEmpty()) 1378 { 1379 final StringBuilder builder = new StringBuilder(256); 1380 for (final RawAttribute includedAttribute : includedAttributes) 1381 { 1382 final String name = includedAttribute.getAttributeType(); 1383 for (final ByteString value : includedAttribute.getValues()) 1384 { 1385 builder.append(name); 1386 appendLDIFSeparatorAndValue(builder, value); 1387 builder.append('\n'); 1388 } 1389 } 1390 final String includedAttributesLDIF = builder.toString(); 1391 addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs); 1392 } 1393 1394 return new Entry(DN.valueOf(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs); 1395 } 1396 1397 /** 1398 * Sends the entry if it matches the base, scope and filter of the current search operation. 1399 * It will also send the base changelog entry if it needs to be sent and was not sent before. 1400 * 1401 * @return {@code true} if search should continue, {@code false} otherwise 1402 */ 1403 private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) 1404 throws DirectoryException 1405 { 1406 if (matchBaseAndScopeAndFilter(searchOp, entry)) 1407 { 1408 return searchOp.returnEntry(entry, getControls(cookie)); 1409 } 1410 // maybe the next entry will match? 1411 return true; 1412 } 1413 1414 /** Indicates if the provided entry matches the filter, base and scope. */ 1415 private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException 1416 { 1417 return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) 1418 && searchOp.getFilter().matchesEntry(entry); 1419 } 1420 1421 private static List<Control> getControls(String cookie) 1422 { 1423 if (cookie != null) 1424 { 1425 final Control c = new EntryChangelogNotificationControl(true, cookie); 1426 return Collections.singletonList(c); 1427 } 1428 return Collections.emptyList(); 1429 } 1430 1431 /** 1432 * Create and returns the base changelog entry to the underlying search operation. 1433 * <p> 1434 * "initial search" phase must return the base entry immediately. 1435 * 1436 * @return {@code true} if search should continue, {@code false} otherwise 1437 */ 1438 private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException 1439 { 1440 final DN baseDN = searchOp.getBaseDN(); 1441 final SearchFilter filter = searchOp.getFilter(); 1442 final SearchScope scope = searchOp.getScope(); 1443 1444 if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope)) 1445 { 1446 final Entry entry = buildBaseChangelogEntry(); 1447 if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null)) 1448 { 1449 // Abandon, size limit reached. 1450 return false; 1451 } 1452 } 1453 return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN) 1454 || !scope.equals(SearchScope.BASE_OBJECT); 1455 } 1456 1457 private Entry buildBaseChangelogEntry() throws DirectoryException 1458 { 1459 final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates()); 1460 1461 final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>(); 1462 final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<>(); 1463 1464 // We never return the numSubordinates attribute for the base changelog entry 1465 // and there is a very good reason for that: 1466 // - Either we compute it before sending the entries, 1467 // -- then we risk returning more entries if new entries come in after we computed numSubordinates 1468 // -- or we risk returning less entries if purge kicks in after we computed numSubordinates 1469 // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError 1470 1471 addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs); 1472 addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY, 1473 ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs); 1474 addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs); 1475 addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs); 1476 return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs); 1477 } 1478 1479 private static void addAttribute(final Entry e, final String attrType, final String attrValue) 1480 { 1481 e.addAttribute(Attributes.create(attrType, attrValue), null); 1482 } 1483 1484 private static void addAttributeByType(String attrNameLowercase, 1485 String attrNameUppercase, String attrValue, 1486 Map<AttributeType, List<Attribute>> userAttrs, 1487 Map<AttributeType, List<Attribute>> operationalAttrs) 1488 { 1489 addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true); 1490 } 1491 1492 private static void addAttributeByUppercaseName(String attrNameLowercase, 1493 String attrNameUppercase, String attrValue, 1494 Map<AttributeType, List<Attribute>> userAttrs, 1495 Map<AttributeType, List<Attribute>> operationalAttrs) 1496 { 1497 addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false); 1498 } 1499 1500 private static void addAttribute(final String attrNameLowercase, 1501 final String attrNameUppercase, final String attrValue, 1502 final Map<AttributeType, List<Attribute>> userAttrs, 1503 final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType) 1504 { 1505 AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(attrNameLowercase, attrNameUppercase); 1506 final Attribute a = addByType 1507 ? Attributes.create(attrType, attrValue) 1508 : Attributes.create(attrNameUppercase, attrValue); 1509 final List<Attribute> attrList = Collections.singletonList(a); 1510 if (attrType.isOperational()) 1511 { 1512 operationalAttrs.put(attrType, attrList); 1513 } 1514 else 1515 { 1516 userAttrs.put(attrType, attrList); 1517 } 1518 } 1519 1520 /** Describes the current search phase. */ 1521 private enum SearchPhase 1522 { 1523 /** 1524 * "Initial search" phase. The "initial search" phase is running 1525 * concurrently. All update notifications are ignored. 1526 */ 1527 INITIAL, 1528 /** 1529 * Transitioning from the "initial search" phase to the "persistent search" 1530 * phase. "Initial search" phase has finished reading from the DB. It now 1531 * verifies if any more updates have been persisted to the DB since stopping 1532 * and send them. All update notifications are blocked. 1533 */ 1534 TRANSITIONING, 1535 /** 1536 * "Persistent search" phase. "Initial search" phase has completed. All 1537 * update notifications are published. 1538 */ 1539 PERSISTENT; 1540 } 1541 1542 /** 1543 * Contains data to ensure that the same change is not sent twice to clients 1544 * because of race conditions between the "initial search" phase and the 1545 * "persistent search" phase. 1546 */ 1547 private static class SendEntryData<K extends Comparable<K>> 1548 { 1549 private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<>(SearchPhase.INITIAL); 1550 private final Object transitioningLock = new Object(); 1551 private volatile K lastKeySentByInitialSearch; 1552 1553 private SendEntryData(SearchPhase startPhase) 1554 { 1555 searchPhase.set(startPhase); 1556 } 1557 1558 private void finalizeInitialSearch() 1559 { 1560 searchPhase.set(SearchPhase.PERSISTENT); 1561 synchronized (transitioningLock) 1562 { // initial search phase has completed, release all persistent searches 1563 transitioningLock.notifyAll(); 1564 } 1565 } 1566 1567 public void transitioningToPersistentSearchPhase() 1568 { 1569 searchPhase.set(SearchPhase.TRANSITIONING); 1570 } 1571 1572 private void initialSearchSendsEntry(final K key) 1573 { 1574 lastKeySentByInitialSearch = key; 1575 } 1576 1577 private boolean persistentSearchCanSendEntry(K key) 1578 { 1579 final SearchPhase stateValue = searchPhase.get(); 1580 switch (stateValue) 1581 { 1582 case INITIAL: 1583 return false; 1584 case TRANSITIONING: 1585 synchronized (transitioningLock) 1586 { 1587 while (SearchPhase.TRANSITIONING.equals(searchPhase.get())) 1588 { 1589 // "initial search" phase is over, and is now verifying whether new 1590 // changes have been published to the DB. 1591 // Wait for this check to complete 1592 try 1593 { 1594 transitioningLock.wait(); 1595 } 1596 catch (InterruptedException e) 1597 { 1598 Thread.currentThread().interrupt(); 1599 // Shutdown must have been called. Stop sending entries. 1600 return false; 1601 } 1602 } 1603 } 1604 return key.compareTo(lastKeySentByInitialSearch) > 0; 1605 case PERSISTENT: 1606 return true; 1607 default: 1608 throw new RuntimeException("Not implemented for " + stateValue); 1609 } 1610 } 1611 } 1612 1613 /** Sends entries to clients for change number searches. */ 1614 private static class ChangeNumberEntrySender 1615 { 1616 private final SearchOperation searchOp; 1617 private final long lowestChangeNumber; 1618 private final long highestChangeNumber; 1619 private final SendEntryData<Long> sendEntryData; 1620 1621 private ChangeNumberEntrySender(SearchOperation searchOp, SearchPhase startPhase, ChangeNumberRange range) 1622 { 1623 this.searchOp = searchOp; 1624 this.sendEntryData = new SendEntryData<>(startPhase); 1625 this.lowestChangeNumber = range.lowerBound; 1626 this.highestChangeNumber = range.upperBound; 1627 } 1628 1629 /** 1630 * Indicates if provided change number is compatible with last change 1631 * number. 1632 * 1633 * @param changeNumber 1634 * The change number to test. 1635 * @return {@code true} if and only if the provided change number is in the 1636 * range of the last change number. 1637 */ 1638 boolean changeNumberIsInRange(long changeNumber) 1639 { 1640 return highestChangeNumber == -1 || changeNumber <= highestChangeNumber; 1641 } 1642 1643 private void finalizeInitialSearch() 1644 { 1645 sendEntryData.finalizeInitialSearch(); 1646 } 1647 1648 private void transitioningToPersistentSearchPhase() 1649 { 1650 sendEntryData.transitioningToPersistentSearchPhase(); 1651 } 1652 1653 /** 1654 * @return {@code true} if search should continue, {@code false} otherwise 1655 */ 1656 private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg, 1657 MultiDomainServerState cookie) throws DirectoryException 1658 { 1659 final DN baseDN = cnIndexRecord.getBaseDN(); 1660 sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber()); 1661 final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); 1662 return sendEntryIfMatches(searchOp, entry, null); 1663 } 1664 1665 private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException 1666 { 1667 if (sendEntryData.persistentSearchCanSendEntry(changeNumber)) 1668 { 1669 sendEntryIfMatches(searchOp, entry, null); 1670 } 1671 } 1672 } 1673 1674 /** Sends entries to clients for cookie-based searches. */ 1675 private static class CookieEntrySender { 1676 private final SearchOperation searchOp; 1677 private final SearchPhase startPhase; 1678 private final Set<DN> excludedBaseDNs; 1679 private final MultiDomainServerState cookie; 1680 private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData = 1681 new ConcurrentSkipListMap<>(); 1682 1683 private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase, MultiDomainServerState cookie, 1684 Set<DN> excludedBaseDNs) 1685 { 1686 this.searchOp = searchOp; 1687 this.startPhase = startPhase; 1688 this.cookie = cookie; 1689 this.excludedBaseDNs = excludedBaseDNs; 1690 } 1691 1692 private void finalizeInitialSearch() 1693 { 1694 for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) 1695 { 1696 sendEntryData.finalizeInitialSearch(); 1697 } 1698 } 1699 1700 private void transitioningToPersistentSearchPhase() 1701 { 1702 for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) 1703 { 1704 sendEntryData.transitioningToPersistentSearchPhase(); 1705 } 1706 } 1707 1708 private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn) 1709 { 1710 final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId()); 1711 SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId); 1712 if (data == null) 1713 { 1714 final SendEntryData<CSN> newData = new SendEntryData<>(startPhase); 1715 data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData); 1716 return data == null ? newData : data; 1717 } 1718 return data; 1719 } 1720 1721 private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException 1722 { 1723 final CSN csn = updateMsg.getCSN(); 1724 final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); 1725 sendEntryData.initialSearchSendsEntry(csn); 1726 final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); 1727 final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); 1728 return sendEntryIfMatches(searchOp, entry, cookieString); 1729 } 1730 1731 private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg) 1732 throws DirectoryException 1733 { 1734 final CSN csn = updateMsg.getCSN(); 1735 final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); 1736 if (sendEntryData.persistentSearchCanSendEntry(csn)) 1737 { 1738 // multi threaded case: wait for the "initial search" phase to set the cookie 1739 final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); 1740 final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); 1741 // FIXME JNR use this instead of previous line: 1742 // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString)); 1743 sendEntryIfMatches(searchOp, cookieEntry, cookieString); 1744 } 1745 } 1746 1747 private String updateCookie(DN baseDN, final CSN csn) 1748 { 1749 synchronized (cookie) 1750 { // forbid concurrent updates to the cookie 1751 cookie.update(baseDN, csn); 1752 return cookie.toString(); 1753 } 1754 } 1755 } 1756}