如何订阅合并的无限流量?
How to subscribe to merged infinite flowables?
我有几个无限的 Flowable(从 BlockingQueues 获取数据)。我合并它们并订阅我的自定义订阅者。我不明白为什么我只能从单个输入 Flowable 接收消息。
这是我的代码:
<T> void example() {
List<BlockingQueue<T>> queues = createQueues();
List<Flowable<T>> allFlowables = queues.stream()
.map(this::createFlowable)
.collect(Collectors.toList());
FlowableScan.merge(allFlowables)
.subscribe(new DefaultSubscriber<T>() {
@Override
protected void onStart() {
System.out.println("Start!");
request(1);
}
@Override
public void onNext(T message) {
System.out.println(message);
request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
}
<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
return Flowable.generate(emitter -> {
T msg = takeFromQueue(q); // blocking
emitter.onNext(msg);
});
}
我只看到来自单个队列的消息,我错过了什么?
我试过调度程序,但没有帮助。
如何修复以上代码以从所有输入队列中消费?
因为您阻塞了第一个队列为所有源提供服务的唯一线程。您必须引入异步,例如在 createFlowable
.
中应用 .subscribeOn(Schedulers.io())
<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
return Flowable.generate(emitter -> {
T msg = takeFromQueue(q); // blocking
emitter.onNext(msg);
}).subscribeOn(Schedulers.io()); // <----------------------------------
}
我有几个无限的 Flowable(从 BlockingQueues 获取数据)。我合并它们并订阅我的自定义订阅者。我不明白为什么我只能从单个输入 Flowable 接收消息。
这是我的代码:
<T> void example() {
List<BlockingQueue<T>> queues = createQueues();
List<Flowable<T>> allFlowables = queues.stream()
.map(this::createFlowable)
.collect(Collectors.toList());
FlowableScan.merge(allFlowables)
.subscribe(new DefaultSubscriber<T>() {
@Override
protected void onStart() {
System.out.println("Start!");
request(1);
}
@Override
public void onNext(T message) {
System.out.println(message);
request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
}
<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
return Flowable.generate(emitter -> {
T msg = takeFromQueue(q); // blocking
emitter.onNext(msg);
});
}
我只看到来自单个队列的消息,我错过了什么? 我试过调度程序,但没有帮助。 如何修复以上代码以从所有输入队列中消费?
因为您阻塞了第一个队列为所有源提供服务的唯一线程。您必须引入异步,例如在 createFlowable
.
.subscribeOn(Schedulers.io())
<T> Flowable<T> createFlowable(BlockingQueue<T> queue) {
return Flowable.generate(emitter -> {
T msg = takeFromQueue(q); // blocking
emitter.onNext(msg);
}).subscribeOn(Schedulers.io()); // <----------------------------------
}