如何使从线程抛出的未经检查的异常传播到主线程?

How can I make an unchecked exception thrown from a thread to be propagated to main thread?

我无法找到一种方法(无论是通过 SO 还是调试我的代码)允许从线程抛出的异常传播到主线程。我已经尝试使用 Thread.setUncaughtExceptionHandler()CompletableFuture.exceptionally().handle()、...

要点是,使用这些机制(在我调试时),实际处理是在工作线程上执行的,而不是主线程 - 我无法设法将其发送到主线程。

总的来说,我正在编写一个测试,如果在工作线程中引发异常,它永远不会到达测试 运行 的主线程,从而使测试甚至通过当出现问题时。

我需要异常引发异步;我迫不及待地等待未来完成,因为我需要立即 return 一个没有阻塞的流(一个 PipedStream),从主要部分开始。

我得到的唯一提示是控制台错误(并且仅当我使用传统的 Thread + uncaughtExceptionHandler 方法时,当我尝试使用 CompletableFuture 时根本没有日志):

Exception: com.example.exception.MyException thrown from the UncaughtExceptionHandler in thread "Thread-5"

或者如果我没有定义异常处理程序的话:

Exception in thread "Thread-5" com.example.MyException: Exception message

我提供一些代码:

try {
    @SuppressWarnings("squid:S2095") final PipedOutputStream pos = new PipedOutputStream();
    final PipedInputStream pis = new PipedInputStream(pos);

    CompletableFuture.runAsync(pipeDecryptorRunnable(inputStream, pos));

    return pis;

} catch (IOException e) {
    throw new CryptographyException(e.getMessage(), e);
}

里面pipeDecryptorRunnable是一个解密数据的CipherStream。那里抛出异常。但是我无法在主线程中捕获它并且变得不可见。来自此方法的 pisstream returning 用于读取,工作线程在数据被读取时即时解密数据 pis.

编辑: UncaughtExceptionHandler 正如类似问题中的答案所暗示的那样,不适用于我的场景,因为处理程序代码是由工作线程调用的,而不是主线程。

感谢@RalfKleberhoff 的提示,我找到了我正在寻找的解决方案。事实上,要实现所需的行为,需要线程间通信机制。鉴于我已经在使用 PipedStreams,我可以利用它来实现目标——我还考虑了某种涉及事件总线的解决方案,以从一个线程到另一个线程发送信号或进行通信(main thread/worker线程)我认为一些事件总线库也可以实现它。

所以回到管道流,我有一个 pis 在主线程中读取,它连接 pos 在工作线程中写入。因此,当 worker 中出现异常时,我需要主线程注意到这一点。

要实现这一点,您可以扩展 PipedOutputStream class,添加一个方法以在发生异常时向连接的管道发出信号。同样,您需要扩展已连接的 PipedInputStream 以在异常时发出信号,存储异常并重写读取方法以检查是否首先发生异常,在这种情况下,抛出包裹在IOException 读取方法。

这里是代码:

/**
 * This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
 * thread to reach the reading thread.
 *
 * @author Gerard on 10/11/2017.
 */
public class ExceptionAwarePipedOutputStream extends PipedOutputStream {

    private final ExceptionAwarePipedInputStream sink;

    public ExceptionAwarePipedOutputStream(ExceptionAwarePipedInputStream sink) throws IOException {
        super(sink);
        this.sink = sink;
    }

    /**
     * Signals connected {@link ExceptionAwarePipedInputStream} that an exception ocurred allowing to propagate it
     * across respective threads. This works as inter thread communication mechanism. So it allows to the reading thread
     * notice that an exception was thrown in the writing thread.
     *
     * @param exception The exception to propagate.
     */
    public void signalException(Throwable exception) {
        sink.signalException(exception);
    }
}

·

/**
 * This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
 * thread to reach the reading thread.
 *
 * @author Gerard on 10/11/2017.
 */
public class ExceptionAwarePipedInputStream extends PipedInputStream {

    private volatile Throwable exception;

    void signalException(Throwable exception) {
        this.exception = exception;
    }

    @Override
    public int read(byte[] b) throws IOException {
        final int read = super.read(b);
        checkException();
        return read;
    }

    @Override
    public synchronized int read() throws IOException {
        final int read = super.read();
        checkException();
        return read;
    }

    @Override
    public synchronized int read(byte[] b, int off, int len) throws IOException {
        final int read = super.read(b, off, len);
        checkException();
        return read;
    }

    private void checkException() throws IOException {
        if (exception != null) {
            throw new IOException(exception.getMessage(), exception);
        }
    }
}

客户代码:

public InputStream decrypt(InputStream inputStream) {

    assert supportedStreamModes.contains(mode) : "Unsupported cipher mode for stream decryption " + mode;

    @SuppressWarnings("squid:S2095") final ExceptionAwarePipedInputStream pis = new ExceptionAwarePipedInputStream();
    final ExceptionAwarePipedOutputStream pos = newConnectedPipedOutputStream(pis);
    final Cipher decryptor = newDecryptorInitialized(inputStream);

    CompletableFuture.runAsync(
        pipeDecryptorRunnable(inputStream, pos, decryptor));

    return pis;
}

private ExceptionAwarePipedOutputStream newConnectedPipedOutputStream(ExceptionAwarePipedInputStream pis) {
    try {
        return new ExceptionAwarePipedOutputStream(pis);
    } catch (IOException e) {
        throw new CryptographyException(e.getMessage(), e);
    }
}

以及异常处理部分(通知线程信号):

private Runnable pipeDecryptorRunnable(InputStream inputStream, ExceptionAwarePipedOutputStream pos, Cipher decryptor) {
    return () -> {
        try {

            // do stuff... and write to pos

        } catch (Exception e) {
            // Signaling any (checked or unchecked) exception
            pos.signalException(new CryptographyException(e.getMessage(), e));
        } finally {
            closePipedStream(pos);
        }
    };
}