001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2006-2009 Sun Microsystems, Inc.
025 *      Portions Copyright 2011-2015 ForgeRock AS
026 */
027package org.opends.server.replication.protocol;
028
029import static org.opends.server.util.StaticUtils.*;
030
031import java.io.*;
032import java.net.Socket;
033import java.net.SocketException;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.LinkedBlockingQueue;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicBoolean;
038import java.util.concurrent.locks.Lock;
039import java.util.concurrent.locks.ReentrantLock;
040import java.util.zip.DataFormatException;
041
042import javax.net.ssl.SSLSocket;
043
044import org.opends.server.api.DirectoryThread;
045import org.forgerock.i18n.slf4j.LocalizedLogger;
046import org.opends.server.util.StaticUtils;
047
048/**
049 * This class defines a replication session using TLS.
050 */
051public final class Session extends DirectoryThread implements Closeable
052{
053  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
054
055  private final Socket plainSocket;
056  private final SSLSocket secureSocket;
057  private final InputStream plainInput;
058  private final OutputStream plainOutput;
059  private final byte[] rcvLengthBuf = new byte[8];
060  private final String readableRemoteAddress;
061  private final String remoteAddress;
062  private final String localUrl;
063
064  /**
065   * The time the last message published to this session.
066   */
067  private volatile long lastPublishTime;
068
069  /**
070   * The time the last message was received on this session.
071   */
072  private volatile long lastReceiveTime;
073
074  /**
075   * Close and error guarded by stateLock: use a different lock to publish since
076   * publishing can block, and we don't want to block while closing failed
077   * connections.
078   */
079  private final Object stateLock = new Object();
080  private volatile boolean closeInitiated;
081  private Throwable sessionError;
082
083  /**
084   * Publish guarded by publishLock: use a full lock here so that we can
085   * optionally publish StopMsg during close.
086   */
087  private final Lock publishLock = new ReentrantLock();
088
089  /**
090   * These do not need synchronization because they are only modified during the
091   * initial single threaded handshake.
092   */
093  private short protocolVersion = ProtocolVersion.getCurrentVersion();
094  /** Initially encrypted. */
095  private boolean isEncrypted = true;
096
097  /**
098   * Use a buffered input stream to avoid too many system calls.
099   */
100  private BufferedInputStream input;
101
102  /**
103   * Use a buffered output stream in order to combine message length and content
104   * into a single TCP packet if possible.
105   */
106  private BufferedOutputStream output;
107
108  private final LinkedBlockingQueue<byte[]> sendQueue = new LinkedBlockingQueue<>(4000);
109  private AtomicBoolean isRunning = new AtomicBoolean(false);
110  private final CountDownLatch latch = new CountDownLatch(1);
111
112  /**
113   * Creates a new Session.
114   *
115   * @param socket
116   *          The regular Socket on which the SocketSession will be based.
117   * @param secureSocket
118   *          The secure Socket on which the SocketSession will be based.
119   * @throws IOException
120   *           When an IException happens on the socket.
121   */
122  public Session(final Socket socket,
123                 final SSLSocket secureSocket) throws IOException
124  {
125    super("Replication Session from "+ socket.getLocalSocketAddress() +
126        " to " + socket.getRemoteSocketAddress());
127    if (logger.isTraceEnabled())
128    {
129      logger.trace(
130          "Creating Session from %s to %s in %s",
131          socket.getLocalSocketAddress(),
132          socket.getRemoteSocketAddress(),
133          stackTraceToSingleLineString(new Exception()));
134    }
135
136    this.plainSocket = socket;
137    this.secureSocket = secureSocket;
138    this.plainInput = plainSocket.getInputStream();
139    this.plainOutput = plainSocket.getOutputStream();
140    this.input = new BufferedInputStream(secureSocket.getInputStream());
141    this.output = new BufferedOutputStream(secureSocket.getOutputStream());
142    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
143        .toString();
144    this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
145    this.localUrl = plainSocket.getLocalAddress().getHostName() + ":"
146        + plainSocket.getLocalPort();
147  }
148
149
150
151  /**
152   * This method is called when the session with the remote must be closed.
153   * This object won't be used anymore after this method is called.
154   */
155  @Override
156  public void close()
157  {
158    Throwable localSessionError;
159
160    synchronized (stateLock)
161    {
162      if (closeInitiated)
163      {
164        return;
165      }
166
167      localSessionError = sessionError;
168      closeInitiated = true;
169    }
170
171    try {
172      interrupt();
173      join();
174    }
175    catch (InterruptedException e) {
176      Thread.currentThread().interrupt();
177    }
178
179    // Perform close outside of critical section.
180    if (logger.isTraceEnabled())
181    {
182      if (localSessionError == null)
183      {
184        logger.trace(
185            "Closing Session from %s to %s in %s",
186            plainSocket.getLocalSocketAddress(),
187            plainSocket.getRemoteSocketAddress(),
188            stackTraceToSingleLineString(new Exception()));
189      }
190      else
191      {
192        logger.traceException(localSessionError,
193            "Aborting Session from %s to %s in %s due to the following error",
194            plainSocket.getLocalSocketAddress(),
195            plainSocket.getRemoteSocketAddress(),
196            stackTraceToSingleLineString(new Exception()));
197      }
198    }
199
200    // V4 protocol introduces a StopMsg to properly end communications.
201    if (localSessionError == null
202        && protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
203    {
204      try
205      {
206        publish(new StopMsg());
207      }
208      catch (final IOException ignored)
209      {
210        // Ignore errors on close.
211      }
212    }
213
214    StaticUtils.close(plainSocket, secureSocket);
215  }
216
217
218
219  /**
220   * This methods allows to determine if the session close was initiated
221   * on this Session.
222   *
223   * @return A boolean allowing to determine if the session close was initiated
224   * on this Session.
225   */
226  public boolean closeInitiated()
227  {
228    synchronized (stateLock)
229    {
230      return closeInitiated;
231    }
232  }
233
234
235
236  /**
237   * Gets the time the last replication message was published on this
238   * session.
239   * @return The timestamp in milliseconds of the last message published.
240   */
241  public long getLastPublishTime()
242  {
243    return lastPublishTime;
244  }
245
246
247
248  /**
249   * Gets the time the last replication message was received on this
250   * session.
251   * @return The timestamp in milliseconds of the last message received.
252   */
253  public long getLastReceiveTime()
254  {
255    if (lastReceiveTime == 0)
256    {
257      return System.currentTimeMillis();
258    }
259    return lastReceiveTime;
260  }
261
262
263
264  /**
265   * Retrieve the local URL in the form host:port.
266   *
267   * @return The local URL.
268   */
269  public String getLocalUrl()
270  {
271    return localUrl;
272  }
273
274
275
276  /**
277   * Retrieve the human readable address of the remote server.
278   *
279   * @return The human readable address of the remote server.
280   */
281  public String getReadableRemoteAddress()
282  {
283    return readableRemoteAddress;
284  }
285
286
287
288  /**
289   * Retrieve the IP address of the remote server.
290   *
291   * @return The IP address of the remote server.
292   */
293  public String getRemoteAddress()
294  {
295    return remoteAddress;
296  }
297
298
299
300  /**
301   * Determine whether the session is using a security layer.
302   * @return true if the connection is encrypted, false otherwise.
303   */
304  public boolean isEncrypted()
305  {
306    return isEncrypted;
307  }
308
309
310
311  /**
312   * Sends a replication message to the remote peer.
313   *
314   * @param msg
315   *          The message to be sent.
316   * @throws IOException
317   *           If an IO error occurred.
318   */
319  public void publish(final ReplicationMsg msg) throws IOException
320  {
321    final byte[] buffer = msg.getBytes(protocolVersion);
322    if (buffer == null)
323    {
324      // skip anything that cannot be encoded for this peer.
325      return;
326    }
327    if (isRunning.get())
328    {
329      while (!closeInitiated)
330      {
331        try
332        {
333          // Avoid blocking forever so that we can check for session closure.
334          if (sendQueue.offer(buffer, 100, TimeUnit.MILLISECONDS))
335          {
336            return;
337          }
338        }
339        catch (final InterruptedException e)
340        {
341          setSessionError(e);
342          throw new IOException(e.getMessage());
343        }
344      }
345    }
346    else
347    {
348      send(buffer);
349    }
350  }
351
352  /** Sends a replication message already encoded to the socket.
353   *
354   * @param buffer
355   *          the encoded buffer
356   * @throws IOException if the message could not be sent
357   */
358  private void send(final byte[] buffer) throws IOException
359  {
360    final String str = String.format("%08x", buffer.length);
361    final byte[] sendLengthBuf = str.getBytes();
362
363    publishLock.lock();
364    try
365    {
366      /*
367       * The buffered output stream ensures that the message is usually sent as
368       * a single TCP packet.
369       */
370      output.write(sendLengthBuf);
371      output.write(buffer);
372      output.flush();
373    } catch (final IOException e) {
374      setSessionError(e);
375      throw e;
376    }
377    finally
378    {
379      publishLock.unlock();
380    }
381
382    lastPublishTime = System.currentTimeMillis();
383  }
384
385
386
387  /**
388   * Attempt to receive a ReplicationMsg.
389   * This method should block the calling thread until a
390   * ReplicationMsg is available or until an error condition.
391   *
392   * This method can only be called by a single thread and therefore does not
393   * need to implement any replication.
394   *
395   * @return The ReplicationMsg that was received.
396   * @throws IOException When error happened during IO process.
397   * @throws DataFormatException When the data received is not formatted as a
398   *         ReplicationMsg.
399   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
400   * an old protocol version and we do not support it.
401   */
402  public ReplicationMsg receive() throws IOException,
403      DataFormatException, NotSupportedOldVersionPDUException
404  {
405    try
406    {
407      /*
408       * Let's start the stop-watch before waiting on read for the heartbeat
409       * check to be operational.
410       */
411      lastReceiveTime = System.currentTimeMillis();
412
413      // Read the first 8 bytes containing the packet length.
414      read(rcvLengthBuf);
415      final int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
416
417      try
418      {
419        final byte[] buffer = new byte[totalLength];
420        read(buffer);
421
422        /*
423         * We do not want the heartbeat to close the session when we are
424         * processing a message even a time consuming one.
425         */
426        lastReceiveTime = 0;
427        return ReplicationMsg.generateMsg(buffer, protocolVersion);
428      }
429      catch (final OutOfMemoryError e)
430      {
431        throw new IOException("Packet too large, can't allocate "
432            + totalLength + " bytes.");
433      }
434    }
435    catch (final IOException | DataFormatException | NotSupportedOldVersionPDUException | RuntimeException e)
436    {
437      setSessionError(e);
438      throw e;
439    }
440  }
441
442  private void read(byte[] buffer) throws IOException
443  {
444    final int totalLength = buffer.length;
445    int length = 0;
446    while (length < totalLength)
447    {
448      final int read = input.read(buffer, length, totalLength - length);
449      if (read == -1)
450      {
451        lastReceiveTime = 0;
452        throw new IOException("no more data");
453      }
454      length += read;
455    }
456  }
457
458  /**
459   * This method is called at the establishment of the session and can
460   * be used to record the version of the protocol that is currently used.
461   *
462   * @param version The version of the protocol that is currently used.
463   */
464  public void setProtocolVersion(final short version)
465  {
466    protocolVersion = version;
467  }
468
469
470  /**
471   * Returns the version of the protocol that is currently used.
472   *
473   * @return The version of the protocol that is currently used.
474   */
475  public short getProtocolVersion()
476  {
477    return protocolVersion;
478  }
479
480
481
482  /**
483   * Set a timeout value.
484   * With this option set to a non-zero value, calls to the receive() method
485   * block for only this amount of time after which a
486   * java.net.SocketTimeoutException is raised.
487   * The Broker is valid and usable even after such an Exception is raised.
488   *
489   * @param timeout the specified timeout, in milliseconds.
490   * @throws SocketException if there is an error in the underlying protocol,
491   *         such as a TCP error.
492   */
493  public void setSoTimeout(final int timeout) throws SocketException
494  {
495    plainSocket.setSoTimeout(timeout);
496  }
497
498
499
500  /**
501   * Stop using the security layer, if there is any.
502   */
503  public void stopEncryption()
504  {
505    /*
506     * The secure socket has been configured not to auto close the underlying
507     * plain socket. We should close it here and properly tear down the SSL
508     * session, but this is not compatible with the existing protocol.
509     */
510    if (false)
511    {
512      StaticUtils.close(secureSocket);
513    }
514
515    input = new BufferedInputStream(plainInput);
516    output = new BufferedOutputStream(plainOutput);
517    isEncrypted = false;
518  }
519
520
521
522  private void setSessionError(final Exception e)
523  {
524    synchronized (stateLock)
525    {
526      if (sessionError == null)
527      {
528        sessionError = e;
529      }
530    }
531  }
532
533  /**
534   * Run method for the Session.
535   * Loops waiting for buffers from the queue and sends them when available.
536   */
537  @Override
538  public void run()
539  {
540    isRunning.set(true);
541    latch.countDown();
542    if (logger.isTraceEnabled())
543    {
544      logger.trace(getName() + " starting.");
545    }
546    boolean needClosing = false;
547    while (!closeInitiated)
548    {
549      byte[] buffer;
550      try
551      {
552        buffer = sendQueue.take();
553      }
554      catch (InterruptedException ie)
555      {
556        break;
557      }
558      try
559      {
560        send(buffer);
561      }
562      catch (IOException e)
563      {
564        setSessionError(e);
565        needClosing = true;
566      }
567    }
568    isRunning.set(false);
569    if (needClosing)
570    {
571      close();
572    }
573    if (logger.isTraceEnabled())
574    {
575      logger.trace(getName() + " stopped.");
576    }
577  }
578
579  /**
580   * This method can be called to wait until the session thread is
581   * properly started.
582   * @throws InterruptedException when interrupted
583   */
584  public void waitForStartup() throws InterruptedException
585  {
586    latch.await();
587  }
588}