监控 WriteToBigQuery
Monitoring WriteToBigQuery
在我的管道中,我使用类似这样的 WriteToBigQuery:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
这个returns一个Dict在文档中的描述如下:
The beam.io.WriteToBigQuery PTransform returns a dictionary whose
BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the
rows that failed to be written.
如何打印此字典并将其转换为 pcollection 或者如何只打印 FAILED_ROWS?
如果我这样做:| "print" >> beam.Map(print)
然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'
我一定读过一百个管道,但在 WriteToBigQuery 之后我从未见过任何东西。
[编辑]
当我完成管道并将结果存储在一个变量中时,我有以下内容:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
但我不知道如何像这样在管道中使用这个结果:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
处理无效输入的死信是一种常见的 Beam/Dataflow 用法,可与 Java 和 Python SDK 一起使用,但后者的示例不多。
假设我们有一些虚拟输入数据,其中有 10 行好行和一行不符合 table 模式的坏行:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
然后,我所做的就是命名写入结果(在本例中为events
):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
然后访问FAILED_ROWS
端输出:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
这与 DirectRunner
配合使用效果很好,并将好的行写入 BigQuery:
将坏的保存到本地文件:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
如果你 运行 它与 DataflowRunner
你将需要一些额外的标志。如果遇到 TypeError: 'PDone' object has no attribute '__getitem__'
错误,您需要添加 --experiments=use_beam_bq_sink
才能使用新的 BigQuery 接收器。
如果您得到 KeyError: 'FailedRows'
,那是因为新接收器将 default 加载批处理管道的 BigQuery 作业:
STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading
data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data.
DEFAULT will use STREAMING_INSERTS on Streaming pipelines and
FILE_LOADS on Batch pipelines.
您可以通过在 WriteToBigQuery
中指定 method='STREAMING_INSERTS'
来覆盖该行为:
DirectRunner
和 DataflowRunner
here.
的完整代码
在我的管道中,我使用类似这样的 WriteToBigQuery:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
这个returns一个Dict在文档中的描述如下:
The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.
如何打印此字典并将其转换为 pcollection 或者如何只打印 FAILED_ROWS?
如果我这样做:| "print" >> beam.Map(print)
然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'
我一定读过一百个管道,但在 WriteToBigQuery 之后我从未见过任何东西。
[编辑] 当我完成管道并将结果存储在一个变量中时,我有以下内容:
{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}
但我不知道如何像这样在管道中使用这个结果:
| beam.io.WriteToBigQuery(
'thijs:thijsset.thijstable',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)
处理无效输入的死信是一种常见的 Beam/Dataflow 用法,可与 Java 和 Python SDK 一起使用,但后者的示例不多。
假设我们有一些虚拟输入数据,其中有 10 行好行和一行不符合 table 模式的坏行:
schema = "index:INTEGER,event:STRING"
data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')
然后,我所做的就是命名写入结果(在本例中为events
):
events = (p
| "Create data" >> beam.Create(data)
| "CSV to dict" >> beam.ParDo(CsvToDictFn())
| "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
"{0}:dataflow_test.good_lines".format(PROJECT),
schema=schema,
)
)
然后访问FAILED_ROWS
端输出:
(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
| "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))
这与 DirectRunner
配合使用效果很好,并将好的行写入 BigQuery:
将坏的保存到本地文件:
$ cat error_log.txt-00000-of-00001
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})
如果你 运行 它与 DataflowRunner
你将需要一些额外的标志。如果遇到 TypeError: 'PDone' object has no attribute '__getitem__'
错误,您需要添加 --experiments=use_beam_bq_sink
才能使用新的 BigQuery 接收器。
如果您得到 KeyError: 'FailedRows'
,那是因为新接收器将 default 加载批处理管道的 BigQuery 作业:
STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.
您可以通过在 WriteToBigQuery
中指定 method='STREAMING_INSERTS'
来覆盖该行为:
DirectRunner
和 DataflowRunner
here.