Plusieurs lecteurs pour InputStream en Java


J'ai un InputStream à partir duquel je lis des caractères. Je voudrais que plusieurs lecteurs accèdent à ce InputStream. Il semble qu'un moyen raisonnable d'y parvenir consiste à écrire des données entrantes dans un StringBuffer ou un StringBuilder, et à faire lire cela aux plusieurs lecteurs. Malheureusement, StringBufferInputStream est obsolète. StringReader lit une chaîne, pas un objet mutable qui est continuellement mis à jour. Quelles sont mes options? Écrire mon propre?

Author: dgorur, 2011-02-17

7 answers

Le flux d'entrée fonctionne comme ceci: une fois que vous en avez lu une partie, il a disparu pour toujours. Vous ne pouvez pas revenir en arrière et le relire. ce que vous pourriez faire quelque chose comme ceci:

class InputStreamSplitter {
  InputStreamSplitter(InputStream toReadFrom) {
    this.reader = new InputStreamReader(toReadFrom);
  }
  void addListener(Listener l) {
    this.listeners.add(l);
  }
  void work() {
    String line = this.reader.readLine();
        while(line != null) {
      for(Listener l : this.listeners) {
        l.processLine(line);
      }
    }
  }
}

interface Listener {
  processLine(String line);
}

Demandez à toutes les parties intéressées d'implémenter Listener et de les ajouter à InputStreamSplitter

 4
Author: iluxa, 2011-02-17 21:06:05

Remarque: Mon autre réponse est plus générale (et meilleure à mon avis).

Comme indiqué par @dimo414, la réponse ci-dessous exige que le premier lecteur soit toujours en avance sur le deuxième lecteur. Si tel est bien le cas pour vous, alors cette réponse pourrait toujours être préférable car elle s'appuie sur des classes standard.


Pour créer deux lecteurs qui lisent indépendamment de la même source, vous devrez vous assurer qu'ils ne consomment pas de données de la même flux.

Ceci peut être réalisé en combinant TeeInputStream à partir de Apache Commons et à une PipedInputStream et PipedOutputStream comme suit:

import java.io.*;
import org.apache.commons.io.input.TeeInputStream;
class Test {
    public static void main(String[] args) throws IOException {

        // Create the source input stream.
        InputStream is = new FileInputStream("filename.txt");

        // Create a piped input stream for one of the readers.
        PipedInputStream in = new PipedInputStream();

        // Create a tee-splitter for the other reader.
        TeeInputStream tee = new TeeInputStream(is, new PipedOutputStream(in));

        // Create the two buffered readers.
        BufferedReader br1 = new BufferedReader(new InputStreamReader(tee));
        BufferedReader br2 = new BufferedReader(new InputStreamReader(in));

        // Do some interleaved reads from them.
        System.out.println("One line from br1:");
        System.out.println(br1.readLine());
        System.out.println();

        System.out.println("Two lines from br2:");
        System.out.println(br2.readLine());
        System.out.println(br2.readLine());
        System.out.println();

        System.out.println("One line from br1:");
        System.out.println(br1.readLine());
        System.out.println();
    }
}

Sortie:

One line from br1:
Line1: Lorem ipsum dolor sit amet,      <-- reading from start

Two lines from br2:
Line1: Lorem ipsum dolor sit amet,      <-- reading from start
Line2: consectetur adipisicing elit,

One line from br1:
Line2: consectetur adipisicing elit,    <-- resumes on line 2
 14
Author: aioobe, 2017-05-23 11:54:43

Comme vous l'avez probablement noté, une fois que vous avez lu un octet d'un flux d'entrée, il a disparu pour toujours (sauf si vous l'avez enregistré quelque part vous-même).

La solution ci-dessous enregistre les octets jusqu'à ce que tous les flux d'entrée abonnés à l'aient lu.

Cela fonctionne comme suit:

// Create a SplittableInputStream from the originalStream
SplittableInputStream is  = new SplittableInputStream(originalStream);

