`distinctUntilChanged` 一旦调用 onNext 如何允许相同的项目

`distinctUntilChanged` how to allow the same item once onNext is called

我有一个不断发出项目的可观察对象,我需要处理每个项目(处理函数需要一些时间)。因此,同时在处理一个项目时,如果另一个项目发出相同的值, 我可以忽略它,因为同样的事情已经在进行中。但是一旦当前项目被处理(并调用 onNext)。以后如果有同样的请求,我应该允许。 我使用了 distinctUntildChanged 运算符,但我可以看到,如果当前项目与最后一个项目相同,即使最后一个项目完成处理并调用 onNext.[=20 也不允许=]

我有一个示例来演示这个问题

我有一个class

class User {
    String id;
    String name;

    public User(String id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public boolean equals(Object obj) {
        User obj1 = (User) obj;
        return id.equals(obj1.id);
    }

    @Override
    public String toString() {
        return name;
    }
}

还有一个可观察的(主题)

Subject<User> mSubject = PublishSubject.create();

而我的 Observable 链是

 mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
    Log.d(TAG, "processing " + user);
    Thread.sleep(5000); // processing takes 5 seconds
    return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));    

我发出这样的值

    for (int i = 0; i < 20; i++) {
        Thread.sleep(1000);            
        mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
    }

结果是

emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19

这里所有的对象都是一样的(equals 方法检查 id)。如您所见,第一次花费了 user0,处理时间为 5 秒,在此期间我可以忽略传入的项目, 但是在那之后 onNext: User 0 我应该允许相同的用户请求,但是 distinctUntilChanged 不允许因为它把最后一个值等于同一个用户,我该怎么做? 希望我的问题很清楚。

您可以使用 groupBy() 运算符将请求分隔为 User。在每个 observable 中,您可以安排只处理最新的发射。

mSubject
  .doOnNext(i -> Log.d(TAG, "emitted: " + i))
  .observeOn(Schedulers.io())
  .groupBy( user -> user )
  .flatMap( userObserverable -> userObservable
                                  .onBackpressureDrop()
                                  .map(user -> {
                                    Log.d(TAG, "processing " + user);
                                    Thread.sleep(5000); // processing takes 5 seconds
                                    return user;
                                  })
  .subscribe(user -> Log.d(TAG, "onNext: " + user.name));

groupBy() 运算符为每个用户创建一个可观察对象。新用户将为他们创建一个新的可观察对象。每个用户都将在其自己的可观察对象上发出,如果下游不接受排放,onBackpressureDrop() 将丢弃该用户。

所以你可以用Flowable和右边的BackpressureStrategy来实现。问题是您在执行 observeOn 时没有设置缓冲区大小。你可以试试这个(尽管是 Kotlin):

Observable.interval(100, TimeUnit.MILLISECONDS)
    .doOnNext { println("emitting $it") }
    .toFlowable(BackpressureStrategy.LATEST)
    .observeOn(Schedulers.io(), false,1)
    .subscribeOn(Schedulers.io())
    .subscribe {
        println("consuming $it")
        Thread.sleep(500)
    }

输出将如下所示:

emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14

当您调用 observeOn(Scheduler) 时,如果我没记错的话,背压的默认缓冲区大小应该是 128。

您可以尝试将上述示例中的缓冲区大小更改为 3。您将得到:

emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...