001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2008 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.protocol; 028 029import java.io.IOException; 030 031import org.opends.server.api.DirectoryThread; 032import org.forgerock.i18n.slf4j.LocalizedLogger; 033import org.opends.server.util.StaticUtils; 034 035/** 036 * This thread publishes a {@link HeartbeatMsg} on a given protocol session at 037 * regular intervals when there are no other replication messages being 038 * published. 039 * <p> 040 * These heartbeat messages are sent by a replication server. 041 */ 042public class HeartbeatThread extends DirectoryThread 043{ 044 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 045 046 047 /** 048 * For test purposes only to simulate loss of heartbeats. 049 */ 050 private static volatile boolean heartbeatsDisabled; 051 052 /** 053 * The session on which heartbeats are to be sent. 054 */ 055 private final Session session; 056 057 058 /** 059 * The time in milliseconds between heartbeats. 060 */ 061 private final long heartbeatInterval; 062 063 064 /** 065 * Set this to stop the thread. 066 */ 067 private volatile boolean shutdown; 068 private final Object shutdownLock = new Object(); 069 070 071 /** 072 * Create a heartbeat thread. 073 * @param threadName The name of the heartbeat thread. 074 * @param session The session on which heartbeats are to be sent. 075 * @param heartbeatInterval The desired interval between heartbeats in 076 * milliseconds. 077 */ 078 public HeartbeatThread(String threadName, Session session, 079 long heartbeatInterval) 080 { 081 super(threadName); 082 this.session = session; 083 this.heartbeatInterval = heartbeatInterval; 084 } 085 086 /** {@inheritDoc} */ 087 @Override 088 public void run() 089 { 090 try 091 { 092 if (logger.isTraceEnabled()) 093 { 094 logger.trace("Heartbeat thread is starting, interval is %d", 095 heartbeatInterval); 096 } 097 HeartbeatMsg heartbeatMessage = new HeartbeatMsg(); 098 099 while (!shutdown) 100 { 101 long now = System.currentTimeMillis(); 102 if (logger.isTraceEnabled()) 103 { 104 logger.trace("Heartbeat thread awoke at %d, last message " + 105 "was sent at %d", now, session.getLastPublishTime()); 106 } 107 108 if (now > session.getLastPublishTime() + heartbeatInterval 109 && !heartbeatsDisabled) 110 { 111 if (logger.isTraceEnabled()) 112 { 113 logger.trace("Heartbeat sent at %d", now); 114 } 115 session.publish(heartbeatMessage); 116 } 117 118 long sleepTime = session.getLastPublishTime() + heartbeatInterval - now; 119 if (sleepTime <= 0) 120 { 121 sleepTime = heartbeatInterval; 122 } 123 124 if (logger.isTraceEnabled()) 125 { 126 logger.trace("Heartbeat thread sleeping for %d", sleepTime); 127 } 128 129 synchronized (shutdownLock) 130 { 131 if (!shutdown) 132 { 133 try 134 { 135 shutdownLock.wait(sleepTime); 136 } 137 catch (InterruptedException e) 138 { 139 // Server shutdown monitor may interrupt slow threads. 140 logger.traceException(e); 141 shutdown = true; 142 } 143 } 144 } 145 } 146 } 147 catch (IOException e) 148 { 149 if (logger.isTraceEnabled()) 150 { 151 logger.trace("Heartbeat thread could not send a heartbeat." 152 + StaticUtils.stackTraceToSingleLineString(e)); 153 } 154 } 155 finally 156 { 157 if (logger.isTraceEnabled()) 158 { 159 logger.trace("Heartbeat thread is exiting."); 160 } 161 } 162 } 163 164 165 /** 166 * Call this method to stop the thread. 167 * This method is blocking until the thread has stopped. 168 */ 169 public void shutdown() 170 { 171 synchronized (shutdownLock) 172 { 173 shutdown = true; 174 shutdownLock.notifyAll(); 175 if (logger.isTraceEnabled()) 176 { 177 logger.trace("Going to notify Heartbeat thread."); 178 } 179 } 180 if (logger.isTraceEnabled()) 181 { 182 logger.trace("Returning from Heartbeat shutdown."); 183 } 184 } 185 186 187 /** 188 * For testing purposes only to simulate loss of heartbeats. 189 * @param heartbeatsDisabled Set true to prevent heartbeats from being sent. 190 */ 191 public static void setHeartbeatsDisabled(boolean heartbeatsDisabled) 192 { 193 HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled; 194 } 195}