Source for java.io.PipedInputStream

   1: /* PipedInputStream.java -- Read portion of piped streams.
   2:    Copyright (C) 1998, 1999, 2000, 2001, 2003, 2005  Free Software Foundation, Inc.
   3: 
   4: This file is part of GNU Classpath.
   5: 
   6: GNU Classpath is free software; you can redistribute it and/or modify
   7: it under the terms of the GNU General Public License as published by
   8: the Free Software Foundation; either version 2, or (at your option)
   9: any later version.
  10: 
  11: GNU Classpath is distributed in the hope that it will be useful, but
  12: WITHOUT ANY WARRANTY; without even the implied warranty of
  13: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14: General Public License for more details.
  15: 
  16: You should have received a copy of the GNU General Public License
  17: along with GNU Classpath; see the file COPYING.  If not, write to the
  18: Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
  19: 02110-1301 USA.
  20: 
  21: Linking this library statically or dynamically with other modules is
  22: making a combined work based on this library.  Thus, the terms and
  23: conditions of the GNU General Public License cover the whole
  24: combination.
  25: 
  26: As a special exception, the copyright holders of this library give you
  27: permission to link this library with independent modules to produce an
  28: executable, regardless of the license terms of these independent
  29: modules, and to copy and distribute the resulting executable under
  30: terms of your choice, provided that you also meet, for each linked
  31: independent module, the terms and conditions of the license of that
  32: module.  An independent module is a module which is not derived from
  33: or based on this library.  If you modify this library, you may extend
  34: this exception to your version of the library, but you are not
  35: obligated to do so.  If you do not wish to do so, delete this
  36: exception statement from your version. */
  37: 
  38: package java.io;
  39: 
  40: // NOTE: This implementation is very similar to that of PipedReader.  If you
  41: // fix a bug in here, chances are you should make a similar change to the
  42: // PipedReader code.
  43: 
  44: /**
  45:   * An input stream that reads its bytes from an output stream
  46:   * to which it is connected.
  47:   * <p>
  48:   * Data is read and written to an internal buffer.  It is highly recommended
  49:   * that the <code>PipedInputStream</code> and connected
  50:   * <code>PipedOutputStream</code>
  51:   * be part of different threads.  If they are not, the read and write
  52:   * operations could deadlock their thread.
  53:   *
  54:   * @specnote The JDK implementation appears to have some undocumented
  55:   *           functionality where it keeps track of what thread is writing
  56:   *           to pipe and throws an IOException if that thread susequently
  57:   *           dies. This behaviour seems dubious and unreliable - we don't
  58:   *           implement it.
  59:   *
  60:   * @author Aaron M. Renn (arenn@urbanophile.com)
  61:   */
  62: public class PipedInputStream extends InputStream
  63: {
  64:   /** PipedOutputStream to which this is connected. Null only if this
  65:     * InputStream hasn't been connected yet. */
  66:   PipedOutputStream source;
  67: 
  68:   /** Set to true if close() has been called on this InputStream. */
  69:   boolean closed;
  70: 
  71: 
  72:   /**
  73:    * The size of the internal buffer used for input/output.
  74:    */
  75:   /* The "Constant Field Values" Javadoc of the Sun J2SE 1.4
  76:    * specifies 1024.
  77:    */
  78:   protected static final int PIPE_SIZE = 1024;
  79: 
  80: 
  81:   /**
  82:     * This is the internal circular buffer used for storing bytes written
  83:     * to the pipe and from which bytes are read by this stream
  84:     */
  85:   protected byte[] buffer = null;
  86: 
  87:   /**
  88:     * The index into buffer where the next byte from the connected
  89:     * <code>PipedOutputStream</code> will be written. If this variable is
  90:     * equal to <code>out</code>, then the buffer is full. If set to < 0,
  91:     * the buffer is empty.
  92:     */
  93:   protected int in = -1;
  94: 
  95:   /**
  96:     * This index into the buffer where bytes will be read from.
  97:     */
  98:   protected int out = 0;
  99: 
 100:   /** Buffer used to implement single-argument read/receive */
 101:   private byte[] read_buf = new byte[1];
 102: 
 103:   /**
 104:     * Creates a new <code>PipedInputStream</code> that is not connected to a
 105:     * <code>PipedOutputStream</code>.  It must be connected before bytes can
 106:     * be read from this stream.
 107:     */
 108:   public PipedInputStream()
 109:   {
 110:     this(PIPE_SIZE);
 111:   }
 112: 
 113:   /**
 114:    * Creates a new <code>PipedInputStream</code> of the given size that is not
 115:    * connected to a <code>PipedOutputStream</code>.
 116:    * It must be connected before bytes can be read from this stream.
 117:    *
 118:    * @since 1.6
 119:    * @since IllegalArgumentException If pipeSize <= 0.
 120:    */
 121:   public PipedInputStream(int pipeSize) throws IllegalArgumentException
 122:   {
 123:     if (pipeSize <= 0)
 124:       throw new IllegalArgumentException("pipeSize must be > 0");
 125: 
 126:     this.buffer = new byte[pipeSize];
 127:   }
 128: 
 129:   /**
 130:     * This constructor creates a new <code>PipedInputStream</code> and connects
 131:     * it to the passed in <code>PipedOutputStream</code>. The stream is then
 132:     * ready for reading.
 133:     *
 134:     * @param source The <code>PipedOutputStream</code> to connect this
 135:     * stream to
 136:     *
 137:     * @exception IOException If <code>source</code> is already connected.
 138:     */
 139:   public PipedInputStream(PipedOutputStream source) throws IOException
 140:   {
 141:     this();
 142:     connect(source);
 143:   }
 144: 
 145:   /**
 146:    * This constructor creates a new <code>PipedInputStream</code> of the given
 147:    * size and connects it to the passed in <code>PipedOutputStream</code>.
 148:    * The stream is then ready for reading.
 149:    *
 150:    * @param source The <code>PipedOutputStream</code> to connect this
 151:    * stream to
 152:    *
 153:    * @since 1.6
 154:    * @exception IOException If <code>source</code> is already connected.
 155:    */
 156:  public PipedInputStream(PipedOutputStream source, int pipeSize)
 157:    throws IOException
 158:  {
 159:    this(pipeSize);
 160:    connect(source);
 161:  }
 162: 
 163:   /**
 164:     * This method connects this stream to the passed in
 165:     * <code>PipedOutputStream</code>.
 166:     * This stream is then ready for reading.  If this stream is already
 167:     * connected or has been previously closed, then an exception is thrown
 168:     *
 169:     * @param source The <code>PipedOutputStream</code> to connect this stream to
 170:     *
 171:     * @exception IOException If this PipedInputStream or <code>source</code>
 172:     *                        has been connected already.
 173:     */
 174:   public void connect(PipedOutputStream source) throws IOException
 175:   {
 176:     // The JDK (1.3) does not appear to check for a previously closed
 177:     // connection here.
 178: 
 179:     if (this.source != null || source.sink != null)
 180:       throw new IOException ("Already connected");
 181: 
 182:     source.sink = this;
 183:     this.source = source;
 184:   }
 185: 
 186:   /**
 187:   * This method receives a byte of input from the source PipedOutputStream.
 188:   * If the internal circular buffer is full, this method blocks.
 189:   *
 190:   * @param val The byte to write to this stream
 191:   *
 192:   * @exception IOException if error occurs
 193:   * @specnote Weird. This method must be some sort of accident.
 194:   */
 195:   protected synchronized void receive(int val) throws IOException
 196:   {
 197:     read_buf[0] = (byte) (val & 0xff);
 198:     receive (read_buf, 0, 1);
 199:   }
 200: 
 201:   /**
 202:     * This method is used by the connected <code>PipedOutputStream</code> to
 203:     * write bytes into the buffer.
 204:     *
 205:     * @param buf The array containing bytes to write to this stream
 206:     * @param offset The offset into the array to start writing from
 207:     * @param len The number of bytes to write.
 208:     *
 209:     * @exception IOException If an error occurs
 210:     * @specnote This code should be in PipedOutputStream.write, but we
 211:     *           put it here in order to support that bizarre recieve(int)
 212:     *           method.
 213:     */
 214:   synchronized void receive(byte[] buf, int offset, int len)
 215:     throws IOException
 216:   {
 217:     if (closed)
 218:       throw new IOException ("Pipe closed");
 219: 
 220:     int bufpos = offset;
 221:     int copylen;
 222: 
 223:     while (len > 0)
 224:       {
 225:         try
 226:           {
 227:             while (in == out)
 228:               {
 229:                 // The pipe is full. Wake up any readers and wait for them.
 230:                 notifyAll();
 231:                 wait();
 232:                 // The pipe could have been closed while we were waiting.
 233:                 if (closed)
 234:                   throw new IOException ("Pipe closed");
 235:               }
 236:           }
 237:         catch (InterruptedException ix)
 238:           {
 239:             throw new InterruptedIOException ();
 240:           }
 241: 
 242:         if (in < 0) // The pipe is empty.
 243:           in = 0;
 244: 
 245:         // Figure out how many bytes from buf can be copied without
 246:         // overrunning out or going past the length of the buffer.
 247:         if (in < out)
 248:           copylen = Math.min (len, out - in);
 249:         else
 250:           copylen = Math.min (len, buffer.length - in);
 251: 
 252:         // Copy bytes until the pipe is filled, wrapping if necessary.
 253:         System.arraycopy(buf, bufpos, buffer, in, copylen);
 254:         len -= copylen;
 255:         bufpos += copylen;
 256:         in += copylen;
 257:         if (in == buffer.length)
 258:           in = 0;
 259:       }
 260:     // Notify readers that new data is in the pipe.
 261:     notifyAll();
 262:   }
 263: 
 264:   /**
 265:     * This method reads one byte from the stream.
 266:     * -1 is returned to indicated that no bytes can be read
 267:     * because the end of the stream was reached.  If the stream is already
 268:     * closed, a -1 will again be returned to indicate the end of the stream.
 269:     *
 270:     * <p>This method will block if no byte is available to be read.</p>
 271:     *
 272:     * @return the value of the read byte value, or -1 of the end of the stream
 273:     * was reached
 274:     *
 275:     * @throws IOException if an error occured
 276:     */
 277:   public int read() throws IOException
 278:   {
 279:     // Method operates by calling the multibyte overloaded read method
 280:     // Note that read_buf is an internal instance variable.  I allocate it
 281:     // there to avoid constant reallocation overhead for applications that
 282:     // call this method in a loop at the cost of some unneeded overhead
 283:     // if this method is never called.
 284: 
 285:     int r = read(read_buf, 0, 1);
 286:     return r != -1 ? (read_buf[0] & 0xff) : -1;
 287:   }
 288: 
 289:   /**
 290:     * This method reads bytes from the stream into a caller supplied buffer.
 291:     * It starts storing bytes at position <code>offset</code> into the
 292:     * buffer and
 293:     * reads a maximum of <code>len</code> bytes.  Note that this method
 294:     * can actually
 295:     * read fewer than <code>len</code> bytes.  The actual number of bytes
 296:     * read is
 297:     * returned.  A -1 is returned to indicated that no bytes can be read
 298:     * because the end of the stream was reached - ie close() was called on the
 299:     * connected PipedOutputStream.
 300:     * <p>
 301:     * This method will block if no bytes are available to be read.
 302:     *
 303:     * @param buf The buffer into which bytes will be stored
 304:     * @param offset The index into the buffer at which to start writing.
 305:     * @param len The maximum number of bytes to read.
 306:     *
 307:     * @exception IOException If <code>close()</code> was called on this Piped
 308:     *                        InputStream.
 309:     */
 310:   public synchronized int read(byte[] buf, int offset, int len)
 311:     throws IOException
 312:   {
 313:     if (source == null)
 314:       throw new IOException ("Not connected");
 315:     if (closed)
 316:       throw new IOException ("Pipe closed");
 317: 
 318:     // Don't block if nothing was requested.
 319:     if (len == 0)
 320:       return 0;
 321: 
 322:     // If the buffer is empty, wait until there is something in the pipe
 323:     // to read.
 324:     try
 325:       {
 326:         while (in < 0)
 327:           {
 328:             if (source.closed)
 329:               return -1;
 330:             wait();
 331:           }
 332:       }
 333:     catch (InterruptedException ix)
 334:       {
 335:         throw new InterruptedIOException();
 336:       }
 337: 
 338:     int total = 0;
 339:     int copylen;
 340: 
 341:     while (true)
 342:       {
 343:         // Figure out how many bytes from the pipe can be copied without
 344:         // overrunning in or going past the length of buf.
 345:         if (out < in)
 346:           copylen = Math.min (len, in - out);
 347:         else
 348:           copylen = Math.min (len, buffer.length - out);
 349: 
 350:         System.arraycopy (buffer, out, buf, offset, copylen);
 351:         offset += copylen;
 352:         len -= copylen;
 353:         out += copylen;
 354:         total += copylen;
 355: 
 356:         if (out == buffer.length)
 357:           out = 0;
 358: 
 359:         if (out == in)
 360:           {
 361:             // Pipe is now empty.
 362:             in = -1;
 363:             out = 0;
 364:           }
 365: 
 366:         // If output buffer is filled or the pipe is empty, we're done.
 367:         if (len == 0 || in == -1)
 368:           {
 369:             // Notify any waiting outputstream that there is now space
 370:             // to write.
 371:             notifyAll();
 372:             return total;
 373:           }
 374:       }
 375:   }
 376: 
 377:   /**
 378:     * This method returns the number of bytes that can be read from this stream
 379:     * before blocking could occur.  This is the number of bytes that are
 380:     * currently unread in the internal circular buffer.  Note that once this
 381:     * many additional bytes are read, the stream may block on a subsequent
 382:     * read, but it not guaranteed to block.
 383:     *
 384:     * @return The number of bytes that can be read before blocking might occur
 385:     *
 386:     * @exception IOException If an error occurs
 387:     */
 388:   public synchronized int available() throws IOException
 389:   {
 390:     // The JDK 1.3 implementation does not appear to check for the closed or
 391:     // unconnected stream conditions here.
 392: 
 393:     if (in < 0)
 394:       return 0;
 395:     else if (out < in)
 396:       return in - out;
 397:     else
 398:       return (buffer.length - out) + in;
 399:   }
 400: 
 401:   /**
 402:   * This methods closes the stream so that no more data can be read
 403:   * from it.
 404:   *
 405:   * @exception IOException If an error occurs
 406:   */
 407:   public synchronized void close() throws IOException
 408:   {
 409:     closed = true;
 410:     // Wake any thread which may be in receive() waiting to write data.
 411:     notifyAll();
 412:   }
 413: }