Pipeline Thread
+----------+ +----------+
| thread A | --- > | thread B |
+----------+ +----------+
(produce) (consume)
[produceData.java]
import java.io.*;
public abstract class produceData implements Runnable {
OutputStream os;
public produceData(OutputStream os) {
this.os = os;
Thread t = new Thread(this);
t.start();
}
public abstract boolean dataProduction();
public void run() {
while(dataProduction()) ;
}
}
[consumeData.java]
import java.io.*;
public abstract class consumeData implements Runnable {
InputStream is;
public consumeData(InputStream is) {
this.is = is;
Thread t = new Thread(this);
t.start();
}
public abstract boolean dataConsumption();
public void run(){
while(dataConsumption());
}
}
|
[sendProduction.java]
import java.io.*;
public class sendProduction extends produceData {
OutputStream output;
makeProduction(OutputStream os) {
super(os);
this.output = os;
}
public boolean dataProduction() {
byte[] j = new byte[1];
boolean done = false;
java.util.Random r = new java.util.Random();
while(!done) {
try {
j[0] = (byte)(Math.abs(r.nextInt()) % 255);
System.out.print(".");
output.write(j);
}
catch (Exception e) {
e.printStackTrace();
return true;
}
}
return done;
}
}
|
[receiveProduction.java]
import java.io.*;
public class receiveProduction extends consumeData {
InputStream input;
receiveProduction(InputStream is) {
super(is);
this.input = is;
}
public boolean dataConsumption() {
int i = 0;
try {
for (;;) {
i = input.read();
System.out.println(" " + i);
}
}
catch (Exception e) {
e.printStackTrace();
}
return true;
}
}
[testThread.java]
import java.io.*;
public class testThread {
public static void main(String a[]){
try {
PipedOutputStream os = new PipedOutputStream();
PipedInputStream is = new PipedInputStream();
os.connect(is);
new sendProduction(os);
new receiveProduction(is);
}
catch (Exception e) {}
}
}
|
Pipeline Thread
+----------+ +----------+
| thread A | ---- > | thread B |
+----------+ | +----------+
(produce) | (consume)
|
|
| +----------+
+- > | thread C |
+----------+
(consume)
|
[AtomicInputStream.java]
import java.io.*;
public class AtomicInputStream extends PipedInputStream {
int atom = 0;
AtomicInputStream(int atom) {
super();
this.atom = atom;
}
public synchronized int atomicRead(byte[] x) {
try {
read(x, 0, atom);
}
catch (Exception e) {
e.printStackTrace();
return -1;
}
return atom;
}
}
[sendProduction.java]
import java.io.*;
public class sendProduction extends produceData {
OutputStream os;
sendProduction(OutputStream os) {
super(os);
this.os = os;
}
public boolean dataProduction() {
byte[] j = new byte[3];
boolean done = false;
j[0] = 0 ; j[1] = 1; j[2] = 2;
java.util.Random r = new java.util.Random();
while(!done) {
try {
os.write(j, 0, 3);
}
catch (Exception e) {
e.printStackTrace();
return true;
}
}
return done;
}
}
[receiveProduction.java]
import java.io.*;
public class receiveProduction extends consumeData {
AtomicInputStream as;
receiveProduction(AtomicInputStream as) {
super(as);
this.as = as;
}
public boolean dataConsumption() {
byte [] i = new byte[3];
try {
for (;;) {
as.read(i);
System.out.println
(Thread.currentThread().getName()+": " +
i[0] + " " + i[1] + " " + i[2]);
}
}
catch (Exception e) {
e.printStackTrace();
}
return true;
}
}
[testThread.java]
import java.io.*;
public class testThread {
public static void main(String a[]){
try {
PipedOutputStream os = new PipedOutputStream();
AtomicInputStream as = new AtomicInputStream(3);
os.connect(as);
new sendProduction(os);
new receiveProduction(as);
new receiveProduction(as);
new receiveProduction(as);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
|
[testThread.java]
public class testThread implements Runnable {
int i;
testThread(int i) {
super();
this.i = i;
}
public void run() {
for (int j=0; j < i; j++) {
System.out.println
(Thread.currentThread().getName() + " " + j);
}
System.out.println
(Thread.currentThread().getName() + " FINISHED");
}
public static void main(String a[]) {
try {
testThread tt1 = new testThread(50);
testThread tt2 = new testThread(75);
Thread t1 = new Thread(tt1,"Test thread 1");
Thread t2 = new Thread(tt2,"Test thread 2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Main FINISHED");
}
catch (Exception e) {
e.printStackTrace();
}
}
}
|