`distinctUntilChanged` 一旦调用 onNext 如何允许相同的项目
`distinctUntilChanged` how to allow the same item once onNext is called
我有一个不断发出项目的可观察对象,我需要处理每个项目(处理函数需要一些时间)。因此,同时在处理一个项目时,如果另一个项目发出相同的值,
我可以忽略它,因为同样的事情已经在进行中。但是一旦当前项目被处理(并调用 onNext)。以后如果有同样的请求,我应该允许。
我使用了 distinctUntildChanged
运算符,但我可以看到,如果当前项目与最后一个项目相同,即使最后一个项目完成处理并调用 onNext
.[=20 也不允许=]
我有一个示例来演示这个问题
我有一个class
class User {
String id;
String name;
public User(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public boolean equals(Object obj) {
User obj1 = (User) obj;
return id.equals(obj1.id);
}
@Override
public String toString() {
return name;
}
}
还有一个可观察的(主题)
Subject<User> mSubject = PublishSubject.create();
而我的 Observable 链是
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));
我发出这样的值
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
}
结果是
emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19
这里所有的对象都是一样的(equals 方法检查 id)。如您所见,第一次花费了 user0
,处理时间为 5 秒,在此期间我可以忽略传入的项目,
但是在那之后 onNext: User 0
我应该允许相同的用户请求,但是 distinctUntilChanged
不允许因为它把最后一个值等于同一个用户,我该怎么做?
希望我的问题很清楚。
您可以使用 groupBy()
运算符将请求分隔为 User
。在每个 observable 中,您可以安排只处理最新的发射。
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.groupBy( user -> user )
.flatMap( userObserverable -> userObservable
.onBackpressureDrop()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
})
.subscribe(user -> Log.d(TAG, "onNext: " + user.name));
groupBy()
运算符为每个用户创建一个可观察对象。新用户将为他们创建一个新的可观察对象。每个用户都将在其自己的可观察对象上发出,如果下游不接受排放,onBackpressureDrop()
将丢弃该用户。
所以你可以用Flowable
和右边的BackpressureStrategy
来实现。问题是您在执行 observeOn
时没有设置缓冲区大小。你可以试试这个(尽管是 Kotlin):
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { println("emitting $it") }
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.io(), false,1)
.subscribeOn(Schedulers.io())
.subscribe {
println("consuming $it")
Thread.sleep(500)
}
输出将如下所示:
emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14
当您调用 observeOn(Scheduler)
时,如果我没记错的话,背压的默认缓冲区大小应该是 128。
您可以尝试将上述示例中的缓冲区大小更改为 3。您将得到:
emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...
我有一个不断发出项目的可观察对象,我需要处理每个项目(处理函数需要一些时间)。因此,同时在处理一个项目时,如果另一个项目发出相同的值,
我可以忽略它,因为同样的事情已经在进行中。但是一旦当前项目被处理(并调用 onNext)。以后如果有同样的请求,我应该允许。
我使用了 distinctUntildChanged
运算符,但我可以看到,如果当前项目与最后一个项目相同,即使最后一个项目完成处理并调用 onNext
.[=20 也不允许=]
我有一个示例来演示这个问题
我有一个class
class User {
String id;
String name;
public User(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public boolean equals(Object obj) {
User obj1 = (User) obj;
return id.equals(obj1.id);
}
@Override
public String toString() {
return name;
}
}
还有一个可观察的(主题)
Subject<User> mSubject = PublishSubject.create();
而我的 Observable 链是
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.distinctUntilChanged()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
}).subscribe(user -> Log.d(TAG, "onNext: " + user.name));
我发出这样的值
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
mSubject.onNext(new User(String.valueOf(1), "User " + i)); // all `User`s have same id
}
结果是
emitted: User 0
processing User 0
emitted: User 1
emitted: User 2
emitted: User 3
emitted: User 4
onNext: User 0
emitted: User 5
emitted: User 6
emitted: User 7
emitted: User 8
emitted: User 9
emitted: User 10
emitted: User 11
emitted: User 12
emitted: User 13
emitted: User 14
emitted: User 15
emitted: User 16
emitted: User 17
emitted: User 18
emitted: User 19
这里所有的对象都是一样的(equals 方法检查 id)。如您所见,第一次花费了 user0
,处理时间为 5 秒,在此期间我可以忽略传入的项目,
但是在那之后 onNext: User 0
我应该允许相同的用户请求,但是 distinctUntilChanged
不允许因为它把最后一个值等于同一个用户,我该怎么做?
希望我的问题很清楚。
您可以使用 groupBy()
运算符将请求分隔为 User
。在每个 observable 中,您可以安排只处理最新的发射。
mSubject
.doOnNext(i -> Log.d(TAG, "emitted: " + i))
.observeOn(Schedulers.io())
.groupBy( user -> user )
.flatMap( userObserverable -> userObservable
.onBackpressureDrop()
.map(user -> {
Log.d(TAG, "processing " + user);
Thread.sleep(5000); // processing takes 5 seconds
return user;
})
.subscribe(user -> Log.d(TAG, "onNext: " + user.name));
groupBy()
运算符为每个用户创建一个可观察对象。新用户将为他们创建一个新的可观察对象。每个用户都将在其自己的可观察对象上发出,如果下游不接受排放,onBackpressureDrop()
将丢弃该用户。
所以你可以用Flowable
和右边的BackpressureStrategy
来实现。问题是您在执行 observeOn
时没有设置缓冲区大小。你可以试试这个(尽管是 Kotlin):
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext { println("emitting $it") }
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.io(), false,1)
.subscribeOn(Schedulers.io())
.subscribe {
println("consuming $it")
Thread.sleep(500)
}
输出将如下所示:
emitting 0
consuming 0
emitting 1
emitting 2
emitting 3
emitting 4
emitting 5
consuming 5
emitting 6
emitting 7
emitting 8
emitting 9
emitting 10
consuming 10
emitting 11
emitting 12
emitting 13
emitting 14
当您调用 observeOn(Scheduler)
时,如果我没记错的话,背压的默认缓冲区大小应该是 128。
您可以尝试将上述示例中的缓冲区大小更改为 3。您将得到:
emitting 0
consuming 0
emitting 1
...
emitting 5
consuming 1
emitting 6
...
emitting 10
consuming 2
emitting 11
...
emitting 15
consuming 15
emitting 16
...
emitting 20
consuming 16
emitting 21
...