Apache Beam 代码中插入语法错误
Syntax error insert in Apache Beam code
我正在编写用于测试的 Apache Beam 代码。请参考下面的代码。我创建了示例 SimpleFunction 并应用代码并尝试编译。
public static void main(String[] args) throws IOException {
System.out.println("Test Log");
PipelineOptions options = PipelineOptionsFactory.create();
//options.setRunner(SparkRunner.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -7867677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
@Override
public String apply(ReadableFile element, KV<String, String> input) {
StringBuilder str = new StringBuilder();
return str.toString();
}
}
))
.apply(FileIO.write());
p.run();
}
但编译器正在抛出 syntax error at public String apply(ReadableFile
我试过但没有成功解决这个问题,有人可以指导我吗?
SimpleFunction<InputT, OutputT>
取值InputT
,returns取值OutputT
。本例中apply
的签名为OutputT apply(InputT input);
,见here.
对于您的类型,SimpleFunction
必须如下所示:
new SimpleFunction <ReadableFile, KV<String,String>>() {
...
@Override
public KV<String,String> apply(ReadableFile input) {
...
}
}
例如看使用方法here.
在您的情况下,您需要更多关于 readMatches()
的逻辑,请参阅 here for example how it is applied to parse Avros, and this 是该代码中 PTransform
的实现细节。
我正在编写用于测试的 Apache Beam 代码。请参考下面的代码。我创建了示例 SimpleFunction 并应用代码并尝试编译。
public static void main(String[] args) throws IOException {
System.out.println("Test Log");
PipelineOptions options = PipelineOptionsFactory.create();
//options.setRunner(SparkRunner.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -7867677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
@Override
public String apply(ReadableFile element, KV<String, String> input) {
StringBuilder str = new StringBuilder();
return str.toString();
}
}
))
.apply(FileIO.write());
p.run();
}
但编译器正在抛出 syntax error at public String apply(ReadableFile
我试过但没有成功解决这个问题,有人可以指导我吗?
SimpleFunction<InputT, OutputT>
取值InputT
,returns取值OutputT
。本例中apply
的签名为OutputT apply(InputT input);
,见here.
对于您的类型,SimpleFunction
必须如下所示:
new SimpleFunction <ReadableFile, KV<String,String>>() {
...
@Override
public KV<String,String> apply(ReadableFile input) {
...
}
}
例如看使用方法here.
在您的情况下,您需要更多关于 readMatches()
的逻辑,请参阅 here for example how it is applied to parse Avros, and this 是该代码中 PTransform
的实现细节。