// Fork this to get more input streams reading independently from originalStream
SplittableInputStream is2 = is.split();
SplittableInputStream is3 = is.split();

Chaque fois que is est split(), il produira un nouveau InputStream qui lira les octets à partir du point où is a été divisé.

Le SplittableInputStream se présente comme suit (copy'n'pate loin!):

class SplittableInputStream extends InputStream {

    // Almost an input stream: The read-method takes an id.
    static class MultiplexedSource {

        static int MIN_BUF = 4096;

        // Underlying source
        private InputStream source;

        // Read positions of each SplittableInputStream
        private List<Integer> readPositions = new ArrayList<>();

        // Data to be read by the SplittableInputStreams
        int[] buffer = new int[MIN_BUF];

        // Last valid position in buffer
        int writePosition = 0;

        public MultiplexedSource(InputStream source) {
            this.source = source;
        }

        // Add a multiplexed reader. Return new reader id.
        int addSource(int splitId) {
            readPositions.add(splitId == -1 ? 0 : readPositions.get(splitId));
            return readPositions.size() - 1;
        }

        // Make room for more data (and drop data that has been read by
        // all readers)
        private void readjustBuffer() {
            int from = Collections.min(readPositions);
            int to = Collections.max(readPositions);
            int newLength = Math.max((to - from) * 2, MIN_BUF);
            int[] newBuf = new int[newLength];
            System.arraycopy(buffer, from, newBuf, 0, to - from);
            for (int i = 0; i < readPositions.size(); i++)
                readPositions.set(i, readPositions.get(i) - from);
            writePosition -= from;
            buffer = newBuf;
        }

        // Read and advance position for given reader
        public int read(int readerId) throws IOException {

            // Enough data in buffer?
            if (readPositions.get(readerId) >= writePosition) {
                readjustBuffer();
                buffer[writePosition++] = source.read();
            }

            int pos = readPositions.get(readerId);
            int b = buffer[pos];
            if (b != -1)
                readPositions.set(readerId, pos + 1);
            return b;
        }
    }

    // Non-root fields
    MultiplexedSource multiSource;
    int myId;

    // Public constructor: Used for first SplittableInputStream
    public SplittableInputStream(InputStream source) {
        multiSource = new MultiplexedSource(source);
        myId = multiSource.addSource(-1);
    }

    // Private constructor: Used in split()
    private SplittableInputStream(MultiplexedSource multiSource, int splitId) {
        this.multiSource = multiSource;
        myId = multiSource.addSource(splitId);
    }

    // Returns a new InputStream that will read bytes from this position
    // onwards.
    public SplittableInputStream split() {
        return new SplittableInputStream(multiSource, myId);
    }

    @Override
    public int read() throws IOException {
        return multiSource.read(myId);
    }
}

Enfin, une démo:

String str = "Lorem ipsum\ndolor sit\namet\n";
InputStream is = new ByteArrayInputStream(str.getBytes("UTF-8"));

// Create the two buffered readers.
SplittableInputStream is1 = new SplittableInputStream(is);
SplittableInputStream is2 = is1.split();

BufferedReader br1 = new BufferedReader(new InputStreamReader(is1));
BufferedReader br2 = new BufferedReader(new InputStreamReader(is2));

// Do some interleaved reads from them.
System.out.println("One line from br1:");
System.out.println(br1.readLine());
System.out.println();

System.out.println("Two lines from br2:");
System.out.println(br2.readLine());
System.out.println(br2.readLine());
System.out.println();

System.out.println("One line from br1:");
System.out.println(br1.readLine());
System.out.println();

Sortie:

One line from br1:
Lorem ipsum

Two lines from br2:
Lorem ipsum
dolor sit

One line from br1:
dolor sit
 6
Author: aioobe, 2015-05-15 14:27:32

Utilisez TeeInputStream pour copier tous les octets lus depuis InputStream vers OutputStream secondaire, par exemple ByteArrayOutputStream.

 5
