如何在 apache_beam 的 MapElements 中动态添加字段?
How can I dynamically add field in MapElements in apache_beam?
如果有人能帮助我为 apache_beam (2.13.0) 编写 java 代码,我将不胜感激。
在 python 中,您可以 动态 使用 1 到 1 映射 [=47] 添加字段=] 的 Map 函数。
代码
#!/usr/bin/env
import apache_beam as beam
from apache_beam.io.textio import WriteToText
def addoutput(line):
return [line, "Its weekend!"]
with beam.Pipeline() as p:
( p
| beam.Create(["blah"])
| beam.Map(addoutput)
| WriteToText(file_path_prefix="/tmp/sample")
)
结果
['blah', 'Its weekend!']
但是,当我尝试用 java 做同样的事情时,我在 maven 中遇到编译错误。
代码
public class SampleTextIO
{
static class AddFieldFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> receiver) {
receiver.output(word);
receiver.output("Its weekend!");
}
}
public static void main ( String[] args ) {
System.out.println( "Main class for DirectRunner" );
// Pipeline create using default runner (DirectRunnter)
// Interface: PipelineOptions
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// Example pcollection
final List<String> LINES = Arrays.asList(
"blah"
);
// Read lines from file
p.apply(Create.of(LINES))
.apply(MapElements.via(new AddFieldFn()))
.apply(TextIO.write().to("/tmp/test-out"));
p.run().waitUntilFinish();
}
}
结果
[ERROR] /home/ywatanabe/git/google-data-engineer/Data_Science_on_the_Google_Cloud_Platform/Ch04/java/directrunner/src/main/java/com/example/SampleTextIO.java:[43,28] no suitable method found for via(com.example.SampleTextIO.AddFieldFn)
[ERROR] method org.apache.beam.sdk.transforms.MapElements.<InputT,OutputT>via(org.apache.beam.sdk.transforms.InferableFunction<InputT,OutputT>) is not applicable
[ERROR] (cannot infer type-variable(s) InputT,OutputT
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.InferableFunction<InputT,OutputT>))
[ERROR] method org.apache.beam.sdk.transforms.MapElements.<InputT,OutputT>via(org.apache.beam.sdk.transforms.SimpleFunction<InputT,OutputT>) is not applicable
[ERROR] (cannot infer type-variable(s) InputT,OutputT
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.SimpleFunction<InputT,OutputT>))
[ERROR] method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.ProcessFunction) is not applicable
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.ProcessFunction)
[ERROR] method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.SerializableFunction) is not applicable
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.SerializableFunction)
[ERROR] method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.Contextful) is not applicable
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.Contextful)
读取 javadoc、MapElements 支持 Processfunction 但在我的情况下效果不佳。
我如何动态地在java[=中添加像python这样的字段45=] ?
这是因为 mapElements
的 via
方法需要以下之一:InferableFunction
、SimpleFunction
、ProcessFunction
、SerializableFunction
, Contextful
。在您的示例中,AddFieldFn
扩展了 DoFn
。此外,根据与 Python 示例的比较,您似乎想要输出一个包含两个元素的列表,而不是生成两个不同的行。
关于如何做到这一点的三个例子:
// via ProcessFunction
PCollection p1 = p.apply(Create.of(LINES))
.apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via((String word) -> (Arrays.asList(word, "Its weekend!"))))
.apply(ParDo.of(new PrintResultsFn()));
// via in-line SimpleFunction
PCollection p2 = p.apply(Create.of(LINES))
.apply(MapElements.via(new SimpleFunction<String, List<String>>() {
public List<String> apply(String word) {
return Arrays.asList(word, "Its weekend!");
}}))
.apply(ParDo.of(new PrintResultsFn()));
// via AddFieldFn class
PCollection p3 = p.apply(Create.of(LINES))
.apply(MapElements.via(new AddFieldFn()))
.apply(ParDo.of(new PrintResultsFn()));
其中 AddFieldFn
是:
// define AddFieldFn extending from SimpleFunction and overriding apply method
static class AddFieldFn extends SimpleFunction<String, List<String>> {
@Override
public List<String> apply(String word) {
return Arrays.asList(word, "Its weekend!");
}
}
和 PrintResultsFn
验证行:
// just print the results
static class PrintResultsFn extends DoFn<List<String>, Void> {
@ProcessElement
public void processElement(@Element List<String> words) {
Log.info(Arrays.toString(words.toArray()));
}
}
应该打印所需的输出:
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
完整代码here。使用 DirectRunner 和 Java SDK 2.13.0
进行测试
如果有人能帮助我为 apache_beam (2.13.0) 编写 java 代码,我将不胜感激。
在 python 中,您可以 动态 使用 1 到 1 映射 [=47] 添加字段=] 的 Map 函数。
代码
#!/usr/bin/env
import apache_beam as beam
from apache_beam.io.textio import WriteToText
def addoutput(line):
return [line, "Its weekend!"]
with beam.Pipeline() as p:
( p
| beam.Create(["blah"])
| beam.Map(addoutput)
| WriteToText(file_path_prefix="/tmp/sample")
)
结果
['blah', 'Its weekend!']
但是,当我尝试用 java 做同样的事情时,我在 maven 中遇到编译错误。
代码
public class SampleTextIO
{
static class AddFieldFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String word, OutputReceiver<String> receiver) {
receiver.output(word);
receiver.output("Its weekend!");
}
}
public static void main ( String[] args ) {
System.out.println( "Main class for DirectRunner" );
// Pipeline create using default runner (DirectRunnter)
// Interface: PipelineOptions
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// Example pcollection
final List<String> LINES = Arrays.asList(
"blah"
);
// Read lines from file
p.apply(Create.of(LINES))
.apply(MapElements.via(new AddFieldFn()))
.apply(TextIO.write().to("/tmp/test-out"));
p.run().waitUntilFinish();
}
}
结果
[ERROR] /home/ywatanabe/git/google-data-engineer/Data_Science_on_the_Google_Cloud_Platform/Ch04/java/directrunner/src/main/java/com/example/SampleTextIO.java:[43,28] no suitable method found for via(com.example.SampleTextIO.AddFieldFn)
[ERROR] method org.apache.beam.sdk.transforms.MapElements.<InputT,OutputT>via(org.apache.beam.sdk.transforms.InferableFunction<InputT,OutputT>) is not applicable
[ERROR] (cannot infer type-variable(s) InputT,OutputT
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.InferableFunction<InputT,OutputT>))
[ERROR] method org.apache.beam.sdk.transforms.MapElements.<InputT,OutputT>via(org.apache.beam.sdk.transforms.SimpleFunction<InputT,OutputT>) is not applicable
[ERROR] (cannot infer type-variable(s) InputT,OutputT
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.SimpleFunction<InputT,OutputT>))
[ERROR] method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.ProcessFunction) is not applicable
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.ProcessFunction)
[ERROR] method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.SerializableFunction) is not applicable
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.SerializableFunction)
[ERROR] method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.Contextful) is not applicable
[ERROR] (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.Contextful)
读取 javadoc、MapElements 支持 Processfunction 但在我的情况下效果不佳。
我如何动态地在java[=中添加像python这样的字段45=] ?
这是因为 mapElements
的 via
方法需要以下之一:InferableFunction
、SimpleFunction
、ProcessFunction
、SerializableFunction
, Contextful
。在您的示例中,AddFieldFn
扩展了 DoFn
。此外,根据与 Python 示例的比较,您似乎想要输出一个包含两个元素的列表,而不是生成两个不同的行。
关于如何做到这一点的三个例子:
// via ProcessFunction
PCollection p1 = p.apply(Create.of(LINES))
.apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via((String word) -> (Arrays.asList(word, "Its weekend!"))))
.apply(ParDo.of(new PrintResultsFn()));
// via in-line SimpleFunction
PCollection p2 = p.apply(Create.of(LINES))
.apply(MapElements.via(new SimpleFunction<String, List<String>>() {
public List<String> apply(String word) {
return Arrays.asList(word, "Its weekend!");
}}))
.apply(ParDo.of(new PrintResultsFn()));
// via AddFieldFn class
PCollection p3 = p.apply(Create.of(LINES))
.apply(MapElements.via(new AddFieldFn()))
.apply(ParDo.of(new PrintResultsFn()));
其中 AddFieldFn
是:
// define AddFieldFn extending from SimpleFunction and overriding apply method
static class AddFieldFn extends SimpleFunction<String, List<String>> {
@Override
public List<String> apply(String word) {
return Arrays.asList(word, "Its weekend!");
}
}
和 PrintResultsFn
验证行:
// just print the results
static class PrintResultsFn extends DoFn<List<String>, Void> {
@ProcessElement
public void processElement(@Element List<String> words) {
Log.info(Arrays.toString(words.toArray()));
}
}
应该打印所需的输出:
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
完整代码here。使用 DirectRunner 和 Java SDK 2.13.0
进行测试