001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2007-2010 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.protocol; 028 029import java.util.*; 030import java.util.zip.DataFormatException; 031 032import org.opends.server.replication.common.AssuredMode; 033import org.opends.server.replication.common.DSInfo; 034import org.opends.server.replication.common.RSInfo; 035import org.opends.server.replication.common.ServerStatus; 036 037import static org.opends.server.replication.protocol.ProtocolVersion.*; 038 039/** 040 * This class defines a message that is sent: 041 * - By a RS to the other RSs in the topology, containing: 042 * - the DSs directly connected to the RS in the DS infos 043 * - only this RS in the RS infos 044 * - By a RS to his connected DSs, containing every DSs and RSs he knows. 045 * In that case the message contains: 046 * - every DSs the RS knows except the destinator DS in the DS infos 047 * - every connected RSs (including the sending RS) in the RS infos 048 * 049 * Exchanging these messages allows to have each RS or DS take 050 * appropriate decisions according to the current topology: 051 * - a RS can route a message to a DS 052 * - a DS can decide towards which peer DS send referrals 053 * ... 054 */ 055public class TopologyMsg extends ReplicationMsg 056{ 057 /** Information for the DSs (aka replicas) known in the topology. */ 058 private final Map<Integer, DSInfo> replicaInfos; 059 /** Information for the RSs known in the topology. */ 060 private final List<RSInfo> rsInfos; 061 062 /** 063 * Creates a new changelogInfo message from its encoded form. 064 * 065 * @param in The byte array containing the encoded form of the message. 066 * @param version The protocol version to use to decode the msg. 067 * @throws java.util.zip.DataFormatException If the byte array does not 068 * contain a valid encoded form of the message. 069 */ 070 TopologyMsg(byte[] in, short version) throws DataFormatException 071 { 072 final ByteArrayScanner scanner = new ByteArrayScanner(in); 073 final byte msgType = scanner.nextByte(); 074 if (msgType != MSG_TYPE_TOPOLOGY) 075 { 076 throw new DataFormatException("Input is not a valid " 077 + getClass().getCanonicalName()); 078 } 079 080 // Read the DS info entries, first read number of them 081 int nDsInfo = scanner.nextByte(); 082 final Map<Integer, DSInfo> replicaInfos = new HashMap<>(Math.max(0, nDsInfo)); 083 while (nDsInfo > 0 && !scanner.isEmpty()) 084 { 085 final DSInfo dsInfo = nextDSInfo(scanner, version); 086 replicaInfos.put(dsInfo.getDsId(), dsInfo); 087 nDsInfo--; 088 } 089 090 // Read the RS info entries 091 int nRsInfo = scanner.nextByte(); 092 final List<RSInfo> rsInfos = new ArrayList<>(Math.max(0, nRsInfo)); 093 while (nRsInfo > 0 && !scanner.isEmpty()) 094 { 095 rsInfos.add(nextRSInfo(scanner, version)); 096 nRsInfo--; 097 } 098 099 this.replicaInfos = Collections.unmodifiableMap(replicaInfos); 100 this.rsInfos = Collections.unmodifiableList(rsInfos); 101 } 102 103 private DSInfo nextDSInfo(ByteArrayScanner scanner, short version) 104 throws DataFormatException 105 { 106 final int dsId = scanner.nextIntUTF8(); 107 final String dsUrl = 108 version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString(); 109 final int rsId = scanner.nextIntUTF8(); 110 final long generationId = scanner.nextLongUTF8(); 111 final ServerStatus status = ServerStatus.valueOf(scanner.nextByte()); 112 final boolean assuredFlag = scanner.nextBoolean(); 113 final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte()); 114 final byte safeDataLevel = scanner.nextByte(); 115 final byte groupId = scanner.nextByte(); 116 117 final List<String> refUrls = new ArrayList<>(); 118 scanner.nextStrings(refUrls); 119 120 final Set<String> attrs = new HashSet<>(); 121 final Set<String> delattrs = new HashSet<>(); 122 short protocolVersion = -1; 123 if (version >= REPLICATION_PROTOCOL_V4) 124 { 125 scanner.nextStrings(attrs); 126 127 if (version >= REPLICATION_PROTOCOL_V5) 128 { 129 scanner.nextStrings(delattrs); 130 } 131 else 132 { 133 // Default to using the same set of attributes for deletes. 134 delattrs.addAll(attrs); 135 } 136 137 protocolVersion = scanner.nextByte(); 138 } 139 140 return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag, 141 assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs, 142 protocolVersion); 143 } 144 145 private RSInfo nextRSInfo(ByteArrayScanner scanner, short version) 146 throws DataFormatException 147 { 148 final int rsId = scanner.nextIntUTF8(); 149 final long generationId = scanner.nextLongUTF8(); 150 final byte groupId = scanner.nextByte(); 151 152 int weight = 1; 153 String serverUrl = null; 154 if (version >= REPLICATION_PROTOCOL_V4) 155 { 156 serverUrl = scanner.nextString(); 157 weight = scanner.nextIntUTF8(); 158 } 159 160 return new RSInfo(rsId, serverUrl, generationId, groupId, weight); 161 } 162 163 /** 164 * Creates a new message of the currently connected servers. 165 * 166 * @param dsInfos The collection of currently connected DS servers ID. 167 * @param rsInfos The list of currently connected RS servers ID. 168 */ 169 public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos) 170 { 171 if (dsInfos == null || dsInfos.isEmpty()) 172 { 173 this.replicaInfos = Collections.emptyMap(); 174 } 175 else 176 { 177 Map<Integer, DSInfo> replicas = new HashMap<>(); 178 for (DSInfo dsInfo : dsInfos) 179 { 180 replicas.put(dsInfo.getDsId(), dsInfo); 181 } 182 this.replicaInfos = Collections.unmodifiableMap(replicas); 183 } 184 185 if (rsInfos == null || rsInfos.isEmpty()) 186 { 187 this.rsInfos = Collections.emptyList(); 188 } 189 else 190 { 191 this.rsInfos = 192 Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos)); 193 } 194 } 195 196 // ============ 197 // Msg encoding 198 // ============ 199 200 /** {@inheritDoc} */ 201 @Override 202 public byte[] getBytes(short version) 203 { 204 /** 205 * Message has the following form: 206 * <pdu type><number of following DSInfo entries>[<DSInfo>]* 207 * <number of following RSInfo entries>[<RSInfo>]* 208 */ 209 final ByteArrayBuilder builder = new ByteArrayBuilder(); 210 builder.appendByte(MSG_TYPE_TOPOLOGY); 211 212 // Put DS infos 213 builder.appendByte(replicaInfos.size()); 214 for (DSInfo dsInfo : replicaInfos.values()) 215 { 216 builder.appendIntUTF8(dsInfo.getDsId()); 217 if (version >= REPLICATION_PROTOCOL_V6) 218 { 219 builder.appendString(dsInfo.getDsUrl()); 220 } 221 builder.appendIntUTF8(dsInfo.getRsId()); 222 builder.appendLongUTF8(dsInfo.getGenerationId()); 223 builder.appendByte(dsInfo.getStatus().getValue()); 224 builder.appendBoolean(dsInfo.isAssured()); 225 builder.appendByte(dsInfo.getAssuredMode().getValue()); 226 builder.appendByte(dsInfo.getSafeDataLevel()); 227 builder.appendByte(dsInfo.getGroupId()); 228 229 builder.appendStrings(dsInfo.getRefUrls()); 230 231 if (version >= REPLICATION_PROTOCOL_V4) 232 { 233 builder.appendStrings(dsInfo.getEclIncludes()); 234 if (version >= REPLICATION_PROTOCOL_V5) 235 { 236 builder.appendStrings(dsInfo.getEclIncludesForDeletes()); 237 } 238 builder.appendByte(dsInfo.getProtocolVersion()); 239 } 240 } 241 242 // Put RS infos 243 builder.appendByte(rsInfos.size()); 244 for (RSInfo rsInfo : rsInfos) 245 { 246 builder.appendIntUTF8(rsInfo.getId()); 247 builder.appendLongUTF8(rsInfo.getGenerationId()); 248 builder.appendByte(rsInfo.getGroupId()); 249 250 if (version >= REPLICATION_PROTOCOL_V4) 251 { 252 builder.appendString(rsInfo.getServerUrl()); 253 builder.appendIntUTF8(rsInfo.getWeight()); 254 } 255 } 256 257 return builder.toByteArray(); 258 } 259 260 /** {@inheritDoc} */ 261 @Override 262 public String toString() 263 { 264 String dsStr = ""; 265 for (DSInfo dsInfo : replicaInfos.values()) 266 { 267 dsStr += dsInfo + "\n----------------------------\n"; 268 } 269 270 String rsStr = ""; 271 for (RSInfo rsInfo : rsInfos) 272 { 273 rsStr += rsInfo + "\n----------------------------\n"; 274 } 275 276 return "TopologyMsg content:" 277 + "\n----------------------------" 278 + "\nCONNECTED DS SERVERS:" 279 + "\n--------------------\n" 280 + dsStr 281 + "CONNECTED RS SERVERS:" 282 + "\n--------------------\n" 283 + rsStr 284 + ("".equals(rsStr) ? "----------------------------\n" : ""); 285 } 286 287 /** 288 * Get the DS infos. 289 * 290 * @return The DS infos 291 */ 292 public Map<Integer, DSInfo> getReplicaInfos() 293 { 294 return replicaInfos; 295 } 296 297 /** 298 * Get the RS infos. 299 * 300 * @return The RS infos 301 */ 302 public List<RSInfo> getRsInfos() 303 { 304 return rsInfos; 305 } 306}