创建从 Google Pub/Sub 读取的 Apache Beam 管道

Create Apache Beam Pipeline that read from Google Pub/Sub

我正在尝试使用 apache-beam 创建一个流管道,它从 google pub/sub 中读取句子并将这些单词写入 Bigquery Table.

我正在使用 0.6.0 apache-beam 版本。

根据示例,我做了这个:

public class StreamingWordExtract {

/**
 * A DoFn that tokenizes lines of text into individual words.
 */
static class ExtractWords extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String[] words = ((String) c.element()).split("[^a-zA-Z']+");
        for (String word : words) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}

/**
 * A DoFn that uppercases a word.
 */
static class Uppercase extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(c.element().toUpperCase());
    }
}


/**
 * A DoFn that uppercases a word.
 */
static class StringToRowConverter extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(new TableRow().set("string_field", c.element()));
    }

    static TableSchema getSchema() {
        return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
            // Compose the list of TableFieldSchema from tableSchema.
            {
                add(new TableFieldSchema().setName("string_field").setType("STRING"));
            }
        });
    }

}

private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions, ExamplePubsubTopicOptions {
    @Description("Input file to inject to Pub/Sub topic")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();

    void setInputFile(String value);
}

public static void main(String[] args) {
    StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StreamingWordExtractOptions.class);

    options.setBigQuerySchema(StringToRowConverter.getSchema());

    Pipeline p = Pipeline.create(options);

    String tableSpec = new StringBuilder()
            .append(options.getProject()).append(":")
            .append(options.getBigQueryDataset()).append(".")
            .append(options.getBigQueryTable())
            .toString();

    p.apply(PubsubIO.read().topic(options.getPubsubTopic()))
            .apply(ParDo.of(new ExtractWords()))
            .apply(ParDo.of(new StringToRowConverter()))
            .apply(BigQueryIO.Write.to(tableSpec)
                    .withSchema(StringToRowConverter.getSchema())
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    PipelineResult result = p.run();


}

我附近有错误:

apply(ParDo.of(new ExtractWords()))

因为前面的apply不是return一个String而是一个Object

我想问题出在 return 从 PubsubIO.read().topic(options.getPubsubTopic()) 编辑的类型上。类型是 PTransform<PBegin, PCollection<T>> 而不是 PTransform<PBegin, PCollection<String>>

使用 apache-beam 从 google pub/sub 中读取的正确方法是什么?

您最近在 Beam 中遇到了向后不兼容的更改 -- 对此深表歉意!

从 Apache Beam 版本 0.5.0 开始,PubsubIO.ReadPubsubIO.Write 需要使用 PubsubIO.<T>read()PubsubIO.<T>write() 实例化,而不是静态工厂方法,例如 PubsubIO.Read.topic(String).

Read 需要通过 .withCoder(Coder) 为输出类型指定编码器。 Write.

需要为输入类型指定编码器,或通过 .withAttributes(SimpleFunction<T, PubsubMessage>) 指定格式函数