001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2008 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.plugin;
028
029import static org.opends.messages.ReplicationMessages.*;
030import static org.opends.server.util.StaticUtils.*;
031
032import java.util.concurrent.BlockingQueue;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035
036import org.opends.server.api.DirectoryThread;
037import org.forgerock.i18n.slf4j.LocalizedLogger;
038import org.opends.server.replication.protocol.LDAPUpdateMsg;
039
040/**
041 * Thread that is used to get message from the replication servers (stored
042 * in the updates queue) and replay them in the current server. A configurable
043 * number of this thread is created for the whole MultimasterReplication object
044 * (i.e: these threads are shared across the ReplicationDomain objects for
045 * replaying the updates they receive)
046 */
047public class ReplayThread extends DirectoryThread
048{
049  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
050
051  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
052  private AtomicBoolean shutdown = new AtomicBoolean(false);
053  private static int count;
054
055  /**
056   * Constructor for the ReplayThread.
057   *
058   * @param updateToReplayQueue The queue of update messages we have to replay
059   */
060  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
061  {
062     super("Replica replay thread " + count++);
063     this.updateToReplayQueue = updateToReplayQueue;
064  }
065
066  /**
067   * Shutdown this replay thread.
068   */
069  public void shutdown()
070  {
071    shutdown.set(true);
072  }
073
074  /**
075   * Run method for this class.
076   */
077  @Override
078  public void run()
079  {
080    if (logger.isTraceEnabled())
081    {
082      logger.trace("Replication Replay thread starting.");
083    }
084
085    while (!shutdown.get())
086    {
087      try
088      {
089        UpdateToReplay updateToreplay;
090        // Loop getting an updateToReplayQueue from the update message queue and
091        // replaying matching changes
092        while (!shutdown.get() &&
093          ((updateToreplay = updateToReplayQueue.poll(1L,
094          TimeUnit.SECONDS)) != null))
095        {
096          // Find replication domain for that update message
097          LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
098          LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
099          domain.replay(updateMsg, shutdown);
100        }
101      } catch (Exception e)
102      {
103        /*
104         * catch all exceptions happening so that the thread never dies even
105         * in case of problems.
106         */
107        logger.error(ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE, stackTraceToSingleLineString(e));
108      }
109    }
110    if (logger.isTraceEnabled())
111    {
112      logger.trace("Replication Replay thread stopping.");
113    }
114  }
115}