使用多播重复 RxJava 管道
repeating RxJava pipeline with multicasting
我是 RxJava 的新手,无法弄清楚如何使用 ConnectableObservable 实现可重复轮询,其中有 2 个订阅者在不同线程上处理事件。
我有一个大致如下所示的管道:
我想在延迟后以与 https://github.com/ReactiveX/RxJava/issues/448
中的解决方案类似的方式重复整个管道
Observable.fromCallable(() -> pollValue())
.repeatWhen(o -> o.concatMap(v -> Observable.timer(20, TimeUnit.SECONDS)));
或Dynamic delay value with repeatWhen().
这适用于普通(不可连接)Observable,但不适用于多播。
代码示例:
有效
final int[] i = {0};
Observable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++));
integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
.subscribe(System.out::println);
无效:
final int[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
.publish();
integerObservable.observeOn(Schedulers.newThread()).subscribe(System.out::println);
integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
.subscribe(System.out::println);
integerObservable.connect();
仍然不清楚这是否是您要的,但也许这会对您有所帮助
Observable<Integer> integerObservable1;
@Override
public void run(String... args) throws Exception {
Integer[] integers = {1, 2, 3};
Observable<Integer> integerObservable = Observable.fromArray(integers);
integerObservable1 = Observable.zip(integerObservable.observeOn(Schedulers.newThread()).delay(100, TimeUnit.MILLISECONDS), integerObservable.observeOn(Schedulers.newThread()).delay(200, TimeUnit.MILLISECONDS), (integer, integer2) -> {
System.out.println(integer + " " + integer2);
return integer;
})
.doOnComplete(() -> {
integerObservable1.subscribe();
});
integerObservable1.subscribe();
}
第二个例子的问题不在多播。它在收集运算符和 repeatWhen 中。
考虑 repeatWhen 就像 'hook' 您正在连接到 onComplete 方法。此 'hook' 将拦截 onComplete 方法并 "override" 它因此不会被调用,除非该可观察对象在开始重复过程之前第一次完成。
如果没有调用 collect 运算符的 onComplete 方法,则不知道它应该收集多少项目。因此,您必须处理如何收集项目以及将它们存储在流外的位置的逻辑,但这将是一种解决方法。
这是一个例子:
List<String> test = new ArrayList<>();
final String[] currentString = {""};
final int[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
.doOnComplete(() -> {
test.add(currentString[0]);
currentString[0] = "";
})
.repeatWhen(o -> {
return o.concatMap(v ->
Observable.timer(3, TimeUnit.SECONDS)
.doOnComplete(() -> {
test.add(currentString[0]);
currentString[0] = "";
}));
})
.publish();
integerObservable
.observeOn(Schedulers.newThread())
.subscribe(System.out::println);
integerObservable
.observeOn(Schedulers.newThread())
.map((sa) -> {
currentString[0] = currentString[0] + sa;
System.out.println(currentString[0]);
return sa;
})
.subscribe();
在这个例子中,我们使用我们持有计时器的可观察对象的 onComplete 方法来重置我们的状态。当消费者消耗数据的速度比重复的延迟慢时,这个例子没有考虑这个选项(这将导致结果从一个数据链溢出到另一个数据链)。我建议使用其他方式来处理重复部分,例如:
最终整数[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++)).publish();
Observable b = integerObservable.observeOn(Schedulers.newThread());
Observable a = integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable();
Observable
.interval(1, TimeUnit.SECONDS)
.doOnSubscribe((d) -> {
a.subscribe(System.out::println);
b.subscribe(System.out::println);
integerObservable.connect();
})
.doOnNext((d) -> {
a.subscribe(System.out::println);
b.subscribe(System.out::println);
integerObservable.connect();
})
.doOnComplete(() -> {})
.subscribe();
第一次连接是第一次执行,onNext 每 1 秒调用一次,它会从头开始重新启动整个管道。
希望对您有所帮助。
我是 RxJava 的新手,无法弄清楚如何使用 ConnectableObservable 实现可重复轮询,其中有 2 个订阅者在不同线程上处理事件。
我有一个大致如下所示的管道:
我想在延迟后以与 https://github.com/ReactiveX/RxJava/issues/448
中的解决方案类似的方式重复整个管道Observable.fromCallable(() -> pollValue())
.repeatWhen(o -> o.concatMap(v -> Observable.timer(20, TimeUnit.SECONDS)));
或Dynamic delay value with repeatWhen().
这适用于普通(不可连接)Observable,但不适用于多播。
代码示例:
有效
final int[] i = {0};
Observable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++));
integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
.subscribe(System.out::println);
无效:
final int[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
.publish();
integerObservable.observeOn(Schedulers.newThread()).subscribe(System.out::println);
integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable().repeatWhen(o -> o.concatMap(v -> Observable.timer(1, TimeUnit.SECONDS)))
.subscribe(System.out::println);
integerObservable.connect();
仍然不清楚这是否是您要的,但也许这会对您有所帮助
Observable<Integer> integerObservable1;
@Override
public void run(String... args) throws Exception {
Integer[] integers = {1, 2, 3};
Observable<Integer> integerObservable = Observable.fromArray(integers);
integerObservable1 = Observable.zip(integerObservable.observeOn(Schedulers.newThread()).delay(100, TimeUnit.MILLISECONDS), integerObservable.observeOn(Schedulers.newThread()).delay(200, TimeUnit.MILLISECONDS), (integer, integer2) -> {
System.out.println(integer + " " + integer2);
return integer;
})
.doOnComplete(() -> {
integerObservable1.subscribe();
});
integerObservable1.subscribe();
}
第二个例子的问题不在多播。它在收集运算符和 repeatWhen 中。
考虑 repeatWhen 就像 'hook' 您正在连接到 onComplete 方法。此 'hook' 将拦截 onComplete 方法并 "override" 它因此不会被调用,除非该可观察对象在开始重复过程之前第一次完成。
如果没有调用 collect 运算符的 onComplete 方法,则不知道它应该收集多少项目。因此,您必须处理如何收集项目以及将它们存储在流外的位置的逻辑,但这将是一种解决方法。
这是一个例子:
List<String> test = new ArrayList<>();
final String[] currentString = {""};
final int[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++))
.doOnComplete(() -> {
test.add(currentString[0]);
currentString[0] = "";
})
.repeatWhen(o -> {
return o.concatMap(v ->
Observable.timer(3, TimeUnit.SECONDS)
.doOnComplete(() -> {
test.add(currentString[0]);
currentString[0] = "";
}));
})
.publish();
integerObservable
.observeOn(Schedulers.newThread())
.subscribe(System.out::println);
integerObservable
.observeOn(Schedulers.newThread())
.map((sa) -> {
currentString[0] = currentString[0] + sa;
System.out.println(currentString[0]);
return sa;
})
.subscribe();
在这个例子中,我们使用我们持有计时器的可观察对象的 onComplete 方法来重置我们的状态。当消费者消耗数据的速度比重复的延迟慢时,这个例子没有考虑这个选项(这将导致结果从一个数据链溢出到另一个数据链)。我建议使用其他方式来处理重复部分,例如:
最终整数[] i = {0};
ConnectableObservable<Integer> integerObservable =
Observable.defer(() -> Observable.fromArray(i[0]++, i[0]++, i[0]++)).publish();
Observable b = integerObservable.observeOn(Schedulers.newThread());
Observable a = integerObservable
.observeOn(Schedulers.newThread())
.collect(StringBuilder::new, (sb, x) -> sb.append(x).append(","))
.map(StringBuilder::toString)
.toObservable();
Observable
.interval(1, TimeUnit.SECONDS)
.doOnSubscribe((d) -> {
a.subscribe(System.out::println);
b.subscribe(System.out::println);
integerObservable.connect();
})
.doOnNext((d) -> {
a.subscribe(System.out::println);
b.subscribe(System.out::println);
integerObservable.connect();
})
.doOnComplete(() -> {})
.subscribe();
第一次连接是第一次执行,onNext 每 1 秒调用一次,它会从头开始重新启动整个管道。
希望对您有所帮助。