监控 WriteToBigQuery

Monitoring WriteToBigQuery

在我的管道中,我使用类似这样的 WriteToBigQuery:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

这个returns一个Dict在文档中的描述如下:

The beam.io.WriteToBigQuery PTransform returns a dictionary whose BigQueryWriteFn.FAILED_ROWS entry contains a PCollection of all the rows that failed to be written.

如何打印此字典并将其转换为 pcollection 或者如何只打印 FAILED_ROWS?

如果我这样做:| "print" >> beam.Map(print)

然后我得到:AttributeError: 'dict' object has no attribute 'pipeline'

我一定读过一百个管道,但在 WriteToBigQuery 之后我从未见过任何东西。

[编辑] 当我完成管道并将结果存储在一个变量中时,我有以下内容:

{'FailedRows': <PCollection[WriteToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn).FailedRows] at 0x7f0e0cdcfed0>}

但我不知道如何像这样在管道中使用这个结果:

| beam.io.WriteToBigQuery(
     'thijs:thijsset.thijstable',
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
| ['FailedRows'] from previous step
| "print" >> beam.Map(print)

处理无效输入的死信是一种常见的 Beam/Dataflow 用法,可与 Java 和 Python SDK 一起使用,但后者的示例不多。

假设我们有一些虚拟输入数据,其中有 10 行好行和一行不符合 table 模式的坏行:

schema = "index:INTEGER,event:STRING"

data = ['{0},good_line_{1}'.format(i + 1, i + 1) for i in range(10)]
data.append('this is a bad row')

然后,我所做的就是命名写入结果(在本例中为events):

events = (p
    | "Create data" >> beam.Create(data)
    | "CSV to dict" >> beam.ParDo(CsvToDictFn())
    | "Write to BigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
        "{0}:dataflow_test.good_lines".format(PROJECT),
        schema=schema,
    )
 )

然后访问FAILED_ROWS端输出:

(events[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
    | "Bad lines" >> beam.io.textio.WriteToText("error_log.txt"))

这与 DirectRunner 配合使用效果很好,并将好的行写入 BigQuery:

将坏的保存到本地文件:

$ cat error_log.txt-00000-of-00001 
('PROJECT_ID:dataflow_test.good_lines', {'index': 'this is a bad row'})

如果你 运行 它与 DataflowRunner 你将需要一些额外的标志。如果遇到 TypeError: 'PDone' object has no attribute '__getitem__' 错误,您需要添加 --experiments=use_beam_bq_sink 才能使用新的 BigQuery 接收器。

如果您得到 KeyError: 'FailedRows',那是因为新接收器将 default 加载批处理管道的 BigQuery 作业:

STREAMING_INSERTS, FILE_LOADS, or DEFAULT. An introduction on loading data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data. DEFAULT will use STREAMING_INSERTS on Streaming pipelines and FILE_LOADS on Batch pipelines.

您可以通过在 WriteToBigQuery 中指定 method='STREAMING_INSERTS' 来覆盖该行为:

DirectRunnerDataflowRunner here.

的完整代码