001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2008 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.protocol;
028
029import java.io.IOException;
030
031import org.opends.server.api.DirectoryThread;
032import org.forgerock.i18n.slf4j.LocalizedLogger;
033import org.opends.server.util.StaticUtils;
034
035/**
036 * This thread publishes a {@link HeartbeatMsg} on a given protocol session at
037 * regular intervals when there are no other replication messages being
038 * published.
039 * <p>
040 * These heartbeat messages are sent by a replication server.
041 */
042public class HeartbeatThread extends DirectoryThread
043{
044  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
045
046
047  /**
048   * For test purposes only to simulate loss of heartbeats.
049   */
050  private static volatile boolean heartbeatsDisabled;
051
052  /**
053   * The session on which heartbeats are to be sent.
054   */
055  private final Session session;
056
057
058  /**
059   * The time in milliseconds between heartbeats.
060   */
061  private final long heartbeatInterval;
062
063
064  /**
065   * Set this to stop the thread.
066   */
067  private volatile boolean shutdown;
068  private final Object shutdownLock = new Object();
069
070
071  /**
072   * Create a heartbeat thread.
073   * @param threadName The name of the heartbeat thread.
074   * @param session The session on which heartbeats are to be sent.
075   * @param heartbeatInterval The desired interval between heartbeats in
076   * milliseconds.
077   */
078  public HeartbeatThread(String threadName, Session session,
079                  long heartbeatInterval)
080  {
081    super(threadName);
082    this.session = session;
083    this.heartbeatInterval = heartbeatInterval;
084  }
085
086  /** {@inheritDoc} */
087  @Override
088  public void run()
089  {
090    try
091    {
092      if (logger.isTraceEnabled())
093      {
094        logger.trace("Heartbeat thread is starting, interval is %d",
095                  heartbeatInterval);
096      }
097      HeartbeatMsg heartbeatMessage = new HeartbeatMsg();
098
099      while (!shutdown)
100      {
101        long now = System.currentTimeMillis();
102        if (logger.isTraceEnabled())
103        {
104          logger.trace("Heartbeat thread awoke at %d, last message " +
105              "was sent at %d", now, session.getLastPublishTime());
106        }
107
108        if (now > session.getLastPublishTime() + heartbeatInterval
109            && !heartbeatsDisabled)
110        {
111          if (logger.isTraceEnabled())
112          {
113            logger.trace("Heartbeat sent at %d", now);
114          }
115          session.publish(heartbeatMessage);
116        }
117
118        long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
119        if (sleepTime <= 0)
120        {
121          sleepTime = heartbeatInterval;
122        }
123
124        if (logger.isTraceEnabled())
125        {
126          logger.trace("Heartbeat thread sleeping for %d", sleepTime);
127        }
128
129        synchronized (shutdownLock)
130        {
131          if (!shutdown)
132          {
133            try
134            {
135              shutdownLock.wait(sleepTime);
136            }
137            catch (InterruptedException e)
138            {
139              // Server shutdown monitor may interrupt slow threads.
140              logger.traceException(e);
141              shutdown = true;
142            }
143          }
144        }
145      }
146    }
147    catch (IOException e)
148    {
149      if (logger.isTraceEnabled())
150      {
151        logger.trace("Heartbeat thread could not send a heartbeat."
152            + StaticUtils.stackTraceToSingleLineString(e));
153      }
154    }
155    finally
156    {
157      if (logger.isTraceEnabled())
158      {
159        logger.trace("Heartbeat thread is exiting.");
160      }
161    }
162  }
163
164
165  /**
166   * Call this method to stop the thread.
167   * This method is blocking until the thread has stopped.
168   */
169  public void shutdown()
170  {
171    synchronized (shutdownLock)
172    {
173      shutdown = true;
174      shutdownLock.notifyAll();
175      if (logger.isTraceEnabled())
176      {
177        logger.trace("Going to notify Heartbeat thread.");
178      }
179    }
180    if (logger.isTraceEnabled())
181    {
182      logger.trace("Returning from Heartbeat shutdown.");
183    }
184  }
185
186
187  /**
188   * For testing purposes only to simulate loss of heartbeats.
189   * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
190   */
191  public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
192  {
193    HeartbeatThread.heartbeatsDisabled = heartbeatsDisabled;
194  }
195}