由于 BlockingQueue 的消费者(我认为是在另一个线程上),主线程挂起
Main thread hangs because of BlockingQueue's consumer (which I thought was on another thread)
我有一个阻塞测试(第一块代码)。我有几个元素可以一起工作。我有一个阻塞队列,我将事件放入其中,然后我有一个消费者将它们取走并将它们发送到 Amazon Kinesis。我很确定我的测试被阻塞了,因为队列阻塞了我的消费者,即使我认为它是 运行ning 在一个单独的线程上。
// Test.java
@Test
public void testWhileLoop() throws InterruptedException {
ArrayBlockingQueue<Event> testQ = new ArrayBlockingQueue<Event>(1024);
// mockKinesis is a mock at the class level.
KPLPoster kpl = new KPLPoster("TestStream", mockKinesis, testQ);
Event event = new Event("TestMessage", "TestPartition");
ListenableFuture<UserRecordResult> fakeReturn = Mockito.mock(ListenableFuture.class);
final AtomicInteger numberOfWhileLoops = new AtomicInteger();
Mockito.doAnswer(invocation -> {
numberOfWhileLoops.incrementAndGet();
return fakeReturn;
})
.when(mockKinesis)
.addUserRecord("TestStream", "TestPartition", ByteBuffer.wrap("TestMessage".getBytes()));
kpl.run(); // Hangs here
for(int i = 100; i > 0; i--){
testQ.put(event);
}
kpl.stop();
kpl = null;
assert(numberOfWhileLoops.toString()).equals("100");
}
这是我的 KPLPoster 继承的 BaseKinesisPoster 的 运行 方法。需要注意的是,BaseKinesisPoster 实现了 Runnable 接口。
//BaseKinesisPoster.java
@Override
public void run() {
shutdown = false;
while (!shutdown && !(Thread.currentThread().isInterrupted())) {
try {
this.runOnce();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (Exception e){
e.printStackTrace();
}
}
}
最后,这是我的 KPLPoster(扩展了 BaseKinesisPoster)相关 runOnce()
方法的一部分。
// KPLPoster.java
@Override
protected void runOnce() throws Exception {
Event event = inputQueue.take();
//other stuff in my method
}
如何确保阻塞队列消费者不会阻塞我的 test/main 线程?
当你打电话时
Thread.run();
它调用调用的方法。没有什么特别的事情发生,方法是 运行 在当前线程中。
当你打电话时
Thread.start();
这将启动线程,该线程又会在该新线程中调用 运行()。
顺便说一句,Thread.stop()
会在 Java 中抛出一个 UnsupportedOperationException
8。你不应该使用它。你应该让它自然结束。
我有一个阻塞测试(第一块代码)。我有几个元素可以一起工作。我有一个阻塞队列,我将事件放入其中,然后我有一个消费者将它们取走并将它们发送到 Amazon Kinesis。我很确定我的测试被阻塞了,因为队列阻塞了我的消费者,即使我认为它是 运行ning 在一个单独的线程上。
// Test.java
@Test
public void testWhileLoop() throws InterruptedException {
ArrayBlockingQueue<Event> testQ = new ArrayBlockingQueue<Event>(1024);
// mockKinesis is a mock at the class level.
KPLPoster kpl = new KPLPoster("TestStream", mockKinesis, testQ);
Event event = new Event("TestMessage", "TestPartition");
ListenableFuture<UserRecordResult> fakeReturn = Mockito.mock(ListenableFuture.class);
final AtomicInteger numberOfWhileLoops = new AtomicInteger();
Mockito.doAnswer(invocation -> {
numberOfWhileLoops.incrementAndGet();
return fakeReturn;
})
.when(mockKinesis)
.addUserRecord("TestStream", "TestPartition", ByteBuffer.wrap("TestMessage".getBytes()));
kpl.run(); // Hangs here
for(int i = 100; i > 0; i--){
testQ.put(event);
}
kpl.stop();
kpl = null;
assert(numberOfWhileLoops.toString()).equals("100");
}
这是我的 KPLPoster 继承的 BaseKinesisPoster 的 运行 方法。需要注意的是,BaseKinesisPoster 实现了 Runnable 接口。
//BaseKinesisPoster.java
@Override
public void run() {
shutdown = false;
while (!shutdown && !(Thread.currentThread().isInterrupted())) {
try {
this.runOnce();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (Exception e){
e.printStackTrace();
}
}
}
最后,这是我的 KPLPoster(扩展了 BaseKinesisPoster)相关 runOnce()
方法的一部分。
// KPLPoster.java
@Override
protected void runOnce() throws Exception {
Event event = inputQueue.take();
//other stuff in my method
}
如何确保阻塞队列消费者不会阻塞我的 test/main 线程?
当你打电话时
Thread.run();
它调用调用的方法。没有什么特别的事情发生,方法是 运行 在当前线程中。
当你打电话时
Thread.start();
这将启动线程,该线程又会在该新线程中调用 运行()。
顺便说一句,Thread.stop()
会在 Java 中抛出一个 UnsupportedOperationException
8。你不应该使用它。你应该让它自然结束。