001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2006-2009 Sun Microsystems, Inc. 025 * Portions Copyright 2011-2015 ForgeRock AS 026 */ 027package org.opends.server.replication.protocol; 028 029import static org.opends.server.util.StaticUtils.*; 030 031import java.io.*; 032import java.net.Socket; 033import java.net.SocketException; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.LinkedBlockingQueue; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicBoolean; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import java.util.zip.DataFormatException; 041 042import javax.net.ssl.SSLSocket; 043 044import org.opends.server.api.DirectoryThread; 045import org.forgerock.i18n.slf4j.LocalizedLogger; 046import org.opends.server.util.StaticUtils; 047 048/** 049 * This class defines a replication session using TLS. 050 */ 051public final class Session extends DirectoryThread implements Closeable 052{ 053 private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); 054 055 private final Socket plainSocket; 056 private final SSLSocket secureSocket; 057 private final InputStream plainInput; 058 private final OutputStream plainOutput; 059 private final byte[] rcvLengthBuf = new byte[8]; 060 private final String readableRemoteAddress; 061 private final String remoteAddress; 062 private final String localUrl; 063 064 /** 065 * The time the last message published to this session. 066 */ 067 private volatile long lastPublishTime; 068 069 /** 070 * The time the last message was received on this session. 071 */ 072 private volatile long lastReceiveTime; 073 074 /** 075 * Close and error guarded by stateLock: use a different lock to publish since 076 * publishing can block, and we don't want to block while closing failed 077 * connections. 078 */ 079 private final Object stateLock = new Object(); 080 private volatile boolean closeInitiated; 081 private Throwable sessionError; 082 083 /** 084 * Publish guarded by publishLock: use a full lock here so that we can 085 * optionally publish StopMsg during close. 086 */ 087 private final Lock publishLock = new ReentrantLock(); 088 089 /** 090 * These do not need synchronization because they are only modified during the 091 * initial single threaded handshake. 092 */ 093 private short protocolVersion = ProtocolVersion.getCurrentVersion(); 094 /** Initially encrypted. */ 095 private boolean isEncrypted = true; 096 097 /** 098 * Use a buffered input stream to avoid too many system calls. 099 */ 100 private BufferedInputStream input; 101 102 /** 103 * Use a buffered output stream in order to combine message length and content 104 * into a single TCP packet if possible. 105 */ 106 private BufferedOutputStream output; 107 108 private final LinkedBlockingQueue<byte[]> sendQueue = new LinkedBlockingQueue<>(4000); 109 private AtomicBoolean isRunning = new AtomicBoolean(false); 110 private final CountDownLatch latch = new CountDownLatch(1); 111 112 /** 113 * Creates a new Session. 114 * 115 * @param socket 116 * The regular Socket on which the SocketSession will be based. 117 * @param secureSocket 118 * The secure Socket on which the SocketSession will be based. 119 * @throws IOException 120 * When an IException happens on the socket. 121 */ 122 public Session(final Socket socket, 123 final SSLSocket secureSocket) throws IOException 124 { 125 super("Replication Session from "+ socket.getLocalSocketAddress() + 126 " to " + socket.getRemoteSocketAddress()); 127 if (logger.isTraceEnabled()) 128 { 129 logger.trace( 130 "Creating Session from %s to %s in %s", 131 socket.getLocalSocketAddress(), 132 socket.getRemoteSocketAddress(), 133 stackTraceToSingleLineString(new Exception())); 134 } 135 136 this.plainSocket = socket; 137 this.secureSocket = secureSocket; 138 this.plainInput = plainSocket.getInputStream(); 139 this.plainOutput = plainSocket.getOutputStream(); 140 this.input = new BufferedInputStream(secureSocket.getInputStream()); 141 this.output = new BufferedOutputStream(secureSocket.getOutputStream()); 142 this.readableRemoteAddress = plainSocket.getRemoteSocketAddress() 143 .toString(); 144 this.remoteAddress = plainSocket.getInetAddress().getHostAddress(); 145 this.localUrl = plainSocket.getLocalAddress().getHostName() + ":" 146 + plainSocket.getLocalPort(); 147 } 148 149 150 151 /** 152 * This method is called when the session with the remote must be closed. 153 * This object won't be used anymore after this method is called. 154 */ 155 @Override 156 public void close() 157 { 158 Throwable localSessionError; 159 160 synchronized (stateLock) 161 { 162 if (closeInitiated) 163 { 164 return; 165 } 166 167 localSessionError = sessionError; 168 closeInitiated = true; 169 } 170 171 try { 172 interrupt(); 173 join(); 174 } 175 catch (InterruptedException e) { 176 Thread.currentThread().interrupt(); 177 } 178 179 // Perform close outside of critical section. 180 if (logger.isTraceEnabled()) 181 { 182 if (localSessionError == null) 183 { 184 logger.trace( 185 "Closing Session from %s to %s in %s", 186 plainSocket.getLocalSocketAddress(), 187 plainSocket.getRemoteSocketAddress(), 188 stackTraceToSingleLineString(new Exception())); 189 } 190 else 191 { 192 logger.traceException(localSessionError, 193 "Aborting Session from %s to %s in %s due to the following error", 194 plainSocket.getLocalSocketAddress(), 195 plainSocket.getRemoteSocketAddress(), 196 stackTraceToSingleLineString(new Exception())); 197 } 198 } 199 200 // V4 protocol introduces a StopMsg to properly end communications. 201 if (localSessionError == null 202 && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) 203 { 204 try 205 { 206 publish(new StopMsg()); 207 } 208 catch (final IOException ignored) 209 { 210 // Ignore errors on close. 211 } 212 } 213 214 StaticUtils.close(plainSocket, secureSocket); 215 } 216 217 218 219 /** 220 * This methods allows to determine if the session close was initiated 221 * on this Session. 222 * 223 * @return A boolean allowing to determine if the session close was initiated 224 * on this Session. 225 */ 226 public boolean closeInitiated() 227 { 228 synchronized (stateLock) 229 { 230 return closeInitiated; 231 } 232 } 233 234 235 236 /** 237 * Gets the time the last replication message was published on this 238 * session. 239 * @return The timestamp in milliseconds of the last message published. 240 */ 241 public long getLastPublishTime() 242 { 243 return lastPublishTime; 244 } 245 246 247 248 /** 249 * Gets the time the last replication message was received on this 250 * session. 251 * @return The timestamp in milliseconds of the last message received. 252 */ 253 public long getLastReceiveTime() 254 { 255 if (lastReceiveTime == 0) 256 { 257 return System.currentTimeMillis(); 258 } 259 return lastReceiveTime; 260 } 261 262 263 264 /** 265 * Retrieve the local URL in the form host:port. 266 * 267 * @return The local URL. 268 */ 269 public String getLocalUrl() 270 { 271 return localUrl; 272 } 273 274 275 276 /** 277 * Retrieve the human readable address of the remote server. 278 * 279 * @return The human readable address of the remote server. 280 */ 281 public String getReadableRemoteAddress() 282 { 283 return readableRemoteAddress; 284 } 285 286 287 288 /** 289 * Retrieve the IP address of the remote server. 290 * 291 * @return The IP address of the remote server. 292 */ 293 public String getRemoteAddress() 294 { 295 return remoteAddress; 296 } 297 298 299 300 /** 301 * Determine whether the session is using a security layer. 302 * @return true if the connection is encrypted, false otherwise. 303 */ 304 public boolean isEncrypted() 305 { 306 return isEncrypted; 307 } 308 309 310 311 /** 312 * Sends a replication message to the remote peer. 313 * 314 * @param msg 315 * The message to be sent. 316 * @throws IOException 317 * If an IO error occurred. 318 */ 319 public void publish(final ReplicationMsg msg) throws IOException 320 { 321 final byte[] buffer = msg.getBytes(protocolVersion); 322 if (buffer == null) 323 { 324 // skip anything that cannot be encoded for this peer. 325 return; 326 } 327 if (isRunning.get()) 328 { 329 while (!closeInitiated) 330 { 331 try 332 { 333 // Avoid blocking forever so that we can check for session closure. 334 if (sendQueue.offer(buffer, 100, TimeUnit.MILLISECONDS)) 335 { 336 return; 337 } 338 } 339 catch (final InterruptedException e) 340 { 341 setSessionError(e); 342 throw new IOException(e.getMessage()); 343 } 344 } 345 } 346 else 347 { 348 send(buffer); 349 } 350 } 351 352 /** Sends a replication message already encoded to the socket. 353 * 354 * @param buffer 355 * the encoded buffer 356 * @throws IOException if the message could not be sent 357 */ 358 private void send(final byte[] buffer) throws IOException 359 { 360 final String str = String.format("%08x", buffer.length); 361 final byte[] sendLengthBuf = str.getBytes(); 362 363 publishLock.lock(); 364 try 365 { 366 /* 367 * The buffered output stream ensures that the message is usually sent as 368 * a single TCP packet. 369 */ 370 output.write(sendLengthBuf); 371 output.write(buffer); 372 output.flush(); 373 } catch (final IOException e) { 374 setSessionError(e); 375 throw e; 376 } 377 finally 378 { 379 publishLock.unlock(); 380 } 381 382 lastPublishTime = System.currentTimeMillis(); 383 } 384 385 386 387 /** 388 * Attempt to receive a ReplicationMsg. 389 * This method should block the calling thread until a 390 * ReplicationMsg is available or until an error condition. 391 * 392 * This method can only be called by a single thread and therefore does not 393 * need to implement any replication. 394 * 395 * @return The ReplicationMsg that was received. 396 * @throws IOException When error happened during IO process. 397 * @throws DataFormatException When the data received is not formatted as a 398 * ReplicationMsg. 399 * @throws NotSupportedOldVersionPDUException If the received PDU is part of 400 * an old protocol version and we do not support it. 401 */ 402 public ReplicationMsg receive() throws IOException, 403 DataFormatException, NotSupportedOldVersionPDUException 404 { 405 try 406 { 407 /* 408 * Let's start the stop-watch before waiting on read for the heartbeat 409 * check to be operational. 410 */ 411 lastReceiveTime = System.currentTimeMillis(); 412 413 // Read the first 8 bytes containing the packet length. 414 read(rcvLengthBuf); 415 final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16); 416 417 try 418 { 419 final byte[] buffer = new byte[totalLength]; 420 read(buffer); 421 422 /* 423 * We do not want the heartbeat to close the session when we are 424 * processing a message even a time consuming one. 425 */ 426 lastReceiveTime = 0; 427 return ReplicationMsg.generateMsg(buffer, protocolVersion); 428 } 429 catch (final OutOfMemoryError e) 430 { 431 throw new IOException("Packet too large, can't allocate " 432 + totalLength + " bytes."); 433 } 434 } 435 catch (final IOException | DataFormatException | NotSupportedOldVersionPDUException | RuntimeException e) 436 { 437 setSessionError(e); 438 throw e; 439 } 440 } 441 442 private void read(byte[] buffer) throws IOException 443 { 444 final int totalLength = buffer.length; 445 int length = 0; 446 while (length < totalLength) 447 { 448 final int read = input.read(buffer, length, totalLength - length); 449 if (read == -1) 450 { 451 lastReceiveTime = 0; 452 throw new IOException("no more data"); 453 } 454 length += read; 455 } 456 } 457 458 /** 459 * This method is called at the establishment of the session and can 460 * be used to record the version of the protocol that is currently used. 461 * 462 * @param version The version of the protocol that is currently used. 463 */ 464 public void setProtocolVersion(final short version) 465 { 466 protocolVersion = version; 467 } 468 469 470 /** 471 * Returns the version of the protocol that is currently used. 472 * 473 * @return The version of the protocol that is currently used. 474 */ 475 public short getProtocolVersion() 476 { 477 return protocolVersion; 478 } 479 480 481 482 /** 483 * Set a timeout value. 484 * With this option set to a non-zero value, calls to the receive() method 485 * block for only this amount of time after which a 486 * java.net.SocketTimeoutException is raised. 487 * The Broker is valid and usable even after such an Exception is raised. 488 * 489 * @param timeout the specified timeout, in milliseconds. 490 * @throws SocketException if there is an error in the underlying protocol, 491 * such as a TCP error. 492 */ 493 public void setSoTimeout(final int timeout) throws SocketException 494 { 495 plainSocket.setSoTimeout(timeout); 496 } 497 498 499 500 /** 501 * Stop using the security layer, if there is any. 502 */ 503 public void stopEncryption() 504 { 505 /* 506 * The secure socket has been configured not to auto close the underlying 507 * plain socket. We should close it here and properly tear down the SSL 508 * session, but this is not compatible with the existing protocol. 509 */ 510 if (false) 511 { 512 StaticUtils.close(secureSocket); 513 } 514 515 input = new BufferedInputStream(plainInput); 516 output = new BufferedOutputStream(plainOutput); 517 isEncrypted = false; 518 } 519 520 521 522 private void setSessionError(final Exception e) 523 { 524 synchronized (stateLock) 525 { 526 if (sessionError == null) 527 { 528 sessionError = e; 529 } 530 } 531 } 532 533 /** 534 * Run method for the Session. 535 * Loops waiting for buffers from the queue and sends them when available. 536 */ 537 @Override 538 public void run() 539 { 540 isRunning.set(true); 541 latch.countDown(); 542 if (logger.isTraceEnabled()) 543 { 544 logger.trace(getName() + " starting."); 545 } 546 boolean needClosing = false; 547 while (!closeInitiated) 548 { 549 byte[] buffer; 550 try 551 { 552 buffer = sendQueue.take(); 553 } 554 catch (InterruptedException ie) 555 { 556 break; 557 } 558 try 559 { 560 send(buffer); 561 } 562 catch (IOException e) 563 { 564 setSessionError(e); 565 needClosing = true; 566 } 567 } 568 isRunning.set(false); 569 if (needClosing) 570 { 571 close(); 572 } 573 if (logger.isTraceEnabled()) 574 { 575 logger.trace(getName() + " stopped."); 576 } 577 } 578 579 /** 580 * This method can be called to wait until the session thread is 581 * properly started. 582 * @throws InterruptedException when interrupted 583 */ 584 public void waitForStartup() throws InterruptedException 585 { 586 latch.await(); 587 } 588}