Threads



Pipe the output of a thread to the input of another one

We use the PipedOutputStream/PipedInputStream duo. When these streams are connected together what is written in the PipedOutputStream can be read in PipedInputStream. That connection acts like a queue (FIFO).
    Pipeline Thread

      +----------+         +----------+
      | thread A |  --- >  | thread B |
      +----------+         +----------+
       (produce)           (consume)

[produceData.java]
import java.io.*;

public abstract class produceData implements Runnable {
  OutputStream os;

  public produceData(OutputStream os) {
    this.os = os;
    Thread t = new Thread(this);
    t.start();
    }

  public abstract boolean dataProduction();

  public void run() {
    while(dataProduction()) ;
    }
  }

[consumeData.java]
import java.io.*;

public abstract class consumeData implements Runnable {
  InputStream is;

  public consumeData(InputStream is) {
    this.is = is;
    Thread t = new Thread(this);
    t.start();
    }

  public abstract boolean dataConsumption();

  public void run(){
    while(dataConsumption());
    }
  }

Next we implement the methods to prepare/send the data.

[sendProduction.java]
import java.io.*;

public class sendProduction extends produceData {
   OutputStream output;

   makeProduction(OutputStream os) {
     super(os);
     this.output = os;
     }

   public boolean dataProduction() {
     byte[] j = new byte[1];
     boolean done = false;
     java.util.Random r = new java.util.Random();
	  while(!done) {
       try {
         j[0] = (byte)(Math.abs(r.nextInt()) % 255);
         System.out.print(".");
         output.write(j);
         }
       catch (Exception e) {
         e.printStackTrace();
         return true;
         }
       }
     return done;
     }
   }

We implement the method to receive and process the data.

[receiveProduction.java]
import java.io.*;

public class receiveProduction extends consumeData {
  InputStream input;

  receiveProduction(InputStream is) {
     super(is);
     this.input = is;
     }

   public boolean dataConsumption() {
     int i = 0;
     try {
       for (;;) {
          i = input.read();
          System.out.println("  " + i);
          }
       }
     catch (Exception e) {
       e.printStackTrace();
       }
     return true;
     }
   }

[testThread.java]
import java.io.*;

public class testThread {
  public static void main(String a[]){
    try {
       PipedOutputStream os = new PipedOutputStream();
       PipedInputStream is  = new PipedInputStream();
       os.connect(is);
       new sendProduction(os);
       new receiveProduction(is);
       }
    catch (Exception e) {}
    }
  }


Pipe the output of a thread to the input of other threads

The idea is to make the READ operation ATOMIC. Once the READING is started for one thread, it cannot be interrupted by another one.
   Pipeline Thread

      +----------+         +----------+
      | thread A |  ---- > | thread B |
      +----------+    |    +----------+
       (produce)      |     (consume)
                      |
                      |
                      |    +----------+
                      +- > | thread C |
                           +----------+
                            (consume)

The produceData.class and consumeData.class are the same as the previous JAVA How-to.
[AtomicInputStream.java]
import java.io.*;

public class AtomicInputStream extends PipedInputStream {
  int atom = 0;

  AtomicInputStream(int atom) {
    super();
    this.atom = atom;
    }
  public synchronized int atomicRead(byte[] x)	{
    try {
      read(x, 0, atom);
      }
    catch (Exception e) {
      e.printStackTrace();
      return -1;
      }
    return atom;
    }
  }

[sendProduction.java]
import java.io.*;

public class sendProduction extends produceData {
  OutputStream os;

  sendProduction(OutputStream os) {
  super(os);
  this.os = os;
  }

  public boolean dataProduction() {
    byte[] j = new byte[3];
    boolean done = false;
    j[0] = 0 ; j[1] = 1; j[2] = 2;
    java.util.Random r = new java.util.Random();
    while(!done) {
      try {
        os.write(j, 0, 3);
        }
      catch (Exception e) {
        e.printStackTrace();
        return true;
        }
      }
    return done;
    }
  }

[receiveProduction.java]
import java.io.*;

public class receiveProduction extends consumeData {
  AtomicInputStream as;

  receiveProduction(AtomicInputStream as) {
    super(as);
    this.as = as;
    }

  public boolean dataConsumption() {
    byte [] i = new byte[3];
    try {
      for (;;) {
        as.read(i);
        System.out.println
          (Thread.currentThread().getName()+": " +
           i[0] + " " + i[1] + " " + i[2]);
        }
      }
    catch (Exception e) {
      e.printStackTrace();
      }
    return true;
    }
  }

[testThread.java]
import java.io.*;

public class testThread {
   public static void main(String a[]){
     try {
       PipedOutputStream os = new PipedOutputStream();
       AtomicInputStream as = new AtomicInputStream(3);
       os.connect(as);
       new sendProduction(os);
       new receiveProduction(as);
       new receiveProduction(as);
       new receiveProduction(as);
       }
     catch (Exception e) {
       e.printStackTrace();
       }
     }
   }

That's OK for the situation One Producer and Many consumers.
To support many producers, simply create a AtomicOutputStream class with a synchronized atomicWrite method in it.

Wait the for the completion of a thread

[testThread.java] 
public class testThread implements Runnable {
  int i;
  testThread(int i) {
    super();
    this.i = i;
    }

  public void run() {
    for (int j=0; j < i; j++) {
      System.out.println
       (Thread.currentThread().getName() + " " + j);
      }
    System.out.println
     (Thread.currentThread().getName() + " FINISHED");
    }

  public static void main(String a[]) {
    try {
      testThread tt1 = new testThread(50);
      testThread tt2 = new testThread(75);
      Thread t1 = new Thread(tt1,"Test thread 1");
      Thread t2 = new Thread(tt2,"Test thread 2");
      t1.start();
      t2.start();
      t1.join();
      t2.join();
      System.out.println("Main FINISHED");
      }
    catch (Exception e) {
      e.printStackTrace();
      }
    }
  }