Author: Tomasz Nurkiewicz, 2011-02-17 21:50:30

Au lieu d'utiliser StringWriter/StringBufferInputStream, écrivez votre InputStream d'origine dans un ByteArrayOutputStream. Une fois que vous avez terminé la lecture à partir de l'InputStream d'origine, passez le tableau d'octets renvoyé par ByteArrayOutputStream.toByteArray à un ByteArrayInputStream. Utilisez ce InputStream comme InputStream de choix pour passer autour d'autres choses qui doivent lire à partir de lui.

Essentiellement, tout ce que vous feriez ici est de stocker le contenu de l'InputStream d'origine dans un octet [] cache en mémoire comme vous avez essayé de le faire à l'origine avec StringWriter/StringBufferInputStream.

 2
Author: whaley, 2011-02-17 21:10:11

Voici une autre façon de lire à partir de deux flux indépendamment, sans présumer que l'un est en avance sur l'autre, mais avec des classes standard. Cependant, il lit avec impatience le flux d'entrée sous-jacent en arrière-plan, ce qui peut être indésirable, selon votre application.

public static void main(String[] args) throws IOException {
  // Create the source input stream.
  InputStream is = new ByteArrayInputStream("line1\nline2\nline3".getBytes());

  // Create a piped input stream for each reader;
  PipedInputStream in1 = new PipedInputStream();
  PipedInputStream in2 = new PipedInputStream();

  // Start copying the input stream to both piped input streams.
  startCopy(is, new TeeOutputStream(
      new PipedOutputStream(in1), new PipedOutputStream(in2)));

  // Create the two buffered readers.
  BufferedReader br1 = new BufferedReader(new InputStreamReader(in1));
  BufferedReader br2 = new BufferedReader(new InputStreamReader(in2));

  // Do some interleaved reads from them.
  // ...
}

