RxJava2如何观察UDP包?
RxJava2 how to observe UDP packets?
我刚刚开始使用 RxJava2,想知道如何正确实现 UDP 可观察对象。
我已经有了一些工作代码,但我认为可能存在一些问题:请参阅下面源代码注释中的 4 个问题。
我还在 GitHub RxJava2_Udp 上发布了代码:欢迎评论、问题和拉取请求。
class UdpObservable {
private static class UdpThread extends Thread {
private final int portNo;
private final int bufferSizeInBytes;
private final ObservableEmitter<DatagramPacket> emitter;
private DatagramSocket udpSocket;
private UdpThread(@NonNull ObservableEmitter<DatagramPacket> emitter
, int portNo, int bufferSizeInBytes) {
this.emitter = emitter;
this.portNo = portNo;
this.bufferSizeInBytes = bufferSizeInBytes;
}
@Override
public void run() {
try {
// we don't want to create the DatagramSocket in the constructor, because this
// might raise an Exception that the observer wants to handle
udpSocket = new DatagramSocket(portNo);
try {
/* QUESTION 1:
Do I really need to check isInterrupted() and emitter.isDisposed()?
When the thread is interrupted an interrupted exception will
be raised anyway and the emitter is being disposed (this is what
caused the interruption)
*/
while (!isInterrupted() && !emitter.isDisposed()) {
byte[] rcvBuffer = new byte[bufferSizeInBytes];
DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
udpSocket.receive(datagramPacket);
// QUESTION 1a: same as QUESTION 1 above
if (!isInterrupted() && !emitter.isDisposed()) {
emitter.onNext(datagramPacket);
}
}
} finally {
closeUdpSocket();
}
} catch (Throwable th) {
// the thread will only be interrupted when the observer has unsubscribed:
// so we need not report it
if (!isInterrupted()) {
if (!emitter.isDisposed()) {
emitter.onError(th);
} else {
// QUESTION 2: is this the correct way to handle errors, when the emitter
// is already disposed?
RxJavaPlugins.onError(th);
}
}
}
}
private void closeUdpSocket() {
if (!udpSocket.isClosed()) {
udpSocket.close();
}
}
@Override
public void interrupt() {
super.interrupt();
// QUESTION 3: this is called from an external thread, right, so
// how can we correctly synchronize the access to udpSocket?
closeUdpSocket();
}
}
/**
* creates an Observable that will emit all UDP datagrams of a UDP port.
* <p>
* This will be an infinite stream that ends when the observer unsubscribes, or when an error
* occurs. The observer does not handle backpressure.
* </p>
*/
public static Observable<DatagramPacket> create(final int portNo, final int bufferSizeInBytes) {
return Observable.create(
new ObservableOnSubscribe<DatagramPacket>() {
@Override
public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
final UdpThread udpThread = new UdpThread(emitter, portNo, bufferSizeInBytes);
/* QUESTION 4: Is this the right way to handle unsubscription?
*/
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
udpThread.interrupt();
}
});
udpThread.start();
}
}
);
}
}
- 一般来说,我认为这不是创建它的正确方法,你不应该自己创建线程,因为 RxJava 和它
Schedulers
应该为你做。
考虑到根据您的 Scheduler
策略,在 ObservableOnSubscribe
处执行的代码将 运行 在一个线程中执行,因此您不需要自己构建它。只需在 create. 中执行 ude while-loop
- 你不需要调用
Thread.interrupt()
方法,RxJava 会在你处理(取消订阅)Observable
时为你完成。 (当然在 while 循环之前设置 cancelable
)
关于您的问题:
你不需要检查中断因为异常会
如果你正在等待 io 操作,你也不需要
检查处置情况,因为 onNext()
会为您完成并将
不发出取消订阅。
您可以再次调用 onError
,发射器将负责检查 Observable
是否已取消订阅。
- 前面说过,Thread应该是没有的,但是对于资源清理,可以使用
emitter.setCancellable
的方法。 (关闭流),这发生在您的代码 运行s. 的同一线程上
- 之前回答过,Thread.interrput() 会被 RxJava 用 dispose/unsubscribe 引发,资源清理应该去
emitter.setCancellable
方法
我刚刚开始使用 RxJava2,想知道如何正确实现 UDP 可观察对象。
我已经有了一些工作代码,但我认为可能存在一些问题:请参阅下面源代码注释中的 4 个问题。
我还在 GitHub RxJava2_Udp 上发布了代码:欢迎评论、问题和拉取请求。
class UdpObservable {
private static class UdpThread extends Thread {
private final int portNo;
private final int bufferSizeInBytes;
private final ObservableEmitter<DatagramPacket> emitter;
private DatagramSocket udpSocket;
private UdpThread(@NonNull ObservableEmitter<DatagramPacket> emitter
, int portNo, int bufferSizeInBytes) {
this.emitter = emitter;
this.portNo = portNo;
this.bufferSizeInBytes = bufferSizeInBytes;
}
@Override
public void run() {
try {
// we don't want to create the DatagramSocket in the constructor, because this
// might raise an Exception that the observer wants to handle
udpSocket = new DatagramSocket(portNo);
try {
/* QUESTION 1:
Do I really need to check isInterrupted() and emitter.isDisposed()?
When the thread is interrupted an interrupted exception will
be raised anyway and the emitter is being disposed (this is what
caused the interruption)
*/
while (!isInterrupted() && !emitter.isDisposed()) {
byte[] rcvBuffer = new byte[bufferSizeInBytes];
DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
udpSocket.receive(datagramPacket);
// QUESTION 1a: same as QUESTION 1 above
if (!isInterrupted() && !emitter.isDisposed()) {
emitter.onNext(datagramPacket);
}
}
} finally {
closeUdpSocket();
}
} catch (Throwable th) {
// the thread will only be interrupted when the observer has unsubscribed:
// so we need not report it
if (!isInterrupted()) {
if (!emitter.isDisposed()) {
emitter.onError(th);
} else {
// QUESTION 2: is this the correct way to handle errors, when the emitter
// is already disposed?
RxJavaPlugins.onError(th);
}
}
}
}
private void closeUdpSocket() {
if (!udpSocket.isClosed()) {
udpSocket.close();
}
}
@Override
public void interrupt() {
super.interrupt();
// QUESTION 3: this is called from an external thread, right, so
// how can we correctly synchronize the access to udpSocket?
closeUdpSocket();
}
}
/**
* creates an Observable that will emit all UDP datagrams of a UDP port.
* <p>
* This will be an infinite stream that ends when the observer unsubscribes, or when an error
* occurs. The observer does not handle backpressure.
* </p>
*/
public static Observable<DatagramPacket> create(final int portNo, final int bufferSizeInBytes) {
return Observable.create(
new ObservableOnSubscribe<DatagramPacket>() {
@Override
public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
final UdpThread udpThread = new UdpThread(emitter, portNo, bufferSizeInBytes);
/* QUESTION 4: Is this the right way to handle unsubscription?
*/
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
udpThread.interrupt();
}
});
udpThread.start();
}
}
);
}
}
- 一般来说,我认为这不是创建它的正确方法,你不应该自己创建线程,因为 RxJava 和它
Schedulers
应该为你做。
考虑到根据您的Scheduler
策略,在ObservableOnSubscribe
处执行的代码将 运行 在一个线程中执行,因此您不需要自己构建它。只需在 create. 中执行 ude while-loop
- 你不需要调用
Thread.interrupt()
方法,RxJava 会在你处理(取消订阅)Observable
时为你完成。 (当然在 while 循环之前设置cancelable
)
关于您的问题:
你不需要检查中断因为异常会 如果你正在等待 io 操作,你也不需要 检查处置情况,因为
onNext()
会为您完成并将 不发出取消订阅。您可以再次调用
onError
,发射器将负责检查Observable
是否已取消订阅。- 前面说过,Thread应该是没有的,但是对于资源清理,可以使用
emitter.setCancellable
的方法。 (关闭流),这发生在您的代码 运行s. 的同一线程上
- 之前回答过,Thread.interrput() 会被 RxJava 用 dispose/unsubscribe 引发,资源清理应该去
emitter.setCancellable
方法