在流式管道中使用 WriteToBigQuery FILE_LOADS 只会创建大量临时表(python SDK)
Using WriteToBigQuery FILE_LOADS in a streaming pipeline just creates a LOT of temporary tables (python SDK)
我有一个流式传输管道,它从 pub/sub 获取消息,解析它们,然后写入 BigQuery。挑战在于每条消息根据消息中的 event
属性 转到不同的事件 table,并且它们没有排序。
这意味着(我相信)WriteToBigQuery
方法无法有效地批量写入,我看到它基本上是一次写入一条消息,因此 运行 太慢了。我还尝试添加一个 60 秒的 window 并添加一个 GroupByKey
/FlatMap
来尝试重新排序它们,但在加快速度方面收效甚微。
使用 WriteToBigQuery
中的 FILE_LOADS
方法以 60 秒以上的触发频率,它似乎可以工作,发送加载作业,然后(至少有时)成功,我看到数据去进入正确的 tables。但是,创建的临时 tables 永远不会被删除,所以我创建了数百个 tables(名称如 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_756_37417blahblahblah
)...这显然是不可持续的。
通过STREAMING_INSERTS
写入可以正常工作,只是速度较慢,这是提高效率的尝试。
如果有人能帮我弄清楚为什么 table 没有被删除,那我认为会给我一个有效的工作管道。我尝试了更长的触发频率(最多 1 小时),但发生了相同的行为。
这是我的主要管道 - 同样,我对其余部分没有任何问题,只是提供上下文。
events, non_events = (p
| 'ReadData' >> beam.io.ReadFromPubSub(subscription = known_args.input_subscription).with_output_types(bytes)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Parse JSON to Dict' >> beam.Map(lambda line: json.loads(line))
| 'FilterOutNonEvents' >> beam.ParDo(FilterOutNonEvents()).with_outputs('MAIN_OUT', 'non_events')
)
parsed, missing_tables, _ = (events
| 'ParseDict' >> beam.ParDo(ParseDict()).with_outputs('MAIN_OUT', 'missing_tables', 'ignore')
)
results, conversion_errors = (parsed
| 'ConvertDataTypes' >> beam.ParDo(ConvertDataTypes()).with_outputs('MAIN_OUT', 'error_data')
)
final = (results
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table = lambda record: '{project}:{dataset}.{table}'.format(project = known_args.project, dataset = known_args.dataset, table = parse_event_to_dataset_name(patterns, record["event"])),
schema = lambda tbl: {'fields':[{'name':c.split(':')[0], 'type':c.split(':')[1]} for c in schema_json[tbl.split('.')[-1]].split(',')]},
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
method = 'FILE_LOADS',
triggering_frequency = 60
)
)
table
arg 由消息的 event
属性 确定,schema
arg 只是全局变量的重新格式化切片(最初读取同样,来自 GCS,使用 streaming_inserts).
没有问题
感谢任何可以提供帮助的人!这让我很头疼(我是 beam/dataflow 的新手)。
将 LOAD_FILES 与多个分区 and/or 动态目标一起使用时,行为应为 follows:
'''
2. Multiple partitions and/or Dynamic Destinations:
When there are multiple partitions of files destined for a single
destination or when Dynamic Destinations are used, multiple load jobs
need to be triggered for each partition/destination. Load Jobs are
triggered to temporary tables, and those are later copied to the actual
appropriate destination table. This ensures atomicity when only some
of the load jobs would fail but not other. If any of them fails, then
copy jobs are not triggered.
'''
在code also appears that after the load jobs, beam should wait for them to finish, then copy the data from the temp tables and delete them; however, it seems that when used with a streaming pipeline, it doesn't complete these steps. On my reproduction using the DirectRunner it didn't even get to the CopyJob. I suggest to report it to apache beam team here.
尽管如此,对于您的用例,我会重新考虑使用加载作业方法,因为您可能会达到 load and copy jobs pretty quick; and streaming inserts 的配额可能更适合这种情况,并且可能提供比插入更好的性能每 60 秒以上加载一次作业
我有一个流式传输管道,它从 pub/sub 获取消息,解析它们,然后写入 BigQuery。挑战在于每条消息根据消息中的 event
属性 转到不同的事件 table,并且它们没有排序。
这意味着(我相信)WriteToBigQuery
方法无法有效地批量写入,我看到它基本上是一次写入一条消息,因此 运行 太慢了。我还尝试添加一个 60 秒的 window 并添加一个 GroupByKey
/FlatMap
来尝试重新排序它们,但在加快速度方面收效甚微。
使用 WriteToBigQuery
中的 FILE_LOADS
方法以 60 秒以上的触发频率,它似乎可以工作,发送加载作业,然后(至少有时)成功,我看到数据去进入正确的 tables。但是,创建的临时 tables 永远不会被删除,所以我创建了数百个 tables(名称如 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_756_37417blahblahblah
)...这显然是不可持续的。
通过STREAMING_INSERTS
写入可以正常工作,只是速度较慢,这是提高效率的尝试。
如果有人能帮我弄清楚为什么 table 没有被删除,那我认为会给我一个有效的工作管道。我尝试了更长的触发频率(最多 1 小时),但发生了相同的行为。
这是我的主要管道 - 同样,我对其余部分没有任何问题,只是提供上下文。
events, non_events = (p
| 'ReadData' >> beam.io.ReadFromPubSub(subscription = known_args.input_subscription).with_output_types(bytes)
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'Parse JSON to Dict' >> beam.Map(lambda line: json.loads(line))
| 'FilterOutNonEvents' >> beam.ParDo(FilterOutNonEvents()).with_outputs('MAIN_OUT', 'non_events')
)
parsed, missing_tables, _ = (events
| 'ParseDict' >> beam.ParDo(ParseDict()).with_outputs('MAIN_OUT', 'missing_tables', 'ignore')
)
results, conversion_errors = (parsed
| 'ConvertDataTypes' >> beam.ParDo(ConvertDataTypes()).with_outputs('MAIN_OUT', 'error_data')
)
final = (results
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table = lambda record: '{project}:{dataset}.{table}'.format(project = known_args.project, dataset = known_args.dataset, table = parse_event_to_dataset_name(patterns, record["event"])),
schema = lambda tbl: {'fields':[{'name':c.split(':')[0], 'type':c.split(':')[1]} for c in schema_json[tbl.split('.')[-1]].split(',')]},
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
method = 'FILE_LOADS',
triggering_frequency = 60
)
)
table
arg 由消息的 event
属性 确定,schema
arg 只是全局变量的重新格式化切片(最初读取同样,来自 GCS,使用 streaming_inserts).
感谢任何可以提供帮助的人!这让我很头疼(我是 beam/dataflow 的新手)。
将 LOAD_FILES 与多个分区 and/or 动态目标一起使用时,行为应为 follows:
'''
2. Multiple partitions and/or Dynamic Destinations:
When there are multiple partitions of files destined for a single
destination or when Dynamic Destinations are used, multiple load jobs
need to be triggered for each partition/destination. Load Jobs are
triggered to temporary tables, and those are later copied to the actual
appropriate destination table. This ensures atomicity when only some
of the load jobs would fail but not other. If any of them fails, then
copy jobs are not triggered.
'''
在code also appears that after the load jobs, beam should wait for them to finish, then copy the data from the temp tables and delete them; however, it seems that when used with a streaming pipeline, it doesn't complete these steps. On my reproduction using the DirectRunner it didn't even get to the CopyJob. I suggest to report it to apache beam team here.
尽管如此,对于您的用例,我会重新考虑使用加载作业方法,因为您可能会达到 load and copy jobs pretty quick; and streaming inserts 的配额可能更适合这种情况,并且可能提供比插入更好的性能每 60 秒以上加载一次作业