001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2009 Sun Microsystems, Inc.
025 *      Portions Copyright 2013-2015 ForgeRock AS.
026 */
027package org.opends.server.replication.protocol;
028
029import java.io.IOException;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.Map;
033import java.util.Map.Entry;
034import java.util.zip.DataFormatException;
035
036import org.forgerock.opendj.io.ASN1Reader;
037import org.forgerock.opendj.io.ASN1Writer;
038import org.opends.server.replication.common.CSN;
039import org.opends.server.replication.common.ServerState;
040
041/**
042 * This message is part of the replication protocol.
043 * RS1 sends a MonitorRequestMessage to RS2 to requests its monitoring
044 * information.
045 * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
046 * MonitorMsg.
047 */
048public class MonitorMsg extends ReplicationMsg
049{
050  /**
051   * The destination server or servers of this message.
052   */
053  private final int destination;
054
055  /**
056   * The serverID of the server that sends this message.
057   */
058  private final int senderID;
059
060
061
062  /**
063   * Data structure to manage the state and the approximation of the data of the
064   * first missing change for each LDAP server connected to a Replication
065   * Server.
066   */
067  private static class ServerData
068  {
069    private ServerState state;
070    private long approxFirstMissingDate;
071  }
072
073  /**
074   * Data structure to manage the state of this replication server
075   * and the state information for the servers connected to it.
076   */
077  private static class SubTopoMonitorData
078  {
079    /** This replication server DbState. */
080    private ServerState replServerDbState;
081    /** The data related to the LDAP servers connected to this RS. */
082    private final Map<Integer, ServerData> ldapStates = new HashMap<>();
083    /** The data related to the RS servers connected to this RS. */
084    private final Map<Integer, ServerData> rsStates = new HashMap<>();
085  }
086
087  private final SubTopoMonitorData data = new SubTopoMonitorData();
088
089  /**
090   * Creates a new MonitorMsg.
091   *
092   * @param sender The sender of this message.
093   * @param destination The destination of this message.
094   */
095  public MonitorMsg(int sender, int destination)
096  {
097    this.senderID = sender;
098    this.destination = destination;
099  }
100
101  /**
102   * Sets the state of the replication server.
103   * @param state The state.
104   */
105  public void setReplServerDbState(ServerState state)
106  {
107    data.replServerDbState = state;
108  }
109
110  /**
111   * Sets the information of an LDAP server.
112   * @param serverId The serverID.
113   * @param state The server state.
114   * @param approxFirstMissingDate  The approximation of the date
115   * of the older missing change. null when none.
116   * @param isLDAPServer Specifies whether the server is a DS or a RS
117   */
118  public void setServerState(int serverId, ServerState state,
119      long approxFirstMissingDate, boolean isLDAPServer)
120  {
121    final ServerData sd = new ServerData();
122    sd.state = state;
123    sd.approxFirstMissingDate = approxFirstMissingDate;
124    if (isLDAPServer)
125    {
126      data.ldapStates.put(serverId, sd);
127    }
128    else
129    {
130      data.rsStates.put(serverId, sd);
131    }
132  }
133
134  /**
135   * Get the server state for the LDAP server with the provided serverId.
136   * @param serverId The provided serverId.
137   * @return The state.
138   */
139  public ServerState getLDAPServerState(int serverId)
140  {
141    return data.ldapStates.get(serverId).state;
142  }
143
144  /**
145   * Get the server state for the RS server with the provided serverId.
146   * @param serverId The provided serverId.
147   * @return The state.
148   */
149  public ServerState getRSServerState(int serverId)
150  {
151    return data.rsStates.get(serverId).state;
152  }
153
154  /**
155   * Get the approximation of the date of the older missing change for the
156   * LDAP Server with the provided server Id.
157   * @param serverId The provided serverId.
158   * @return The approximated state.
159   */
160  public long getLDAPApproxFirstMissingDate(int serverId)
161  {
162    return data.ldapStates.get(serverId).approxFirstMissingDate;
163  }
164
165  /**
166   * Get the approximation of the date of the older missing change for the
167   * RS Server with the provided server Id.
168   * @param serverId The provided serverId.
169   * @return The approximated state.
170   */
171  public long getRSApproxFirstMissingDate(int serverId)
172  {
173    return data.rsStates.get(serverId).approxFirstMissingDate;
174  }
175
176  /**
177   * Creates a new EntryMessage from its encoded form.
178   *
179   * @param in       The byte array containing the encoded form of the message.
180   * @param version  The version of the protocol to use to decode the msg.
181   * @throws DataFormatException If the byte array does not contain a valid
182   *                             encoded form of the ServerStartMessage.
183   */
184  MonitorMsg(byte[] in, short version) throws DataFormatException
185  {
186    final ByteArrayScanner scanner = new ByteArrayScanner(in);
187    if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR)
188    {
189      throw new DataFormatException("input is not a valid "
190          + getClass().getCanonicalName());
191    }
192
193    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
194    {
195      this.senderID = scanner.nextIntUTF8();
196      this.destination = scanner.nextIntUTF8();
197    }
198    else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
199    {
200      this.senderID = scanner.nextShort();
201      this.destination = scanner.nextShort();
202    }
203    else
204    {
205      this.senderID = scanner.nextInt();
206      this.destination = scanner.nextInt();
207    }
208
209    ASN1Reader asn1Reader = scanner.getASN1Reader();
210    try
211    {
212      asn1Reader.readStartSequence();
213      // loop on the servers
214      while(asn1Reader.hasNextElement())
215      {
216        ServerState newState = new ServerState();
217        int serverId = 0;
218        long outime = 0;
219        boolean isLDAPServer = false;
220
221        asn1Reader.readStartSequence();
222        // loop on the list of CSN of the state
223        while(asn1Reader.hasNextElement())
224        {
225          CSN csn;
226          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
227          {
228            csn = CSN.valueOf(asn1Reader.readOctetString());
229          }
230          else
231          {
232            csn = CSN.valueOf(asn1Reader.readOctetStringAsString());
233          }
234
235          if (data.replServerDbState != null && serverId == 0)
236          {
237            // we are on the first CSN that is a fake CSN to store the serverId
238            // and the older update time
239            serverId = csn.getServerId();
240            outime = csn.getTime();
241            isLDAPServer = csn.getSeqnum() > 0;
242          }
243          else
244          {
245            // we are on a normal CSN
246            newState.update(csn);
247          }
248        }
249        asn1Reader.readEndSequence();
250
251        if (data.replServerDbState == null)
252        {
253          // the first state is the replication state
254          data.replServerDbState = newState;
255        }
256        else
257        {
258          // the next states are the server states
259          setServerState(serverId, newState, outime, isLDAPServer);
260        }
261      }
262      asn1Reader.readEndSequence();
263    } catch(Exception e)
264    { /* do nothing */
265    }
266  }
267
268  /** {@inheritDoc} */
269  @Override
270  public byte[] getBytes(short protocolVersion)
271  {
272    try
273    {
274      final ByteArrayBuilder builder = new ByteArrayBuilder();
275      builder.appendByte(MSG_TYPE_REPL_SERVER_MONITOR);
276      append(builder, senderID, protocolVersion);
277      append(builder, destination, protocolVersion);
278
279      /* Put the serverStates ... */
280      ASN1Writer writer = builder.getASN1Writer();
281      writer.writeStartSequence();
282      {
283        /* first put the Replication Server state */
284        writer.writeStartSequence();
285        {
286          data.replServerDbState.writeTo(writer, protocolVersion);
287        }
288        writer.writeEndSequence();
289
290        // then the DS + RS server data
291        writeServerStates(protocolVersion, writer, false /* DS */);
292        writeServerStates(protocolVersion, writer, true /* RS */);
293      }
294      writer.writeEndSequence();
295
296      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
297      {
298        // legacy coding mistake
299        builder.appendByte(0);
300      }
301      return builder.toByteArray();
302    }
303    catch (Exception e)
304    {
305      return null;
306    }
307  }
308
309  private void append(final ByteArrayBuilder builder, int data,
310      short protocolVersion)
311  {
312    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
313    {
314      builder.appendIntUTF8(data);
315    }
316    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
317    {
318      builder.appendShort(data);
319    }
320    else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4
321    {
322      builder.appendInt(data);
323    }
324  }
325
326  private void writeServerStates(short protocolVersion, ASN1Writer writer,
327      boolean writeRSStates) throws IOException
328  {
329    final Map<Integer, ServerData> servers =
330        writeRSStates ? data.rsStates : data.ldapStates;
331    final int seqNum = writeRSStates ? 0 : 1;
332    for (Map.Entry<Integer, ServerData> server : servers.entrySet())
333    {
334      writer.writeStartSequence();
335      {
336        /*
337         * A fake CSN helps storing the LDAP server ID. The sequence number will
338         * be used to differentiate between an LDAP server (1) or an RS (0).
339         */
340        CSN csn = new CSN(
341            server.getValue().approxFirstMissingDate, seqNum,
342            server.getKey());
343        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
344        {
345          writer.writeOctetString(csn.toByteString());
346        }
347        else
348        {
349          writer.writeOctetString(csn.toString());
350        }
351
352        // the CSNs that make the state
353        server.getValue().state.writeTo(writer, protocolVersion);
354      }
355      writer.writeEndSequence();
356    }
357  }
358
359  /**
360   * Get the state of the replication server that sent this message.
361   * @return The state.
362   */
363  public ServerState getReplServerDbState()
364  {
365    return data.replServerDbState;
366  }
367
368  /**
369   * Returns an iterator on the serverId of the connected LDAP servers.
370   * @return The iterator.
371   */
372  public Iterator<Integer> ldapIterator()
373  {
374    return data.ldapStates.keySet().iterator();
375  }
376
377  /**
378   * Returns an iterator on the serverId of the connected RS servers.
379   * @return The iterator.
380   */
381  public Iterator<Integer> rsIterator()
382  {
383    return data.rsStates.keySet().iterator();
384  }
385
386  /**
387   * Get the destination.
388   *
389   * @return the destination
390   */
391  public int getDestination()
392  {
393    return destination;
394  }
395
396  /**
397   * Get the server ID of the server that sent this message.
398   *
399   * @return the server id
400   */
401  public int getSenderID()
402  {
403    return senderID;
404  }
405
406  /** {@inheritDoc} */
407  @Override
408  public String toString()
409  {
410    final StringBuilder stateS = new StringBuilder("\nRState:[");
411    stateS.append(data.replServerDbState);
412    stateS.append("]");
413
414    stateS.append("\nLDAPStates:[");
415    for (Entry<Integer, ServerData> entry : data.ldapStates.entrySet())
416    {
417      ServerData sd = entry.getValue();
418      stateS.append("\n[LSstate(").append(entry.getKey()).append(")=")
419            .append(sd.state).append("]").append(" afmd=")
420            .append(sd.approxFirstMissingDate).append("]");
421    }
422
423    stateS.append("\nRSStates:[");
424    for (Entry<Integer, ServerData> entry : data.rsStates.entrySet())
425    {
426      final ServerData sd = entry.getValue();
427      stateS.append("\n[RSState(").append(entry.getKey()).append(")=")
428            .append(sd.state).append("]").append(" afmd=")
429            .append(sd.approxFirstMissingDate).append("]");
430    }
431    return getClass().getCanonicalName() +
432    "[ sender=" + this.senderID +
433    " destination=" + this.destination +
434    " data=[" + stateS + "]" +
435    "]";
436  }
437}