不适用于使用 Apache Beam 的 ParDo 和 DoFn

Apply not applicable with ParDo and DoFn using Apache Beam

我正在实施 Pub/Sub 到 BigQuery 管道。它看起来类似于 How to create read transform using ParDo and DoFn in Apache Beam,但在这里,我已经创建了一个 PCollection。

我正在按照 Apache Beam documentation 中的描述实施 ParDo 操作,使用以下管道准备 table 行:

static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage message = c.element();
        // Retrieve data from message
        String rawData = message.getData();
        Instant timestamp = new Instant(new Date());
        // Prepare TableRow
        TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
        c.output(row);
    }
}

// Read input from Pub/Sub
pipeline.apply("Read from Pub/Sub",PubsubIO.readMessagesWithAttributes().fromTopic(topicPath))
        .apply("Prepare raw data for insertion", ParDo.of(new convertToTableRowFn()))
        .apply("Insert in Big Query", BigQueryIO.writeTableRows().to(BQTable));

我在 gist.

中找到了 DoFn 函数

我不断收到以下错误:

The method apply(String, PTransform<? super PCollection<PubsubMessage>,OutputT>) in the type PCollection<PubsubMessage> is not applicable for the arguments (String, ParDo.SingleOutput<PubsubMessage,TableRow>)

我一直认为 ParDo/DoFn 操作是元素方面的 PTransform 操作,我错了吗?我在 Python 中从未遇到过此类错误,所以我对为什么会这样感到有点困惑。

你说得对,ParDos 是逐元素变换,你的方法看起来是正确的。

您看到的是编译错误。当 java 编译器推断出的 apply() 方法的参数类型与实际输入的类型不匹配时,就会发生这种情况,例如convertToTableRowFn.

从您看到的错误看来,java 推断 apply() 的第二个参数是 PTransform<? super PCollection<PubsubMessage>,OutputT> 类型,而您正在传递 [=] 的子类15=] 代替(你的 convertToTableRowFn)。查看 SingleOutput 的定义,您的 convertToTableRowFn 基本上是 PTransform<PCollection<? extends PubsubMessage>, PCollection<TableRow>>。并且 java 未能在 apply 中使用它,它期望 PTransform<? super PCollection<PubsubMessage>,OutputT>.

看起来可疑的是 java 没有将 OutputT 推断为 PCollection<TableRow>。如果您有其他错误,它会失败的原因之一。您确定您也没有其他错误吗?

例如,查看 convertToTableRowFn 你正在调用 message.getData(),当我尝试这样做时它不存在并且在那里编译失败。在我的例子中,我需要做这样的事情:rawData = new String(message.getPayload(), Charset.defaultCharset()).to(BQTable)) 还需要一个字符串(例如,表示 BQ table 名称的字符串)作为参数,并且您正在传递一些未知符号 BQTable (也许它存在于您的程序中的某个地方,并且这在你的情况下不是问题)。

在我修复这两个错误后,您的代码为我编译,apply() 被完全推断并且类型兼容。