使用 Beam 读取记录时重命名列?
Rename Column when reading record using Beam?
我正在尝试使用现有的 google 示例代码 (PubSubToBigQuery.java)
来解析 StackDriver
日志消息并将它们推送到 BigQuery
.
问题是 SD 日志字段的名称之一是 "@type"
,BigQuery 不接受 table。所以我在 BigQuery
中用不同的字段名称 (mytest) 创建了 table。
现在当我 运行 PubSubToBigQuery.java
显然我收到错误消息
"@type" field not found.
如何在我的 Beam 代码中将列名称从 "@type"
重命名为 "mytype"
?
如果您只想将 Stackdriver 日志原封不动地放入 BigQuery,您可以使用 Stackdriver 的内置导出功能并创建一个到 BigQuery 的接收器:https://cloud.google.com/logging/docs/export/
如果导出对您来说不可行,您可以修改 Beam 中的转换逻辑。
在这种情况下 PubSubToBigQuery.java BigQueryIO 使用 TableRow PCollection 作为输入将消息写入 BigQuery。 PubsubMessageToTableRow PTransform 通过一些错误处理将 PubsubMessage 转换为 TableRow。
您可以添加带有自定义 DoFn 的 ParDo,这会更改创建的 TableRow 中的列名称。流程元素方法可能如下所示:
@ProcessElement
public void processElement(@Element TableRow row, OutputReceiver<TableRow> outputReceiver) {
TableRow clone = row.clone();
Object value = clone.get("@type");
clone.remove("@type");
clone.set("mytype", value);
outputReceiver.output(clone);
}
如果您使用我链接的未更改的 PubSubToBigQuery.java,您可以在代码中第 323 行附近的某个地方的 jsonToTableRowOut.get(TRANSFORM_OUT)
PCollection 上应用此 ParDo。
我正在尝试使用现有的 google 示例代码
(PubSubToBigQuery.java)
来解析StackDriver
日志消息并将它们推送到BigQuery
.问题是 SD 日志字段的名称之一是
"@type"
,BigQuery 不接受 table。所以我在BigQuery
中用不同的字段名称 (mytest) 创建了 table。现在当我 运行
PubSubToBigQuery.java
显然我收到错误消息"@type" field not found.
如何在我的 Beam 代码中将列名称从
"@type"
重命名为"mytype"
?
如果您只想将 Stackdriver 日志原封不动地放入 BigQuery,您可以使用 Stackdriver 的内置导出功能并创建一个到 BigQuery 的接收器:https://cloud.google.com/logging/docs/export/
如果导出对您来说不可行,您可以修改 Beam 中的转换逻辑。
在这种情况下 PubSubToBigQuery.java BigQueryIO 使用 TableRow PCollection 作为输入将消息写入 BigQuery。 PubsubMessageToTableRow PTransform 通过一些错误处理将 PubsubMessage 转换为 TableRow。 您可以添加带有自定义 DoFn 的 ParDo,这会更改创建的 TableRow 中的列名称。流程元素方法可能如下所示:
@ProcessElement
public void processElement(@Element TableRow row, OutputReceiver<TableRow> outputReceiver) {
TableRow clone = row.clone();
Object value = clone.get("@type");
clone.remove("@type");
clone.set("mytype", value);
outputReceiver.output(clone);
}
如果您使用我链接的未更改的 PubSubToBigQuery.java,您可以在代码中第 323 行附近的某个地方的 jsonToTableRowOut.get(TRANSFORM_OUT)
PCollection 上应用此 ParDo。