如何使用 Java 在 Apache Beam 中按键减少并获得此输出键=值

how to reduce by key in Apache Beam using Java and get this output key=value

这里是菜鸟!我正在尝试在 Java 中学习 Apache Beam,但我一直没有进步! 假设我有一个这种格式的文件:

957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Kyle,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019

我想知道如何计算包含“帐户”的行中的人数并获得此输出:

Kyle=2
Kumiko=1

Mostafa 如果你想在 Java 中使用 Apache Beam,首先在你的 pom.xml 文件中添加这两个依赖项:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.25.0</version>
</dependency>

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>2.25.0</version>
</dependency>

然后为了达到你想要的输出你可以做这样的事情(它并不完美但给了你想法)

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p
        .apply(TextIO.read().from("input.txt"))
        .apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
                .via(l -> Arrays.asList(l.split(","))))
        .apply(Filter.by(element -> element.get(3).equals("Accounts")))
        .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
                .via(element -> KV.of(element.get(1), 1)))
        .apply(Combine.perKey((a, b) -> a + b))
        .apply(MapElements.into(TypeDescriptors.strings()).via(element -> element.getKey() + "=" + element.getValue()))
        .apply(TextIO.write().to("data/output").withNumShards(1).withSuffix(".txt"));

p.run().waitUntilFinish();

这导致:

Kyle=2
Kumiko=1