001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2008 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.plugin; 028 029import static org.opends.messages.ReplicationMessages.*; 030import static org.opends.server.util.StaticUtils.*; 031 032import java.util.concurrent.BlockingQueue; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035 036import org.opends.server.api.DirectoryThread; 037import org.forgerock.i18n.slf4j.LocalizedLogger; 038import org.opends.server.replication.protocol.LDAPUpdateMsg; 039 040/** 041 * Thread that is used to get message from the replication servers (stored 042 * in the updates queue) and replay them in the current server. A configurable 043 * number of this thread is created for the whole MultimasterReplication object 044 * (i.e: these threads are shared across the ReplicationDomain objects for 045 * replaying the updates they receive) 046 */ 047public class ReplayThread extends DirectoryThread 048{ 049 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 050 051 private final BlockingQueue<UpdateToReplay> updateToReplayQueue; 052 private AtomicBoolean shutdown = new AtomicBoolean(false); 053 private static int count; 054 055 /** 056 * Constructor for the ReplayThread. 057 * 058 * @param updateToReplayQueue The queue of update messages we have to replay 059 */ 060 public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue) 061 { 062 super("Replica replay thread " + count++); 063 this.updateToReplayQueue = updateToReplayQueue; 064 } 065 066 /** 067 * Shutdown this replay thread. 068 */ 069 public void shutdown() 070 { 071 shutdown.set(true); 072 } 073 074 /** 075 * Run method for this class. 076 */ 077 @Override 078 public void run() 079 { 080 if (logger.isTraceEnabled()) 081 { 082 logger.trace("Replication Replay thread starting."); 083 } 084 085 while (!shutdown.get()) 086 { 087 try 088 { 089 UpdateToReplay updateToreplay; 090 // Loop getting an updateToReplayQueue from the update message queue and 091 // replaying matching changes 092 while (!shutdown.get() && 093 ((updateToreplay = updateToReplayQueue.poll(1L, 094 TimeUnit.SECONDS)) != null)) 095 { 096 // Find replication domain for that update message 097 LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage(); 098 LDAPReplicationDomain domain = updateToreplay.getReplicationDomain(); 099 domain.replay(updateMsg, shutdown); 100 } 101 } catch (Exception e) 102 { 103 /* 104 * catch all exceptions happening so that the thread never dies even 105 * in case of problems. 106 */ 107 logger.error(ERR_EXCEPTION_REPLAYING_REPLICATION_MESSAGE, stackTraceToSingleLineString(e)); 108 } 109 } 110 if (logger.isTraceEnabled()) 111 { 112 logger.trace("Replication Replay thread stopping."); 113 } 114 } 115}