Pipeline Thread +----------+ +----------+ | thread A | --- > | thread B | +----------+ +----------+ (produce) (consume) [produceData.java] import java.io.*; public abstract class produceData implements Runnable { OutputStream os; public produceData(OutputStream os) { this.os = os; Thread t = new Thread(this); t.start(); } public abstract boolean dataProduction(); public void run() { while(dataProduction()) ; } } [consumeData.java] import java.io.*; public abstract class consumeData implements Runnable { InputStream is; public consumeData(InputStream is) { this.is = is; Thread t = new Thread(this); t.start(); } public abstract boolean dataConsumption(); public void run(){ while(dataConsumption()); } } |
[sendProduction.java] import java.io.*; public class sendProduction extends produceData { OutputStream output; makeProduction(OutputStream os) { super(os); this.output = os; } public boolean dataProduction() { byte[] j = new byte[1]; boolean done = false; java.util.Random r = new java.util.Random(); while(!done) { try { j[0] = (byte)(Math.abs(r.nextInt()) % 255); System.out.print("."); output.write(j); } catch (Exception e) { e.printStackTrace(); return true; } } return done; } } |
[receiveProduction.java] import java.io.*; public class receiveProduction extends consumeData { InputStream input; receiveProduction(InputStream is) { super(is); this.input = is; } public boolean dataConsumption() { int i = 0; try { for (;;) { i = input.read(); System.out.println(" " + i); } } catch (Exception e) { e.printStackTrace(); } return true; } } [testThread.java] import java.io.*; public class testThread { public static void main(String a[]){ try { PipedOutputStream os = new PipedOutputStream(); PipedInputStream is = new PipedInputStream(); os.connect(is); new sendProduction(os); new receiveProduction(is); } catch (Exception e) {} } } |
Pipeline Thread +----------+ +----------+ | thread A | ---- > | thread B | +----------+ | +----------+ (produce) | (consume) | | | +----------+ +- > | thread C | +----------+ (consume) |
[AtomicInputStream.java] import java.io.*; public class AtomicInputStream extends PipedInputStream { int atom = 0; AtomicInputStream(int atom) { super(); this.atom = atom; } public synchronized int atomicRead(byte[] x) { try { read(x, 0, atom); } catch (Exception e) { e.printStackTrace(); return -1; } return atom; } } [sendProduction.java] import java.io.*; public class sendProduction extends produceData { OutputStream os; sendProduction(OutputStream os) { super(os); this.os = os; } public boolean dataProduction() { byte[] j = new byte[3]; boolean done = false; j[0] = 0 ; j[1] = 1; j[2] = 2; java.util.Random r = new java.util.Random(); while(!done) { try { os.write(j, 0, 3); } catch (Exception e) { e.printStackTrace(); return true; } } return done; } } [receiveProduction.java] import java.io.*; public class receiveProduction extends consumeData { AtomicInputStream as; receiveProduction(AtomicInputStream as) { super(as); this.as = as; } public boolean dataConsumption() { byte [] i = new byte[3]; try { for (;;) { as.read(i); System.out.println (Thread.currentThread().getName()+": " + i[0] + " " + i[1] + " " + i[2]); } } catch (Exception e) { e.printStackTrace(); } return true; } } [testThread.java] import java.io.*; public class testThread { public static void main(String a[]){ try { PipedOutputStream os = new PipedOutputStream(); AtomicInputStream as = new AtomicInputStream(3); os.connect(as); new sendProduction(os); new receiveProduction(as); new receiveProduction(as); new receiveProduction(as); } catch (Exception e) { e.printStackTrace(); } } } |
[testThread.java] public class testThread implements Runnable { int i; testThread(int i) { super(); this.i = i; } public void run() { for (int j=0; j < i; j++) { System.out.println (Thread.currentThread().getName() + " " + j); } System.out.println (Thread.currentThread().getName() + " FINISHED"); } public static void main(String a[]) { try { testThread tt1 = new testThread(50); testThread tt2 = new testThread(75); Thread t1 = new Thread(tt1,"Test thread 1"); Thread t2 = new Thread(tt2,"Test thread 2"); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("Main FINISHED"); } catch (Exception e) { e.printStackTrace(); } } } |