Beam Python Dataflow Runner 在 apply_WriteToBigQuery 中使用弃用的 BigQuerySink 而不是 WriteToBigQuery

Beam Python Dataflow Runner Uses deprecated BigQuerySink instead of WriteToBigQuery in apply_WriteToBigQuery

对于DataflowRunner内部的实现细节,很多人可能并不关心是用BigQuerySink还是WriteToBigQuery

但是,在我的例子中,我试图将我的代码部署到数据流模板,并使用 RunTimeValueProvider 作为参数。 WriteToBigQuery:

支持此行为
class WriteToBigQuery(PTransform):
....

 table (str, callable, ValueProvider): The ID of the table, or a callable
         that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
         numbers ``0-9``, or underscores ``_``. If dataset argument is
         :data:`None` then the table argument must contain the entire table
         reference specified as: ``'DATASET.TABLE'``
         or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
         argument representing an element to be written to BigQuery, and return
         a TableReference, or a string table name as specified above.
         Multiple destinations are only supported on Batch pipelines at the
         moment.

BigQuerySink支持:

class BigQuerySink(dataflow_io.NativeSink):
      table (str): The ID of the table. The ID must contain only letters
        ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
        **dataset** argument is :data:`None` then the table argument must
        contain the entire table reference specified as: ``'DATASET.TABLE'`` or
        ``'PROJECT:DATASET.TABLE'``.

更有趣的是,代码中的BigQuerySink自2.11.0以来已被弃用。

@deprecated(since='2.11.0', current="WriteToBigQuery")

然而在 DataFlowRunner 中,当前的代码和注释似乎完全不符合 WriteToBigQuery 是默认 class 的预期,要使用 over BigQuerySink:

  def apply_WriteToBigQuery(self, transform, pcoll, options):
    # Make sure this is the WriteToBigQuery class that we expected, and that
    # users did not specifically request the new BQ sink by passing experiment
    # flag.

    # TODO(BEAM-6928): Remove this function for release 2.14.0.
    experiments = options.view_as(DebugOptions).experiments or []
    if (not isinstance(transform, beam.io.WriteToBigQuery)
        or 'use_beam_bq_sink' in experiments):
      return self.apply_PTransform(transform, pcoll, options)
    if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
      raise RuntimeError(
          'Schema auto-detection is not supported on the native sink')
    standard_options = options.view_as(StandardOptions)
    if standard_options.streaming:
      if (transform.write_disposition ==
          beam.io.BigQueryDisposition.WRITE_TRUNCATE):
        raise RuntimeError('Can not use write truncation mode in streaming')
      return self.apply_PTransform(transform, pcoll, options)
    else:
      from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
      schema = None
      if transform.schema:
        schema = parse_table_schema_from_json(json.dumps(transform.schema))
      return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
          beam.io.BigQuerySink(
              transform.table_reference.tableId,
              transform.table_reference.datasetId,
              transform.table_reference.projectId,
              schema,
              transform.create_disposition,
              transform.write_disposition,
              kms_key=transform.kms_key))

我的问题有两个:

  1. 为什么 DataflowRunnerio.BigQuery class 之间的 contract/expectations 不同?
  2. 在不等待错误修复的情况下,有人对如何强制 DataflowRunner 使用 WriteToBigQuery 而不是 BigQuerySink 有什么建议吗?

WriteToBigQuery 转换有两种不同的写入 BigQuery 的策略:

  • 流式插入 BigQuery 端点
  • 定期触发文件加载作业(或对批处理管道触发一次)

对于 Python SDK,我们最初只支持流式插入,并且我们有一个运行器本机实现的文件加载,它只适用于数据流(这是 BigQuerySink)。

对于 Dataflow 上的批处理管道 运行,BigQuerySink 被替换为 - 正如您正确找到的那样。对于所有其他情况,使用了流式插入。

在 Beam 的最新版本中,我们在 SDK 中添加了对文件加载的本地支持 - 实现在 BigQueryBatchFileLoads.

因为我们不想破坏依赖旧行为的用户,所以我们在实验标志后面屏蔽了 BigQueryBatchFileLoads。 (标志是use_beam_bq_sink)。

所以:

  • 在未来的版本中,我们将自动使用 BigQueryBatchFileLoads,但目前,您有两种访问方式:

    1. 直接在您的管道中使用它(例如 input | BigQueryBatchFileLoads(...))。
    2. 传递选项 --experiments use_beam_bq_sink,同时使用 WriteToBigQuery

希望对您有所帮助!