RxJava2 flatMap 创建重复事件
RxJava2 flatMap create duplicate events
我是 RxJava2 的新手,我遇到了一些奇怪的行为,所以很可能我以错误的方式使用了该工具。
这是一个相当大的项目,但我将下面的代码片段分离为最小的可重现代码:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
此代码使用 .interval
模拟来自我的 rx 流的事件,并且 flatMap
接收这些事件 "do some processing" 并使用 Subject
将结果推送到流中.
流是一个持续的过程,将有几个事件。
这个最少的代码很愚蠢,因为我只在 apply
回调上推送,但在实际情况中,有几个可能的时刻会发生推送,并且在 apply
与将通过主题发送的金额不同。
我希望通过这段代码看到的是:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
我实际得到的是:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
我也尝试过 Observable<Integer> o = s.share();
和 return 编辑它,或者直接 return s.share();
得到相同的结果。
我有点理解为什么会这样。 ObservableSource
再次订阅 n 次 n 次,因此每个循环都有更多事件。
问题:
我怎样才能达到预期的行为?
(如果我的预期行为不清楚,请在评论中询问更多)
您的 PublishSubject
已订阅 多次,每项来自 interval()。
编辑:您每次都需要传入一个新的 PublishSubject
(如果您想保留 first/last 排放,请切换到 BehaviorSubject
);将其传递给 long-running 进程,并确保在 long-running 进程完成时正确调用其 onComplete
。
编辑
根据最近的评论,我可以想出这种解决方案:
class MyBluetoothClient {
private PublishSubject<BTLEEvent> statusPublishSubject = PublishSubject.create()
public Observable<BTLEEvent> getEventObservable() {
return statusPublishSubject
}
private void publishEvent(BTLEEvent event) {
statusPublishSubject.onNext(event)
}
public void doStuff1() {
// do something that returns:
publishEvent(BTLEEvent.someEvent1)
}
public void doStuff2() {
// do something else that eventually yields
publishEvent(BTLEEvent.someEvent2)
}
}
而你是这样使用的:
MyBluetoothClient client = MyBluetoothClient()
client
.getEventObservable()
.subscribe( /* */ )
///
client.doStuff1()
///
client.doStuff2
原回答
这样可以吗?
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
return Observable.just(val);
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
这就是我想出的答案。我会将@Tassos 的回答标记为正确,因为他指出了我正确的道路。
首先我需要一个 CachedSubject
(一个在没有观察者的情况下缓存项目并在观察者连接后立即调度它们的主题),这是确保从 apply
内部发出的必要条件真的通过了。 class 主要包含 PublishSubject
.
class CachedSubject<T> extends Subject<T> {
private PublishSubject<T> publishSubject = PublishSubject.create();
private Queue<T> cache = new ConcurrentLinkedQueue<>();
@Override public boolean hasObservers() {
return publishSubject.hasObservers();
}
@Override public boolean hasThrowable() {
return publishSubject.hasThrowable();
}
@Override public boolean hasComplete() {
return publishSubject.hasComplete();
}
@Override public Throwable getThrowable() {
return publishSubject.getThrowable();
}
@Override protected void subscribeActual(Observer<? super T> observer) {
while (cache.size() > 0) {
observer.onNext(cache.remove());
}
publishSubject.subscribeActual(observer);
}
@Override public void onSubscribe(Disposable d) {
publishSubject.onSubscribe(d);
}
@Override public void onNext(T t) {
if (hasObservers()) {
publishSubject.onNext(t);
} else {
cache.add(t);
}
}
@Override public void onError(Throwable e) {
publishSubject.onError(e);
}
@Override public void onComplete() {
publishSubject.onComplete();
}
}
然后我将这个 class 与 switchMap
:
一起使用
Observable
.interval(1000, TimeUnit.MILLISECONDS)
.switchMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = new CachedSubject<>();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
这有效地允许我在 apply<T t>
方法上接收任意数量的事件并且只有 1 Consumer
订阅它,从它接收所有事件。
我是 RxJava2 的新手,我遇到了一些奇怪的行为,所以很可能我以错误的方式使用了该工具。
这是一个相当大的项目,但我将下面的代码片段分离为最小的可重现代码:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
此代码使用 .interval
模拟来自我的 rx 流的事件,并且 flatMap
接收这些事件 "do some processing" 并使用 Subject
将结果推送到流中.
流是一个持续的过程,将有几个事件。
这个最少的代码很愚蠢,因为我只在 apply
回调上推送,但在实际情况中,有几个可能的时刻会发生推送,并且在 apply
与将通过主题发送的金额不同。
我希望通过这段代码看到的是:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
我实际得到的是:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
我也尝试过 Observable<Integer> o = s.share();
和 return 编辑它,或者直接 return s.share();
得到相同的结果。
我有点理解为什么会这样。 ObservableSource
再次订阅 n 次 n 次,因此每个循环都有更多事件。
问题:
我怎样才能达到预期的行为?
(如果我的预期行为不清楚,请在评论中询问更多)
您的 PublishSubject
已订阅 多次,每项来自 interval()。
编辑:您每次都需要传入一个新的 PublishSubject
(如果您想保留 first/last 排放,请切换到 BehaviorSubject
);将其传递给 long-running 进程,并确保在 long-running 进程完成时正确调用其 onComplete
。
编辑
根据最近的评论,我可以想出这种解决方案:
class MyBluetoothClient {
private PublishSubject<BTLEEvent> statusPublishSubject = PublishSubject.create()
public Observable<BTLEEvent> getEventObservable() {
return statusPublishSubject
}
private void publishEvent(BTLEEvent event) {
statusPublishSubject.onNext(event)
}
public void doStuff1() {
// do something that returns:
publishEvent(BTLEEvent.someEvent1)
}
public void doStuff2() {
// do something else that eventually yields
publishEvent(BTLEEvent.someEvent2)
}
}
而你是这样使用的:
MyBluetoothClient client = MyBluetoothClient()
client
.getEventObservable()
.subscribe( /* */ )
///
client.doStuff1()
///
client.doStuff2
原回答
这样可以吗?
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
return Observable.just(val);
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
这就是我想出的答案。我会将@Tassos 的回答标记为正确,因为他指出了我正确的道路。
首先我需要一个 CachedSubject
(一个在没有观察者的情况下缓存项目并在观察者连接后立即调度它们的主题),这是确保从 apply
内部发出的必要条件真的通过了。 class 主要包含 PublishSubject
.
class CachedSubject<T> extends Subject<T> {
private PublishSubject<T> publishSubject = PublishSubject.create();
private Queue<T> cache = new ConcurrentLinkedQueue<>();
@Override public boolean hasObservers() {
return publishSubject.hasObservers();
}
@Override public boolean hasThrowable() {
return publishSubject.hasThrowable();
}
@Override public boolean hasComplete() {
return publishSubject.hasComplete();
}
@Override public Throwable getThrowable() {
return publishSubject.getThrowable();
}
@Override protected void subscribeActual(Observer<? super T> observer) {
while (cache.size() > 0) {
observer.onNext(cache.remove());
}
publishSubject.subscribeActual(observer);
}
@Override public void onSubscribe(Disposable d) {
publishSubject.onSubscribe(d);
}
@Override public void onNext(T t) {
if (hasObservers()) {
publishSubject.onNext(t);
} else {
cache.add(t);
}
}
@Override public void onError(Throwable e) {
publishSubject.onError(e);
}
@Override public void onComplete() {
publishSubject.onComplete();
}
}
然后我将这个 class 与 switchMap
:
Observable
.interval(1000, TimeUnit.MILLISECONDS)
.switchMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = new CachedSubject<>();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
这有效地允许我在 apply<T t>
方法上接收任意数量的事件并且只有 1 Consumer
订阅它,从它接收所有事件。