如何使用 Java 在 Apache Beam 中按键减少并获得此输出键=值
how to reduce by key in Apache Beam using Java and get this output key=value
这里是菜鸟!我正在尝试在 Java 中学习 Apache Beam,但我一直没有进步!
假设我有一个这种格式的文件:
957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Kyle,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019
我想知道如何计算包含“帐户”的行中的人数并获得此输出:
Kyle=2
Kumiko=1
Mostafa 如果你想在 Java 中使用 Apache Beam,首先在你的 pom.xml 文件中添加这两个依赖项:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.25.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.25.0</version>
</dependency>
然后为了达到你想要的输出你可以做这样的事情(它并不完美但给了你想法)
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p
.apply(TextIO.read().from("input.txt"))
.apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via(l -> Arrays.asList(l.split(","))))
.apply(Filter.by(element -> element.get(3).equals("Accounts")))
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via(element -> KV.of(element.get(1), 1)))
.apply(Combine.perKey((a, b) -> a + b))
.apply(MapElements.into(TypeDescriptors.strings()).via(element -> element.getKey() + "=" + element.getValue()))
.apply(TextIO.write().to("data/output").withNumShards(1).withSuffix(".txt"));
p.run().waitUntilFinish();
这导致:
Kyle=2
Kumiko=1
这里是菜鸟!我正在尝试在 Java 中学习 Apache Beam,但我一直没有进步! 假设我有一个这种格式的文件:
957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Kyle,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019
我想知道如何计算包含“帐户”的行中的人数并获得此输出:
Kyle=2
Kumiko=1
Mostafa 如果你想在 Java 中使用 Apache Beam,首先在你的 pom.xml 文件中添加这两个依赖项:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.25.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.25.0</version>
</dependency>
然后为了达到你想要的输出你可以做这样的事情(它并不完美但给了你想法)
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p
.apply(TextIO.read().from("input.txt"))
.apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via(l -> Arrays.asList(l.split(","))))
.apply(Filter.by(element -> element.get(3).equals("Accounts")))
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via(element -> KV.of(element.get(1), 1)))
.apply(Combine.perKey((a, b) -> a + b))
.apply(MapElements.into(TypeDescriptors.strings()).via(element -> element.getKey() + "=" + element.getValue()))
.apply(TextIO.write().to("data/output").withNumShards(1).withSuffix(".txt"));
p.run().waitUntilFinish();
这导致:
Kyle=2
Kumiko=1