001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *      Copyright 2014-2015 ForgeRock AS
024 */
025package org.opends.server.replication.service;
026
027import java.util.concurrent.ConcurrentSkipListSet;
028import java.util.concurrent.atomic.AtomicLong;
029
030import org.opends.server.types.DN;
031
032/**
033 * Class useful for the case where DS/RS instances are collocated inside the
034 * same JVM. It synchronizes the shutdown of the DS and RS sides.
035 * <p>
036 * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is
037 * relayed/forwarded by the collocated RS to the other RSs in the topology
038 * before the whole process shuts down.
039 *
040 * @since OPENDJ-1453
041 */
042public class DSRSShutdownSync
043{
044  private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs = new ConcurrentSkipListSet<>();
045  private static AtomicLong stopInstanceTimestamp = new AtomicLong();
046
047  /**
048   * Message has been sent.
049   *
050   * @param baseDN
051   *          the domain for which the message has been sent
052   */
053  public void replicaOfflineMsgSent(DN baseDN)
054  {
055    stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis());
056    replicaOfflineMsgs.add(baseDN);
057  }
058
059  /**
060   * Message has been forwarded.
061   *
062   * @param baseDN
063   *          the domain for which the message has been sent
064   */
065  public void replicaOfflineMsgForwarded(DN baseDN)
066  {
067    replicaOfflineMsgs.remove(baseDN);
068  }
069
070  /**
071   * Whether a ReplicationServer ServerReader or ServerWriter can proceed with
072   * shutdown.
073   *
074   * @param baseDN
075   *          the baseDN of the ServerReader or ServerWriter .
076   * @return true if the caller can shutdown, false otherwise
077   */
078  public boolean canShutdown(DN baseDN)
079  {
080    return !replicaOfflineMsgs.contains(baseDN)
081        || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000;
082  }
083}