onComplete 重新提交自身的 RxJava 2 Observable
RxJava 2 Observable that onComplete resubmits itself
我是 RxJava 的新手。我正在尝试创建一个可观察对象,当它完成时它将重新开始,直到我调用处置,但一段时间后我遇到了 OutofMemory 错误,下面是我正在尝试做的事情的简化示例
public void start() throws RuntimeException {
log.info("\t * Starting {} Managed Service...", getClass().getSimpleName());
try {
executeObserve();
log.info("\t * Starting {} Managed Service...OK!", getClass().getSimpleName());
} catch (Exception e) {
log.info("Managed Service {} FAILED! Reason is {} ", getClass().getSimpleName(), e.getMessage(), e);
}
}
start在初始化阶段调用一次,executeObserve如下(简化形式..)。请注意,在 onComplete I "resubmit" executeObserve
public void executeObserve() throws RuntimeException {
Observable<Book> booksObserve
= manager.getAsObservable();
booksObserve
.map(Book::getAllOrders)
.flatMap(Observable::fromIterable)
.toList()
.subscribeOn(Schedulers.io())
.subscribe(collectedISBN ->
Observable.fromIterable(collectedISBN)
.buffer(10)
// ...some more steps here...
.toList()
.toObservable()
// resubmit
.doOnComplete(this::executeObserve)
.subscribe(validISBN -> {
// do something with the valid ones
})
)
);
}
我的猜测是,如果我想重新提交我的任务但找不到任何文档,这不是可行的方法。
booksObserve 实现如下
public Observable<Book> getAsObservable() {
return Observable.create(e -> {
try (CloseableResultSet<Book> rs = (CloseableResultSet<Book>) datasource.retrieveAll())) {
for (Book r : rs) {
e.onNext(r);
}
e.onComplete();
} catch (Exception ex) {
e.onError(ex);
}
});
}
在我们调用 dispose 或等效函数之前不断重新提交操作的正确方法是什么?我正在使用 RxJava 2
您已经创建了无限递归,循环将创建越来越多的资源,有时它会因 OutOfMemory/Stack 溢出异常而崩溃。
为了重复Observable
的工作你应该使用repeat()
运算符,它会在收到onComplete()
时重新订阅Observable
。
除此之外,对您的代码的一些一般性评论:
- 为什么要将第二个
Observable
嵌套在订阅者中?你正在打破链条,你可以继续链条而不是在订阅者处创建新的 Observable
。
此外,似乎(假设 Observable.fromIterable(collectedBets)
使用与 onNext()
o.w 一起获得的 collectedISBN
。它是从哪里来的? ) 你正在将所有项目收集到一个列表中,然后使用 from iterable 再次将其扁平化,所以看起来你可以继续流,类似的东西:
booksObserve
.map(Book::getAllOrders)
.flatMap(Observable::fromIterable)
.buffer(10)
// ...some more steps here...
.toList()
.toObservable()
// resubmit
.doOnComplete(this::executeObserve)
.subscribeOn(Schedulers.io())
.subscribe(validISBN -> {
// do something with the valid ones
});
无论如何,对于嵌套的 Observable
,repeat()
运算符只会重复嵌套的一个,而不是整个流(这是您想要的)未连接到它。
继续我的问题,@yosriz 建议的 repeat
是正确的方法,下面的简单片段演示了将在每次重复时调用可观察源
Observable<Integer> recursiveObservable = Observable.create(emitter -> {
System.out.println("Calling to emit data");
Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0).forEach(emitter::onNext);
emitter.onComplete();
});
recursiveObservable
.buffer(2)
.repeat()
.subscribe(integers -> {
System.out.println(integers);
TimeUnit.SECONDS.sleep(1);
});
我是 RxJava 的新手。我正在尝试创建一个可观察对象,当它完成时它将重新开始,直到我调用处置,但一段时间后我遇到了 OutofMemory 错误,下面是我正在尝试做的事情的简化示例
public void start() throws RuntimeException {
log.info("\t * Starting {} Managed Service...", getClass().getSimpleName());
try {
executeObserve();
log.info("\t * Starting {} Managed Service...OK!", getClass().getSimpleName());
} catch (Exception e) {
log.info("Managed Service {} FAILED! Reason is {} ", getClass().getSimpleName(), e.getMessage(), e);
}
}
start在初始化阶段调用一次,executeObserve如下(简化形式..)。请注意,在 onComplete I "resubmit" executeObserve
public void executeObserve() throws RuntimeException {
Observable<Book> booksObserve
= manager.getAsObservable();
booksObserve
.map(Book::getAllOrders)
.flatMap(Observable::fromIterable)
.toList()
.subscribeOn(Schedulers.io())
.subscribe(collectedISBN ->
Observable.fromIterable(collectedISBN)
.buffer(10)
// ...some more steps here...
.toList()
.toObservable()
// resubmit
.doOnComplete(this::executeObserve)
.subscribe(validISBN -> {
// do something with the valid ones
})
)
);
}
我的猜测是,如果我想重新提交我的任务但找不到任何文档,这不是可行的方法。
booksObserve 实现如下
public Observable<Book> getAsObservable() {
return Observable.create(e -> {
try (CloseableResultSet<Book> rs = (CloseableResultSet<Book>) datasource.retrieveAll())) {
for (Book r : rs) {
e.onNext(r);
}
e.onComplete();
} catch (Exception ex) {
e.onError(ex);
}
});
}
在我们调用 dispose 或等效函数之前不断重新提交操作的正确方法是什么?我正在使用 RxJava 2
您已经创建了无限递归,循环将创建越来越多的资源,有时它会因 OutOfMemory/Stack 溢出异常而崩溃。
为了重复Observable
的工作你应该使用repeat()
运算符,它会在收到onComplete()
时重新订阅Observable
。
除此之外,对您的代码的一些一般性评论:
- 为什么要将第二个
Observable
嵌套在订阅者中?你正在打破链条,你可以继续链条而不是在订阅者处创建新的Observable
。 此外,似乎(假设 O
bservable.fromIterable(collectedBets)
使用与onNext()
o.w 一起获得的collectedISBN
。它是从哪里来的? ) 你正在将所有项目收集到一个列表中,然后使用 from iterable 再次将其扁平化,所以看起来你可以继续流,类似的东西:booksObserve .map(Book::getAllOrders) .flatMap(Observable::fromIterable) .buffer(10) // ...some more steps here... .toList() .toObservable() // resubmit .doOnComplete(this::executeObserve) .subscribeOn(Schedulers.io()) .subscribe(validISBN -> { // do something with the valid ones });
无论如何,对于嵌套的
Observable
,repeat()
运算符只会重复嵌套的一个,而不是整个流(这是您想要的)未连接到它。
继续我的问题,@yosriz 建议的 repeat
是正确的方法,下面的简单片段演示了将在每次重复时调用可观察源
Observable<Integer> recursiveObservable = Observable.create(emitter -> {
System.out.println("Calling to emit data");
Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0).forEach(emitter::onNext);
emitter.onComplete();
});
recursiveObservable
.buffer(2)
.repeat()
.subscribe(integers -> {
System.out.println(integers);
TimeUnit.SECONDS.sleep(1);
});