private static void startCopy(InputStream in, OutputStream out) {
  (new Thread() {
    public void run() {
      try {
        IOUtils.copy(in, out);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
  }).start();
}
 1
Author: progressnerd, 2015-05-17 08:38:35

À la recherche d'un moyen possible d'avoir un outputstream envoyant des octets à deux ou plusieurs Inputstream différents, j'ai trouvé ce forum. Malheureusement, la solution exacte dirigeait vers PipedOutputStream et PipedInputStream. Donc, on m'a refusé d'écrire une extension PipeOutputStream. Elle est ici. L'exemple est écrit dans la méthode "main" de PipedOutputStream.

/**
 * Extensao de {@link PipedOutputStream}, onde eh possivel conectar mais de um {@link PipedInputStream}
 */
public class PipedOutputStreamEx extends PipedOutputStream {

    /**
     * 
     */
    public PipedOutputStreamEx() {
        // TODO Auto-generated constructor stub
    }

    /* REMIND: identification of the read and write sides needs to be
       more sophisticated.  Either using thread groups (but what about
       pipes within a thread?) or using finalization (but it may be a
       long time until the next GC). */
    private PipedInputStreamEx[] sinks=null;

    public synchronized void connect(PipedInputStreamEx... pIns) throws IOException {
        for (PipedInputStreamEx snk : pIns) {
            if (snk == null) {
                throw new NullPointerException();
            } else if (sinks != null || snk.connected) {
                throw new IOException("Already connected");
            }
            snk.in = -1;
            snk.out = 0;
            snk.connected = true;
        }

        this.sinks = pIns;
    }


    /**
     * Writes the specified <code>byte</code> to the piped output stream. 
     * <p>
     * Implements the <code>write</code> method of <code>OutputStream</code>.
     *
     * @param      b   the <code>byte</code> to be written.
     * @exception IOException if the pipe is <a href=#BROKEN> broken</a>,
     *      {@link #connect(java.io.PipedInputStream) unconnected},
     *      closed, or if an I/O error occurs.
     */
    public void write(int b)  throws IOException {
        if (this.sinks == null) {
            throw new IOException("Pipe(s) not connected");
        }
        for (PipedInputStreamEx sink : this.sinks) {
            sink.receive(b);
        }
    }

    /**
     * Writes <code>len</code> bytes from the specified byte array 
     * starting at offset <code>off</code> to this piped output stream. 
     * This method blocks until all the bytes are written to the output
     * stream.
     *
     * @param      b     the data.
     * @param      off   the start offset in the data.
     * @param      len   the number of bytes to write.
     * @exception IOException if the pipe is <a href=#BROKEN> broken</a>,
     *          {@link #connect(java.io.PipedInputStream) unconnected},
     *      closed, or if an I/O error occurs.
     */
    public void write(byte b[], int off, int len) throws IOException {
        if (sinks == null) {
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        } 
        for (PipedInputStreamEx sink : this.sinks) {
            sink.receive(b, off, len);
        }
    }

    /**
     * Flushes this output stream and forces any buffered output bytes 
     * to be written out. 
     * This will notify any readers that bytes are waiting in the pipe.
     *
     * @exception IOException if an I/O error occurs.
     */
    public synchronized void flush() throws IOException {
        if (sinks != null) {
            for (PipedInputStreamEx sink : this.sinks) {
                synchronized (sink) {
                    sink.notifyAll();
                }
            }
        }
    }

    /**
     * Closes this piped output stream and releases any system resources 
     * associated with this stream. This stream may no longer be used for 
     * writing bytes.
     *
     * @exception  IOException  if an I/O error occurs.
     */
    public void close()  throws IOException {
        if (sinks != null) {
            for (PipedInputStreamEx sink : this.sinks) {
                sink.receivedLast();
            }
        }
    }


    /**
     * Teste desta extensao de {@link PipedOutputStream}
     * @param args
     * @throws InterruptedException 
     * @throws IOException 
     */
    public static void main(String[] args) throws InterruptedException, IOException {
        final PipedOutputStreamEx pOut = new PipedOutputStreamEx();
        final PipedInputStreamEx pInHash = new PipedInputStreamEx();
        final PipedInputStreamEx pInConsole = new PipedInputStreamEx();

        pOut.connect(pInHash, pInConsole);

        Thread escreve = new Thread("Escrevendo") {
            @Override
            public void run() {
                String[] paraGravar = new String[]{
                        "linha1 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha2 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha3 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha4 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha5 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha6 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha7 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha8 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha9 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha10 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha11 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha12 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha13 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha14 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha15 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha16 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha17 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha18 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha19 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                        , "linha20 0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789\n"
                };
                for (String s :paraGravar) {
                    try {
                        pOut.write(s.getBytes("ISO-8859-1") );
                        Thread.sleep(100);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                try {
                    pOut.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }   
        };

        Thread le1 = new Thread("Le1 - hash") {
            @Override
            public void run() {
                try {
                    System.out.println("HASH: "+HashUtil.getHashCRC(pInHash,true));                         
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };

        Thread le2 = new Thread("Le2 - escreve no console") {
            @Override
            public void run() {
                BufferedReader bIn = new BufferedReader(new InputStreamReader(pInConsole));
                String s;
                try {
                    while ( (s=bIn.readLine())!=null) {
                        Thread.sleep(700); //teste simulando o um leitor lento...
                        System.out.println(s);
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        };


        escreve.start();
        le1.start();
        le2.start();

        escreve.join();
        le1.join();
        le2.join();

        pInHash.close();
        pInConsole.close();
    }
}

Voici le code PipedInputStreamEx. Malheureusement, j'ai dû copier tout le code JDK, pour avoir accès à "connected", " in " et "out" les propriétés.

/**
 * Extensao de {@link PipedInputStream}, que permite conetar mais de um destes no {@link PipedOutputStream}
 * Como a classe ancestral possui propriedades 'package friend', tivemos que copiar o codigo herdado :/
 */
public class PipedInputStreamEx extends PipedInputStream {

    @Override
    public void connect(PipedOutputStream src) throws IOException {
        throw new IOException("conecte usando PipedOutputStream.connect()");
    }

//----------------------------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------------
//--------- INICIO codigo da classe herdada (alguns metodos comentados...)----------------------------------
//----------------------------------------------------------------------------------------------------------

    boolean closedByWriter = false;
    volatile boolean closedByReader = false;
    boolean connected = false;

    /* REMIND: identification of the read and write sides needs to be
       more sophisticated.  Either using thread groups (but what about
       pipes within a thread?) or using finalization (but it may be a
       long time until the next GC). */
    Thread readSide;
    Thread writeSide;

    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * The default size of the pipe's circular input buffer.
     * @since   JDK1.1
     */
    // This used to be a constant before the pipe size was allowed
    // to change. This field will continue to be maintained
    // for backward compatibility. 
    protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;

    /**
     * The circular buffer into which incoming data is placed.
     * @since   JDK1.1
     */
    protected byte buffer[];

    /**
     * The index of the position in the circular buffer at which the
     * next byte of data will be stored when received from the connected
     * piped output stream. <code>in&lt;0</code> implies the buffer is empty,
     * <code>in==out</code> implies the buffer is full
     * @since   JDK1.1
     */
    protected int in = -1;

    /**
     * The index of the position in the circular buffer at which the next
     * byte of data will be read by this piped input stream.
     * @since   JDK1.1
     */
    protected int out = 0;

//    /**
//     * Creates a <code>PipedInputStream</code> so
//     * that it is connected to the piped output
//     * stream <code>src</code>. Data bytes written
//     * to <code>src</code> will then be  available
//     * as input from this stream.
//     *
//     * @param      src   the stream to connect to.
//     * @exception  IOException  if an I/O error occurs.
//     */
//    public PipedInputStream(PipedOutputStream src) throws IOException {
//        this(src, DEFAULT_PIPE_SIZE);
//    }
//
//    /**
//     * Creates a <code>PipedInputStream</code> so that it is
//     * connected to the piped output stream
//     * <code>src</code> and uses the specified pipe size for
//     * the pipe's buffer.
//     * Data bytes written to <code>src</code> will then
//     * be available as input from this stream.
//     *
//     * @param      src   the stream to connect to.
//     * @param      pipeSize the size of the pipe's buffer.
//     * @exception  IOException  if an I/O error occurs.
//     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
//     * @since    1.6
//     */
//    public PipedInputStream(PipedOutputStream src, int pipeSize)
//            throws IOException {
//   initPipe(pipeSize);
//   connect(src);
//    }

    /**
     * Creates a <code>PipedInputStream</code> so
     * that it is not yet {@linkplain #connect(java.io.PipedOutputStream)
     * connected}.
     * It must be {@linkplain java.io.PipedOutputStream#connect(
     * java.io.PipedInputStream) connected} to a
     * <code>PipedOutputStream</code> before being used.
     */
    public PipedInputStreamEx() {
    initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     * Creates a <code>PipedInputStream</code> so that it is not yet
     * {@linkplain #connect(java.io.PipedOutputStream) connected} and
     * uses the specified pipe size for the pipe's buffer.
     * It must be {@linkplain java.io.PipedOutputStream#connect(
     * java.io.PipedInputStream)
     * connected} to a <code>PipedOutputStream</code> before being used.
     *
     * @param      pipeSize the size of the pipe's buffer.
     * @exception  IllegalArgumentException if <code>pipeSize <= 0</code>.
     * @since      1.6
     */
    public PipedInputStreamEx(int pipeSize) {
    initPipe(pipeSize);
    }

    private void initPipe(int pipeSize) {
         if (pipeSize <= 0) {
            throw new IllegalArgumentException("Pipe Size <= 0");
         }
         buffer = new byte[pipeSize];
    }

//    /**
//     * Causes this piped input stream to be connected
//     * to the piped  output stream <code>src</code>.
//     * If this object is already connected to some
//     * other piped output  stream, an <code>IOException</code>
//     * is thrown.
//     * <p>
//     * If <code>src</code> is an
//     * unconnected piped output stream and <code>snk</code>
//     * is an unconnected piped input stream, they
//     * may be connected by either the call:
//     * <p>
//     * <pre><code>snk.connect(src)</code> </pre>
//     * <p>
//     * or the call:
//     * <p>
//     * <pre><code>src.connect(snk)</code> </pre>
//     * <p>
//     * The two
//     * calls have the same effect.
//     *
//     * @param      src   The piped output stream to connect to.
//     * @exception  IOException  if an I/O error occurs.
//     */
//    public void connect(PipedOutputStream src) throws IOException {
//  src.connect(this);
//    }

    /**
     * Receives a byte of data.  This method will block if no input is
     * available.
     * @param b the byte being received
     * @exception IOException If the pipe is <a href=#BROKEN> <code>broken</code></a>,
     *      {@link #connect(java.io.PipedOutputStream) unconnected},
     *      closed, or if an I/O error occurs.
     * @since     JDK1.1
     */
    protected synchronized void receive(int b) throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        if (in == out)
            awaitSpace();
    if (in < 0) {
        in = 0;
        out = 0;
    }
    buffer[in++] = (byte)(b & 0xFF);
    if (in >= buffer.length) {
        in = 0;
    }
    }

    /**
     * Receives data into an array of bytes.  This method will
     * block until some input is available.
     * @param b the buffer into which the data is received
     * @param off the start offset of the data
     * @param len the maximum number of bytes received
     * @exception IOException If the pipe is <a href=#BROKEN> broken</a>,
     *       {@link #connect(java.io.PipedOutputStream) unconnected},
     *       closed,or if an I/O error occurs.
     */
    synchronized void receive(byte b[], int off, int len)  throws IOException {
        checkStateForReceive();
        writeSide = Thread.currentThread();
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) {
            if (in == out)
                awaitSpace();
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                    in = out = 0;
                    nextTransferAmount = buffer.length - in;
                } else {
                    nextTransferAmount = out - in;
                }
            }
            if (nextTransferAmount > bytesToTransfer)
                nextTransferAmount = bytesToTransfer;
            assert(nextTransferAmount > 0);
            System.arraycopy(b, off, buffer, in, nextTransferAmount);
            bytesToTransfer -= nextTransferAmount;
            off += nextTransferAmount;
            in += nextTransferAmount;
            if (in >= buffer.length) {
                in = 0;
            }
        }
    }

    private void checkStateForReceive() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
        throw new IOException("Pipe closed");
    } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }

    private void awaitSpace() throws IOException {
    while (in == out) {
        checkStateForReceive();

        /* full: kick any waiting readers */
        notifyAll();
        try {
            wait(1000);
        } catch (InterruptedException ex) {
        throw new java.io.InterruptedIOException();
        }
    }
    }

    /**
     * Notifies all waiting threads that the last byte of data has been
     * received.
     */
    synchronized void receivedLast() {
    closedByWriter = true;
    notifyAll();
    }

    /**
     * Reads the next byte of data from this piped input stream. The
     * value byte is returned as an <code>int</code> in the range
     * <code>0</code> to <code>255</code>. 
     * This method blocks until input data is available, the end of the
     * stream is detected, or an exception is thrown.
     *
     * @return     the next byte of data, or <code>-1</code> if the end of the
     *             stream is reached.
     * @exception  IOException  if the pipe is
     *       {@link #connect(java.io.PipedOutputStream) unconnected},
     *       <a href=#BROKEN> <code>broken</code></a>, closed,
     *       or if an I/O error occurs.
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
        throw new IOException("Pipe closed");
    } else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
    }

        readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) {
        if (closedByWriter) {
        /* closed by writer, return EOF */
        return -1;
        }
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
        throw new IOException("Pipe broken");
        }
            /* might be a writer waiting */
        notifyAll();
        try {
            wait(1000);
        } catch (InterruptedException ex) {
        throw new java.io.InterruptedIOException();
        }
    }
    int ret = buffer[out++] & 0xFF;
    if (out >= buffer.length) {
        out = 0;
    }
    if (in == out) {
            /* now empty */
        in = -1;
    }

    return ret;
    }

    /**
     * Reads up to <code>len</code> bytes of data from this piped input
     * stream into an array of bytes. Less than <code>len</code> bytes
     * will be read if the end of the data stream is reached or if 
     * <code>len</code> exceeds the pipe's buffer size.
     * If <code>len </code> is zero, then no bytes are read and 0 is returned; 
     * otherwise, the method blocks until at least 1 byte of input is 
     * available, end of the stream has been detected, or an exception is
     * thrown.
     *
     * @param      b     the buffer into which the data is read.
     * @param      off   the start offset in the destination array <code>b</code>
     * @param      len   the maximum number of bytes read.
     * @return     the total number of bytes read into the buffer, or
     *             <code>-1</code> if there is no more data because the end of
     *             the stream has been reached.
     * @exception  NullPointerException If <code>b</code> is <code>null</code>.
     * @exception  IndexOutOfBoundsException If <code>off</code> is negative, 
     * <code>len</code> is negative, or <code>len</code> is greater than 
     * <code>b.length - off</code>
     * @exception  IOException if the pipe is <a href=#BROKEN> <code>broken</code></a>,
     *       {@link #connect(java.io.PipedOutputStream) unconnected},
     *       closed, or if an I/O error occurs.
     */
    public synchronized int read(byte b[], int off, int len)  throws IOException {
    if (b == null) {
        throw new NullPointerException();
    } else if (off < 0 || len < 0 || len > b.length - off) {
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        return 0;
    }

        /* possibly wait on the first character */
    int c = read();
    if (c < 0) {
        return -1;
    }
    b[off] = (byte) c;
    int rlen = 1;
    while ((in >= 0) && (len > 1)) {

        int available; 

        if (in > out) {
        available = Math.min((buffer.length - out), (in - out));
        } else {
        available = buffer.length - out;
        }

        // A byte is read beforehand outside the loop
        if (available > (len - 1)) {
        available = len - 1;
        }
        System.arraycopy(buffer, out, b, off + rlen, available);
        out += available;
        rlen += available; 
        len -= available;

        if (out >= buffer.length) {
        out = 0;
        }
        if (in == out) {
                /* now empty */
        in = -1;
        }
    }
    return rlen;
    }

    /**
     * Returns the number of bytes that can be read from this input
     * stream without blocking.
     *
     * @return the number of bytes that can be read from this input stream
     *         without blocking, or {@code 0} if this input stream has been
     *         closed by invoking its {@link #close()} method, or if the pipe
     *         is {@link #connect(java.io.PipedOutputStream) unconnected}, or
     *      <a href=#BROKEN> <code>broken</code></a>.
     *
     * @exception  IOException  if an I/O error occurs.
     * @since   JDK1.0.2
     */
    public synchronized int available() throws IOException {
    if(in < 0)
        return 0;
    else if(in == out)
        return buffer.length;
    else if (in > out)
        return in - out;
    else
        return in + buffer.length - out;
    }

    /**
     * Closes this piped input stream and releases any system resources
     * associated with the stream.
     *
     * @exception  IOException  if an I/O error occurs.
     */
    public void close()  throws IOException {
    closedByReader = true;
        synchronized (this) {
            in = -1;
        }
    }

//----------------------------------------------------------------------------------------------------------
//--------- FIM codigo da classe herdada -------------------------------------------------------------------
//----------------------------------------------------------------------------------------------------------


}
 0
Author: Lúcio Iglezias Pacheco, 2018-02-26 17:42:11