001/* 002 * CDDL HEADER START 003 * 004 * The contents of this file are subject to the terms of the 005 * Common Development and Distribution License, Version 1.0 only 006 * (the "License"). You may not use this file except in compliance 007 * with the License. 008 * 009 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt 010 * or http://forgerock.org/license/CDDLv1.0.html. 011 * See the License for the specific language governing permissions 012 * and limitations under the License. 013 * 014 * When distributing Covered Code, include this CDDL HEADER in each 015 * file and include the License file at legal-notices/CDDLv1_0.txt. 016 * If applicable, add the following below this CDDL HEADER, with the 017 * fields enclosed by brackets "[]" replaced with your own identifying 018 * information: 019 * Portions Copyright [yyyy] [name of copyright owner] 020 * 021 * CDDL HEADER END 022 * 023 * 024 * Copyright 2008 Sun Microsystems, Inc. 025 * Portions Copyright 2014-2015 ForgeRock AS 026 */ 027package org.opends.server.protocols.internal; 028 029 030 031import java.io.IOException; 032import java.io.InputStream; 033import java.util.concurrent.ArrayBlockingQueue; 034 035import org.forgerock.opendj.io.ASN1; 036import org.forgerock.opendj.io.ASN1Writer; 037import org.opends.server.protocols.ldap.LDAPMessage; 038import org.forgerock.opendj.ldap.ByteSequenceReader; 039import org.forgerock.opendj.ldap.ByteStringBuilder; 040 041 042/** 043 * This class provides an implementation of a 044 * {@code java.io.InputStream} that can be used to facilitate internal 045 * communication with the Directory Server. On the backend, this 046 * input stream will be populated by ASN.1 elements encoded from LDAP 047 * messages created from internal operation responses. 048 */ 049@org.opends.server.types.PublicAPI( 050 stability=org.opends.server.types.StabilityLevel.UNCOMMITTED, 051 mayInstantiate=false, 052 mayExtend=false, 053 mayInvoke=true) 054public final class InternalLDAPInputStream 055 extends InputStream 056{ 057 /** 058 * The queue of LDAP messages providing the data to be made 059 * available to the client. 060 */ 061 private final ArrayBlockingQueue<LDAPMessage> messageQueue; 062 063 /** Indicates whether this stream has been closed. */ 064 private boolean closed; 065 066 /** The byte buffer with partial data to be written to the client. */ 067 private final ByteStringBuilder messageBuffer; 068 069 /** The byte buffer reader. */ 070 private final ByteSequenceReader messageReader; 071 072 /** The byte buffer writer. */ 073 private final ASN1Writer writer; 074 075 /** The internal LDAP socket serviced by this input stream. */ 076 private final InternalLDAPSocket socket; 077 078 079 080 /** 081 * Creates a new internal LDAP input stream that will service the 082 * provided internal LDAP socket. 083 * 084 * @param socket The internal LDAP socket serviced by this 085 * internal LDAP input stream. 086 */ 087 public InternalLDAPInputStream(InternalLDAPSocket socket) 088 { 089 this.socket = socket; 090 this.messageQueue = new ArrayBlockingQueue<>(10); 091 this.messageBuffer = new ByteStringBuilder(); 092 this.messageReader = messageBuffer.asReader(); 093 this.writer = ASN1.getWriter(messageBuffer); 094 this.closed = false; 095 } 096 097 098 099 /** 100 * Adds the provided LDAP message to the set of messages to be 101 * returned to the client. Note that this may block if there is 102 * already a significant backlog of messages to be returned. 103 * 104 * @param message The message to add to the set of messages to be 105 * returned to the client. 106 */ 107 @org.opends.server.types.PublicAPI( 108 stability=org.opends.server.types.StabilityLevel.PRIVATE, 109 mayInstantiate=false, 110 mayExtend=false, 111 mayInvoke=false) 112 void addLDAPMessage(LDAPMessage message) 113 { 114 // If the stream is closed, then simply drop the message. 115 if (closed) 116 { 117 return; 118 } 119 120 try 121 { 122 messageQueue.put(message); 123 return; 124 } 125 catch (Exception e) 126 { 127 // This shouldn't happen, but if it does then try three more 128 // times before giving up and dropping the message. 129 for (int i=0; i < 3; i++) 130 { 131 try 132 { 133 messageQueue.put(message); 134 break; 135 } catch (Exception e2) {} 136 } 137 138 return; 139 } 140 } 141 142 143 144 /** 145 * Retrieves the number of bytes that can be read (or skipped over) 146 * from this input stream without blocking. 147 * 148 * @return The number of bytes that can be read (or skipped over) 149 * from this input stream without blocking. 150 * @throws IOException if an I/O error occurs. 151 */ 152 @Override 153 public synchronized int available() throws IOException 154 { 155 if (messageReader.remaining() < 1) 156 { 157 LDAPMessage message = messageQueue.poll(); 158 if (message == null || message instanceof NullLDAPMessage) 159 { 160 if (message != null) 161 { 162 messageQueue.clear(); 163 closed = true; 164 } 165 166 return 0; 167 } 168 else 169 { 170 messageBuffer.clear(); 171 messageReader.rewind(); 172 message.write(writer); 173 } 174 } 175 176 return messageReader.remaining(); 177 } 178 179 180 181 /** 182 * Closes this input stream. This will add a special marker 183 * element to the message queue indicating that the end of the 184 * stream has been reached. If the queue is full, then it will be 185 * cleared before adding the marker element. 186 */ 187 @Override 188 public void close() 189 { 190 socket.close(); 191 } 192 193 194 195 /** 196 * Closes this input stream through an internal mechanism that will 197 * not cause an infinite recursion loop by trying to also close the 198 * input stream. 199 */ 200 @org.opends.server.types.PublicAPI( 201 stability=org.opends.server.types.StabilityLevel.PRIVATE, 202 mayInstantiate=false, 203 mayExtend=false, 204 mayInvoke=false) 205 void closeInternal() 206 { 207 if (closed) 208 { 209 return; 210 } 211 212 closed = true; 213 NullLDAPMessage nullMessage = new NullLDAPMessage(); 214 215 while (! messageQueue.offer(nullMessage)) 216 { 217 messageQueue.clear(); 218 } 219 } 220 221 222 223 /** 224 * Marks the current position in the input stream. This will not 225 * have any effect, as this input stream implementation does not 226 * support marking. 227 * 228 * @param readLimit The maximum limit of bytes that can be read 229 * before the mark position becomes invalid. 230 */ 231 @Override 232 public void mark(int readLimit) 233 { 234 // No implementation is required. 235 } 236 237 238 239 /** 240 * Indicates whether this input stream implementation supports the 241 * use of the {@code mark} and {@code reset} methods. This 242 * implementation does not support that functionality. 243 * 244 * @return {@code false} because this implementation does not 245 * support the use of the {@code mark} and {@code reset} 246 * methods. 247 */ 248 @Override 249 public boolean markSupported() 250 { 251 return false; 252 } 253 254 255 256 /** 257 * Reads the next byte of data from the input stream, blocking if 258 * necessary until there is data available. 259 * 260 * @return The next byte of data read from the input stream, or -1 261 * if the end of the input stream has been reached. 262 * 263 * @throws IOException If a problem occurs while trying to read 264 * data from the stream. 265 */ 266 @Override 267 public synchronized int read() 268 throws IOException 269 { 270 if (messageReader.remaining() < 1) 271 { 272 LDAPMessage message; 273 try 274 { 275 message = messageQueue.take(); 276 } 277 catch(InterruptedException ie) 278 { 279 // Probably because a shutdown was started. EOF 280 message = new NullLDAPMessage(); 281 } 282 283 if (message == null || message instanceof NullLDAPMessage) 284 { 285 if (message instanceof NullLDAPMessage) 286 { 287 messageQueue.clear(); 288 closed = true; 289 return -1; 290 } 291 292 return 0; 293 } 294 else 295 { 296 messageBuffer.clear(); 297 messageReader.rewind(); 298 message.write(writer); 299 } 300 } 301 302 return 0xFF & messageReader.readByte(); 303 } 304 305 306 307 /** 308 * Reads some number of bytes from the input stream, blocking if 309 * necessary until there is data available, and adds them to the 310 * provided array starting at position 0. 311 * 312 * @param b The array to which the data is to be written. 313 * 314 * @return The number of bytes actually written into the 315 * provided array, or -1 if the end of the stream has been 316 * reached. 317 * 318 * @throws IOException If a problem occurs while trying to read 319 * data from the stream. 320 */ 321 @Override 322 public int read(byte[] b) 323 throws IOException 324 { 325 return read(b, 0, b.length); 326 } 327 328 329 330 /** 331 * Reads some number of bytes from the input stream, blocking if 332 * necessary until there is data available, and adds them to the 333 * provided array starting at the specified position. 334 * 335 * @param b The array to which the data is to be written. 336 * @param off The offset in the array at which to start writing 337 * data. 338 * @param len The maximum number of bytes that may be added to the 339 * array. 340 * 341 * @return The number of bytes actually written into the 342 * provided array, or -1 if the end of the stream has been 343 * reached. 344 * 345 * @throws IOException If a problem occurs while trying to read 346 * data from the stream. 347 */ 348 @Override 349 public synchronized int read(byte[] b, int off, int len) 350 throws IOException 351 { 352 if (messageReader.remaining() < 1) 353 { 354 LDAPMessage message; 355 try 356 { 357 message = messageQueue.take(); 358 } 359 catch(InterruptedException ie) 360 { 361 // Probably because a shutdown was started. EOF 362 message = new NullLDAPMessage(); 363 } 364 365 if (message == null || message instanceof NullLDAPMessage) 366 { 367 if (message instanceof NullLDAPMessage) 368 { 369 messageQueue.clear(); 370 closed = true; 371 return -1; 372 } 373 374 return 0; 375 } 376 else 377 { 378 messageBuffer.clear(); 379 messageReader.rewind(); 380 message.write(writer); 381 } 382 } 383 384 int actualLen = Math.min(len, messageReader.remaining()); 385 messageReader.readBytes(b, off, actualLen); 386 return actualLen; 387 } 388 389 390 391 /** 392 * Repositions this stream to the position at the time that the 393 * {@code mark} method was called on this stream. This will not 394 * have any effect, as this input stream implementation does not 395 * support marking. 396 */ 397 @Override 398 public void reset() 399 { 400 // No implementation is required. 401 } 402 403 404 405 /** 406 * Skips over and discards up to the specified number of bytes of 407 * data from this input stream. This implementation will always 408 * skip the requested number of bytes unless the end of the stream 409 * is reached. 410 * 411 * @param n The maximum number of bytes to skip. 412 * 413 * @return The number of bytes actually skipped. 414 * 415 * @throws IOException If a problem occurs while trying to read 416 * data from the input stream. 417 */ 418 @Override 419 public synchronized long skip(long n) 420 throws IOException 421 { 422 byte[] b; 423 if (n > 8192) 424 { 425 b = new byte[8192]; 426 } 427 else 428 { 429 b = new byte[(int) n]; 430 } 431 432 long totalBytesRead = 0L; 433 while (totalBytesRead < n) 434 { 435 int maxLen = (int) Math.min((n - totalBytesRead), b.length); 436 437 int bytesRead = read(b, 0, maxLen); 438 if (bytesRead < 0) 439 { 440 if (totalBytesRead > 0) 441 { 442 return totalBytesRead; 443 } 444 else 445 { 446 return bytesRead; 447 } 448 } 449 else 450 { 451 totalBytesRead += bytesRead; 452 } 453 } 454 455 return totalBytesRead; 456 } 457 458 459 460 /** 461 * Retrieves a string representation of this internal LDAP socket. 462 * 463 * @return A string representation of this internal LDAP socket. 464 */ 465 @Override 466 public String toString() 467 { 468 return getClass().getSimpleName(); 469 } 470}