001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2007-2010 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.protocol;
028
029import java.util.*;
030import java.util.zip.DataFormatException;
031
032import org.opends.server.replication.common.AssuredMode;
033import org.opends.server.replication.common.DSInfo;
034import org.opends.server.replication.common.RSInfo;
035import org.opends.server.replication.common.ServerStatus;
036
037import static org.opends.server.replication.protocol.ProtocolVersion.*;
038
039/**
040 * This class defines a message that is sent:
041 * - By a RS to the other RSs in the topology, containing:
042 *   - the DSs directly connected to the RS in the DS infos
043 *   - only this RS in the RS infos
044 * - By a RS to his connected DSs, containing every DSs and RSs he knows.
045 * In that case the message contains:
046 *   - every DSs the RS knows except the destinator DS in the DS infos
047 *   - every connected RSs (including the sending RS) in the RS infos
048 *
049 * Exchanging these messages allows to have each RS or DS take
050 * appropriate decisions according to the current topology:
051 * - a RS can route a message to a DS
052 * - a DS can decide towards which peer DS send referrals
053 * ...
054 */
055public class TopologyMsg extends ReplicationMsg
056{
057  /** Information for the DSs (aka replicas) known in the topology. */
058  private final Map<Integer, DSInfo> replicaInfos;
059  /** Information for the RSs known in the topology. */
060  private final List<RSInfo> rsInfos;
061
062  /**
063   * Creates a new changelogInfo message from its encoded form.
064   *
065   * @param in The byte array containing the encoded form of the message.
066   * @param version The protocol version to use to decode the msg.
067   * @throws java.util.zip.DataFormatException If the byte array does not
068   * contain a valid encoded form of the message.
069   */
070  TopologyMsg(byte[] in, short version) throws DataFormatException
071  {
072    final ByteArrayScanner scanner = new ByteArrayScanner(in);
073    final byte msgType = scanner.nextByte();
074    if (msgType != MSG_TYPE_TOPOLOGY)
075    {
076      throw new DataFormatException("Input is not a valid "
077          + getClass().getCanonicalName());
078    }
079
080    // Read the DS info entries, first read number of them
081    int nDsInfo = scanner.nextByte();
082    final Map<Integer, DSInfo> replicaInfos = new HashMap<>(Math.max(0, nDsInfo));
083    while (nDsInfo > 0 && !scanner.isEmpty())
084    {
085      final DSInfo dsInfo = nextDSInfo(scanner, version);
086      replicaInfos.put(dsInfo.getDsId(), dsInfo);
087      nDsInfo--;
088    }
089
090    // Read the RS info entries
091    int nRsInfo = scanner.nextByte();
092    final List<RSInfo> rsInfos = new ArrayList<>(Math.max(0, nRsInfo));
093    while (nRsInfo > 0 && !scanner.isEmpty())
094    {
095      rsInfos.add(nextRSInfo(scanner, version));
096      nRsInfo--;
097    }
098
099    this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
100    this.rsInfos = Collections.unmodifiableList(rsInfos);
101  }
102
103  private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
104      throws DataFormatException
105  {
106    final int dsId = scanner.nextIntUTF8();
107    final String dsUrl =
108        version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
109    final int rsId = scanner.nextIntUTF8();
110    final long generationId = scanner.nextLongUTF8();
111    final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
112    final boolean assuredFlag = scanner.nextBoolean();
113    final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
114    final byte safeDataLevel = scanner.nextByte();
115    final byte groupId = scanner.nextByte();
116
117    final List<String> refUrls = new ArrayList<>();
118    scanner.nextStrings(refUrls);
119
120    final Set<String> attrs = new HashSet<>();
121    final Set<String> delattrs = new HashSet<>();
122    short protocolVersion = -1;
123    if (version >= REPLICATION_PROTOCOL_V4)
124    {
125      scanner.nextStrings(attrs);
126
127      if (version >= REPLICATION_PROTOCOL_V5)
128      {
129        scanner.nextStrings(delattrs);
130      }
131      else
132      {
133        // Default to using the same set of attributes for deletes.
134        delattrs.addAll(attrs);
135      }
136
137      protocolVersion = scanner.nextByte();
138    }
139
140    return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
141        assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
142        protocolVersion);
143  }
144
145  private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
146      throws DataFormatException
147  {
148    final int rsId = scanner.nextIntUTF8();
149    final long generationId = scanner.nextLongUTF8();
150    final byte groupId = scanner.nextByte();
151
152    int weight = 1;
153    String serverUrl = null;
154    if (version >= REPLICATION_PROTOCOL_V4)
155    {
156      serverUrl = scanner.nextString();
157      weight = scanner.nextIntUTF8();
158    }
159
160    return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
161  }
162
163  /**
164   * Creates a new message of the currently connected servers.
165   *
166   * @param dsInfos The collection of currently connected DS servers ID.
167   * @param rsInfos The list of currently connected RS servers ID.
168   */
169  public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
170  {
171    if (dsInfos == null || dsInfos.isEmpty())
172    {
173      this.replicaInfos = Collections.emptyMap();
174    }
175    else
176    {
177      Map<Integer, DSInfo> replicas = new HashMap<>();
178      for (DSInfo dsInfo : dsInfos)
179      {
180        replicas.put(dsInfo.getDsId(), dsInfo);
181      }
182      this.replicaInfos = Collections.unmodifiableMap(replicas);
183    }
184
185    if (rsInfos == null || rsInfos.isEmpty())
186    {
187      this.rsInfos = Collections.emptyList();
188    }
189    else
190    {
191      this.rsInfos =
192          Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
193    }
194  }
195
196  // ============
197  // Msg encoding
198  // ============
199
200  /** {@inheritDoc} */
201  @Override
202  public byte[] getBytes(short version)
203  {
204    /**
205     * Message has the following form:
206     * <pdu type><number of following DSInfo entries>[<DSInfo>]*
207     * <number of following RSInfo entries>[<RSInfo>]*
208     */
209    final ByteArrayBuilder builder = new ByteArrayBuilder();
210    builder.appendByte(MSG_TYPE_TOPOLOGY);
211
212    // Put DS infos
213    builder.appendByte(replicaInfos.size());
214    for (DSInfo dsInfo : replicaInfos.values())
215    {
216      builder.appendIntUTF8(dsInfo.getDsId());
217      if (version >= REPLICATION_PROTOCOL_V6)
218      {
219        builder.appendString(dsInfo.getDsUrl());
220      }
221      builder.appendIntUTF8(dsInfo.getRsId());
222      builder.appendLongUTF8(dsInfo.getGenerationId());
223      builder.appendByte(dsInfo.getStatus().getValue());
224      builder.appendBoolean(dsInfo.isAssured());
225      builder.appendByte(dsInfo.getAssuredMode().getValue());
226      builder.appendByte(dsInfo.getSafeDataLevel());
227      builder.appendByte(dsInfo.getGroupId());
228
229      builder.appendStrings(dsInfo.getRefUrls());
230
231      if (version >= REPLICATION_PROTOCOL_V4)
232      {
233        builder.appendStrings(dsInfo.getEclIncludes());
234        if (version >= REPLICATION_PROTOCOL_V5)
235        {
236          builder.appendStrings(dsInfo.getEclIncludesForDeletes());
237        }
238        builder.appendByte(dsInfo.getProtocolVersion());
239      }
240    }
241
242    // Put RS infos
243    builder.appendByte(rsInfos.size());
244    for (RSInfo rsInfo : rsInfos)
245    {
246      builder.appendIntUTF8(rsInfo.getId());
247      builder.appendLongUTF8(rsInfo.getGenerationId());
248      builder.appendByte(rsInfo.getGroupId());
249
250      if (version >= REPLICATION_PROTOCOL_V4)
251      {
252        builder.appendString(rsInfo.getServerUrl());
253        builder.appendIntUTF8(rsInfo.getWeight());
254      }
255    }
256
257    return builder.toByteArray();
258  }
259
260  /** {@inheritDoc} */
261  @Override
262  public String toString()
263  {
264    String dsStr = "";
265    for (DSInfo dsInfo : replicaInfos.values())
266    {
267      dsStr += dsInfo + "\n----------------------------\n";
268    }
269
270    String rsStr = "";
271    for (RSInfo rsInfo : rsInfos)
272    {
273      rsStr += rsInfo + "\n----------------------------\n";
274    }
275
276    return "TopologyMsg content:"
277      + "\n----------------------------"
278      + "\nCONNECTED DS SERVERS:"
279      + "\n--------------------\n"
280      + dsStr
281      + "CONNECTED RS SERVERS:"
282      + "\n--------------------\n"
283      + rsStr
284      + ("".equals(rsStr) ? "----------------------------\n" : "");
285  }
286
287  /**
288   * Get the DS infos.
289   *
290   * @return The DS infos
291   */
292  public Map<Integer, DSInfo> getReplicaInfos()
293  {
294    return replicaInfos;
295  }
296
297  /**
298   * Get the RS infos.
299   *
300   * @return The RS infos
301   */
302  public List<RSInfo> getRsInfos()
303  {
304    return rsInfos;
305  }
306}