Rx - 按条件将流分成段(列表)
Rx - Divide stream into segments (lists) by condition
我有一个 RX 生产者,它创建一个字符串流(真实流的简化版本):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
川流不息,井然有序。所以在以 A
运行 开头的字符串出来后, B
开始。当B
运行出来时,C
开始...当Z
运行出来时,我们移动到AA1
等。有一个未知数A
、B
等,但通常每个字母有 10-30 个实例。
我正在寻找一种方法将此流分成所有 A 的块:A1 A2 A3
、所有 B 的:B1 B2
、所有 C 的:C1 C2 C3 C4 C5 C6
等。每个块可以要么是一个可观察的(我将把它变成一个列表),要么只是一个列表。
我使用 RxJava 尝试了几种不同的方法,但都失败了。无效的部分包括:
分组依据:由于流是无止境的,因此每个字母的可观察对象不会完成。所以当 A 的 运行 出来而 B 开始时,A 的 Observable 没有完成。因此,可观察对象的数量不断增加。
Window/Buffer with distinctUntilChanged - 我在原始流上使用 "distinctUntilChanged" 输出每组的第一项(第一项A,第一个 B 等)。然后我将该流用作 window
的输入或 "buffer" 运算符用作 windows/buffers 之间的边界。那没有用,我得到的只是空列表。
使用 RX 的正确解决方案是什么?
我更喜欢 Java 解决方案,但也非常欢迎可以轻松转换为 Java 的其他 RX 实现中的解决方案。
这是我解决这个问题的方法:
Observable<String> source = Observable.from(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
Observable<List<String>> output = Observable.defer(() -> {
List<String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap(new Function<String, Observable<List<String>>>() {
String lastKey;
@Override
public Observable<List<String>> apply(String t) {
String key = t.substring(0, 1);
if (lastKey != null && !key.equals(lastKey)) {
List<String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v -> {
if (buffer.isEmpty()) {
return Observable.empty();
}
return Observable.just(buffer);
})
);
}
);
output.subscribe(System.out::println);
这是它的工作原理:
- 我使用 defer 是因为我们需要每个订阅者缓冲区,而不是全局缓冲区
- 如果源恰好是有限的,我将缓冲与最后一个缓冲区的发射连接起来
- 我使用 concatMap 并添加到缓冲区,直到键改变,直到那时,我发出空的 Observables。密钥更改后,我会发出缓冲区的内容并开始一个新的内容。
您可以使用 rxjava-extras .toListWhile
:
Observable<String> source =
Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
(list, t) -> list.isEmpty()
|| list.get(0).charAt(0) == t.charAt(0)))
.forEach(System.out::println);
它做了@akarnokd 在幕后所做的事情并且经过了单元测试。
在查看了 akarnokd's and Dave 的回答后,我通过实现一个名为 BufferWhile
的自定义 Rx 运算符提出了自己的解决方案。它似乎和其他解决方案一样有效(如果我错了请纠正我),但它似乎更直接:
public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{
private final Func1<? super T, ? extends U> keyGenerator;
public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
this.keyGenerator = keyGenerator;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
return new Subscriber<T>(s) {
private ArrayList<T> buffer = null;
private U currentKey = null;
@Override
public void onCompleted() {
submitAndClearBuffer();
s.onCompleted();
}
@Override
public void onError(Throwable e) {
submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
s.onError(e);
}
@Override
public void onNext(T t) {
if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
currentKey = keyGenerator.call(t);
submitAndClearBuffer();
buffer.add(t);
} else {
buffer.add(t);
request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
}
}
private void submitAndClearBuffer() {
if (buffer != null && buffer.size() > 0) {
s.onNext(buffer);
}
buffer = new ArrayList<>();
}
};
}
}
我可以使用 lift
将此运算符应用于原始可观察对象,并获得一个发出字符串列表的可观察对象。
假设我们有一个 string
的 source
流和一个为每个 string
提取密钥的函数 key
,例如:
IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());
然后我们可以使用 Buffer
和自定义结束选择器。
var query = source.Publish(o => o.Buffer(() =>
{
var keys = o.Select(key);
return Observable
.CombineLatest(
keys.Take(1),
keys.Skip(1),
(a, b) => a != b)
.Where(x => x);
}));
当缓冲区中的第一项和我们考虑添加到缓冲区的当前项不具有相同的键时,每个缓冲区结束。
我有一个 RX 生产者,它创建一个字符串流(真实流的简化版本):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
川流不息,井然有序。所以在以 A
运行 开头的字符串出来后, B
开始。当B
运行出来时,C
开始...当Z
运行出来时,我们移动到AA1
等。有一个未知数A
、B
等,但通常每个字母有 10-30 个实例。
我正在寻找一种方法将此流分成所有 A 的块:A1 A2 A3
、所有 B 的:B1 B2
、所有 C 的:C1 C2 C3 C4 C5 C6
等。每个块可以要么是一个可观察的(我将把它变成一个列表),要么只是一个列表。
我使用 RxJava 尝试了几种不同的方法,但都失败了。无效的部分包括:
分组依据:由于流是无止境的,因此每个字母的可观察对象不会完成。所以当 A 的 运行 出来而 B 开始时,A 的 Observable 没有完成。因此,可观察对象的数量不断增加。
Window/Buffer with distinctUntilChanged - 我在原始流上使用 "distinctUntilChanged" 输出每组的第一项(第一项A,第一个 B 等)。然后我将该流用作
window
的输入或 "buffer" 运算符用作 windows/buffers 之间的边界。那没有用,我得到的只是空列表。
使用 RX 的正确解决方案是什么? 我更喜欢 Java 解决方案,但也非常欢迎可以轻松转换为 Java 的其他 RX 实现中的解决方案。
这是我解决这个问题的方法:
Observable<String> source = Observable.from(
"A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
Observable<List<String>> output = Observable.defer(() -> {
List<String> buffer = new ArrayList<>();
return
Observable.concat(
source.concatMap(new Function<String, Observable<List<String>>>() {
String lastKey;
@Override
public Observable<List<String>> apply(String t) {
String key = t.substring(0, 1);
if (lastKey != null && !key.equals(lastKey)) {
List<String> b = new ArrayList<>(buffer);
buffer.clear();
buffer.add(t);
lastKey = key;
return Observable.just(b);
}
lastKey = key;
buffer.add(t);
return Observable.empty();
}
}),
Observable.just(1)
.flatMap(v -> {
if (buffer.isEmpty()) {
return Observable.empty();
}
return Observable.just(buffer);
})
);
}
);
output.subscribe(System.out::println);
这是它的工作原理:
- 我使用 defer 是因为我们需要每个订阅者缓冲区,而不是全局缓冲区
- 如果源恰好是有限的,我将缓冲与最后一个缓冲区的发射连接起来
- 我使用 concatMap 并添加到缓冲区,直到键改变,直到那时,我发出空的 Observables。密钥更改后,我会发出缓冲区的内容并开始一个新的内容。
您可以使用 rxjava-extras .toListWhile
:
Observable<String> source =
Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
(list, t) -> list.isEmpty()
|| list.get(0).charAt(0) == t.charAt(0)))
.forEach(System.out::println);
它做了@akarnokd 在幕后所做的事情并且经过了单元测试。
在查看了 akarnokd's and Dave 的回答后,我通过实现一个名为 BufferWhile
的自定义 Rx 运算符提出了自己的解决方案。它似乎和其他解决方案一样有效(如果我错了请纠正我),但它似乎更直接:
public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{
private final Func1<? super T, ? extends U> keyGenerator;
public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
this.keyGenerator = keyGenerator;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
return new Subscriber<T>(s) {
private ArrayList<T> buffer = null;
private U currentKey = null;
@Override
public void onCompleted() {
submitAndClearBuffer();
s.onCompleted();
}
@Override
public void onError(Throwable e) {
submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
s.onError(e);
}
@Override
public void onNext(T t) {
if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
currentKey = keyGenerator.call(t);
submitAndClearBuffer();
buffer.add(t);
} else {
buffer.add(t);
request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
}
}
private void submitAndClearBuffer() {
if (buffer != null && buffer.size() > 0) {
s.onNext(buffer);
}
buffer = new ArrayList<>();
}
};
}
}
我可以使用 lift
将此运算符应用于原始可观察对象,并获得一个发出字符串列表的可观察对象。
假设我们有一个 string
的 source
流和一个为每个 string
提取密钥的函数 key
,例如:
IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());
然后我们可以使用 Buffer
和自定义结束选择器。
var query = source.Publish(o => o.Buffer(() =>
{
var keys = o.Select(key);
return Observable
.CombineLatest(
keys.Take(1),
keys.Skip(1),
(a, b) => a != b)
.Where(x => x);
}));
当缓冲区中的第一项和我们考虑添加到缓冲区的当前项不具有相同的键时,每个缓冲区结束。