基于列列表对数据进行分组和聚合
Grouping and Aggregating data based on List of columns
我有一个 JSON 数组,其中包含多个 JSON 对象,每个 JSON 对象代表一行数据。 (像 SQL 行)
示例:
[{
"col1": "c1",
"col2": "r1",
"col3": 12121
}, {
"col1": "c1",
"col2": "r1",
"col3": 1321
}, {
"col1": "c1",
"col2": "r2",
"col3": 4342
}, {
"col1": "c1",
"col2": "r2",
"col3": 4532
}]
一个包含分组依据的列的列表:
示例:
["col1","col2"]
最后,必须应用聚合,MIN
、MAX
、SUM
、AVG
以及必须应用聚合的列:
预期输出:聚合为 SUM
[{
"col1": "c1",
"col2": "r1",
"col3": 13442
},{
"col1": "c1",
"col2": "r2",
"col3": 8874
}]
What I have tried so far:
我想将当前与以前的列列表进行比较,每当我看到值发生变化时,我都会对其进行汇总。但是这种方法看起来效率太低了。我正在考虑使用 Java Streams,但我很不擅长。任何帮助将不胜感激。
if (agg.equalsIgnoreCase("MIN")) {
Number min = data.getJSONObject(0).getNumber(column);
for (int i = 0; i < data.length(); i++) {
JSONObject jsonObject = data.getJSONObject(i);
if (i > 1) {
}
}
}
根据您要处理的数据量,一种不依赖流的简单方法是使用 Map
。散列聚合列值以生成映射键,并根据聚合列的值更新映射值。
我在这里创建了一个 Operation
接口,可以为每个操作(求和、最大值、最小值等)实现。
例如
interface Operation {
Long update(Long currentAggregate, int nextValue);
}
class Sum implements Operation {
@Override
public Long update(Long currentAggregate, int nextValue) {
return currentAggregate + nextValue;
}
}
JSONArray aggregate(JSONArray array, String[] columns, String aggregateColumn, Operation op) {
Map<String, Long> aggregates = new HashMap<>();
for (int i = 0; i < array.size(); ++i) {
JSONObject obj = array.getJsonObject(i);
String key = getKey(obj, columns);
Long current = aggregates.get(key);
aggregates.put(key, op.update(current, obj.getInt(aggregateColumn)));
}
// Then split the map key back out to columns values (or use a more sophisticated
// object in place of 'aggregates' that also stores the column values explicitly) and
// return a JSONArray with values for the 'aggregateColumn' taken from 'aggregates'.
// ...
}
String getKey(JSONObject obj, String[] columns) {
// Assumes no column names include "_".
StringBuilder builder = new StringBuilder();
for (int i = 0; i < columns.length; ++i)
builder.append(obj.getString(columns[i])).append("_");
return builder.toString();
}
您必须先分解问题,然后才能意识到这并不难,尤其是当有可以满足您需求的工具时,您甚至不需要自己实施某些东西。流在这里完全是一个不错的选择,因为 Java 8 Streams API 允许通过键对流式元素进行分组,并将这些组作为下游进行处理,例如聚合操作。
假设您有一个 JSON 源生成一个巨大的数据集:对于您的示例,它仍然可以表示为 Stream<JSONObject>
。我使用你的文件以流式方式读取它,生成了可供分析的数据流(我敢打赌我的拆分器实现并不完美,但它似乎有效):
public static <T> Stream<T> asStream(final JSONTokener jsonTokener) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED) {
private Status status = Status.BEFORE_ARRAY;
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
for ( ; ; ) {
switch ( status ) {
case BEFORE_ARRAY:
jsonTokener.next('[');
status = Status.IN_ARRAY;
continue;
case IN_ARRAY:
switch ( jsonTokener.nextClean() ) {
case ',':
return true;
case ']':
status = Status.AFTER_ARRAY;
return false;
default:
jsonTokener.back();
@SuppressWarnings("unchecked")
final T value = (T) jsonTokener.nextValue();
action.accept(value);
continue; // or return true?
}
case AFTER_ARRAY:
throw new IllegalStateException();
default:
throw new AssertionError(status);
}
}
}
}, false);
}
private enum Status {
BEFORE_ARRAY,
IN_ARRAY,
AFTER_ARRAY
}
它所做的只是将一些 JSON 令牌流转换为 某些东西的流(因为 org.json 对象模型不建议使用公共基础 class).
.如果您已经缓冲了 JSONArray
,可以使用此处的内容对其进行流式传输:Convert Iterable to Stream using Java 8 JDK
接下来,仅使用上面解析的流中的分组收集器:
final Collector<JSONObject, ?, Map<List<String>, Double>> collector = Collectors.groupingBy(
// your groups for (col1. col2)
row -> List.of(row.getString("col1"), row.getString("col2")),
// your aggregating SUM for col3
Collectors.summingDouble(row -> row.getDouble("col3"))
);
Assertions.assertEquals(
Map.of(List.of("c1", "r2"), 8874.0, List.of("c1", "r1"), 13442.0),
JsonStreams.<JSONObject>asStream(new JSONTokener(reader))
.collect(collector)
);
SUM
就这些了。 AVG
结果可以通过使用 Collectors.averagingDouble
.
来完成
我有一个 JSON 数组,其中包含多个 JSON 对象,每个 JSON 对象代表一行数据。 (像 SQL 行)
示例:
[{
"col1": "c1",
"col2": "r1",
"col3": 12121
}, {
"col1": "c1",
"col2": "r1",
"col3": 1321
}, {
"col1": "c1",
"col2": "r2",
"col3": 4342
}, {
"col1": "c1",
"col2": "r2",
"col3": 4532
}]
一个包含分组依据的列的列表:
示例:
["col1","col2"]
最后,必须应用聚合,MIN
、MAX
、SUM
、AVG
以及必须应用聚合的列:
预期输出:聚合为 SUM
[{
"col1": "c1",
"col2": "r1",
"col3": 13442
},{
"col1": "c1",
"col2": "r2",
"col3": 8874
}]
What I have tried so far:
我想将当前与以前的列列表进行比较,每当我看到值发生变化时,我都会对其进行汇总。但是这种方法看起来效率太低了。我正在考虑使用 Java Streams,但我很不擅长。任何帮助将不胜感激。
if (agg.equalsIgnoreCase("MIN")) {
Number min = data.getJSONObject(0).getNumber(column);
for (int i = 0; i < data.length(); i++) {
JSONObject jsonObject = data.getJSONObject(i);
if (i > 1) {
}
}
}
根据您要处理的数据量,一种不依赖流的简单方法是使用 Map
。散列聚合列值以生成映射键,并根据聚合列的值更新映射值。
我在这里创建了一个 Operation
接口,可以为每个操作(求和、最大值、最小值等)实现。
例如
interface Operation {
Long update(Long currentAggregate, int nextValue);
}
class Sum implements Operation {
@Override
public Long update(Long currentAggregate, int nextValue) {
return currentAggregate + nextValue;
}
}
JSONArray aggregate(JSONArray array, String[] columns, String aggregateColumn, Operation op) {
Map<String, Long> aggregates = new HashMap<>();
for (int i = 0; i < array.size(); ++i) {
JSONObject obj = array.getJsonObject(i);
String key = getKey(obj, columns);
Long current = aggregates.get(key);
aggregates.put(key, op.update(current, obj.getInt(aggregateColumn)));
}
// Then split the map key back out to columns values (or use a more sophisticated
// object in place of 'aggregates' that also stores the column values explicitly) and
// return a JSONArray with values for the 'aggregateColumn' taken from 'aggregates'.
// ...
}
String getKey(JSONObject obj, String[] columns) {
// Assumes no column names include "_".
StringBuilder builder = new StringBuilder();
for (int i = 0; i < columns.length; ++i)
builder.append(obj.getString(columns[i])).append("_");
return builder.toString();
}
您必须先分解问题,然后才能意识到这并不难,尤其是当有可以满足您需求的工具时,您甚至不需要自己实施某些东西。流在这里完全是一个不错的选择,因为 Java 8 Streams API 允许通过键对流式元素进行分组,并将这些组作为下游进行处理,例如聚合操作。
假设您有一个 JSON 源生成一个巨大的数据集:对于您的示例,它仍然可以表示为 Stream<JSONObject>
。我使用你的文件以流式方式读取它,生成了可供分析的数据流(我敢打赌我的拆分器实现并不完美,但它似乎有效):
public static <T> Stream<T> asStream(final JSONTokener jsonTokener) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED) {
private Status status = Status.BEFORE_ARRAY;
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
for ( ; ; ) {
switch ( status ) {
case BEFORE_ARRAY:
jsonTokener.next('[');
status = Status.IN_ARRAY;
continue;
case IN_ARRAY:
switch ( jsonTokener.nextClean() ) {
case ',':
return true;
case ']':
status = Status.AFTER_ARRAY;
return false;
default:
jsonTokener.back();
@SuppressWarnings("unchecked")
final T value = (T) jsonTokener.nextValue();
action.accept(value);
continue; // or return true?
}
case AFTER_ARRAY:
throw new IllegalStateException();
default:
throw new AssertionError(status);
}
}
}
}, false);
}
private enum Status {
BEFORE_ARRAY,
IN_ARRAY,
AFTER_ARRAY
}
它所做的只是将一些 JSON 令牌流转换为 某些东西的流(因为 org.json 对象模型不建议使用公共基础 class).
.如果您已经缓冲了 JSONArray
,可以使用此处的内容对其进行流式传输:Convert Iterable to Stream using Java 8 JDK
接下来,仅使用上面解析的流中的分组收集器:
final Collector<JSONObject, ?, Map<List<String>, Double>> collector = Collectors.groupingBy(
// your groups for (col1. col2)
row -> List.of(row.getString("col1"), row.getString("col2")),
// your aggregating SUM for col3
Collectors.summingDouble(row -> row.getDouble("col3"))
);
Assertions.assertEquals(
Map.of(List.of("c1", "r2"), 8874.0, List.of("c1", "r1"), 13442.0),
JsonStreams.<JSONObject>asStream(new JSONTokener(reader))
.collect(collector)
);
SUM
就这些了。 AVG
结果可以通过使用 Collectors.averagingDouble
.