RxJava2 的 delay(rx.functions.Func1) 没有按顺序发出项目
RxJava2's delay(rx.functions.Func1) is not emitting items in order
我正在使用 delay
:
的这个签名
public final <U> Observable<T> delay(Func1<? super T,? extends Observable<U>> itemDelay)
我正在使用 Func1
到 return 一个 Observable
,它充当一种 "trigger"。我的目标是延迟项目直到外部异步操作完成。该操作完成后,我想发出所有已延迟的项目和所有未来项目按顺序。
下面是一些示例代码,展示了我正在尝试做的事情:
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;
public class Example {
private ReplaySubject<Object> delayTrigger = ReplaySubject.create(); // (1)
public void main() {
System.out.println("============ MAIN ============");
SourceThread sourceThread = new SourceThread();
sourceThread.start();
sourceThread.stream
.compose(doOnFirst(integer -> startAsyncOperation())) // (2)
.delay(integer -> delayTrigger) // (3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe((Integer integer)
-> System.out.println("onNext: " + integer));
}
private void startAsyncOperation() {
System.out.println(">>>>>>> long async operation started");
SomeOtherThread someOtherThread = new SomeOtherThread();
someOtherThread.start();
}
private void onAsyncOperationComplete() {
System.out.println("<<<<<<< long async operation completed");
delayTrigger.onNext(new Object()); // (4)
}
/**
* From
*/
private <T> ObservableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
return observableTransformer -> Observable.defer(() -> {
final AtomicBoolean first = new AtomicBoolean(true);
return observableTransformer.doOnNext(t -> {
if (first.compareAndSet(true, false)) {
consumer.accept(t);
}
});
});
}
/**
* Some thread to simulate a some time delayed source.
* This is not really part of the problem,
* we just need a time delayed source on another thread
*/
private final class SourceThread extends Thread {
private ReplaySubject<Integer> stream = ReplaySubject.create();
@Override
public void run() {
super.run();
for (int i = 0; i < 100; i++) {
stream.onNext(i);
System.out.println("Source emits item: " + i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private final class SomeOtherThread extends Thread {
@Override
public void run() {
super.run();
try {
Thread.sleep(1000);
onAsyncOperationComplete();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在 (1) 我创建一个 ReplaySubject
作为我的触发器,在 (2) 我开始异步操作,在 (3) 我延迟直到触发器发出一些东西;最后在 (4) 处,当异步操作完成时,我将一些东西放入触发器流中。
这在大多数情况下工作正常,除了当异步操作完成时,从 delay
编辑的流 return 出现故障。
I/System.out: Source emits item: 46
I/System.out: Source emits item: 47
I/System.out: <<<<<<< long async operation completed
I/System.out: Source emits item: 48
I/System.out: onNext: 0
I/System.out: onNext: 48 <---- problem here!!!
I/System.out: onNext: 1
I/System.out: onNext: 2
I/System.out: onNext: 3
项目 48 在项目 1 - 47 之前从 delay
发出。项目 49 也将无序发出。这将一直持续到第 1-47 项被发出,然后流被清理。但是有很大一部分未订购的商品。有什么办法可以解决这个问题吗?我使用 delay
正确吗?这是延迟错误吗?
这只是一个示例,供参考。在我的 "real" 问题中,一旦发出的项目出现故障(即编号不正确),我无法重新排序它们。
delay
运算符没有顺序保证,因为通常情况下,项目 #1 的内部源发出信号的时间可能晚于项目 #2 的另一个内部源。任何异步信号都可能会打乱排序,即使来自诸如已终止的 ReplaySubject
.
之类的源也是如此
我假设你想预取主源但不让它在外部信号之前通过,对吧?在这种情况下,您可以使用 concatArrayEager
,其中第一个源的完成会触发预取的第二个源的发射:
PublishSubject<Integer> delayer = PublishSubject.create();
Observable.concatArrayEager(
delayer,
sourceThread.stream
)
// somewhere async
delayer.onComplete();
我正在使用 delay
:
public final <U> Observable<T> delay(Func1<? super T,? extends Observable<U>> itemDelay)
我正在使用 Func1
到 return 一个 Observable
,它充当一种 "trigger"。我的目标是延迟项目直到外部异步操作完成。该操作完成后,我想发出所有已延迟的项目和所有未来项目按顺序。
下面是一些示例代码,展示了我正在尝试做的事情:
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;
public class Example {
private ReplaySubject<Object> delayTrigger = ReplaySubject.create(); // (1)
public void main() {
System.out.println("============ MAIN ============");
SourceThread sourceThread = new SourceThread();
sourceThread.start();
sourceThread.stream
.compose(doOnFirst(integer -> startAsyncOperation())) // (2)
.delay(integer -> delayTrigger) // (3)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe((Integer integer)
-> System.out.println("onNext: " + integer));
}
private void startAsyncOperation() {
System.out.println(">>>>>>> long async operation started");
SomeOtherThread someOtherThread = new SomeOtherThread();
someOtherThread.start();
}
private void onAsyncOperationComplete() {
System.out.println("<<<<<<< long async operation completed");
delayTrigger.onNext(new Object()); // (4)
}
/**
* From
*/
private <T> ObservableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
return observableTransformer -> Observable.defer(() -> {
final AtomicBoolean first = new AtomicBoolean(true);
return observableTransformer.doOnNext(t -> {
if (first.compareAndSet(true, false)) {
consumer.accept(t);
}
});
});
}
/**
* Some thread to simulate a some time delayed source.
* This is not really part of the problem,
* we just need a time delayed source on another thread
*/
private final class SourceThread extends Thread {
private ReplaySubject<Integer> stream = ReplaySubject.create();
@Override
public void run() {
super.run();
for (int i = 0; i < 100; i++) {
stream.onNext(i);
System.out.println("Source emits item: " + i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private final class SomeOtherThread extends Thread {
@Override
public void run() {
super.run();
try {
Thread.sleep(1000);
onAsyncOperationComplete();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在 (1) 我创建一个 ReplaySubject
作为我的触发器,在 (2) 我开始异步操作,在 (3) 我延迟直到触发器发出一些东西;最后在 (4) 处,当异步操作完成时,我将一些东西放入触发器流中。
这在大多数情况下工作正常,除了当异步操作完成时,从 delay
编辑的流 return 出现故障。
I/System.out: Source emits item: 46
I/System.out: Source emits item: 47
I/System.out: <<<<<<< long async operation completed
I/System.out: Source emits item: 48
I/System.out: onNext: 0
I/System.out: onNext: 48 <---- problem here!!!
I/System.out: onNext: 1
I/System.out: onNext: 2
I/System.out: onNext: 3
项目 48 在项目 1 - 47 之前从 delay
发出。项目 49 也将无序发出。这将一直持续到第 1-47 项被发出,然后流被清理。但是有很大一部分未订购的商品。有什么办法可以解决这个问题吗?我使用 delay
正确吗?这是延迟错误吗?
这只是一个示例,供参考。在我的 "real" 问题中,一旦发出的项目出现故障(即编号不正确),我无法重新排序它们。
delay
运算符没有顺序保证,因为通常情况下,项目 #1 的内部源发出信号的时间可能晚于项目 #2 的另一个内部源。任何异步信号都可能会打乱排序,即使来自诸如已终止的 ReplaySubject
.
我假设你想预取主源但不让它在外部信号之前通过,对吧?在这种情况下,您可以使用 concatArrayEager
,其中第一个源的完成会触发预取的第二个源的发射:
PublishSubject<Integer> delayer = PublishSubject.create();
Observable.concatArrayEager(
delayer,
sourceThread.stream
)
// somewhere async
delayer.onComplete();