RxJava- Group、Emit 和 Zip Sorted "Chunks" with a common 属性?
RxJava- Group, Emit, and Zip Sorted "Chunks" with a common property?
我和我的同事 运行 经常遇到挑战,我希望响应式编程能够解决它。不过,它可能需要我自己实现 Operator
或 Transformer
。
我想获取任何 Observable<T>
发出 T
的项目,但我希望操作员将它们分组在 T
的映射上并将每个分组作为 List<T>
,或者更好的一些通用累加器,就像 Java 8 流中的 Collector
。
但这是我认为 groupBy()
无法做到的棘手部分。我想通过这个运算符获取两个 Observables,并假设发出的项目在 属性 上排序(传入数据将从排序的 SQL 查询发出并映射到 T
对象).运算符将连续累积项目,直到 属性 更改,然后它发出该组并移动到下一个。这样我就可以从每个 Observable 中获取每组数据,压缩并处理这两个块,然后将它们扔掉并继续下一个。这样我就可以保持半缓冲状态并保持低内存使用率。
因此,如果我在 PARTITION_ID
上进行排序、分组和压缩,这就是我想要完成的视觉效果。
我这样做只是因为我可能有两个查询,每个查询都超过一百万条记录,而且我需要并排进行复杂的比较。我没有足够的内存来一次从两侧导入所有数据,但我可以将其范围缩小到每个排序的 属性 值并将其分成几批。在每批之后,GC 将丢弃它,Operator 可以继续处理下一批。
这是我目前的代码,但我不太清楚如何继续,因为我不想在批处理完成之前发出任何东西。我该怎么做?
public final class SortedPartitioner<T,P,C,R> implements Transformer<T,R> {
private final Function<T,P> mappedPartitionProperty;
private final Supplier<C> acculatorSupplier;
private final BiConsumer<T,R> accumulator;
private final Function<C,R> finalResult;
private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
this.mappedPartitionProperty = mappedPartitionProperty;
this.acculatorSupplier = acculatorSupplier;
this.accumulator = accumulator;
this.finalResult = finalResult;
}
public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
Function<T,P> mappedPartitionProperty,
Supplier<C> accumulatorSupplier,
BiConsumer<T,R> accumulator,
Function<C,R> finalResult) {
return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);
}
@Override
public Observable<R> call(Observable<T> t) {
return null;
}
}
这是一个棘手的问题,但我也经常遇到。
诀窍是使用 materialize
、scan
和 flatMap
。 scan
累积具有相同 partitionId 和下一个不同值(如果存在)的值列表。 materialize
是必需的,因为我们需要知道源何时完成,以便我们可以发出剩余的不同值(如果存在)。 flatMap
获取列表和值并在值存在时发出列表(我们刚刚切换到一个新的 partitionId)并在流完成时发出值(剩余的)。
下面是一个单元测试,它从列表 1, 1, 2, 2, 2, 3
发出列表 {1, 1}, {2, 2, 2}, {3}
。
对于您的用例,您只需将此技术应用于两个源并将它们压缩在一起。
代码:
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
import rx.Observable;
public class StateMachineExampleTest {
@Test
public void testForWhosebug() {
Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3);
State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(),
false);
List<List<Integer>> lists = a.materialize()
// accumulate lists and uses onCompleted notification to emit
// left overs when source completes
.scan(initial,
(state, notification) -> {
if (notification.isOnCompleted()) {
return new State<>(null, state.value, true);
} else if (notification.isOnError())
throw new RuntimeException(notification.getThrowable());
else if (state.list.size() == 0) {
return new State<>(Arrays.asList(notification.getValue()), Optional
.empty(), false);
} else if (partitionId(notification.getValue()) == partitionId(state.list
.get(0))) {
List<Integer> list = new ArrayList<>();
list.addAll(state.list);
list.add(notification.getValue());
return new State<>(list, Optional.empty(), false);
} else if (state.value.isPresent()) {
if (partitionId(state.value.get()) == partitionId(notification
.getValue())) {
return new State<>(Arrays.asList(state.value.get(),
notification.getValue()), Optional.empty(), false);
} else {
return new State<>(Arrays.asList(state.value.get()), Optional
.of(notification.getValue()), false);
}
} else {
return new State<>(state.list,
Optional.of(notification.getValue()), false);
}
})
// emit lists from state
.flatMap(state -> {
if (state.completed) {
if (state.value.isPresent())
return Observable.just(Arrays.asList(state.value.get()));
else
return Observable.empty();
} else if (state.value.isPresent()) {
return Observable.just(state.list);
} else {
return Observable.empty();
}
})
// get as a list of lists to check
.toList().toBlocking().single();
assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)),
lists);
}
private static int partitionId(Integer n) {
return n;
}
private static final class State<T> {
final List<T> list;
final Optional<T> value;
final boolean completed;
State(List<T> list, Optional<T> value, boolean completed) {
this.list = list;
this.value = value;
this.completed = completed;
}
}
}
请记住,此代码是快速编写的,可能有漏洞。请务必对您改编的此代码进行完整的单元测试。
给你的额外注意是,因为我们使用背压支持运算符 materialize
、scan
和 flatMap
,结果转换也支持背压,因此可以安全地与 zip
.
你的另一个答案使用了 Maven Central 上的库,而且更短。
将此依赖项添加到您的 pom.xml
。
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.5.13</version>
</dependency>
对于具有相同 partition_id
的项目进行分组,请执行以下操作:
import com.github.davidmoten.rx.Transformers;
Observable<List<Item>> grouped = items.compose(
Transformers.toListWhile(
(list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));
此方法的测试非常全面(另请参阅 Transformers.collectWhile
了解列表以外的数据结构),但您可以在 github.
上自行查看源代码
然后继续zip
。
我和我的同事 运行 经常遇到挑战,我希望响应式编程能够解决它。不过,它可能需要我自己实现 Operator
或 Transformer
。
我想获取任何 Observable<T>
发出 T
的项目,但我希望操作员将它们分组在 T
的映射上并将每个分组作为 List<T>
,或者更好的一些通用累加器,就像 Java 8 流中的 Collector
。
但这是我认为 groupBy()
无法做到的棘手部分。我想通过这个运算符获取两个 Observables,并假设发出的项目在 属性 上排序(传入数据将从排序的 SQL 查询发出并映射到 T
对象).运算符将连续累积项目,直到 属性 更改,然后它发出该组并移动到下一个。这样我就可以从每个 Observable 中获取每组数据,压缩并处理这两个块,然后将它们扔掉并继续下一个。这样我就可以保持半缓冲状态并保持低内存使用率。
因此,如果我在 PARTITION_ID
上进行排序、分组和压缩,这就是我想要完成的视觉效果。
我这样做只是因为我可能有两个查询,每个查询都超过一百万条记录,而且我需要并排进行复杂的比较。我没有足够的内存来一次从两侧导入所有数据,但我可以将其范围缩小到每个排序的 属性 值并将其分成几批。在每批之后,GC 将丢弃它,Operator 可以继续处理下一批。
这是我目前的代码,但我不太清楚如何继续,因为我不想在批处理完成之前发出任何东西。我该怎么做?
public final class SortedPartitioner<T,P,C,R> implements Transformer<T,R> {
private final Function<T,P> mappedPartitionProperty;
private final Supplier<C> acculatorSupplier;
private final BiConsumer<T,R> accumulator;
private final Function<C,R> finalResult;
private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
this.mappedPartitionProperty = mappedPartitionProperty;
this.acculatorSupplier = acculatorSupplier;
this.accumulator = accumulator;
this.finalResult = finalResult;
}
public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
Function<T,P> mappedPartitionProperty,
Supplier<C> accumulatorSupplier,
BiConsumer<T,R> accumulator,
Function<C,R> finalResult) {
return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);
}
@Override
public Observable<R> call(Observable<T> t) {
return null;
}
}
这是一个棘手的问题,但我也经常遇到。
诀窍是使用 materialize
、scan
和 flatMap
。 scan
累积具有相同 partitionId 和下一个不同值(如果存在)的值列表。 materialize
是必需的,因为我们需要知道源何时完成,以便我们可以发出剩余的不同值(如果存在)。 flatMap
获取列表和值并在值存在时发出列表(我们刚刚切换到一个新的 partitionId)并在流完成时发出值(剩余的)。
下面是一个单元测试,它从列表 1, 1, 2, 2, 2, 3
发出列表 {1, 1}, {2, 2, 2}, {3}
。
对于您的用例,您只需将此技术应用于两个源并将它们压缩在一起。
代码:
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
import rx.Observable;
public class StateMachineExampleTest {
@Test
public void testForWhosebug() {
Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3);
State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(),
false);
List<List<Integer>> lists = a.materialize()
// accumulate lists and uses onCompleted notification to emit
// left overs when source completes
.scan(initial,
(state, notification) -> {
if (notification.isOnCompleted()) {
return new State<>(null, state.value, true);
} else if (notification.isOnError())
throw new RuntimeException(notification.getThrowable());
else if (state.list.size() == 0) {
return new State<>(Arrays.asList(notification.getValue()), Optional
.empty(), false);
} else if (partitionId(notification.getValue()) == partitionId(state.list
.get(0))) {
List<Integer> list = new ArrayList<>();
list.addAll(state.list);
list.add(notification.getValue());
return new State<>(list, Optional.empty(), false);
} else if (state.value.isPresent()) {
if (partitionId(state.value.get()) == partitionId(notification
.getValue())) {
return new State<>(Arrays.asList(state.value.get(),
notification.getValue()), Optional.empty(), false);
} else {
return new State<>(Arrays.asList(state.value.get()), Optional
.of(notification.getValue()), false);
}
} else {
return new State<>(state.list,
Optional.of(notification.getValue()), false);
}
})
// emit lists from state
.flatMap(state -> {
if (state.completed) {
if (state.value.isPresent())
return Observable.just(Arrays.asList(state.value.get()));
else
return Observable.empty();
} else if (state.value.isPresent()) {
return Observable.just(state.list);
} else {
return Observable.empty();
}
})
// get as a list of lists to check
.toList().toBlocking().single();
assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)),
lists);
}
private static int partitionId(Integer n) {
return n;
}
private static final class State<T> {
final List<T> list;
final Optional<T> value;
final boolean completed;
State(List<T> list, Optional<T> value, boolean completed) {
this.list = list;
this.value = value;
this.completed = completed;
}
}
}
请记住,此代码是快速编写的,可能有漏洞。请务必对您改编的此代码进行完整的单元测试。
给你的额外注意是,因为我们使用背压支持运算符 materialize
、scan
和 flatMap
,结果转换也支持背压,因此可以安全地与 zip
.
你的另一个答案使用了 Maven Central 上的库,而且更短。
将此依赖项添加到您的 pom.xml
。
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava-extras</artifactId>
<version>0.5.13</version>
</dependency>
对于具有相同 partition_id
的项目进行分组,请执行以下操作:
import com.github.davidmoten.rx.Transformers;
Observable<List<Item>> grouped = items.compose(
Transformers.toListWhile(
(list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));
此方法的测试非常全面(另请参阅 Transformers.collectWhile
了解列表以外的数据结构),但您可以在 github.
然后继续zip
。