repeatWhen() 没有完成
repeatWhen() does not complete
我 运行 遇到了让 repeatWhen()
与 Completable
一起工作的问题。下面的代码块正在工作。订阅的 Action
在重复后结束 运行。
completable
.subscribeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.repeat(2)
.observeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.subscribe(new Action() {
@Override
public void run() throws Exception {
//runs
}
}
但是,当我尝试以下操作时
completable
.subscribeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.repeatWhen(new MyRetry(2))
.observeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.subscribe(new Action() {
@Override
public void run() throws Exception {
//doesn't run
}
}
MyRetry
定义如下:
class MyRetry implements Function<Flowable<Object>, Publisher<?>> {
private final int maxRetryCount;
private int retryCount;
/**
* @param maxRetryCount The number of times to repeat
*/
public MyRetry(final int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
retryCount = 0;
}
@Override
public Publisher<?> apply(final Flowable<Object> objectFlowable) throws Exception {
return objectFlowable.flatMap(new Function<Object, Publisher<?>>() {
@Override
public Publisher<?> apply(final Object o) throws Exception {
if (retryCount < maxRetryCount) {
retryCount++;
return Flowable.just(o);
} else {
return Flowable.empty();
}
}
});
}
}
订阅的Action
在重复完成后不再运行,但是Completable
的原始订阅重复了两次。这让我相信我返回 Flowable.empty()
可能不正确,但我找不到关于这个主题的信息,而且我对 RxJava 还很陌生。我也有基础设施限制,让我仍然停留在 Java 7 in Android.
如果您在 MyRetry
class:
中将 flatMap()
更改为 takeUntil()
,初始订阅者将收到 onCompleted()
事件
@Override
public Publisher<?> apply(final Flowable<Object> objectFlowable) throws Exception {
return objectFlowable.takeWhile(o -> retryCount++ < maxRetryCount);
}
此问题已在 this 中讨论
线程.
我 运行 遇到了让 repeatWhen()
与 Completable
一起工作的问题。下面的代码块正在工作。订阅的 Action
在重复后结束 运行。
completable
.subscribeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.repeat(2)
.observeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.subscribe(new Action() {
@Override
public void run() throws Exception {
//runs
}
}
但是,当我尝试以下操作时
completable
.subscribeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.repeatWhen(new MyRetry(2))
.observeOn(Schedulers.trampoline()) //for debug purposes; have also omitted completely
.subscribe(new Action() {
@Override
public void run() throws Exception {
//doesn't run
}
}
MyRetry
定义如下:
class MyRetry implements Function<Flowable<Object>, Publisher<?>> {
private final int maxRetryCount;
private int retryCount;
/**
* @param maxRetryCount The number of times to repeat
*/
public MyRetry(final int maxRetryCount) {
this.maxRetryCount = maxRetryCount;
retryCount = 0;
}
@Override
public Publisher<?> apply(final Flowable<Object> objectFlowable) throws Exception {
return objectFlowable.flatMap(new Function<Object, Publisher<?>>() {
@Override
public Publisher<?> apply(final Object o) throws Exception {
if (retryCount < maxRetryCount) {
retryCount++;
return Flowable.just(o);
} else {
return Flowable.empty();
}
}
});
}
}
订阅的Action
在重复完成后不再运行,但是Completable
的原始订阅重复了两次。这让我相信我返回 Flowable.empty()
可能不正确,但我找不到关于这个主题的信息,而且我对 RxJava 还很陌生。我也有基础设施限制,让我仍然停留在 Java 7 in Android.
如果您在 MyRetry
class:
flatMap()
更改为 takeUntil()
,初始订阅者将收到 onCompleted()
事件
@Override
public Publisher<?> apply(final Flowable<Object> objectFlowable) throws Exception {
return objectFlowable.takeWhile(o -> retryCount++ < maxRetryCount);
}
此问题已在 this 中讨论 线程.