如何在 Apache Beam Java SDK 的多个列上使用 aggregateField()?
How to use aggregateField() over multiple columns in Apache Beam Java SDK?
在 Apache Beam Python SDK 中,可以执行以下操作:
input
| GroupBy(account=lambda s: s["account"])
.aggregate_field(lambda x: x["wordsAddup"] - x["wordsSubtract"], sum, 'wordsRead')
我们如何在 Java SDK 中执行类似的操作?奇怪的是,编程指南对此转换有 only examples in Python。
这是我尝试生成 Java 中的等价物:
input.apply(
Group.byFieldNames("account")
.aggregateField(<INSERT EQUIVALENT HERE>, Sum.ofIntegers(), "wordsRead"));
https://beam.apache.org/documentation/programming-guide/#using-schemas 中有一些 Java 示例。 (请注意,您可能需要 select select 或同时具有 Java 和 Python 的 java
选项卡才能看到它们。)
在Java中我不认为aggregateField的第一个参数可以采用任意表达式;它必须是一个字段名。您可以使用为所需表达式添加新字段的投影来进行分组操作。例如
input
.apply(SqlTransform.query(
"SELECT *, wordsAddup - wordsSubtract AS wordsDiff from PCOLLECTION")
.apply(Group.byFieldNames("account")
.aggregateField("wordsDiff", Sum.ofIntegers(), "wordsRead"));
在 Apache Beam Python SDK 中,可以执行以下操作:
input
| GroupBy(account=lambda s: s["account"])
.aggregate_field(lambda x: x["wordsAddup"] - x["wordsSubtract"], sum, 'wordsRead')
我们如何在 Java SDK 中执行类似的操作?奇怪的是,编程指南对此转换有 only examples in Python。
这是我尝试生成 Java 中的等价物:
input.apply(
Group.byFieldNames("account")
.aggregateField(<INSERT EQUIVALENT HERE>, Sum.ofIntegers(), "wordsRead"));
https://beam.apache.org/documentation/programming-guide/#using-schemas 中有一些 Java 示例。 (请注意,您可能需要 select select 或同时具有 Java 和 Python 的 java
选项卡才能看到它们。)
在Java中我不认为aggregateField的第一个参数可以采用任意表达式;它必须是一个字段名。您可以使用为所需表达式添加新字段的投影来进行分组操作。例如
input
.apply(SqlTransform.query(
"SELECT *, wordsAddup - wordsSubtract AS wordsDiff from PCOLLECTION")
.apply(Group.byFieldNames("account")
.aggregateField("wordsDiff", Sum.ofIntegers(), "wordsRead"));