如何 运行 多个 WriteToBigQuery 在 google 云数据流/apache beam 中并行?
How to run multiple WriteToBigQuery parallel in google cloud dataflow / apache beam?
我想在给定数据的情况下将事件从一堆多个事件中分离出来
{"type": "A", "k1": "v1"}
{"type": "B", "k2": "v2"}
{"type": "C", "k3": "v3"}
我想在 bigquery 中将 type: A
事件分离到 table A
,将 type:B
事件分离到 table B
,type: C
事件 table C
.
这是我通过apache beam
python sdk实现的代码,并将数据写入bigquery
,
A_schema = 'type:string, k1:string'
B_schema = 'type:string, k2:string'
C_schema = 'type:string, k2:string'
class ParseJsonDoFn(beam.DoFn):
A_TYPE = 'tag_A'
B_TYPE = 'tag_B'
C_TYPE = 'tag_C'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)
if data['type'] == 'A':
yield pvalue.TaggedOutput(self.A_TYPE, data)
elif data['type'] == 'B':
yield pvalue.TaggedOutput(self.B_TYPE, data)
elif data['type'] == 'C':
yield pvalue.TaggedOutput(self.C_TYPE, data)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=seperate-bi-events-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.A_TYPE,
ParseJsonDoFn.B_TYPE,
ParseJsonDoFn.C_TYPE)))
a_line = multiple_lines.tag_A
b_line = multiple_lines.tag_B
c_line = multiple_lines.tag_C
(a_line
| "output_a" >> beam.io.WriteToBigQuery(
'temp.a',
schema = A_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
(b_line
| "output_b" >> beam.io.WriteToBigQuery(
'temp.b',
schema = B_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
(c_line
| "output_c" >> beam.io.WriteToBigQuery(
'temp.c',
schema = (C_schema),
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
p.run().wait_until_finish()
输出:
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
但是,这里有两个问题
bigquery
中没有数据?
- 从日志来看,代码似乎没有 运行 并行而不是 运行 3 次序列?
我的代码有问题还是我遗漏了什么?
there is no data in bigquery?
您的代码似乎非常完美,因为数据已写入 BigQuery(C_schema
应该是 k3
而不是 k2
)。请记住,您正在流式传输数据,因此如果您单击 Preview
table 按钮,直到流式缓冲区中的数据被提交,您才会看到它。 运行 SELECT *
查询将显示预期结果:
From the logs it seems the codes does NOT run parallel rather than run 3 times sequence?
好的,这很有趣。通过追踪 code 中的 WARNING
消息,我们可以读取以下内容:
# if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
# the table before this point.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
# BigQuery can route data to the old table for 2 mins max so wait
# that much time before creating the table and writing it
logging.warning('Sleeping for 150 seconds before the write as ' +
'BigQuery inserts can be routed to deleted table ' +
'for 2 mins after the delete and create.')
# TODO(BEAM-2673): Remove this sleep by migrating to load api
time.sleep(150)
return created_table
else:
return created_table
阅读 BEAM-2673 and BEAM-2801 后,这似乎是由于使用 Streaming API 和 DirectRunner
的 BigQuery 接收器存在问题。这将导致进程在重新创建 table 时休眠 150 秒,并且这不是并行完成的。
相反,如果我们 运行 它在数据流上(使用 DataflowRunner
,提供暂存和临时存储桶路径以及从 GCS 加载输入数据)这将 运行 三个并行导入作业。请看,在下图中,所有三个都从 22:19:45
开始并在 22:19:56
结束:
我想在给定数据的情况下将事件从一堆多个事件中分离出来
{"type": "A", "k1": "v1"}
{"type": "B", "k2": "v2"}
{"type": "C", "k3": "v3"}
我想在 bigquery 中将 type: A
事件分离到 table A
,将 type:B
事件分离到 table B
,type: C
事件 table C
.
这是我通过apache beam
python sdk实现的代码,并将数据写入bigquery
,
A_schema = 'type:string, k1:string'
B_schema = 'type:string, k2:string'
C_schema = 'type:string, k2:string'
class ParseJsonDoFn(beam.DoFn):
A_TYPE = 'tag_A'
B_TYPE = 'tag_B'
C_TYPE = 'tag_C'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)
if data['type'] == 'A':
yield pvalue.TaggedOutput(self.A_TYPE, data)
elif data['type'] == 'B':
yield pvalue.TaggedOutput(self.B_TYPE, data)
elif data['type'] == 'C':
yield pvalue.TaggedOutput(self.C_TYPE, data)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=seperate-bi-events-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.A_TYPE,
ParseJsonDoFn.B_TYPE,
ParseJsonDoFn.C_TYPE)))
a_line = multiple_lines.tag_A
b_line = multiple_lines.tag_B
c_line = multiple_lines.tag_C
(a_line
| "output_a" >> beam.io.WriteToBigQuery(
'temp.a',
schema = A_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
(b_line
| "output_b" >> beam.io.WriteToBigQuery(
'temp.b',
schema = B_schema,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
(c_line
| "output_c" >> beam.io.WriteToBigQuery(
'temp.c',
schema = (C_schema),
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
))
p.run().wait_until_finish()
输出:
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
INFO:root:start <DoOperation output_banner/WriteToBigQuery output_tags=['out']>
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
WARNING:root:Sleeping for 150 seconds before the write as BigQuery inserts can be routed to deleted table for 2 mins after the delete and create.
但是,这里有两个问题
bigquery
中没有数据?- 从日志来看,代码似乎没有 运行 并行而不是 运行 3 次序列?
我的代码有问题还是我遗漏了什么?
there is no data in bigquery?
您的代码似乎非常完美,因为数据已写入 BigQuery(C_schema
应该是 k3
而不是 k2
)。请记住,您正在流式传输数据,因此如果您单击 Preview
table 按钮,直到流式缓冲区中的数据被提交,您才会看到它。 运行 SELECT *
查询将显示预期结果:
From the logs it seems the codes does NOT run parallel rather than run 3 times sequence?
好的,这很有趣。通过追踪 code 中的 WARNING
消息,我们可以读取以下内容:
# if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
# the table before this point.
if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
# BigQuery can route data to the old table for 2 mins max so wait
# that much time before creating the table and writing it
logging.warning('Sleeping for 150 seconds before the write as ' +
'BigQuery inserts can be routed to deleted table ' +
'for 2 mins after the delete and create.')
# TODO(BEAM-2673): Remove this sleep by migrating to load api
time.sleep(150)
return created_table
else:
return created_table
阅读 BEAM-2673 and BEAM-2801 后,这似乎是由于使用 Streaming API 和 DirectRunner
的 BigQuery 接收器存在问题。这将导致进程在重新创建 table 时休眠 150 秒,并且这不是并行完成的。
相反,如果我们 运行 它在数据流上(使用 DataflowRunner
,提供暂存和临时存储桶路径以及从 GCS 加载输入数据)这将 运行 三个并行导入作业。请看,在下图中,所有三个都从 22:19:45
开始并在 22:19:56
结束: