Rx - 按条件将流分成段(列表)

Rx - Divide stream into segments (lists) by condition

我有一个 RX 生产者,它创建一个字符串流(真实流的简化版本):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

川流不息,井然有序。所以在以 A 运行 开头的字符串出来后, B 开始。当B 运行出来时,C开始...当Z 运行出来时,我们移动到AA1等。有一个未知数AB 等,但通常每个字母有 10-30 个实例。

我正在寻找一种方法将此流分成所有 A 的块:A1 A2 A3、所有 B 的:B1 B2、所有 C 的:C1 C2 C3 C4 C5 C6 等。每个块可以要么是一个可观察的(我将把它变成一个列表),要么只是一个列表。

我使用 RxJava 尝试了几种不同的方法,但都失败了。无效的部分包括:

使用 RX 的正确解决方案是什么? 我更喜欢 Java 解决方案,但也非常欢迎可以轻松转换为 Java 的其他 RX 实现中的解决方案。

这是我解决这个问题的方法:

Observable<String> source = Observable.from(
        "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");

Observable<List<String>> output = Observable.defer(() -> {
    List<String> buffer = new ArrayList<>();
    return 
            Observable.concat(
                source.concatMap(new Function<String, Observable<List<String>>>() {
                    String lastKey;
                    @Override
                    public Observable<List<String>> apply(String t) {
                        String key = t.substring(0, 1);
                        if (lastKey != null && !key.equals(lastKey)) {
                            List<String> b = new ArrayList<>(buffer);
                            buffer.clear();
                            buffer.add(t);
                            lastKey = key;
                            return Observable.just(b);
                        }
                        lastKey = key;
                        buffer.add(t);
                        return Observable.empty();
                    }
                }),
                Observable.just(1)
                .flatMap(v -> {
                    if (buffer.isEmpty()) {
                        return Observable.empty();
                    }
                    return Observable.just(buffer);
                })
            );
    }
);

output.subscribe(System.out::println);

这是它的工作原理:

  • 我使用 defer 是因为我们需要每个订阅者缓冲区,而不是全局缓冲区
  • 如果源恰好是有限的,我将缓冲与最后一个缓冲区的发射连接起来
  • 我使用 concatMap 并添加到缓冲区,直到键改变,直到那时,我发出空的 Observables。密钥更改后,我会发出缓冲区的内容并开始一个新的内容。

您可以使用 rxjava-extras .toListWhile:

Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1");
source.compose(Transformers.<String> toListWhile(
            (list, t) -> list.isEmpty() 
                         || list.get(0).charAt(0) == t.charAt(0)))
      .forEach(System.out::println);

它做了@akarnokd 在幕后所做的事情并且经过了单元测试。

在查看了 akarnokd's and Dave 的回答后,我通过实现一个名为 BufferWhile 的自定义 Rx 运算符提出了自己的解决方案。它似乎和其他解决方案一样有效(如果我错了请纠正我),但它似乎更直接:

public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{

    private final Func1<? super T, ? extends U> keyGenerator;

    public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) {
        this.keyGenerator = keyGenerator;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super List<T>> s) {
        return new Subscriber<T>(s) {

            private ArrayList<T> buffer = null;
            private U currentKey = null;

            @Override
            public void onCompleted() {
                submitAndClearBuffer();
                s.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case
                s.onError(e);
            }

            @Override
            public void onNext(T t) {
                if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) {
                    currentKey = keyGenerator.call(t);
                    submitAndClearBuffer();
                    buffer.add(t);
                } else {
                    buffer.add(t);
                    request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers
                }
            }

            private void submitAndClearBuffer() {
                if (buffer != null && buffer.size() > 0) {
                    s.onNext(buffer);
                }
                buffer = new ArrayList<>();
            }
        };
    }
}

我可以使用 lift 将此运算符应用于原始可观察对象,并获得一个发出字符串列表的可观察对象。

假设我们有一个 stringsource 流和一个为每个 string 提取密钥的函数 key,例如:

IObservable<string> source = ...;
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray());

然后我们可以使用 Buffer 和自定义结束选择器。

var query = source.Publish(o => o.Buffer(() =>
{
    var keys = o.Select(key);
    return Observable
        .CombineLatest(
            keys.Take(1),
            keys.Skip(1),
            (a, b) => a != b)
        .Where(x => x);
}));

当缓冲区中的第一项和我们考虑添加到缓冲区的当前项不具有相同的键时,每个缓冲区结束。