RxJava 分组排序数据
RxJava group sorted data
我有一个非常大的数据集,原始数据已经按照将成为关键的内容进行了排序。例如,我有一个 CSV 文件,第一列将作为分组的键。
A,x,x,x
A,x,y,x
A,z,y,y
C,x,s,d
C,t,d,s
B,a,s,a
E,x,x,x
E,t,r,y
这些行被转换为对象并放入列表中并使用 RxJava Flowable
进行流式传输。因为这个 CSV 会很大(大到会导致应用程序崩溃),有没有什么好的方法可以将这些对象转换成地图条目,看起来像这样:
{ 'A': [[x,x,x],[x,y,x],[z,y,y]] }
使用 rxjava2-extras 中的 collectWhile
并指定集合工厂以生成特殊的键控对象:
class Keyed {
final K key;
final List<Value> list;
...
}
K key(Value value) {
...
}
source.compose(
Transformers.
collectWhile(
// factory
() -> new Keyed(),
// add
(keyed, x) -> {
keyed.list.add(x);
return keyed; },
// condition
(keyed, x) ->
keyed.list.isEmpty() ||
key(x).equals(keyed.key)));
我现在正在使用 FlowableTransformers.partialCollect
。
一个例子是
Flowable.fromPublisher(FlowableTransformers.partialCollect(
(Consumer<PartialCollectEmitter<LineData, Integer,
ListBuilder, ListDataModel>>) emitter -> {
// Get or initialize collecting object
ListBuilder lb = emitter.getAccumulator();
if (lb == null) {
lb = new ListBuilder();
emitter.setAccumulator(lb);
}
if (emitter.demand() != 0) {
boolean d = emitter.isComplete();
if (emitter.size() != 0 && !d) {
LineData data = emitter.getItem(0);
emitter.dropItems(1);
// add returns the finished model if the prefix changes
ListDataModel model = lb.add(data);
if (model != null) {
emitter.next(model);
}
} else if (d) {
if (!lb.isEmpty()) {
// clear returns the last model
emitter.next(lb.clear());
}
emitter.complete();
return;
}
}
emitter.setIndex(0);
}, Functions.emptyConsumer(), settings.getReadBufferSize() + 1).apply(
Flowable.fromIterable(file.getFileNameList())
.concatMap(
fileName -> reader
.getLineData(fileName)
.buffer(settings.getReadBufferSize()))
.flatMap(Flowable::fromIterable)))
我有一个非常大的数据集,原始数据已经按照将成为关键的内容进行了排序。例如,我有一个 CSV 文件,第一列将作为分组的键。
A,x,x,x
A,x,y,x
A,z,y,y
C,x,s,d
C,t,d,s
B,a,s,a
E,x,x,x
E,t,r,y
这些行被转换为对象并放入列表中并使用 RxJava Flowable
进行流式传输。因为这个 CSV 会很大(大到会导致应用程序崩溃),有没有什么好的方法可以将这些对象转换成地图条目,看起来像这样:
{ 'A': [[x,x,x],[x,y,x],[z,y,y]] }
使用 rxjava2-extras 中的 collectWhile
并指定集合工厂以生成特殊的键控对象:
class Keyed {
final K key;
final List<Value> list;
...
}
K key(Value value) {
...
}
source.compose(
Transformers.
collectWhile(
// factory
() -> new Keyed(),
// add
(keyed, x) -> {
keyed.list.add(x);
return keyed; },
// condition
(keyed, x) ->
keyed.list.isEmpty() ||
key(x).equals(keyed.key)));
我现在正在使用 FlowableTransformers.partialCollect
。
一个例子是
Flowable.fromPublisher(FlowableTransformers.partialCollect(
(Consumer<PartialCollectEmitter<LineData, Integer,
ListBuilder, ListDataModel>>) emitter -> {
// Get or initialize collecting object
ListBuilder lb = emitter.getAccumulator();
if (lb == null) {
lb = new ListBuilder();
emitter.setAccumulator(lb);
}
if (emitter.demand() != 0) {
boolean d = emitter.isComplete();
if (emitter.size() != 0 && !d) {
LineData data = emitter.getItem(0);
emitter.dropItems(1);
// add returns the finished model if the prefix changes
ListDataModel model = lb.add(data);
if (model != null) {
emitter.next(model);
}
} else if (d) {
if (!lb.isEmpty()) {
// clear returns the last model
emitter.next(lb.clear());
}
emitter.complete();
return;
}
}
emitter.setIndex(0);
}, Functions.emptyConsumer(), settings.getReadBufferSize() + 1).apply(
Flowable.fromIterable(file.getFileNameList())
.concatMap(
fileName -> reader
.getLineData(fileName)
.buffer(settings.getReadBufferSize()))
.flatMap(Flowable::fromIterable)))