001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * Copyright 2014-2015 ForgeRock AS 024 */ 025package org.opends.server.replication.service; 026 027import java.util.concurrent.ConcurrentSkipListSet; 028import java.util.concurrent.atomic.AtomicLong; 029 030import org.opends.server.types.DN; 031 032/** 033 * Class useful for the case where DS/RS instances are collocated inside the 034 * same JVM. It synchronizes the shutdown of the DS and RS sides. 035 * <p> 036 * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is 037 * relayed/forwarded by the collocated RS to the other RSs in the topology 038 * before the whole process shuts down. 039 * 040 * @since OPENDJ-1453 041 */ 042public class DSRSShutdownSync 043{ 044 private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs = new ConcurrentSkipListSet<>(); 045 private static AtomicLong stopInstanceTimestamp = new AtomicLong(); 046 047 /** 048 * Message has been sent. 049 * 050 * @param baseDN 051 * the domain for which the message has been sent 052 */ 053 public void replicaOfflineMsgSent(DN baseDN) 054 { 055 stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis()); 056 replicaOfflineMsgs.add(baseDN); 057 } 058 059 /** 060 * Message has been forwarded. 061 * 062 * @param baseDN 063 * the domain for which the message has been sent 064 */ 065 public void replicaOfflineMsgForwarded(DN baseDN) 066 { 067 replicaOfflineMsgs.remove(baseDN); 068 } 069 070 /** 071 * Whether a ReplicationServer ServerReader or ServerWriter can proceed with 072 * shutdown. 073 * 074 * @param baseDN 075 * the baseDN of the ServerReader or ServerWriter . 076 * @return true if the caller can shutdown, false otherwise 077 */ 078 public boolean canShutdown(DN baseDN) 079 { 080 return !replicaOfflineMsgs.contains(baseDN) 081 || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000; 082 } 083}