如何使从线程抛出的未经检查的异常传播到主线程?
How can I make an unchecked exception thrown from a thread to be propagated to main thread?
我无法找到一种方法(无论是通过 SO 还是调试我的代码)允许从线程抛出的异常传播到主线程。我已经尝试使用 Thread.setUncaughtExceptionHandler()
和 CompletableFuture
:.exceptionally()
、.handle()
、...
要点是,使用这些机制(在我调试时),实际处理是在工作线程上执行的,而不是主线程 - 我无法设法将其发送到主线程。
总的来说,我正在编写一个测试,如果在工作线程中引发异常,它永远不会到达测试 运行 的主线程,从而使测试甚至通过当出现问题时。
我需要异常引发异步;我迫不及待地等待未来完成,因为我需要立即 return 一个没有阻塞的流(一个 PipedStream),从主要部分开始。
我得到的唯一提示是控制台错误(并且仅当我使用传统的 Thread
+ uncaughtExceptionHandler
方法时,当我尝试使用 CompletableFuture
时根本没有日志):
Exception: com.example.exception.MyException thrown from the UncaughtExceptionHandler in thread "Thread-5"
或者如果我没有定义异常处理程序的话:
Exception in thread "Thread-5" com.example.MyException: Exception message
我提供一些代码:
try {
@SuppressWarnings("squid:S2095") final PipedOutputStream pos = new PipedOutputStream();
final PipedInputStream pis = new PipedInputStream(pos);
CompletableFuture.runAsync(pipeDecryptorRunnable(inputStream, pos));
return pis;
} catch (IOException e) {
throw new CryptographyException(e.getMessage(), e);
}
里面pipeDecryptorRunnable
是一个解密数据的CipherStream。那里抛出异常。但是我无法在主线程中捕获它并且变得不可见。来自此方法的 pis
stream returning 用于读取,工作线程在数据被读取时即时解密数据 pis
.
编辑:
UncaughtExceptionHandler
正如类似问题中的答案所暗示的那样,不适用于我的场景,因为处理程序代码是由工作线程调用的,而不是主线程。
感谢@RalfKleberhoff 的提示,我找到了我正在寻找的解决方案。事实上,要实现所需的行为,需要线程间通信机制。鉴于我已经在使用 PipedStream
s,我可以利用它来实现目标——我还考虑了某种涉及事件总线的解决方案,以从一个线程到另一个线程发送信号或进行通信(main thread/worker线程)我认为一些事件总线库也可以实现它。
所以回到管道流,我有一个 pis
在主线程中读取,它连接 pos
在工作线程中写入。因此,当 worker 中出现异常时,我需要主线程注意到这一点。
要实现这一点,您可以扩展 PipedOutputStream
class,添加一个方法以在发生异常时向连接的管道发出信号。同样,您需要扩展已连接的 PipedInputStream
以在异常时发出信号,存储异常并重写读取方法以检查是否首先发生异常,在这种情况下,抛出包裹在IOException
读取方法。
这里是代码:
/**
* This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
* thread to reach the reading thread.
*
* @author Gerard on 10/11/2017.
*/
public class ExceptionAwarePipedOutputStream extends PipedOutputStream {
private final ExceptionAwarePipedInputStream sink;
public ExceptionAwarePipedOutputStream(ExceptionAwarePipedInputStream sink) throws IOException {
super(sink);
this.sink = sink;
}
/**
* Signals connected {@link ExceptionAwarePipedInputStream} that an exception ocurred allowing to propagate it
* across respective threads. This works as inter thread communication mechanism. So it allows to the reading thread
* notice that an exception was thrown in the writing thread.
*
* @param exception The exception to propagate.
*/
public void signalException(Throwable exception) {
sink.signalException(exception);
}
}
·
/**
* This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
* thread to reach the reading thread.
*
* @author Gerard on 10/11/2017.
*/
public class ExceptionAwarePipedInputStream extends PipedInputStream {
private volatile Throwable exception;
void signalException(Throwable exception) {
this.exception = exception;
}
@Override
public int read(byte[] b) throws IOException {
final int read = super.read(b);
checkException();
return read;
}
@Override
public synchronized int read() throws IOException {
final int read = super.read();
checkException();
return read;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
final int read = super.read(b, off, len);
checkException();
return read;
}
private void checkException() throws IOException {
if (exception != null) {
throw new IOException(exception.getMessage(), exception);
}
}
}
客户代码:
public InputStream decrypt(InputStream inputStream) {
assert supportedStreamModes.contains(mode) : "Unsupported cipher mode for stream decryption " + mode;
@SuppressWarnings("squid:S2095") final ExceptionAwarePipedInputStream pis = new ExceptionAwarePipedInputStream();
final ExceptionAwarePipedOutputStream pos = newConnectedPipedOutputStream(pis);
final Cipher decryptor = newDecryptorInitialized(inputStream);
CompletableFuture.runAsync(
pipeDecryptorRunnable(inputStream, pos, decryptor));
return pis;
}
private ExceptionAwarePipedOutputStream newConnectedPipedOutputStream(ExceptionAwarePipedInputStream pis) {
try {
return new ExceptionAwarePipedOutputStream(pis);
} catch (IOException e) {
throw new CryptographyException(e.getMessage(), e);
}
}
以及异常处理部分(通知线程信号):
private Runnable pipeDecryptorRunnable(InputStream inputStream, ExceptionAwarePipedOutputStream pos, Cipher decryptor) {
return () -> {
try {
// do stuff... and write to pos
} catch (Exception e) {
// Signaling any (checked or unchecked) exception
pos.signalException(new CryptographyException(e.getMessage(), e));
} finally {
closePipedStream(pos);
}
};
}
我无法找到一种方法(无论是通过 SO 还是调试我的代码)允许从线程抛出的异常传播到主线程。我已经尝试使用 Thread.setUncaughtExceptionHandler()
和 CompletableFuture
:.exceptionally()
、.handle()
、...
要点是,使用这些机制(在我调试时),实际处理是在工作线程上执行的,而不是主线程 - 我无法设法将其发送到主线程。
总的来说,我正在编写一个测试,如果在工作线程中引发异常,它永远不会到达测试 运行 的主线程,从而使测试甚至通过当出现问题时。
我需要异常引发异步;我迫不及待地等待未来完成,因为我需要立即 return 一个没有阻塞的流(一个 PipedStream),从主要部分开始。
我得到的唯一提示是控制台错误(并且仅当我使用传统的 Thread
+ uncaughtExceptionHandler
方法时,当我尝试使用 CompletableFuture
时根本没有日志):
Exception: com.example.exception.MyException thrown from the UncaughtExceptionHandler in thread "Thread-5"
或者如果我没有定义异常处理程序的话:
Exception in thread "Thread-5" com.example.MyException: Exception message
我提供一些代码:
try {
@SuppressWarnings("squid:S2095") final PipedOutputStream pos = new PipedOutputStream();
final PipedInputStream pis = new PipedInputStream(pos);
CompletableFuture.runAsync(pipeDecryptorRunnable(inputStream, pos));
return pis;
} catch (IOException e) {
throw new CryptographyException(e.getMessage(), e);
}
里面pipeDecryptorRunnable
是一个解密数据的CipherStream。那里抛出异常。但是我无法在主线程中捕获它并且变得不可见。来自此方法的 pis
stream returning 用于读取,工作线程在数据被读取时即时解密数据 pis
.
编辑:
UncaughtExceptionHandler
正如类似问题中的答案所暗示的那样,不适用于我的场景,因为处理程序代码是由工作线程调用的,而不是主线程。
感谢@RalfKleberhoff 的提示,我找到了我正在寻找的解决方案。事实上,要实现所需的行为,需要线程间通信机制。鉴于我已经在使用 PipedStream
s,我可以利用它来实现目标——我还考虑了某种涉及事件总线的解决方案,以从一个线程到另一个线程发送信号或进行通信(main thread/worker线程)我认为一些事件总线库也可以实现它。
所以回到管道流,我有一个 pis
在主线程中读取,它连接 pos
在工作线程中写入。因此,当 worker 中出现异常时,我需要主线程注意到这一点。
要实现这一点,您可以扩展 PipedOutputStream
class,添加一个方法以在发生异常时向连接的管道发出信号。同样,您需要扩展已连接的 PipedInputStream
以在异常时发出信号,存储异常并重写读取方法以检查是否首先发生异常,在这种情况下,抛出包裹在IOException
读取方法。
这里是代码:
/**
* This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
* thread to reach the reading thread.
*
* @author Gerard on 10/11/2017.
*/
public class ExceptionAwarePipedOutputStream extends PipedOutputStream {
private final ExceptionAwarePipedInputStream sink;
public ExceptionAwarePipedOutputStream(ExceptionAwarePipedInputStream sink) throws IOException {
super(sink);
this.sink = sink;
}
/**
* Signals connected {@link ExceptionAwarePipedInputStream} that an exception ocurred allowing to propagate it
* across respective threads. This works as inter thread communication mechanism. So it allows to the reading thread
* notice that an exception was thrown in the writing thread.
*
* @param exception The exception to propagate.
*/
public void signalException(Throwable exception) {
sink.signalException(exception);
}
}
·
/**
* This piped stream class allows to signal Exception between threads, allowing an exception produced in the writing
* thread to reach the reading thread.
*
* @author Gerard on 10/11/2017.
*/
public class ExceptionAwarePipedInputStream extends PipedInputStream {
private volatile Throwable exception;
void signalException(Throwable exception) {
this.exception = exception;
}
@Override
public int read(byte[] b) throws IOException {
final int read = super.read(b);
checkException();
return read;
}
@Override
public synchronized int read() throws IOException {
final int read = super.read();
checkException();
return read;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
final int read = super.read(b, off, len);
checkException();
return read;
}
private void checkException() throws IOException {
if (exception != null) {
throw new IOException(exception.getMessage(), exception);
}
}
}
客户代码:
public InputStream decrypt(InputStream inputStream) {
assert supportedStreamModes.contains(mode) : "Unsupported cipher mode for stream decryption " + mode;
@SuppressWarnings("squid:S2095") final ExceptionAwarePipedInputStream pis = new ExceptionAwarePipedInputStream();
final ExceptionAwarePipedOutputStream pos = newConnectedPipedOutputStream(pis);
final Cipher decryptor = newDecryptorInitialized(inputStream);
CompletableFuture.runAsync(
pipeDecryptorRunnable(inputStream, pos, decryptor));
return pis;
}
private ExceptionAwarePipedOutputStream newConnectedPipedOutputStream(ExceptionAwarePipedInputStream pis) {
try {
return new ExceptionAwarePipedOutputStream(pis);
} catch (IOException e) {
throw new CryptographyException(e.getMessage(), e);
}
}
以及异常处理部分(通知线程信号):
private Runnable pipeDecryptorRunnable(InputStream inputStream, ExceptionAwarePipedOutputStream pos, Cipher decryptor) {
return () -> {
try {
// do stuff... and write to pos
} catch (Exception e) {
// Signaling any (checked or unchecked) exception
pos.signalException(new CryptographyException(e.getMessage(), e));
} finally {
closePipedStream(pos);
}
};
}