如何获取PCollection<String, String>中的元素总数
How to get the total number of elements in a PCollection<String, String>
我想在 apache beam 中获取 PCollection<String, String>
中的元素总数。我想存储此计数以供进一步使用。如何编写相同的 java 代码?
在 Apache Beam 中有一个名为 Count 的转换(这里的 JavaDoc 是 link)。这有一个名为 globally
的方法,其中 returns 一个包含输入 PCollection 中元素数量的 PCollection。您将使用此方法来获取元素的数量。
这是我用来测试的逻辑片段:
private class MyMap extends SimpleFunction < Long, Long > {
public Long apply(Long in ) {
System.out.println("Length is: " + in );
return in;
}
}
public void run(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
// Create a PCollection from static objects
ArrayList < String > strs = new ArrayList < > ();
strs.add("Neil");
strs.add("John");
strs.add("Bob");
PCollection < String > pc1 = p.apply(Create.of(strs));
PCollection < Long > count = pc1.apply(Count.globally());
count.apply(MapElements.via(new MyMap()));
System.out.println("About to run!");
p.run().waitUntilFinish();
System.out.println("Run complete!");
} // run
当运行时,此代码创建一个包含三个字符串的PCollection。然后我应用 Count.globally()
转换,最后应用一个 Map 来记录包含一个元素的新 PCollection ...长度。
我想在 apache beam 中获取 PCollection<String, String>
中的元素总数。我想存储此计数以供进一步使用。如何编写相同的 java 代码?
在 Apache Beam 中有一个名为 Count 的转换(这里的 JavaDoc 是 link)。这有一个名为 globally
的方法,其中 returns 一个包含输入 PCollection 中元素数量的 PCollection。您将使用此方法来获取元素的数量。
这是我用来测试的逻辑片段:
private class MyMap extends SimpleFunction < Long, Long > {
public Long apply(Long in ) {
System.out.println("Length is: " + in );
return in;
}
}
public void run(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
// Create a PCollection from static objects
ArrayList < String > strs = new ArrayList < > ();
strs.add("Neil");
strs.add("John");
strs.add("Bob");
PCollection < String > pc1 = p.apply(Create.of(strs));
PCollection < Long > count = pc1.apply(Count.globally());
count.apply(MapElements.via(new MyMap()));
System.out.println("About to run!");
p.run().waitUntilFinish();
System.out.println("Run complete!");
} // run
当运行时,此代码创建一个包含三个字符串的PCollection。然后我应用 Count.globally()
转换,最后应用一个 Map 来记录包含一个元素的新 PCollection ...长度。