线程消耗队列,终止
Thread consuming queue, Terminate
我想知道下面的方法是否正确。
我有在公共 BlockingQueue 上工作的生产者和消费者线程。
生产者是一个嗅探器线程,所以它会自动停止,但我认为消费者会在生产者线程的状态(alive/dead)上循环终止。有什么建议么?谢谢
-)来自主线程:
ArrayBlockingQueue<PcapPacket> queue = new ArrayBlockingQueue<>();
Producer p = new Producer(queue);
Thread t1 =new Thread(p);
t1.start();
Consumer c = new Consumer(queue,t1);
new Thread(c).start();
-)制作人
public void run() {
public void nextPacket(PcapPacket packet, String user) {
try {
queue.put(packet);
} catch (InterruptedException ex) {
}
-) 消费者
public void run() {
while(producer.isAlive()){
try {
//Thread.sleep(50);
packet=queue.take();
轮询生产者的状态是次优的。
首选方法是让生产者在生产者退出期间将一些 'poison pill' 放入队列,并让消费者在收到该药丸后立即结束其循环:
class Producer implements Runnable {
static final Object TIME_TO_STOP = new Object();
private final BlockingQueue<Object> q;
Producer(BlockingQueue<Object> q) {
this.q = q;
}
@Override
public void run() {
try {
while (true) {
q.put(readNextPacket());
}
} finally {
// exception happened
try {
q.put(TIME_TO_STOP);
} catch (InterruptedException e) {
// somehow log failure to stop properly
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Object> q;
Consumer(BlockingQueue<Object> q) {
this.q = q;
}
@Override
public void run() {
while (true) {
Object packet = q.take();
if (packet == Producer.TIME_TO_STOP) {
break;
}
// process packet
}
}
}
我想知道下面的方法是否正确。 我有在公共 BlockingQueue 上工作的生产者和消费者线程。 生产者是一个嗅探器线程,所以它会自动停止,但我认为消费者会在生产者线程的状态(alive/dead)上循环终止。有什么建议么?谢谢
-)来自主线程:
ArrayBlockingQueue<PcapPacket> queue = new ArrayBlockingQueue<>();
Producer p = new Producer(queue);
Thread t1 =new Thread(p);
t1.start();
Consumer c = new Consumer(queue,t1);
new Thread(c).start();
-)制作人
public void run() {
public void nextPacket(PcapPacket packet, String user) {
try {
queue.put(packet);
} catch (InterruptedException ex) {
}
-) 消费者
public void run() {
while(producer.isAlive()){
try {
//Thread.sleep(50);
packet=queue.take();
轮询生产者的状态是次优的。
首选方法是让生产者在生产者退出期间将一些 'poison pill' 放入队列,并让消费者在收到该药丸后立即结束其循环:
class Producer implements Runnable {
static final Object TIME_TO_STOP = new Object();
private final BlockingQueue<Object> q;
Producer(BlockingQueue<Object> q) {
this.q = q;
}
@Override
public void run() {
try {
while (true) {
q.put(readNextPacket());
}
} finally {
// exception happened
try {
q.put(TIME_TO_STOP);
} catch (InterruptedException e) {
// somehow log failure to stop properly
}
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Object> q;
Consumer(BlockingQueue<Object> q) {
this.q = q;
}
@Override
public void run() {
while (true) {
Object packet = q.take();
if (packet == Producer.TIME_TO_STOP) {
break;
}
// process packet
}
}
}