由于 BlockingQueue 的消费者(我认为是在另一个线程上),主线程挂起

Main thread hangs because of BlockingQueue's consumer (which I thought was on another thread)

我有一个阻塞测试(第一块代码)。我有几个元素可以一起工作。我有一个阻塞队列,我将事件放入其中,然后我有一个消费者将它们取走并将它们发送到 Amazon Kinesis。我很确定我的测试被阻塞了,因为队列阻塞了我的消费者,即使我认为它是 运行ning 在一个单独的线程上。

// Test.java
@Test
public void testWhileLoop() throws InterruptedException {
    ArrayBlockingQueue<Event> testQ = new ArrayBlockingQueue<Event>(1024);
    // mockKinesis is a mock at the class level.
    KPLPoster kpl = new KPLPoster("TestStream", mockKinesis, testQ);
    Event event = new Event("TestMessage", "TestPartition");
    ListenableFuture<UserRecordResult> fakeReturn = Mockito.mock(ListenableFuture.class);

    final AtomicInteger numberOfWhileLoops = new AtomicInteger();

    Mockito.doAnswer(invocation -> {
        numberOfWhileLoops.incrementAndGet();
        return fakeReturn;
    })
    .when(mockKinesis)
    .addUserRecord("TestStream", "TestPartition", ByteBuffer.wrap("TestMessage".getBytes()));

    kpl.run(); // Hangs here

    for(int i = 100; i > 0; i--){
        testQ.put(event);
    }

    kpl.stop();
    kpl = null;

    assert(numberOfWhileLoops.toString()).equals("100");
}

这是我的 KPLPoster 继承的 BaseKinesisPoster 的 运行 方法。需要注意的是,BaseKinesisPoster 实现了 Runnable 接口。

//BaseKinesisPoster.java
@Override
public void run() {
    shutdown = false;
    while (!shutdown && !(Thread.currentThread().isInterrupted())) {
        try {
            this.runOnce();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

最后,这是我的 KPLPoster(扩展了 BaseKinesisPoster)相关 runOnce() 方法的一部分。

// KPLPoster.java
@Override
protected void runOnce() throws Exception {
    Event event = inputQueue.take();
    //other stuff in my method
}

如何确保阻塞队列消费者不会阻塞我的 test/main 线程?

当你打电话时

Thread.run();

它调用调用的方法。没有什么特别的事情发生,方法是 运行 在当前线程中。

当你打电话时

Thread.start();

这将启动线程,该线程又会在该新线程中调用 运行()。

顺便说一句,Thread.stop() 会在 Java 中抛出一个 UnsupportedOperationException 8。你不应该使用它。你应该让它自然结束。