001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
010 * or http://forgerock.org/license/CDDLv1.0.html.
011 * See the License for the specific language governing permissions
012 * and limitations under the License.
013 *
014 * When distributing Covered Code, include this CDDL HEADER in each
015 * file and include the License file at legal-notices/CDDLv1_0.txt.
016 * If applicable, add the following below this CDDL HEADER, with the
017 * fields enclosed by brackets "[]" replaced with your own identifying
018 * information:
019 *      Portions Copyright [yyyy] [name of copyright owner]
020 *
021 * CDDL HEADER END
022 *
023 *
024 *      Copyright 2008 Sun Microsystems, Inc.
025 *      Portions Copyright 2014-2015 ForgeRock AS
026 */
027package org.opends.server.protocols.internal;
028
029
030
031import java.io.IOException;
032import java.io.InputStream;
033import java.util.concurrent.ArrayBlockingQueue;
034
035import org.forgerock.opendj.io.ASN1;
036import org.forgerock.opendj.io.ASN1Writer;
037import org.opends.server.protocols.ldap.LDAPMessage;
038import org.forgerock.opendj.ldap.ByteSequenceReader;
039import org.forgerock.opendj.ldap.ByteStringBuilder;
040
041
042/**
043 * This class provides an implementation of a
044 * {@code java.io.InputStream} that can be used to facilitate internal
045 * communication with the Directory Server.  On the backend, this
046 * input stream will be populated by ASN.1 elements encoded from LDAP
047 * messages created from internal operation responses.
048 */
049@org.opends.server.types.PublicAPI(
050     stability=org.opends.server.types.StabilityLevel.UNCOMMITTED,
051     mayInstantiate=false,
052     mayExtend=false,
053     mayInvoke=true)
054public final class InternalLDAPInputStream
055       extends InputStream
056{
057  /**
058   * The queue of LDAP messages providing the data to be made
059   * available to the client.
060   */
061  private final ArrayBlockingQueue<LDAPMessage> messageQueue;
062
063  /** Indicates whether this stream has been closed. */
064  private boolean closed;
065
066  /** The byte buffer with partial data to be written to the client. */
067  private final ByteStringBuilder messageBuffer;
068
069  /** The byte buffer reader. */
070  private final ByteSequenceReader messageReader;
071
072  /** The byte buffer writer. */
073  private final ASN1Writer writer;
074
075  /** The internal LDAP socket serviced by this input stream. */
076  private final InternalLDAPSocket socket;
077
078
079
080  /**
081   * Creates a new internal LDAP input stream that will service the
082   * provided internal LDAP socket.
083   *
084   * @param  socket  The internal LDAP socket serviced by this
085   *                 internal LDAP input stream.
086   */
087  public InternalLDAPInputStream(InternalLDAPSocket socket)
088  {
089    this.socket = socket;
090    this.messageQueue = new ArrayBlockingQueue<>(10);
091    this.messageBuffer = new ByteStringBuilder();
092    this.messageReader = messageBuffer.asReader();
093    this.writer = ASN1.getWriter(messageBuffer);
094    this.closed = false;
095  }
096
097
098
099  /**
100   * Adds the provided LDAP message to the set of messages to be
101   * returned to the client.  Note that this may block if there is
102   * already a significant backlog of messages to be returned.
103   *
104   * @param  message  The message to add to the set of messages to be
105   *                  returned to the client.
106   */
107  @org.opends.server.types.PublicAPI(
108       stability=org.opends.server.types.StabilityLevel.PRIVATE,
109       mayInstantiate=false,
110       mayExtend=false,
111       mayInvoke=false)
112  void addLDAPMessage(LDAPMessage message)
113  {
114    // If the stream is closed, then simply drop the message.
115    if (closed)
116    {
117      return;
118    }
119
120    try
121    {
122      messageQueue.put(message);
123      return;
124    }
125    catch (Exception e)
126    {
127      // This shouldn't happen, but if it does then try three more
128      // times before giving up and dropping the message.
129      for (int i=0; i < 3; i++)
130      {
131        try
132        {
133          messageQueue.put(message);
134          break;
135        } catch (Exception e2) {}
136      }
137
138      return;
139    }
140  }
141
142
143
144  /**
145   * Retrieves the number of bytes that can be read (or skipped over)
146   * from this input stream without blocking.
147   *
148   * @return  The number of bytes that can be read (or skipped over)
149   *          from this input stream without blocking.
150   * @throws IOException if an I/O error occurs.
151   */
152  @Override
153  public synchronized int available() throws IOException
154  {
155    if (messageReader.remaining() < 1)
156    {
157      LDAPMessage message = messageQueue.poll();
158      if (message == null || message instanceof NullLDAPMessage)
159      {
160        if (message != null)
161        {
162          messageQueue.clear();
163          closed = true;
164        }
165
166        return 0;
167      }
168      else
169      {
170        messageBuffer.clear();
171        messageReader.rewind();
172        message.write(writer);
173      }
174    }
175
176    return messageReader.remaining();
177  }
178
179
180
181  /**
182   * Closes this input stream.  This will add a special marker
183   * element to the message queue indicating that the end of the
184   * stream has been reached.  If the queue is full, then it will be
185   * cleared before adding the marker element.
186   */
187  @Override
188  public void close()
189  {
190    socket.close();
191  }
192
193
194
195  /**
196   * Closes this input stream through an internal mechanism that will
197   * not cause an infinite recursion loop by trying to also close the
198   * input stream.
199   */
200  @org.opends.server.types.PublicAPI(
201       stability=org.opends.server.types.StabilityLevel.PRIVATE,
202       mayInstantiate=false,
203       mayExtend=false,
204       mayInvoke=false)
205  void closeInternal()
206  {
207    if (closed)
208    {
209      return;
210    }
211
212    closed = true;
213    NullLDAPMessage nullMessage = new NullLDAPMessage();
214
215    while (! messageQueue.offer(nullMessage))
216    {
217      messageQueue.clear();
218    }
219  }
220
221
222
223  /**
224   * Marks the current position in the input stream.  This will not
225   * have any effect, as this input stream implementation does not
226   * support marking.
227   *
228   * @param  readLimit  The maximum limit of bytes that can be read
229   *                    before the mark position becomes invalid.
230   */
231  @Override
232  public void mark(int readLimit)
233  {
234    // No implementation is required.
235  }
236
237
238
239  /**
240   * Indicates whether this input stream implementation supports the
241   * use of the {@code mark} and {@code reset} methods.  This
242   * implementation does not support that functionality.
243   *
244   * @return  {@code false} because this implementation does not
245   *          support the use of the {@code mark} and {@code reset}
246   *          methods.
247   */
248  @Override
249  public boolean markSupported()
250  {
251    return false;
252  }
253
254
255
256  /**
257   * Reads the next byte of data from the input stream, blocking if
258   * necessary until there is data available.
259   *
260   * @return  The next byte of data read from the input stream, or -1
261   *          if the end of the input stream has been reached.
262   *
263   * @throws  IOException  If a problem occurs while trying to read
264   *                       data from the stream.
265   */
266  @Override
267  public synchronized int read()
268         throws IOException
269  {
270    if (messageReader.remaining() < 1)
271    {
272      LDAPMessage message;
273      try
274      {
275        message = messageQueue.take();
276      }
277      catch(InterruptedException ie)
278      {
279        // Probably because a shutdown was started. EOF
280        message = new NullLDAPMessage();
281      }
282
283      if (message == null || message instanceof NullLDAPMessage)
284      {
285        if (message instanceof NullLDAPMessage)
286        {
287          messageQueue.clear();
288          closed = true;
289          return -1;
290        }
291
292        return 0;
293      }
294      else
295      {
296        messageBuffer.clear();
297        messageReader.rewind();
298        message.write(writer);
299      }
300    }
301
302    return 0xFF & messageReader.readByte();
303  }
304
305
306
307  /**
308   * Reads some number of bytes from the input stream, blocking if
309   * necessary until there is data available, and adds them to the
310   * provided array starting at position 0.
311   *
312   * @param  b  The array to which the data is to be written.
313   *
314   * @return  The number of bytes actually written into the
315   *          provided array, or -1 if the end of the stream has been
316   *          reached.
317   *
318   * @throws  IOException  If a problem occurs while trying to read
319   *                       data from the stream.
320   */
321  @Override
322  public int read(byte[] b)
323         throws IOException
324  {
325    return read(b, 0, b.length);
326  }
327
328
329
330  /**
331   * Reads some number of bytes from the input stream, blocking if
332   * necessary until there is data available, and adds them to the
333   * provided array starting at the specified position.
334   *
335   * @param  b    The array to which the data is to be written.
336   * @param  off  The offset in the array at which to start writing
337   *              data.
338   * @param  len  The maximum number of bytes that may be added to the
339   *              array.
340   *
341   * @return  The number of bytes actually written into the
342   *          provided array, or -1 if the end of the stream has been
343   *          reached.
344   *
345   * @throws  IOException  If a problem occurs while trying to read
346   *                       data from the stream.
347   */
348  @Override
349  public synchronized int read(byte[] b, int off, int len)
350         throws IOException
351  {
352    if (messageReader.remaining() < 1)
353    {
354      LDAPMessage message;
355      try
356      {
357        message = messageQueue.take();
358      }
359      catch(InterruptedException ie)
360      {
361        // Probably because a shutdown was started. EOF
362        message = new NullLDAPMessage();
363      }
364
365      if (message == null || message instanceof NullLDAPMessage)
366      {
367        if (message instanceof NullLDAPMessage)
368        {
369          messageQueue.clear();
370          closed = true;
371          return -1;
372        }
373
374        return 0;
375      }
376      else
377      {
378        messageBuffer.clear();
379        messageReader.rewind();
380        message.write(writer);
381      }
382    }
383
384    int actualLen = Math.min(len, messageReader.remaining());
385    messageReader.readBytes(b, off, actualLen);
386    return actualLen;
387  }
388
389
390
391  /**
392   * Repositions this stream to the position at the time that the
393   * {@code mark} method was called on this stream.  This will not
394   * have any effect, as this input stream implementation does not
395   * support marking.
396   */
397  @Override
398  public void reset()
399  {
400    // No implementation is required.
401  }
402
403
404
405  /**
406   * Skips over and discards up to the specified number of bytes of
407   * data from this input stream.  This implementation will always
408   * skip the requested number of bytes unless the end of the stream
409   * is reached.
410   *
411   * @param  n  The maximum number of bytes to skip.
412   *
413   * @return  The number of bytes actually skipped.
414   *
415   * @throws  IOException  If a problem occurs while trying to read
416   *                       data from the input stream.
417   */
418  @Override
419  public synchronized long skip(long n)
420         throws IOException
421  {
422    byte[] b;
423    if (n > 8192)
424    {
425      b = new byte[8192];
426    }
427    else
428    {
429      b = new byte[(int) n];
430    }
431
432    long totalBytesRead = 0L;
433    while (totalBytesRead < n)
434    {
435      int maxLen = (int) Math.min((n - totalBytesRead), b.length);
436
437      int bytesRead = read(b, 0, maxLen);
438      if (bytesRead < 0)
439      {
440        if (totalBytesRead > 0)
441        {
442          return totalBytesRead;
443        }
444        else
445        {
446          return bytesRead;
447        }
448      }
449      else
450      {
451        totalBytesRead += bytesRead;
452      }
453    }
454
455    return totalBytesRead;
456  }
457
458
459
460  /**
461   * Retrieves a string representation of this internal LDAP socket.
462   *
463   * @return  A string representation of this internal LDAP socket.
464   */
465  @Override
466  public String toString()
467  {
468    return getClass().getSimpleName();
469  }
470}