001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2014-2015 ForgeRock AS.
025 */
026package org.opends.server.backends;
027
028import static org.opends.messages.BackendMessages.*;
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.config.ConfigConstants.*;
031import static org.opends.server.replication.plugin.MultimasterReplication.*;
032import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
033import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
034import static org.opends.server.util.LDIFWriter.*;
035import static org.opends.server.util.ServerConstants.*;
036import static org.opends.server.util.StaticUtils.*;
037
038import java.text.SimpleDateFormat;
039import java.util.Collection;
040import java.util.Collections;
041import java.util.Date;
042import java.util.Iterator;
043import java.util.LinkedHashMap;
044import java.util.List;
045import java.util.Map;
046import java.util.Set;
047import java.util.TimeZone;
048import java.util.concurrent.ConcurrentLinkedQueue;
049import java.util.concurrent.ConcurrentSkipListMap;
050import java.util.concurrent.atomic.AtomicReference;
051
052import org.forgerock.i18n.LocalizableMessage;
053import org.forgerock.i18n.slf4j.LocalizedLogger;
054import org.forgerock.opendj.config.server.ConfigException;
055import org.forgerock.opendj.ldap.ByteString;
056import org.forgerock.opendj.ldap.ConditionResult;
057import org.forgerock.opendj.ldap.ModificationType;
058import org.forgerock.opendj.ldap.ResultCode;
059import org.forgerock.opendj.ldap.SearchScope;
060import org.opends.server.admin.Configuration;
061import org.opends.server.api.Backend;
062import org.opends.server.config.ConfigConstants;
063import org.opends.server.controls.EntryChangelogNotificationControl;
064import org.opends.server.controls.ExternalChangelogRequestControl;
065import org.opends.server.core.AddOperation;
066import org.opends.server.core.DeleteOperation;
067import org.opends.server.core.DirectoryServer;
068import org.opends.server.core.ModifyDNOperation;
069import org.opends.server.core.ModifyOperation;
070import org.opends.server.core.PersistentSearch;
071import org.opends.server.core.SearchOperation;
072import org.opends.server.core.ServerContext;
073import org.opends.server.replication.common.CSN;
074import org.opends.server.replication.common.MultiDomainServerState;
075import org.opends.server.replication.common.ServerState;
076import org.opends.server.replication.protocol.AddMsg;
077import org.opends.server.replication.protocol.DeleteMsg;
078import org.opends.server.replication.protocol.LDAPUpdateMsg;
079import org.opends.server.replication.protocol.ModifyCommonMsg;
080import org.opends.server.replication.protocol.ModifyDNMsg;
081import org.opends.server.replication.protocol.UpdateMsg;
082import org.opends.server.replication.server.ReplicationServer;
083import org.opends.server.replication.server.ReplicationServerDomain;
084import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
085import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
086import org.opends.server.replication.server.changelog.api.ChangelogDB;
087import org.opends.server.replication.server.changelog.api.ChangelogException;
088import org.opends.server.replication.server.changelog.api.DBCursor;
089import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
090import org.opends.server.replication.server.changelog.api.ReplicaId;
091import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
092import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
093import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor;
094import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
095import org.opends.server.types.Attribute;
096import org.opends.server.types.AttributeType;
097import org.opends.server.types.Attributes;
098import org.opends.server.types.BackupConfig;
099import org.opends.server.types.BackupDirectory;
100import org.opends.server.types.CanceledOperationException;
101import org.opends.server.types.Control;
102import org.opends.server.types.DN;
103import org.opends.server.types.DirectoryException;
104import org.opends.server.types.Entry;
105import org.opends.server.types.FilterType;
106import org.opends.server.types.IndexType;
107import org.opends.server.types.InitializationException;
108import org.opends.server.types.LDIFExportConfig;
109import org.opends.server.types.LDIFImportConfig;
110import org.opends.server.types.LDIFImportResult;
111import org.opends.server.types.Modification;
112import org.opends.server.types.ObjectClass;
113import org.opends.server.types.Privilege;
114import org.opends.server.types.RDN;
115import org.opends.server.types.RawAttribute;
116import org.opends.server.types.RestoreConfig;
117import org.opends.server.types.SearchFilter;
118import org.opends.server.types.WritabilityMode;
119import org.opends.server.util.StaticUtils;
120
121/**
122 * A backend that provides access to the changelog, i.e. the "cn=changelog"
123 * suffix. It is a read-only backend that is created by a
124 * {@link ReplicationServer} and is not configurable.
125 * <p>
126 * There are two modes to search the changelog:
127 * <ul>
128 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
129 * request. The cookie provided in the control is used to retrieve entries from
130 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
131 * the entries.</li>
132 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
133 * with the request. The entries are retrieved using the ChangeNumberIndexDB and
134 * their attributes are set with the information from the ReplicasDBs. The
135 * <code>changeNumber</code> attribute value is set from the content of
136 * ChangeNumberIndexDB.</li>
137 * </ul>
138 * <h3>Searches flow</h3>
139 * <p>
140 * Here is the flow of searches within the changelog backend APIs:
141 * <ul>
142 * <li>Normal searches only go through:
143 * <ol>
144 * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
145 * </ol>
146 * </li>
147 * <li>Persistent searches with <code>changesOnly=false</code> go through:
148 * <ol>
149 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
150 * (once, single threaded),</li>
151 * <li>
152 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
153 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
154 * threaded)</li>
155 * </ol>
156 * </li>
157 * <li>Persistent searches with <code>changesOnly=true</code> go through:
158 * <ol>
159 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
160 * (once, single threaded)</li>
161 * <li>
162 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
163 * threaded)</li>
164 * </ol>
165 * </li>
166 * </ul>
167 *
168 * @see ReplicationServer
169 */
170public class ChangelogBackend extends Backend<Configuration>
171{
172  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
173
174  /** The id of this backend. */
175  public static final String BACKEND_ID = "changelog";
176
177  private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
178
179  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
180  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
181  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
182
183  /** The set of objectclasses that will be used in root entry. */
184  private static final Map<ObjectClass, String>
185    CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<>(2);
186  static
187  {
188    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
189    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container");
190  }
191
192  /** The set of objectclasses that will be used in ECL entries. */
193  private static final Map<ObjectClass, String>
194    CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<>(2);
195  static
196  {
197    CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
198    CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY);
199  }
200
201  /** The attribute type for the "creatorsName" attribute. */
202  private static final AttributeType CREATORS_NAME_TYPE =
203      DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_CREATORS_NAME_LC);
204  /** The attribute type for the "modifiersName" attribute. */
205  private static final AttributeType MODIFIERS_NAME_TYPE =
206      DirectoryServer.getAttributeTypeOrDefault(OP_ATTR_MODIFIERS_NAME_LC);
207
208  /** The base DN for the external change log. */
209  public static final DN CHANGELOG_BASE_DN;
210
211  static
212  {
213    try
214    {
215      CHANGELOG_BASE_DN = DN.valueOf(DN_EXTERNAL_CHANGELOG_ROOT);
216    }
217    catch (DirectoryException e)
218    {
219      throw new RuntimeException(e);
220    }
221  }
222
223  /** The set of base DNs for this backend. */
224  private DN[] baseDNs;
225  /** The set of supported controls for this backend. */
226  private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
227  /** Whether the base changelog entry has subordinates. */
228  private Boolean baseEntryHasSubordinates;
229
230  /** The replication server on which the changelog is read. */
231  private final ReplicationServer replicationServer;
232  private final ECLEnabledDomainPredicate domainPredicate;
233
234  /** The set of cookie-based persistent searches registered with this backend. */
235  private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = new ConcurrentLinkedQueue<>();
236  /** The set of change number-based persistent searches registered with this backend. */
237  private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
238      new ConcurrentLinkedQueue<>();
239
240  /**
241   * Creates a new backend with the provided replication server.
242   *
243   * @param replicationServer
244   *          The replication server on which the changes are read.
245   * @param domainPredicate
246   *          Returns whether a domain is enabled for the external changelog.
247   */
248  public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
249  {
250    this.replicationServer = replicationServer;
251    this.domainPredicate = domainPredicate;
252    setBackendID(BACKEND_ID);
253    setWritabilityMode(WritabilityMode.DISABLED);
254    setPrivateBackend(true);
255  }
256
257  private ChangelogDB getChangelogDB()
258  {
259    return replicationServer.getChangelogDB();
260  }
261
262  /**
263   * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
264   *
265   * @return the ChangelogBackend configured for "cn=changelog" in this directory server
266   * @deprecated instead inject the required object where needed
267   */
268  @Deprecated
269  public static ChangelogBackend getInstance()
270  {
271    return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
272  }
273
274  @Override
275  public void configureBackend(final Configuration config, ServerContext serverContext) throws ConfigException
276  {
277    throw new UnsupportedOperationException("The changelog backend is not configurable");
278  }
279
280  @Override
281  public void openBackend() throws InitializationException
282  {
283    baseDNs = new DN[] { CHANGELOG_BASE_DN };
284
285    try
286    {
287      DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
288    }
289    catch (final DirectoryException e)
290    {
291      throw new InitializationException(
292          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e);
293    }
294  }
295
296  @Override
297  public void closeBackend()
298  {
299    try
300    {
301      DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
302    }
303    catch (final DirectoryException e)
304    {
305      logger.traceException(e);
306    }
307  }
308
309  @Override
310  public DN[] getBaseDNs()
311  {
312    return baseDNs;
313  }
314
315  @Override
316  public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
317  {
318    return true;
319  }
320
321  @Override
322  public Entry getEntry(final DN entryDN) throws DirectoryException
323  {
324    if (entryDN == null)
325    {
326      throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
327          ERR_BACKEND_GET_ENTRY_NULL.get(getBackendID()));
328    }
329    throw new RuntimeException("Not implemented");
330  }
331
332  @Override
333  public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException
334  {
335    if (CHANGELOG_BASE_DN.equals(entryDN))
336    {
337      final Boolean hasSubs = baseChangelogHasSubordinates();
338      if (hasSubs == null)
339      {
340        return ConditionResult.UNDEFINED;
341      }
342      return ConditionResult.valueOf(hasSubs);
343    }
344    return ConditionResult.FALSE;
345  }
346
347  private Boolean baseChangelogHasSubordinates() throws DirectoryException
348  {
349    if (baseEntryHasSubordinates == null)
350    {
351      // compute its value
352      try
353      {
354        final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
355        CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
356        try (final MultiDomainDBCursor cursor =
357            replicationDomainDB.getCursorFrom(new MultiDomainServerState(), options, getExcludedBaseDNs()))
358        {
359          baseEntryHasSubordinates = cursor.next();
360        }
361      }
362      catch (ChangelogException e)
363      {
364        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get(
365            "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e)));
366      }
367    }
368    return baseEntryHasSubordinates;
369  }
370
371  @Override
372  public long getNumberOfEntriesInBaseDN(final DN baseDN) throws DirectoryException
373  {
374    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
375  }
376
377  @Override
378  public long getNumberOfChildren(final DN parentDN) throws DirectoryException
379  {
380    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_NUM_SUBORDINATES_NOT_SUPPORTED.get());
381  }
382
383  /**
384   * Notifies persistent searches of this backend that a new cookie entry was added to it.
385   * <p>
386   * Note: This method correspond to the "persistent search" phase.
387   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
388   * <p>
389   * This method must only be called after the provided data have been persisted to disk.
390   *
391   * @param baseDN
392   *          the baseDN of the newly added entry.
393   * @param updateMsg
394   *          the update message of the newly added entry
395   * @throws ChangelogException
396   *           If a problem occurs while notifying of the newly added entry.
397   */
398  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
399  {
400    if (!(updateMsg instanceof LDAPUpdateMsg))
401    {
402      return;
403    }
404
405    try
406    {
407      for (PersistentSearch pSearch : cookieBasedPersistentSearches)
408      {
409        final SearchOperation searchOp = pSearch.getSearchOperation();
410        final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
411        entrySender.persistentSearchSendEntry(baseDN, updateMsg);
412      }
413    }
414    catch (DirectoryException e)
415    {
416      throw new ChangelogException(e.getMessageObject(), e);
417    }
418  }
419
420  /**
421   * Notifies persistent searches of this backend that a new change number entry was added to it.
422   * <p>
423   * Note: This method correspond to the "persistent search" phase.
424   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
425   * <p>
426   * This method must only be called after the provided data have been persisted to disk.
427   *
428   * @param baseDN
429   *          the baseDN of the newly added entry.
430   * @param changeNumber
431   *          the change number of the newly added entry. It will be greater
432   *          than zero for entries added to the change number index and less
433   *          than or equal to zero for entries added to any replica DB
434   * @param cookieString
435   *          a string representing the cookie of the newly added entry.
436   *          This is only meaningful for entries added to the change number index
437   * @param updateMsg
438   *          the update message of the newly added entry
439   * @throws ChangelogException
440   *           If a problem occurs while notifying of the newly added entry.
441   */
442  public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
443      throws ChangelogException
444  {
445    if (!(updateMsg instanceof LDAPUpdateMsg)
446        || changeNumberBasedPersistentSearches.isEmpty())
447    {
448      return;
449    }
450
451    try
452    {
453      // changeNumber entry can be shared with multiple persistent searches
454      final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
455      for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
456      {
457        final SearchOperation searchOp = pSearch.getSearchOperation();
458        final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
459        entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
460      }
461    }
462    catch (DirectoryException e)
463    {
464      throw new ChangelogException(e.getMessageObject(), e);
465    }
466  }
467
468  private boolean isCookieBased(final SearchOperation searchOp)
469  {
470    for (Control c : searchOp.getRequestControls())
471    {
472      if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
473      {
474        return true;
475      }
476    }
477    return false;
478  }
479
480  @Override
481  public void addEntry(Entry entry, AddOperation addOperation)
482      throws DirectoryException, CanceledOperationException
483  {
484    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
485        ERR_BACKEND_ADD_NOT_SUPPORTED.get(String.valueOf(entry.getName()), getBackendID()));
486  }
487
488  @Override
489  public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
490      throws DirectoryException, CanceledOperationException
491  {
492    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
493        ERR_BACKEND_DELETE_NOT_SUPPORTED.get(String.valueOf(entryDN), getBackendID()));
494  }
495
496  @Override
497  public void replaceEntry(Entry oldEntry, Entry newEntry,
498      ModifyOperation modifyOperation) throws DirectoryException,
499      CanceledOperationException
500  {
501    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
502        ERR_BACKEND_MODIFY_NOT_SUPPORTED.get(String.valueOf(newEntry.getName()), getBackendID()));
503  }
504
505  @Override
506  public void renameEntry(DN currentDN, Entry entry,
507      ModifyDNOperation modifyDNOperation) throws DirectoryException,
508      CanceledOperationException
509  {
510    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
511        ERR_BACKEND_MODIFY_DN_NOT_SUPPORTED.get(String.valueOf(currentDN), getBackendID()));
512  }
513
514  /**
515   * {@inheritDoc}
516   * <p>
517   * Runs the "initial search" phase (as opposed to a "persistent search"
518   * phase). The "initial search" phase is the only search run by normal
519   * searches, but it is also run by persistent searches with
520   * <code>changesOnly=false</code>. Persistent searches with
521   * <code>changesOnly=true</code> never execute this code.
522   * <p>
523   * Note: this method is executed only once per persistent search, single
524   * threaded.
525   */
526  @Override
527  public void search(final SearchOperation searchOperation) throws DirectoryException
528  {
529    checkChangelogReadPrivilege(searchOperation);
530
531    final Set<DN> excludedBaseDNs = getExcludedBaseDNs();
532    final MultiDomainServerState cookie = getCookieFromControl(searchOperation, excludedBaseDNs);
533
534    final ChangeNumberRange range = optimizeSearch(searchOperation.getBaseDN(), searchOperation.getFilter());
535    try
536    {
537      final boolean isPersistentSearch = isPersistentSearch(searchOperation);
538      if (cookie != null)
539      {
540        initialSearchFromCookie(
541            getCookieEntrySender(SearchPhase.INITIAL, searchOperation, cookie, excludedBaseDNs, isPersistentSearch));
542      }
543      else
544      {
545        initialSearchFromChangeNumber(
546            getChangeNumberEntrySender(SearchPhase.INITIAL, searchOperation, range, isPersistentSearch));
547      }
548    }
549    catch (ChangelogException e)
550    {
551      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
552          searchOperation.getBaseDN(), searchOperation.getFilter(), stackTraceToSingleLineString(e)));
553    }
554  }
555
556  private MultiDomainServerState getCookieFromControl(final SearchOperation searchOperation, Set<DN> excludedBaseDNs)
557      throws DirectoryException
558  {
559    final ExternalChangelogRequestControl eclRequestControl =
560        searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
561    if (eclRequestControl != null)
562    {
563      final MultiDomainServerState cookie = eclRequestControl.getCookie();
564      validateProvidedCookie(cookie, excludedBaseDNs);
565      return cookie;
566    }
567    return null;
568  }
569
570  @Override
571  public Set<String> getSupportedControls()
572  {
573    return supportedControls;
574  }
575
576  @Override
577  public Set<String> getSupportedFeatures()
578  {
579    return Collections.emptySet();
580  }
581
582  @Override
583  public boolean supports(BackendOperation backendOperation)
584  {
585    return false;
586  }
587
588  @Override
589  public void exportLDIF(final LDIFExportConfig exportConfig)
590      throws DirectoryException
591  {
592    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
593        ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
594  }
595
596  @Override
597  public LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext)
598      throws DirectoryException
599  {
600    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
601        ERR_BACKEND_IMPORT_AND_EXPORT_NOT_SUPPORTED.get(getBackendID()));
602  }
603
604  @Override
605  public void createBackup(BackupConfig backupConfig) throws DirectoryException
606  {
607    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
608        ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
609  }
610
611  @Override
612  public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
613  {
614      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
615          ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
616  }
617
618  @Override
619  public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
620  {
621      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
622          ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
623  }
624
625  @Override
626  public long getEntryCount()
627  {
628    try
629    {
630      return getNumberOfEntriesInBaseDN(CHANGELOG_BASE_DN) + 1;
631    }
632    catch (DirectoryException e)
633    {
634      logger.traceException(e);
635      return -1;
636    }
637  }
638
639  /**
640   * Represent the change number range targeted by a search operation.
641   * <p>
642   * This class should be visible for tests.
643   */
644  static final class ChangeNumberRange
645  {
646    private long lowerBound = -1;
647    private long upperBound = -1;
648
649    /**
650     * Returns the lowest change number to retrieve (inclusive).
651     *
652     * @return the lowest change number
653     */
654    long getLowerBound()
655    {
656      return lowerBound;
657    }
658
659    /**
660     * Returns the highest change number to retrieve (inclusive).
661     *
662     * @return the highest change number
663     */
664    long getUpperBound()
665    {
666      return upperBound;
667    }
668  }
669
670  /**
671   * Returns the set of DNs to exclude from the search.
672   *
673   * @return the DNs corresponding to domains to exclude from the search.
674   * @throws DirectoryException
675   *           If a DN can't be decoded.
676   */
677  private static Set<DN> getExcludedBaseDNs() throws DirectoryException
678  {
679    return getExcludedChangelogDomains();
680  }
681
682  /**
683   * Optimize the search parameters by analyzing the DN and filter.
684   * It also performs validation on some search parameters
685   * for both cookie and change number based changelogs.
686   *
687   * @param baseDN the provided search baseDN.
688   * @param userFilter the provided search filter.
689   * @return the optimized change number range
690   * @throws DirectoryException when an exception occurs.
691   */
692  ChangeNumberRange optimizeSearch(final DN baseDN, final SearchFilter userFilter) throws DirectoryException
693  {
694    SearchFilter equalityFilter = null;
695    switch (baseDN.size())
696    {
697    case 1:
698      // "cn=changelog" : use user-provided search filter.
699      break;
700    case 2:
701      // It is probably "changeNumber=xxx,cn=changelog", use equality filter
702      // But it also could be "<service-id>,cn=changelog" so need to check on attribute
703      equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR);
704      break;
705    default:
706      // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
707      equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN");
708      break;
709    }
710
711    return optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
712  }
713
714  /**
715   * Build a search filter from given DN and attribute.
716   *
717   * @return the search filter or {@code null} if attribute is not present in
718   *         the provided DN
719   */
720  private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr)
721  {
722    final RDN rdn = baseDN.rdn();
723    AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(lowerCaseAttr, upperCaseAttr);
724    final ByteString attrValue = rdn.getAttributeValue(attrType);
725    if (attrValue != null)
726    {
727      return SearchFilter.createEqualityFilter(attrType, attrValue);
728    }
729    return null;
730  }
731
732  private ChangeNumberRange optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
733  {
734    final ChangeNumberRange range = new ChangeNumberRange();
735    if (filter == null)
736    {
737      return range;
738    }
739
740    if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
741    {
742      range.lowerBound = decodeChangeNumber(filter.getAssertionValue());
743    }
744    else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
745    {
746      range.upperBound = decodeChangeNumber(filter.getAssertionValue());
747    }
748    else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
749    {
750      final long number = decodeChangeNumber(filter.getAssertionValue());
751      range.lowerBound = number;
752      range.upperBound = number;
753    }
754    else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
755    {
756      // == exact CSN
757      // validate provided CSN is correct
758      new CSN(filter.getAssertionValue().toString());
759    }
760    else if (filter.getFilterType() == FilterType.AND)
761    {
762      // TODO: it looks like it could be generalized to N components, not only two
763      final Collection<SearchFilter> components = filter.getFilterComponents();
764      final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
765      long upper1 = -1;
766      long lower1 = -1;
767      long upper2 = -1;
768      long lower2 = -1;
769      if (filters.length > 0)
770      {
771        ChangeNumberRange range1 = optimizeSearchUsingFilter(filters[0]);
772        upper1 = range1.upperBound;
773        lower1 = range1.lowerBound;
774      }
775      if (filters.length > 1)
776      {
777        ChangeNumberRange range2 = optimizeSearchUsingFilter(filters[1]);
778        upper2 = range2.upperBound;
779        lower2 = range2.lowerBound;
780      }
781      if (upper1 == -1)
782      {
783        range.upperBound = upper2;
784      }
785      else if (upper2 == -1)
786      {
787        range.upperBound = upper1;
788      }
789      else
790      {
791        range.upperBound = Math.min(upper1, upper2);
792      }
793
794      range.lowerBound = Math.max(lower1, lower2);
795    }
796    return range;
797  }
798
799  private static long decodeChangeNumber(final ByteString assertionValue)
800      throws DirectoryException
801  {
802    try
803    {
804      return Long.decode(assertionValue.toString());
805    }
806    catch (NumberFormatException e)
807    {
808      throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
809          LocalizableMessage.raw("Could not convert value '%s' to long", assertionValue));
810    }
811  }
812
813  private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
814  {
815    return filter.getFilterType() == filterType
816           && filter.getAttributeType() != null
817           && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
818  }
819
820  /** Search the changelog when a cookie control is provided. */
821  private void initialSearchFromCookie(final CookieEntrySender entrySender)
822      throws DirectoryException, ChangelogException
823  {
824    if (!sendBaseChangelogEntry(entrySender.searchOp))
825    { // only return the base entry: stop here
826      return;
827    }
828
829    final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
830    CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
831    try (final MultiDomainDBCursor cursor =
832        replicationDomainDB.getCursorFrom(entrySender.cookie, options, entrySender.excludedBaseDNs);
833        ECLMultiDomainDBCursor replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
834    {
835      if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor))
836      {
837        entrySender.transitioningToPersistentSearchPhase();
838        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
839      }
840    }
841    finally
842    {
843      entrySender.finalizeInitialSearch();
844    }
845  }
846
847  private CookieEntrySender getCookieEntrySender(SearchPhase startPhase, final SearchOperation searchOperation,
848      MultiDomainServerState cookie, Set<DN> excludedBaseDNs, boolean isPersistentSearch)
849  {
850    if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase))
851    {
852      return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
853    }
854    return new CookieEntrySender(searchOperation, startPhase, cookie, excludedBaseDNs);
855  }
856
857  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
858      final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
859  {
860    boolean continueSearch = true;
861    while (continueSearch && replicaUpdatesCursor.next())
862    {
863      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
864      final DN domainBaseDN = replicaUpdatesCursor.getData();
865      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
866    }
867    return continueSearch;
868  }
869
870  private boolean isPersistentSearch(SearchOperation op)
871  {
872    for (PersistentSearch pSearch : getPersistentSearches())
873    {
874      if (op == pSearch.getSearchOperation())
875      {
876        return true;
877      }
878    }
879    return false;
880  }
881
882  @Override
883  public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
884  {
885    initializePersistentSearch(pSearch);
886
887    if (isCookieBased(pSearch.getSearchOperation()))
888    {
889      cookieBasedPersistentSearches.add(pSearch);
890    }
891    else
892    {
893      changeNumberBasedPersistentSearches.add(pSearch);
894    }
895    super.registerPersistentSearch(pSearch);
896  }
897
898  private void initializePersistentSearch(PersistentSearch pSearch) throws DirectoryException
899  {
900    final SearchOperation searchOp = pSearch.getSearchOperation();
901
902    // Validation must be done during registration for changes only persistent searches.
903    // Otherwise, when there is an initial search phase,
904    // validation is performed by the search() method.
905    if (pSearch.isChangesOnly())
906    {
907      checkChangelogReadPrivilege(searchOp);
908    }
909    final ChangeNumberRange range = optimizeSearch(searchOp.getBaseDN(), searchOp.getFilter());
910
911    final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
912    if (isCookieBased(searchOp))
913    {
914      final Set<DN> excludedBaseDNs = getExcludedBaseDNs();
915      final MultiDomainServerState cookie = getCookie(pSearch.isChangesOnly(), searchOp, excludedBaseDNs);
916      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT,
917          new CookieEntrySender(searchOp, startPhase, cookie, excludedBaseDNs));
918    }
919    else
920    {
921      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT,
922          new ChangeNumberEntrySender(searchOp, startPhase, range));
923    }
924  }
925
926  private MultiDomainServerState getCookie(boolean isChangesOnly, SearchOperation searchOp, Set<DN> excludedBaseDNs)
927      throws DirectoryException
928  {
929    if (isChangesOnly)
930    {
931      // this changesOnly persistent search will not go through #initialSearch()
932      // so we must initialize the cookie here
933      return getNewestCookie(searchOp);
934    }
935    return getCookieFromControl(searchOp, excludedBaseDNs);
936  }
937
938  private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
939  {
940    if (!isCookieBased(searchOp))
941    {
942      return null;
943    }
944
945    final MultiDomainServerState cookie = new MultiDomainServerState();
946    for (final Iterator<ReplicationServerDomain> it =
947        replicationServer.getDomainIterator(); it.hasNext();)
948    {
949      final DN baseDN = it.next().getBaseDN();
950      final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
951      cookie.update(baseDN, state);
952    }
953    return cookie;
954  }
955
956  /**
957   * Validates the cookie contained in search parameters by checking its content
958   * with the actual replication server state.
959   *
960   * @throws DirectoryException
961   *           If the state is not valid
962   */
963  private void validateProvidedCookie(final MultiDomainServerState cookie, Set<DN> excludedBaseDNs)
964      throws DirectoryException
965  {
966    if (cookie != null && !cookie.isEmpty())
967    {
968      replicationServer.validateCookie(cookie, excludedBaseDNs);
969    }
970  }
971
972  /** Search the changelog using change number(s). */
973  private void initialSearchFromChangeNumber(final ChangeNumberEntrySender entrySender)
974      throws ChangelogException, DirectoryException
975  {
976    if (!sendBaseChangelogEntry(entrySender.searchOp))
977    { // only return the base entry: stop here
978      return;
979    }
980
981    final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<>();
982    try (DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = getCNIndexDBCursor(entrySender.lowestChangeNumber))
983    {
984      final MultiDomainServerState cookie = new MultiDomainServerState();
985
986      if (sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie))
987      {
988        entrySender.transitioningToPersistentSearchPhase();
989        sendChangeNumberEntriesFromCursors(entrySender, cnIndexDBCursor, replicaUpdatesCursor, cookie);
990      }
991    }
992    finally
993    {
994      entrySender.finalizeInitialSearch();
995      StaticUtils.close(replicaUpdatesCursor.get());
996    }
997  }
998
999  private ChangeNumberEntrySender getChangeNumberEntrySender(SearchPhase startPhase,
1000      final SearchOperation searchOperation, ChangeNumberRange range, boolean isPersistentSearch)
1001  {
1002    if (isPersistentSearch && SearchPhase.INITIAL.equals(startPhase))
1003    {
1004      return searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
1005    }
1006    return new ChangeNumberEntrySender(searchOperation, SearchPhase.INITIAL, range);
1007  }
1008
1009  private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
1010      DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor,
1011      MultiDomainServerState cookie) throws ChangelogException, DirectoryException
1012  {
1013    boolean continueSearch = true;
1014    while (continueSearch && cnIndexDBCursor.next())
1015    {
1016      // Handle the current cnIndex record
1017      final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
1018      if (replicaUpdatesCursor.get() == null)
1019      {
1020        replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
1021        initializeCookieForChangeNumberMode(cookie, cnIndexRecord);
1022      }
1023      else
1024      {
1025        cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1026      }
1027      continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
1028      if (continueSearch)
1029      {
1030        final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN());
1031        if (updateMsg != null)
1032        {
1033          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
1034          replicaUpdatesCursor.get().next();
1035        }
1036      }
1037    }
1038    return continueSearch;
1039  }
1040
1041  /** Initialize the provided cookie from the provided change number index record. */
1042  private void initializeCookieForChangeNumberMode(
1043      MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
1044  {
1045    // Initialize the multi domain cursor only from the change number index record.
1046    // The cookie is always empty at this stage.
1047    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN());
1048    MultiDomainServerState unused = new MultiDomainServerState();
1049    MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options);
1050    try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
1051    {
1052      updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord);
1053    }
1054  }
1055
1056  /**
1057   * Rebuilds the changelogcookie starting at the newest change number index record.
1058   * <p>
1059   * It updates the provided cookie with the changes from the provided ECL cursor,
1060   * up to (and including) the provided change number index record.
1061   * <p>
1062   * Therefore, after calling this method, the cursor is positioned
1063   * to the change immediately following the provided change number index record.
1064   *
1065   * @param cookie the cookie to update
1066   * @param cursor the cursor where to read changes from
1067   * @param cnIndexRecord the change number index record to go right after
1068   * @throws ChangelogException if any problem occurs
1069   */
1070  public static void updateCookieToMediumConsistencyPoint(
1071      MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord)
1072          throws ChangelogException
1073  {
1074    if (cnIndexRecord == null)
1075    {
1076      return;
1077    }
1078
1079    while (cursor.next())
1080    {
1081      UpdateMsg updateMsg = cursor.getRecord();
1082      if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0)
1083      {
1084        break;
1085      }
1086      cookie.update(cursor.getData(), updateMsg.getCSN());
1087    }
1088  }
1089
1090  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
1091      final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
1092  {
1093    final MultiDomainServerState state = new MultiDomainServerState();
1094    state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
1095
1096    // No need for ECLMultiDomainDBCursor in this case
1097    // as updateMsg will be matched with cnIndexRecord
1098    CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
1099    final MultiDomainDBCursor replicaUpdatesCursor =
1100        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options);
1101    replicaUpdatesCursor.next();
1102    return replicaUpdatesCursor;
1103  }
1104
1105  /**
1106   * Returns the replica update message corresponding to the provided
1107   * cnIndexRecord.
1108   *
1109   * @return the update message, which may be {@code null} if the update message
1110   *         could not be found because it was purged or because corresponding
1111   *         baseDN was removed from the changelog
1112   * @throws DirectoryException
1113   *           If inconsistency is detected between the available update
1114   *           messages and the provided cnIndexRecord
1115   */
1116  private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn)
1117      throws ChangelogException, DirectoryException
1118  {
1119    while (true)
1120    {
1121      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
1122      final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN());
1123      if (compareIndexWithUpdateMsg < 0) {
1124        // Either update message has been purged or baseDN has been removed from changelogDB,
1125        // ignore current index record and go to the next one
1126        return null;
1127      }
1128      else if (compareIndexWithUpdateMsg == 0)
1129      {
1130        // Found the matching update message
1131        return updateMsg;
1132      }
1133      // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
1134      if (!replicaUpdatesCursor.next())
1135      {
1136        // Should never happen, as it means some messages have disappeared
1137        // TODO : put the correct I18N message
1138        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1139            LocalizableMessage.raw("Could not find replica update message matching index record. " +
1140                "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
1141      }
1142    }
1143  }
1144
1145  /** Returns a cursor on CNIndexDB for the provided first change number. */
1146  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
1147      final long firstChangeNumber) throws ChangelogException
1148  {
1149    final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
1150    long changeNumberToUse = firstChangeNumber;
1151    if (changeNumberToUse <= 1)
1152    {
1153      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
1154      changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
1155    }
1156    return cnIndexDB.getCursorFrom(changeNumberToUse);
1157  }
1158
1159  /** Creates a changelog entry. */
1160  private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
1161      final UpdateMsg msg) throws DirectoryException
1162  {
1163    if (msg instanceof AddMsg)
1164    {
1165      return createAddMsg(baseDN, changeNumber, cookie, msg);
1166    }
1167    else if (msg instanceof ModifyCommonMsg)
1168    {
1169      return createModifyMsg(baseDN, changeNumber, cookie, msg);
1170    }
1171    else if (msg instanceof DeleteMsg)
1172    {
1173      final DeleteMsg delMsg = (DeleteMsg) msg;
1174      return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
1175    }
1176    throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
1177        LocalizableMessage.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN,
1178            msg.getClass()));
1179  }
1180
1181  /**
1182   * Creates an entry from an add message.
1183   * <p>
1184   * Map addMsg to an LDIF string for the 'changes' attribute, and pull out
1185   * change initiators name if available which is contained in the creatorsName
1186   * attribute.
1187   */
1188  private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
1189      throws DirectoryException
1190  {
1191    final AddMsg addMsg = (AddMsg) msg;
1192    String changeInitiatorsName = null;
1193    String ldifChanges = null;
1194    try
1195    {
1196      final StringBuilder builder = new StringBuilder(256);
1197      for (Attribute attr : addMsg.getAttributes())
1198      {
1199        if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty())
1200        {
1201          // This attribute is not multi-valued.
1202          changeInitiatorsName = attr.iterator().next().toString();
1203        }
1204        final String attrName = attr.getNameWithOptions();
1205        for (ByteString value : attr)
1206        {
1207          builder.append(attrName);
1208          appendLDIFSeparatorAndValue(builder, value);
1209          builder.append('\n');
1210        }
1211      }
1212      ldifChanges = builder.toString();
1213    }
1214    catch (Exception e)
1215    {
1216      logEncodingMessageError("add", addMsg.getDN(), e);
1217    }
1218
1219    return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
1220  }
1221
1222  /**
1223   * Creates an entry from a modify message.
1224   * <p>
1225   * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
1226   * out change initiators name if available which is contained in the
1227   * modifiersName attribute.
1228   */
1229  private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
1230      final UpdateMsg msg) throws DirectoryException
1231  {
1232    final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
1233    String changeInitiatorsName = null;
1234    String ldifChanges = null;
1235    try
1236    {
1237      final StringBuilder builder = new StringBuilder(128);
1238      for (Modification mod : modifyMsg.getMods())
1239      {
1240        final Attribute attr = mod.getAttribute();
1241        if (mod.getModificationType() == ModificationType.REPLACE
1242            && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE)
1243            && !attr.isEmpty())
1244        {
1245          // This attribute is not multi-valued.
1246          changeInitiatorsName = attr.iterator().next().toString();
1247        }
1248        final String attrName = attr.getNameWithOptions();
1249        builder.append(mod.getModificationType());
1250        builder.append(": ");
1251        builder.append(attrName);
1252        builder.append('\n');
1253
1254        for (ByteString value : attr)
1255        {
1256          builder.append(attrName);
1257          appendLDIFSeparatorAndValue(builder, value);
1258          builder.append('\n');
1259        }
1260        builder.append("-\n");
1261      }
1262      ldifChanges = builder.toString();
1263    }
1264    catch (Exception e)
1265    {
1266      logEncodingMessageError("modify", modifyMsg.getDN(), e);
1267    }
1268
1269    final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
1270    final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
1271        isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
1272
1273    if (isModifyDNMsg)
1274    {
1275      final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
1276      addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
1277      if (modDNMsg.getNewSuperior() != null)
1278      {
1279        addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
1280      }
1281      addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
1282    }
1283    return entry;
1284  }
1285
1286  /**
1287   * Log an encoding message error.
1288   *
1289   * @param messageType
1290   *            String identifying type of message. Should be "add" or "modify".
1291   * @param entryDN
1292   *            DN of original entry
1293   */
1294  private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
1295  {
1296    logger.traceException(exception);
1297    logger.error(LocalizableMessage.raw(
1298        "An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
1299        + entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
1300  }
1301
1302  private void checkChangelogReadPrivilege(SearchOperation searchOp) throws DirectoryException
1303  {
1304    if (!searchOp.getClientConnection().hasPrivilege(Privilege.CHANGELOG_READ, searchOp))
1305    {
1306      throw new DirectoryException(ResultCode.INSUFFICIENT_ACCESS_RIGHTS,
1307          NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
1308    }
1309  }
1310
1311  /**
1312   * Create a changelog entry from a set of provided information. This is the part of
1313   * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
1314   */
1315  private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
1316      final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
1317      final String changeInitiatorsName) throws DirectoryException
1318  {
1319    final CSN csn = msg.getCSN();
1320    String dnString;
1321    if (changeNumber > 0)
1322    {
1323      // change number mode
1324      dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
1325    }
1326    else
1327    {
1328      // Cookie mode
1329      dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
1330    }
1331
1332    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>();
1333    final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<>();
1334
1335    // Operational standard attributes
1336    addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
1337        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
1338    addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs);
1339    addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs);
1340    addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
1341
1342    // REQUIRED attributes
1343    if (changeNumber > 0)
1344    {
1345      addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
1346    }
1347    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
1348    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
1349    final String format = dateFormat.format(new Date(csn.getTime()));
1350    addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs);
1351    addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs);
1352    addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs);
1353
1354    // NON REQUESTED attributes
1355    addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs);
1356    addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()),
1357        userAttrs, opAttrs);
1358
1359    if (ldifChanges != null)
1360    {
1361      addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs);
1362    }
1363    if (changeInitiatorsName != null)
1364    {
1365      addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
1366    }
1367
1368    final String targetUUID = msg.getEntryUUID();
1369    if (targetUUID != null)
1370    {
1371      addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
1372    }
1373    final String cookie2 = cookie != null ? cookie : "";
1374    addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
1375
1376    final List<RawAttribute> includedAttributes = msg.getEclIncludes();
1377    if (includedAttributes != null && !includedAttributes.isEmpty())
1378    {
1379      final StringBuilder builder = new StringBuilder(256);
1380      for (final RawAttribute includedAttribute : includedAttributes)
1381      {
1382        final String name = includedAttribute.getAttributeType();
1383        for (final ByteString value : includedAttribute.getValues())
1384        {
1385          builder.append(name);
1386          appendLDIFSeparatorAndValue(builder, value);
1387          builder.append('\n');
1388        }
1389      }
1390      final String includedAttributesLDIF = builder.toString();
1391      addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
1392    }
1393
1394    return new Entry(DN.valueOf(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
1395  }
1396
1397  /**
1398   * Sends the entry if it matches the base, scope and filter of the current search operation.
1399   * It will also send the base changelog entry if it needs to be sent and was not sent before.
1400   *
1401   * @return {@code true} if search should continue, {@code false} otherwise
1402   */
1403  private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
1404      throws DirectoryException
1405  {
1406    if (matchBaseAndScopeAndFilter(searchOp, entry))
1407    {
1408      return searchOp.returnEntry(entry, getControls(cookie));
1409    }
1410    // maybe the next entry will match?
1411    return true;
1412  }
1413
1414  /** Indicates if the provided entry matches the filter, base and scope. */
1415  private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
1416  {
1417    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
1418        && searchOp.getFilter().matchesEntry(entry);
1419  }
1420
1421  private static List<Control> getControls(String cookie)
1422  {
1423    if (cookie != null)
1424    {
1425      final Control c = new EntryChangelogNotificationControl(true, cookie);
1426      return Collections.singletonList(c);
1427    }
1428    return Collections.emptyList();
1429  }
1430
1431  /**
1432   * Create and returns the base changelog entry to the underlying search operation.
1433   * <p>
1434   * "initial search" phase must return the base entry immediately.
1435   *
1436   * @return {@code true} if search should continue, {@code false} otherwise
1437   */
1438  private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException
1439  {
1440    final DN baseDN = searchOp.getBaseDN();
1441    final SearchFilter filter = searchOp.getFilter();
1442    final SearchScope scope = searchOp.getScope();
1443
1444    if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
1445    {
1446      final Entry entry = buildBaseChangelogEntry();
1447      if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
1448      {
1449        // Abandon, size limit reached.
1450        return false;
1451      }
1452    }
1453    return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
1454        || !scope.equals(SearchScope.BASE_OBJECT);
1455  }
1456
1457  private Entry buildBaseChangelogEntry() throws DirectoryException
1458  {
1459    final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates());
1460
1461    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<>();
1462    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<>();
1463
1464    // We never return the numSubordinates attribute for the base changelog entry
1465    // and there is a very good reason for that:
1466    // - Either we compute it before sending the entries,
1467    // -- then we risk returning more entries if new entries come in after we computed numSubordinates
1468    // --   or we risk returning less entries if purge kicks in      after we computed numSubordinates
1469    // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError
1470
1471    addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
1472    addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
1473        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
1474    addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs);
1475    addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
1476    return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
1477  }
1478
1479  private static void addAttribute(final Entry e, final String attrType, final String attrValue)
1480  {
1481    e.addAttribute(Attributes.create(attrType, attrValue), null);
1482  }
1483
1484  private static void addAttributeByType(String attrNameLowercase,
1485      String attrNameUppercase, String attrValue,
1486      Map<AttributeType, List<Attribute>> userAttrs,
1487      Map<AttributeType, List<Attribute>> operationalAttrs)
1488  {
1489    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
1490  }
1491
1492  private static void addAttributeByUppercaseName(String attrNameLowercase,
1493      String attrNameUppercase,  String attrValue,
1494      Map<AttributeType, List<Attribute>> userAttrs,
1495      Map<AttributeType, List<Attribute>> operationalAttrs)
1496  {
1497    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false);
1498  }
1499
1500  private static void addAttribute(final String attrNameLowercase,
1501      final String attrNameUppercase, final String attrValue,
1502      final Map<AttributeType, List<Attribute>> userAttrs,
1503      final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
1504  {
1505    AttributeType attrType = DirectoryServer.getAttributeTypeOrDefault(attrNameLowercase, attrNameUppercase);
1506    final Attribute a = addByType
1507        ? Attributes.create(attrType, attrValue)
1508        : Attributes.create(attrNameUppercase, attrValue);
1509    final List<Attribute> attrList = Collections.singletonList(a);
1510    if (attrType.isOperational())
1511    {
1512      operationalAttrs.put(attrType, attrList);
1513    }
1514    else
1515    {
1516      userAttrs.put(attrType, attrList);
1517    }
1518  }
1519
1520  /** Describes the current search phase. */
1521  private enum SearchPhase
1522  {
1523    /**
1524     * "Initial search" phase. The "initial search" phase is running
1525     * concurrently. All update notifications are ignored.
1526     */
1527    INITIAL,
1528    /**
1529     * Transitioning from the "initial search" phase to the "persistent search"
1530     * phase. "Initial search" phase has finished reading from the DB. It now
1531     * verifies if any more updates have been persisted to the DB since stopping
1532     * and send them. All update notifications are blocked.
1533     */
1534    TRANSITIONING,
1535    /**
1536     * "Persistent search" phase. "Initial search" phase has completed. All
1537     * update notifications are published.
1538     */
1539    PERSISTENT;
1540  }
1541
1542  /**
1543   * Contains data to ensure that the same change is not sent twice to clients
1544   * because of race conditions between the "initial search" phase and the
1545   * "persistent search" phase.
1546   */
1547  private static class SendEntryData<K extends Comparable<K>>
1548  {
1549    private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<>(SearchPhase.INITIAL);
1550    private final Object transitioningLock = new Object();
1551    private volatile K lastKeySentByInitialSearch;
1552
1553    private SendEntryData(SearchPhase startPhase)
1554    {
1555      searchPhase.set(startPhase);
1556    }
1557
1558    private void finalizeInitialSearch()
1559    {
1560      searchPhase.set(SearchPhase.PERSISTENT);
1561      synchronized (transitioningLock)
1562      { // initial search phase has completed, release all persistent searches
1563        transitioningLock.notifyAll();
1564      }
1565    }
1566
1567    public void transitioningToPersistentSearchPhase()
1568    {
1569      searchPhase.set(SearchPhase.TRANSITIONING);
1570    }
1571
1572    private void initialSearchSendsEntry(final K key)
1573    {
1574      lastKeySentByInitialSearch = key;
1575    }
1576
1577    private boolean persistentSearchCanSendEntry(K key)
1578    {
1579      final SearchPhase stateValue = searchPhase.get();
1580      switch (stateValue)
1581      {
1582      case INITIAL:
1583        return false;
1584      case TRANSITIONING:
1585        synchronized (transitioningLock)
1586        {
1587          while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
1588          {
1589            // "initial search" phase is over, and is now verifying whether new
1590            // changes have been published to the DB.
1591            // Wait for this check to complete
1592            try
1593            {
1594              transitioningLock.wait();
1595            }
1596            catch (InterruptedException e)
1597            {
1598              Thread.currentThread().interrupt();
1599              // Shutdown must have been called. Stop sending entries.
1600              return false;
1601            }
1602          }
1603        }
1604        return key.compareTo(lastKeySentByInitialSearch) > 0;
1605      case PERSISTENT:
1606        return true;
1607      default:
1608        throw new RuntimeException("Not implemented for " + stateValue);
1609      }
1610    }
1611  }
1612
1613  /** Sends entries to clients for change number searches. */
1614  private static class ChangeNumberEntrySender
1615  {
1616    private final SearchOperation searchOp;
1617    private final long lowestChangeNumber;
1618    private final long highestChangeNumber;
1619    private final SendEntryData<Long> sendEntryData;
1620
1621    private ChangeNumberEntrySender(SearchOperation searchOp, SearchPhase startPhase, ChangeNumberRange range)
1622    {
1623      this.searchOp = searchOp;
1624      this.sendEntryData = new SendEntryData<>(startPhase);
1625      this.lowestChangeNumber = range.lowerBound;
1626      this.highestChangeNumber = range.upperBound;
1627    }
1628
1629    /**
1630     * Indicates if provided change number is compatible with last change
1631     * number.
1632     *
1633     * @param changeNumber
1634     *          The change number to test.
1635     * @return {@code true} if and only if the provided change number is in the
1636     *         range of the last change number.
1637     */
1638    boolean changeNumberIsInRange(long changeNumber)
1639    {
1640      return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
1641    }
1642
1643    private void finalizeInitialSearch()
1644    {
1645      sendEntryData.finalizeInitialSearch();
1646    }
1647
1648    private void transitioningToPersistentSearchPhase()
1649    {
1650      sendEntryData.transitioningToPersistentSearchPhase();
1651    }
1652
1653    /**
1654     * @return {@code true} if search should continue, {@code false} otherwise
1655     */
1656    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg,
1657        MultiDomainServerState cookie) throws DirectoryException
1658    {
1659      final DN baseDN = cnIndexRecord.getBaseDN();
1660      sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
1661      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
1662      return sendEntryIfMatches(searchOp, entry, null);
1663    }
1664
1665    private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
1666    {
1667      if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
1668      {
1669        sendEntryIfMatches(searchOp, entry, null);
1670      }
1671    }
1672  }
1673
1674  /** Sends entries to clients for cookie-based searches. */
1675  private static class CookieEntrySender {
1676    private final SearchOperation searchOp;
1677    private final SearchPhase startPhase;
1678    private final Set<DN> excludedBaseDNs;
1679    private final MultiDomainServerState cookie;
1680    private final ConcurrentSkipListMap<ReplicaId, SendEntryData<CSN>> replicaIdToSendEntryData =
1681        new ConcurrentSkipListMap<>();
1682
1683    private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase, MultiDomainServerState cookie,
1684        Set<DN> excludedBaseDNs)
1685    {
1686      this.searchOp = searchOp;
1687      this.startPhase = startPhase;
1688      this.cookie = cookie;
1689      this.excludedBaseDNs = excludedBaseDNs;
1690    }
1691
1692    private void finalizeInitialSearch()
1693    {
1694      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
1695      {
1696        sendEntryData.finalizeInitialSearch();
1697      }
1698    }
1699
1700    private void transitioningToPersistentSearchPhase()
1701    {
1702      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
1703      {
1704        sendEntryData.transitioningToPersistentSearchPhase();
1705      }
1706    }
1707
1708    private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
1709    {
1710      final ReplicaId replicaId = ReplicaId.of(baseDN, csn.getServerId());
1711      SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
1712      if (data == null)
1713      {
1714        final SendEntryData<CSN> newData = new SendEntryData<>(startPhase);
1715        data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
1716        return data == null ? newData : data;
1717      }
1718      return data;
1719    }
1720
1721    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
1722    {
1723      final CSN csn = updateMsg.getCSN();
1724      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
1725      sendEntryData.initialSearchSendsEntry(csn);
1726      final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
1727      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
1728      return sendEntryIfMatches(searchOp, entry, cookieString);
1729    }
1730
1731    private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
1732        throws DirectoryException
1733    {
1734      final CSN csn = updateMsg.getCSN();
1735      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
1736      if (sendEntryData.persistentSearchCanSendEntry(csn))
1737      {
1738        // multi threaded case: wait for the "initial search" phase to set the cookie
1739        final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
1740        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
1741        // FIXME JNR use this instead of previous line:
1742        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
1743        sendEntryIfMatches(searchOp, cookieEntry, cookieString);
1744      }
1745    }
1746
1747    private String updateCookie(DN baseDN, final CSN csn)
1748    {
1749      synchronized (cookie)
1750      { // forbid concurrent updates to the cookie
1751        cookie.update(baseDN, csn);
1752        return cookie.toString();
1753      }
1754    }
1755  }
1756}