使用多线程 RxJava 的反应式拉取
Reactive Pull with muti-threaded RxJava
我正在尝试在 RxJava.
中构建一个 reactive pull 观察者
我的观察者是这样的:
Observable<Command> myObs = Observable.create(s -> {
Command command;
int i = 0;
do {
command = NetworkOperation1.call(i);
logger.info("Init command " + i);
s.onNext(command);
i++;
} while (!command.isLast() && i < MAX);
s.onCompleted();
});
我想在 4 个并发批处理(缓冲区)中处理它,如下所示:
myObs
.buffer(10)
.flatMap(batch -> {
return Observable
.from(batch)
.subscribeOn(Schedulers.io())
.map(c -> {
Intermediate m = NetworkOperation2.call(c));
logger.info("Done intermediate " + m.id);
return m;
}
}, 4);
然后,我需要以不同的大小对结果进行批处理,如下所示:
.buffer(25)
.subscribeOn(Schedulers.newThread())
.subscribe(list ->
logger.info("Finished batch with " + list.size());
问题是 Observable 中的命令 是一次性处理的 ,而我希望它们 在需要时被处理 .
这是发生的事情的日志:(注意所有 1000 个命令都是 运行 一次,而不是根据需要调用)
Init command 0
Init command 1
Init command 2
...
Init command 999
Done intermediate 0
Done intermediate 1
...
Done intermediate 24
Finished batch with 25
Done intermediate 25
Done intermediate 26
...
Done intermediate 49
Finished batch with 25
...
问题:有没有办法暂停观察者的线程,这样它就不会一次发出所有命令或类似的东西?我已经尝试过 request() 运算符,但无法正常工作。
谢谢。
您需要反压感知源和操作员。您使用的运算符支持背压,但您的源不支持。
改为这样做:
myObs = Observable.range(1,1000)
.map(i -> NetworkOperation1.call(i));
Observable.range
支持背压,因此仅在请求时才会发出。
我正在尝试在 RxJava.
中构建一个 reactive pull 观察者我的观察者是这样的:
Observable<Command> myObs = Observable.create(s -> {
Command command;
int i = 0;
do {
command = NetworkOperation1.call(i);
logger.info("Init command " + i);
s.onNext(command);
i++;
} while (!command.isLast() && i < MAX);
s.onCompleted();
});
我想在 4 个并发批处理(缓冲区)中处理它,如下所示:
myObs
.buffer(10)
.flatMap(batch -> {
return Observable
.from(batch)
.subscribeOn(Schedulers.io())
.map(c -> {
Intermediate m = NetworkOperation2.call(c));
logger.info("Done intermediate " + m.id);
return m;
}
}, 4);
然后,我需要以不同的大小对结果进行批处理,如下所示:
.buffer(25)
.subscribeOn(Schedulers.newThread())
.subscribe(list ->
logger.info("Finished batch with " + list.size());
问题是 Observable 中的命令 是一次性处理的 ,而我希望它们 在需要时被处理 .
这是发生的事情的日志:(注意所有 1000 个命令都是 运行 一次,而不是根据需要调用)
Init command 0
Init command 1
Init command 2
...
Init command 999
Done intermediate 0
Done intermediate 1
...
Done intermediate 24
Finished batch with 25
Done intermediate 25
Done intermediate 26
...
Done intermediate 49
Finished batch with 25
...
问题:有没有办法暂停观察者的线程,这样它就不会一次发出所有命令或类似的东西?我已经尝试过 request() 运算符,但无法正常工作。
谢谢。
您需要反压感知源和操作员。您使用的运算符支持背压,但您的源不支持。
改为这样做:
myObs = Observable.range(1,1000)
.map(i -> NetworkOperation1.call(i));
Observable.range
支持背压,因此仅在请求时才会发出。