解析 Dataflow 中的属性 SQL
Parsing attributes in Dataflow SQL
给定一个 Pub/Sub 主题,BigQuery 允许使用 Dataflow SQL 语法将数据流式传输到 table。
假设您 post 此消息 {"a": 1, "b": 2, "c": 3}
到一个主题。在 BigQuery 中,使用数据流引擎,您需要将 my_topic
架构定义为
步骤1
event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64
然后使用该命令创建 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery table。
步骤2
gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
--job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
--bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic
gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2, "c": 3}'
bq query --nouse_legacy_sql \
'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'
+---------------------+-----+-----+-----+
| event_timestamp | a | b | c |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 | 1 | 2 | 3 |
在 第 2 步 我也想 --attribute="origin=gcloud,username=gcp"
发送到 Pub/Sub 主题。是否可以在步骤 1 定义架构,以便它自动写入 table?
我一直在尝试不同的东西:
attributes: STRUCT
在架构中,在 this Beam extensions documentation 之后,但我得到的只是 JSON 数据流中的解析错误
gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2}' --attribute='c=3'
希望消息像 piece of code 中那样被压扁,但我在结果 table 中得到 c
的 NULL
值。
谢谢。
Pub/Sub 属性属于 MAP
类型,但这不是 Dataflow SQL 的 supported types 类型之一。有关于添加支持的讨论,但我不知道它的状态。
如果属性很重要,我建议使用 ReadFromPubSub
创建自定义管道
给定一个 Pub/Sub 主题,BigQuery 允许使用 Dataflow SQL 语法将数据流式传输到 table。
假设您 post 此消息 {"a": 1, "b": 2, "c": 3}
到一个主题。在 BigQuery 中,使用数据流引擎,您需要将 my_topic
架构定义为
步骤1
event_timestamp: TIMESTAMP
a: INT64
b: INT64
c: INT64
然后使用该命令创建 Dataflow 流式传输作业,以便将每条消息流式传输到目标 BigQuery table。
步骤2
gcloud dataflow sql query 'SELECT * FROM pubsub.topic.my_project.my_topic' \
--job-name my_job --region europe-west1 --bigquery-write-disposition write-append \
--bigquery-project my_project --bigquery-dataset staging --bigquery-table my_topic
gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2, "c": 3}'
bq query --nouse_legacy_sql \
'SELECT * FROM my_project.staging.my_topic ORDER BY event_timestamp DESC LIMIT 10'
+---------------------+-----+-----+-----+
| event_timestamp | a | b | c |
+---------------------+-----+-----+-----+
| 2020-10-28 14:21:40 | 1 | 2 | 3 |
在 第 2 步 我也想 --attribute="origin=gcloud,username=gcp"
发送到 Pub/Sub 主题。是否可以在步骤 1 定义架构,以便它自动写入 table?
我一直在尝试不同的东西:
attributes: STRUCT
在架构中,在 this Beam extensions documentation 之后,但我得到的只是 JSON 数据流中的解析错误gcloud pubsub topics publish my_topic --message='{"a": 1, "b": 2}' --attribute='c=3'
希望消息像 piece of code 中那样被压扁,但我在结果 table 中得到c
的NULL
值。
谢谢。
Pub/Sub 属性属于 MAP
类型,但这不是 Dataflow SQL 的 supported types 类型之一。有关于添加支持的讨论,但我不知道它的状态。
如果属性很重要,我建议使用 ReadFromPubSub
创建自定义管道