使用 Beam 读取记录时重命名列?

Rename Column when reading record using Beam?

如果您只想将 Stackdriver 日志原封不动地放入 BigQuery,您可以使用 Stackdriver 的内置导出功能并创建一个到 BigQuery 的接收器:https://cloud.google.com/logging/docs/export/

如果导出对您来说不可行,您可以修改 Beam 中的转换逻辑。

在这种情况下 PubSubToBigQuery.java BigQueryIO 使用 TableRow PCollection 作为输入将消息写入 BigQuery。 PubsubMessageToTableRow PTransform 通过一些错误处理将 PubsubMessage 转换为 TableRow。 您可以添加带有自定义 DoFn 的 ParDo,这会更改创建的 TableRow 中的列名称。流程元素方法可能如下所示:

 @ProcessElement
 public void processElement(@Element TableRow row, OutputReceiver<TableRow> outputReceiver) {

    TableRow clone = row.clone();
    Object value = clone.get("@type");
    clone.remove("@type");
    clone.set("mytype", value);
    outputReceiver.output(clone);
}

如果您使用我链接的未更改的 PubSubToBigQuery.java,您可以在代码中第 323 行附近的某个地方的 jsonToTableRowOut.get(TRANSFORM_OUT) PCollection 上应用此 ParDo。