RxJava- Group、Emit 和 Zip Sorted "Chunks" with a common 属性?

RxJava- Group, Emit, and Zip Sorted "Chunks" with a common property?

我和我的同事 运行 经常遇到挑战,我希望响应式编程能够解决它。不过,它可能需要我自己实现 OperatorTransformer

我想获取任何 Observable<T> 发出 T 的项目,但我希望操作员将它们分组在 T 的映射上并将每个分组作为 List<T>,或者更好的一些通用累加器,就像 Java 8 流中的 Collector

但这是我认为 groupBy() 无法做到的棘手部分。我想通过这个运算符获取两个 Observables,并假设发出的项目在 属性 上排序(传入数据将从排序的 SQL 查询发出并映射到 T 对象).运算符将连续累积项目,直到 属性 更改,然后它发出该组并移动到下一个。这样我就可以从每个 Observable 中获取每组数据,压缩并处理这两个块,然后将它们扔掉并继续下一个。这样我就可以保持半缓冲状态并保持低内存使用率。

因此,如果我在 PARTITION_ID 上进行排序、分组和压缩,这就是我想要完成的视觉效果。

我这样做只是因为我可能有两个查询,每个查询都超过一百万条记录,而且我需要并排进行复杂的比较。我没有足够的内存来一次从两侧导入所有数据,但我可以将其范围缩小到每个排序的 属性 值并将其分成几批。在每批之后,GC 将丢弃它,Operator 可以继续处理下一批。

这是我目前的代码,但我不太清楚如何继续,因为我不想在批处理完成之前发出任何东西。我该怎么做?

public final class  SortedPartitioner<T,P,C,R> implements Transformer<T,R> {

 private final Function<T,P> mappedPartitionProperty;
 private final Supplier<C> acculatorSupplier;
 private final BiConsumer<T,R> accumulator;
 private final Function<C,R> finalResult;


 private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
   BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
  this.mappedPartitionProperty = mappedPartitionProperty;
  this.acculatorSupplier = acculatorSupplier;
  this.accumulator = accumulator;
  this.finalResult = finalResult;
 }
 public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
   Function<T,P> mappedPartitionProperty, 
   Supplier<C> accumulatorSupplier,
   BiConsumer<T,R> accumulator,
   Function<C,R> finalResult) {

  return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);

 }
 @Override
 public Observable<R> call(Observable<T> t) {
  return null;
 }

}

这是一个棘手的问题,但我也经常遇到。

诀窍是使用 materializescanflatMapscan 累积具有相同 partitionId 和下一个不同值(如果存在)的值列表。 materialize 是必需的,因为我们需要知道源何时完成,以便我们可以发出剩余的不同值(如果存在)。 flatMap 获取列表和值并在值存在时发出列表(我们刚刚切换到一个新的 partitionId)并在流完成时发出值(剩余的)。

下面是一个单元测试,它从列表 1, 1, 2, 2, 2, 3 发出列表 {1, 1}, {2, 2, 2}, {3}

对于您的用例,您只需将此技术应用于两个源并将它们压缩在一起。

代码:

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import org.junit.Test;

import rx.Observable;

public class StateMachineExampleTest {

    @Test
    public void testForWhosebug() {
        Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3);
        State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(),
                false);
        List<List<Integer>> lists = a.materialize()
                // accumulate lists and uses onCompleted notification to emit
                // left overs when source completes
                .scan(initial,
                        (state, notification) -> {
                            if (notification.isOnCompleted()) {
                                return new State<>(null, state.value, true);
                            } else if (notification.isOnError())
                                throw new RuntimeException(notification.getThrowable());
                            else if (state.list.size() == 0) {
                                return new State<>(Arrays.asList(notification.getValue()), Optional
                                        .empty(), false);
                            } else if (partitionId(notification.getValue()) == partitionId(state.list
                                    .get(0))) {
                                List<Integer> list = new ArrayList<>();
                                list.addAll(state.list);
                                list.add(notification.getValue());
                                return new State<>(list, Optional.empty(), false);
                            } else if (state.value.isPresent()) {
                                if (partitionId(state.value.get()) == partitionId(notification
                                        .getValue())) {
                                    return new State<>(Arrays.asList(state.value.get(),
                                            notification.getValue()), Optional.empty(), false);
                                } else {
                                    return new State<>(Arrays.asList(state.value.get()), Optional
                                            .of(notification.getValue()), false);
                                }
                            } else {
                                return new State<>(state.list,
                                        Optional.of(notification.getValue()), false);
                            }
                        })
                // emit lists from state
                .flatMap(state -> {
                    if (state.completed) {
                        if (state.value.isPresent())
                            return Observable.just(Arrays.asList(state.value.get()));
                        else
                            return Observable.empty();
                    } else if (state.value.isPresent()) {
                        return Observable.just(state.list);
                    } else {
                        return Observable.empty();
                    }
                })
                // get as a list of lists to check
                .toList().toBlocking().single();
        assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)),
                lists);
    }

    private static int partitionId(Integer n) {
        return n;
    }

    private static final class State<T> {
        final List<T> list;
        final Optional<T> value;
        final boolean completed;

        State(List<T> list, Optional<T> value, boolean completed) {
            this.list = list;
            this.value = value;
            this.completed = completed;
        }
    }

}

请记住,此代码是快速编写的,可能有漏洞。请务必对您改编的此代码进行完整的单元测试。

给你的额外注意是,因为我们使用背压支持运算符 materializescanflatMap,结果转换也支持背压,因此可以安全地与 zip.

你的另一个答案使用了 Maven Central 上的库,而且更短。

将此依赖项添加到您的 pom.xml

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.5.13</version>
</dependency>

对于具有相同 partition_id 的项目进行分组,请执行以下操作:

import com.github.davidmoten.rx.Transformers;

Observable<List<Item>> grouped = items.compose(
    Transformers.toListWhile(
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

此方法的测试非常全面(另请参阅 Transformers.collectWhile 了解列表以外的数据结构),但您可以在 github.

上自行查看源代码

然后继续zip