Apache Beam 代码中插入语法错误

Syntax error insert in Apache Beam code

我正在编写用于测试的 Apache Beam 代码。请参考下面的代码。我创建了示例 SimpleFunction 并应用代码并尝试编译。

    public static void main(String[] args) throws IOException {
        System.out.println("Test Log");

        PipelineOptions options = PipelineOptionsFactory.create();
        //options.setRunner(SparkRunner.class);
        options.setRunner(SparkRunner.class);

        Pipeline p = Pipeline.create(options);

        p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
            // withCompression can be omitted - by default compression is detected from the filename.
            .apply(FileIO.readMatches())
            .apply(MapElements
                // uses imports from TypeDescriptors
                .via(
                    new SimpleFunction <ReadableFile, KV<String,String>>() {

                        private static final long serialVersionUID = -7867677L;

                        @SuppressWarnings("unused")
                        public KV<String,String> createKV(ReadableFile f) {
                            String temp = null;
                            try{
                            temp = f.readFullyAsUTF8String();
                            }catch(IOException e){

                            }
                            return KV.of(f.getMetadata().resourceId().toString(), temp);
                        }

                        @Override
                        public String apply(ReadableFile element, KV<String, String> input) {
                            StringBuilder str = new StringBuilder();
                            return str.toString();
                        }
                    }

            ))
            .apply(FileIO.write());
        p.run();
    }

但编译器正在抛出 syntax error at public String apply(ReadableFile

我试过但没有成功解决这个问题,有人可以指导我吗?

SimpleFunction<InputT, OutputT>取值InputT,returns取值OutputT。本例中apply的签名为OutputT apply(InputT input);,见here.

对于您的类型,SimpleFunction 必须如下所示:

new SimpleFunction <ReadableFile, KV<String,String>>() {
   ...
   @Override
   public KV<String,String> apply(ReadableFile input) {
      ...
   }
}

例如看使用方法here.

在您的情况下,您需要更多关于 readMatches() 的逻辑,请参阅 here for example how it is applied to parse Avros, and this 是该代码中 PTransform 的实现细节。