解析 Dataflow 中的属性 SQL

Parsing attributes in Dataflow SQL

给定一个 Pub/Sub 主题,BigQuery 允许使用 Dataflow SQL 语法将数据流式传输到 table。

假设您 post 此消息 {"a": 1, "b": 2, "c": 3} 到一个主题。在 BigQuery 中,使用数据流引擎,您需要将 my_topic 架构定义为

步骤1

event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64

然后使用该命令创建 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery table。

步骤2

gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
  --job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
  --bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic

gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2, "c": 3}'
​
bq query --nouse_legacy_sql \
  'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'

+---------------------+-----+-----+-----+
|   event_timestamp   |  a  |  b  |  c  |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 |  1  |  2  |  3  |

第 2 步 我也想 --attribute="origin=gcloud,username=gcp" 发送到 Pub/Sub 主题。是否可以在步骤 1 定义架构,以便它自动写入 table?

我一直在尝试不同的东西:

谢谢。

Pub/Sub 属性属于 MAP 类型,但这不是 Dataflow SQL 的 supported types 类型之一。有关于添加支持的讨论,但我不知道它的状态。

如果属性很重要,我建议使用 ReadFromPubSub

创建自定义管道