基于列列表对数据进行分组和聚合

Grouping and Aggregating data based on List of columns

我有一个 JSON 数组,其中包含多个 JSON 对象,每个 JSON 对象代表一行数据。 (像 SQL 行)

示例:

[{
    "col1": "c1",
    "col2": "r1",
    "col3": 12121
}, {
    "col1": "c1",
    "col2": "r1",
    "col3": 1321
}, {
    "col1": "c1",
    "col2": "r2",
    "col3": 4342
}, {
    "col1": "c1",
    "col2": "r2",
    "col3": 4532
}]

一个包含分组依据的列的列表:

示例:

["col1","col2"]

最后,必须应用聚合,MINMAXSUMAVG 以及必须应用聚合的列:

预期输出:聚合为 SUM

[{
    "col1": "c1",
    "col2": "r1",
    "col3": 13442
},{
    "col1": "c1",
    "col2": "r2",
    "col3": 8874
}]

What I have tried so far:

我想将当前与以前的列列表进行比较,每当我看到值发生变化时,我都会对其进行汇总。但是这种方法看起来效率太低了。我正在考虑使用 Java Streams,但我很不擅长。任何帮助将不胜感激。

 if (agg.equalsIgnoreCase("MIN")) {
        Number min = data.getJSONObject(0).getNumber(column);
        for (int i = 0; i < data.length(); i++) {
            JSONObject jsonObject = data.getJSONObject(i);
            if (i > 1) {
            }
        }
    }

根据您要处理的数据量,一种不依赖流的简单方法是使用 Map。散列聚合列值以生成映射键,并根据聚合列的值更新映射值。

我在这里创建了一个 Operation 接口,可以为每个操作(求和、最大值、最小值等)实现。

例如

interface Operation {
    Long update(Long currentAggregate, int nextValue);
}

class Sum implements Operation {
    @Override
    public Long update(Long currentAggregate, int nextValue) {
        return currentAggregate + nextValue;
    }
}

JSONArray aggregate(JSONArray array, String[] columns, String aggregateColumn, Operation op) {
    Map<String, Long> aggregates = new HashMap<>();
    for (int i = 0; i < array.size(); ++i) {
        JSONObject obj = array.getJsonObject(i);
        String key = getKey(obj, columns);
        Long current = aggregates.get(key);
        aggregates.put(key, op.update(current, obj.getInt(aggregateColumn)));
    }
    // Then split the map key back out to columns values (or use a more sophisticated 
    // object in place of 'aggregates' that also stores the column values explicitly) and 
    // return a JSONArray with values for the 'aggregateColumn' taken from 'aggregates'.
    // ...
}

String getKey(JSONObject obj, String[] columns) {
    // Assumes no column names include "_".
    StringBuilder builder = new StringBuilder();
    for (int i = 0; i < columns.length; ++i)
        builder.append(obj.getString(columns[i])).append("_");
    return builder.toString();
}

您必须先分解问题,然后才能意识到这并不难,尤其是当有可以满足您需求的工具时,您甚至不需要自己实施某些东西。流在这里完全是一个不错的选择,因为 Java 8 Streams API 允许通过键对流式元素进行分组,并将这些组作为下游进行处理,例如聚合操作。

假设您有一个 JSON 源生成一个巨大的数据集:对于您的示例,它仍然可以表示为 Stream<JSONObject>。我使用你的文件以流式方式读取它,生成了可供分析的数据流(我敢打赌我的拆分器实现并不完美,但它似乎有效):

public static <T> Stream<T> asStream(final JSONTokener jsonTokener) {
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED) {
        private Status status = Status.BEFORE_ARRAY;

        @Override
        public boolean tryAdvance(final Consumer<? super T> action) {
            for ( ; ; ) {
                switch ( status ) {
                case BEFORE_ARRAY:
                    jsonTokener.next('[');
                    status = Status.IN_ARRAY;
                    continue;
                case IN_ARRAY:
                    switch ( jsonTokener.nextClean() ) {
                    case ',':
                        return true;
                    case ']':
                        status = Status.AFTER_ARRAY;
                        return false;
                    default:
                        jsonTokener.back();
                        @SuppressWarnings("unchecked")
                        final T value = (T) jsonTokener.nextValue();
                        action.accept(value);
                        continue; // or return true?
                    }
                case AFTER_ARRAY:
                    throw new IllegalStateException();
                default:
                    throw new AssertionError(status);
                }
            }
        }
    }, false);
}

private enum Status {

    BEFORE_ARRAY,
    IN_ARRAY,
    AFTER_ARRAY

}

它所做的只是将一些 JSON 令牌流转换为 某些东西的流(因为 org.json 对象模型不建议使用公共基础 class). .如果您已经缓冲了 JSONArray,可以使用此处的内容对其进行流式传输:Convert Iterable to Stream using Java 8 JDK

接下来,仅使用上面解析的流中的分组收集器:

final Collector<JSONObject, ?, Map<List<String>, Double>> collector = Collectors.groupingBy(
        // your groups for (col1. col2)
        row -> List.of(row.getString("col1"), row.getString("col2")),
        // your aggregating SUM for col3
        Collectors.summingDouble(row -> row.getDouble("col3"))
);
Assertions.assertEquals(
        Map.of(List.of("c1", "r2"), 8874.0, List.of("c1", "r1"), 13442.0),
        JsonStreams.<JSONObject>asStream(new JSONTokener(reader))
                .collect(collector)
);

SUM 就这些了。 AVG 结果可以通过使用 Collectors.averagingDouble.

来完成