RxJava:如何在取消订阅时中断线程?
RxJava: How to interrupt thread on unsubscribe?
我使用 Observable.create()
创建一个可观察对象以在调度程序上执行一些工作(例如 Schedulers.io()
然后 return AndroidSchedulers.mainThread()
上的结果。
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
action.invoke()
里面的操作是同步的,可能是阻塞IO操作。当用户决定取消它时,我取消订阅可观察的:subscription.unsubscribe()
但是,I/O 操作没有被中断。有没有rx-javaAPI中断运行?
据我所知,使用 RxJava 是不可能的。
您可以在可观察对象之外保留对 action
对象的引用,并在 unsubscribe
调用时中断它。
当您调用yourSubscription.unsubscribe();
时,Rx会调用您的退订代码。
此取消订阅代码将是 Subscription
class,当您创建 Observable
.[=18= 时,您可以 add
到 subscriber
]
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
所以在 unsubscribe
方法中,如果有办法的话,你可以中断你的工作。
请注意,当您取消订阅一个 Observable 或一个 Observable 完成时,将调用取消订阅方法。 (它会自行退订)
编辑 : 考虑vladimir mironov评论
RxJava 调度程序使用 Furure.cancel(true) 来中断 运行 操作,因此如果您有中断敏感操作(例如 Thread.sleep()
),您应该会看到线程中断,尽管您对此无能为力,因为此时整个下游链已被取消订阅,并且没有更多事件可以到达您的订阅者。
如果您的操作对中断不敏感并且没有提供任何中断它的标准方法(例如在套接字上调用 close()
,或在 [=21= 上调用 cancel()
] 声明),RxJava 对此无能为力。
如果您可以异步调用一些 close() 方法,请考虑使用 using()
,它让您指定一个清理回调,可以在取消订阅时关闭您的资源。
我使用 Observable.create()
创建一个可观察对象以在调度程序上执行一些工作(例如 Schedulers.io()
然后 return AndroidSchedulers.mainThread()
上的结果。
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
action.invoke()
里面的操作是同步的,可能是阻塞IO操作。当用户决定取消它时,我取消订阅可观察的:subscription.unsubscribe()
但是,I/O 操作没有被中断。有没有rx-javaAPI中断运行?
据我所知,使用 RxJava 是不可能的。
您可以在可观察对象之外保留对 action
对象的引用,并在 unsubscribe
调用时中断它。
当您调用yourSubscription.unsubscribe();
时,Rx会调用您的退订代码。
此取消订阅代码将是 Subscription
class,当您创建 Observable
.[=18= 时,您可以 add
到 subscriber
]
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
所以在 unsubscribe
方法中,如果有办法的话,你可以中断你的工作。
请注意,当您取消订阅一个 Observable 或一个 Observable 完成时,将调用取消订阅方法。 (它会自行退订)
编辑 : 考虑vladimir mironov评论
RxJava 调度程序使用 Furure.cancel(true) 来中断 运行 操作,因此如果您有中断敏感操作(例如 Thread.sleep()
),您应该会看到线程中断,尽管您对此无能为力,因为此时整个下游链已被取消订阅,并且没有更多事件可以到达您的订阅者。
如果您的操作对中断不敏感并且没有提供任何中断它的标准方法(例如在套接字上调用 close()
,或在 [=21= 上调用 cancel()
] 声明),RxJava 对此无能为力。
如果您可以异步调用一些 close() 方法,请考虑使用 using()
,它让您指定一个清理回调,可以在取消订阅时关闭您的资源。