001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2009 Sun Microsystems, Inc. 025 * Portions Copyright 2013-2015 ForgeRock AS. 026 */ 027package org.opends.server.replication.protocol; 028 029import java.io.IOException; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.Map; 033import java.util.Map.Entry; 034import java.util.zip.DataFormatException; 035 036import org.forgerock.opendj.io.ASN1Reader; 037import org.forgerock.opendj.io.ASN1Writer; 038import org.opends.server.replication.common.CSN; 039import org.opends.server.replication.common.ServerState; 040 041/** 042 * This message is part of the replication protocol. 043 * RS1 sends a MonitorRequestMessage to RS2 to requests its monitoring 044 * information. 045 * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a 046 * MonitorMsg. 047 */ 048public class MonitorMsg extends ReplicationMsg 049{ 050 /** 051 * The destination server or servers of this message. 052 */ 053 private final int destination; 054 055 /** 056 * The serverID of the server that sends this message. 057 */ 058 private final int senderID; 059 060 061 062 /** 063 * Data structure to manage the state and the approximation of the data of the 064 * first missing change for each LDAP server connected to a Replication 065 * Server. 066 */ 067 private static class ServerData 068 { 069 private ServerState state; 070 private long approxFirstMissingDate; 071 } 072 073 /** 074 * Data structure to manage the state of this replication server 075 * and the state information for the servers connected to it. 076 */ 077 private static class SubTopoMonitorData 078 { 079 /** This replication server DbState. */ 080 private ServerState replServerDbState; 081 /** The data related to the LDAP servers connected to this RS. */ 082 private final Map<Integer, ServerData> ldapStates = new HashMap<>(); 083 /** The data related to the RS servers connected to this RS. */ 084 private final Map<Integer, ServerData> rsStates = new HashMap<>(); 085 } 086 087 private final SubTopoMonitorData data = new SubTopoMonitorData(); 088 089 /** 090 * Creates a new MonitorMsg. 091 * 092 * @param sender The sender of this message. 093 * @param destination The destination of this message. 094 */ 095 public MonitorMsg(int sender, int destination) 096 { 097 this.senderID = sender; 098 this.destination = destination; 099 } 100 101 /** 102 * Sets the state of the replication server. 103 * @param state The state. 104 */ 105 public void setReplServerDbState(ServerState state) 106 { 107 data.replServerDbState = state; 108 } 109 110 /** 111 * Sets the information of an LDAP server. 112 * @param serverId The serverID. 113 * @param state The server state. 114 * @param approxFirstMissingDate The approximation of the date 115 * of the older missing change. null when none. 116 * @param isLDAPServer Specifies whether the server is a DS or a RS 117 */ 118 public void setServerState(int serverId, ServerState state, 119 long approxFirstMissingDate, boolean isLDAPServer) 120 { 121 final ServerData sd = new ServerData(); 122 sd.state = state; 123 sd.approxFirstMissingDate = approxFirstMissingDate; 124 if (isLDAPServer) 125 { 126 data.ldapStates.put(serverId, sd); 127 } 128 else 129 { 130 data.rsStates.put(serverId, sd); 131 } 132 } 133 134 /** 135 * Get the server state for the LDAP server with the provided serverId. 136 * @param serverId The provided serverId. 137 * @return The state. 138 */ 139 public ServerState getLDAPServerState(int serverId) 140 { 141 return data.ldapStates.get(serverId).state; 142 } 143 144 /** 145 * Get the server state for the RS server with the provided serverId. 146 * @param serverId The provided serverId. 147 * @return The state. 148 */ 149 public ServerState getRSServerState(int serverId) 150 { 151 return data.rsStates.get(serverId).state; 152 } 153 154 /** 155 * Get the approximation of the date of the older missing change for the 156 * LDAP Server with the provided server Id. 157 * @param serverId The provided serverId. 158 * @return The approximated state. 159 */ 160 public long getLDAPApproxFirstMissingDate(int serverId) 161 { 162 return data.ldapStates.get(serverId).approxFirstMissingDate; 163 } 164 165 /** 166 * Get the approximation of the date of the older missing change for the 167 * RS Server with the provided server Id. 168 * @param serverId The provided serverId. 169 * @return The approximated state. 170 */ 171 public long getRSApproxFirstMissingDate(int serverId) 172 { 173 return data.rsStates.get(serverId).approxFirstMissingDate; 174 } 175 176 /** 177 * Creates a new EntryMessage from its encoded form. 178 * 179 * @param in The byte array containing the encoded form of the message. 180 * @param version The version of the protocol to use to decode the msg. 181 * @throws DataFormatException If the byte array does not contain a valid 182 * encoded form of the ServerStartMessage. 183 */ 184 MonitorMsg(byte[] in, short version) throws DataFormatException 185 { 186 final ByteArrayScanner scanner = new ByteArrayScanner(in); 187 if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR) 188 { 189 throw new DataFormatException("input is not a valid " 190 + getClass().getCanonicalName()); 191 } 192 193 if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1) 194 { 195 this.senderID = scanner.nextIntUTF8(); 196 this.destination = scanner.nextIntUTF8(); 197 } 198 else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3) 199 { 200 this.senderID = scanner.nextShort(); 201 this.destination = scanner.nextShort(); 202 } 203 else 204 { 205 this.senderID = scanner.nextInt(); 206 this.destination = scanner.nextInt(); 207 } 208 209 ASN1Reader asn1Reader = scanner.getASN1Reader(); 210 try 211 { 212 asn1Reader.readStartSequence(); 213 // loop on the servers 214 while(asn1Reader.hasNextElement()) 215 { 216 ServerState newState = new ServerState(); 217 int serverId = 0; 218 long outime = 0; 219 boolean isLDAPServer = false; 220 221 asn1Reader.readStartSequence(); 222 // loop on the list of CSN of the state 223 while(asn1Reader.hasNextElement()) 224 { 225 CSN csn; 226 if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7) 227 { 228 csn = CSN.valueOf(asn1Reader.readOctetString()); 229 } 230 else 231 { 232 csn = CSN.valueOf(asn1Reader.readOctetStringAsString()); 233 } 234 235 if (data.replServerDbState != null && serverId == 0) 236 { 237 // we are on the first CSN that is a fake CSN to store the serverId 238 // and the older update time 239 serverId = csn.getServerId(); 240 outime = csn.getTime(); 241 isLDAPServer = csn.getSeqnum() > 0; 242 } 243 else 244 { 245 // we are on a normal CSN 246 newState.update(csn); 247 } 248 } 249 asn1Reader.readEndSequence(); 250 251 if (data.replServerDbState == null) 252 { 253 // the first state is the replication state 254 data.replServerDbState = newState; 255 } 256 else 257 { 258 // the next states are the server states 259 setServerState(serverId, newState, outime, isLDAPServer); 260 } 261 } 262 asn1Reader.readEndSequence(); 263 } catch(Exception e) 264 { /* do nothing */ 265 } 266 } 267 268 /** {@inheritDoc} */ 269 @Override 270 public byte[] getBytes(short protocolVersion) 271 { 272 try 273 { 274 final ByteArrayBuilder builder = new ByteArrayBuilder(); 275 builder.appendByte(MSG_TYPE_REPL_SERVER_MONITOR); 276 append(builder, senderID, protocolVersion); 277 append(builder, destination, protocolVersion); 278 279 /* Put the serverStates ... */ 280 ASN1Writer writer = builder.getASN1Writer(); 281 writer.writeStartSequence(); 282 { 283 /* first put the Replication Server state */ 284 writer.writeStartSequence(); 285 { 286 data.replServerDbState.writeTo(writer, protocolVersion); 287 } 288 writer.writeEndSequence(); 289 290 // then the DS + RS server data 291 writeServerStates(protocolVersion, writer, false /* DS */); 292 writeServerStates(protocolVersion, writer, true /* RS */); 293 } 294 writer.writeEndSequence(); 295 296 if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) 297 { 298 // legacy coding mistake 299 builder.appendByte(0); 300 } 301 return builder.toByteArray(); 302 } 303 catch (Exception e) 304 { 305 return null; 306 } 307 } 308 309 private void append(final ByteArrayBuilder builder, int data, 310 short protocolVersion) 311 { 312 if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) 313 { 314 builder.appendIntUTF8(data); 315 } 316 else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3) 317 { 318 builder.appendShort(data); 319 } 320 else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4 321 { 322 builder.appendInt(data); 323 } 324 } 325 326 private void writeServerStates(short protocolVersion, ASN1Writer writer, 327 boolean writeRSStates) throws IOException 328 { 329 final Map<Integer, ServerData> servers = 330 writeRSStates ? data.rsStates : data.ldapStates; 331 final int seqNum = writeRSStates ? 0 : 1; 332 for (Map.Entry<Integer, ServerData> server : servers.entrySet()) 333 { 334 writer.writeStartSequence(); 335 { 336 /* 337 * A fake CSN helps storing the LDAP server ID. The sequence number will 338 * be used to differentiate between an LDAP server (1) or an RS (0). 339 */ 340 CSN csn = new CSN( 341 server.getValue().approxFirstMissingDate, seqNum, 342 server.getKey()); 343 if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7) 344 { 345 writer.writeOctetString(csn.toByteString()); 346 } 347 else 348 { 349 writer.writeOctetString(csn.toString()); 350 } 351 352 // the CSNs that make the state 353 server.getValue().state.writeTo(writer, protocolVersion); 354 } 355 writer.writeEndSequence(); 356 } 357 } 358 359 /** 360 * Get the state of the replication server that sent this message. 361 * @return The state. 362 */ 363 public ServerState getReplServerDbState() 364 { 365 return data.replServerDbState; 366 } 367 368 /** 369 * Returns an iterator on the serverId of the connected LDAP servers. 370 * @return The iterator. 371 */ 372 public Iterator<Integer> ldapIterator() 373 { 374 return data.ldapStates.keySet().iterator(); 375 } 376 377 /** 378 * Returns an iterator on the serverId of the connected RS servers. 379 * @return The iterator. 380 */ 381 public Iterator<Integer> rsIterator() 382 { 383 return data.rsStates.keySet().iterator(); 384 } 385 386 /** 387 * Get the destination. 388 * 389 * @return the destination 390 */ 391 public int getDestination() 392 { 393 return destination; 394 } 395 396 /** 397 * Get the server ID of the server that sent this message. 398 * 399 * @return the server id 400 */ 401 public int getSenderID() 402 { 403 return senderID; 404 } 405 406 /** {@inheritDoc} */ 407 @Override 408 public String toString() 409 { 410 final StringBuilder stateS = new StringBuilder("\nRState:["); 411 stateS.append(data.replServerDbState); 412 stateS.append("]"); 413 414 stateS.append("\nLDAPStates:["); 415 for (Entry<Integer, ServerData> entry : data.ldapStates.entrySet()) 416 { 417 ServerData sd = entry.getValue(); 418 stateS.append("\n[LSstate(").append(entry.getKey()).append(")=") 419 .append(sd.state).append("]").append(" afmd=") 420 .append(sd.approxFirstMissingDate).append("]"); 421 } 422 423 stateS.append("\nRSStates:["); 424 for (Entry<Integer, ServerData> entry : data.rsStates.entrySet()) 425 { 426 final ServerData sd = entry.getValue(); 427 stateS.append("\n[RSState(").append(entry.getKey()).append(")=") 428 .append(sd.state).append("]").append(" afmd=") 429 .append(sd.approxFirstMissingDate).append("]"); 430 } 431 return getClass().getCanonicalName() + 432 "[ sender=" + this.senderID + 433 " destination=" + this.destination + 434 " data=[" + stateS + "]" + 435 "]"; 436 } 437}