Beam Python Dataflow Runner 在 apply_WriteToBigQuery 中使用弃用的 BigQuerySink 而不是 WriteToBigQuery
Beam Python Dataflow Runner Uses deprecated BigQuerySink instead of WriteToBigQuery in apply_WriteToBigQuery
对于DataflowRunner内部的实现细节,很多人可能并不关心是用BigQuerySink
还是WriteToBigQuery
但是,在我的例子中,我试图将我的代码部署到数据流模板,并使用 RunTimeValueProvider 作为参数。 WriteToBigQuery
:
支持此行为
class WriteToBigQuery(PTransform):
....
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
Multiple destinations are only supported on Batch pipelines at the
moment.
BigQuerySink不支持:
class BigQuerySink(dataflow_io.NativeSink):
table (str): The ID of the table. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
**dataset** argument is :data:`None` then the table argument must
contain the entire table reference specified as: ``'DATASET.TABLE'`` or
``'PROJECT:DATASET.TABLE'``.
更有趣的是,代码中的BigQuerySink
自2.11.0以来已被弃用。
@deprecated(since='2.11.0', current="WriteToBigQuery")
然而在 DataFlowRunner 中,当前的代码和注释似乎完全不符合 WriteToBigQuery
是默认 class 的预期,要使用 over BigQuerySink
:
def apply_WriteToBigQuery(self, transform, pcoll, options):
# Make sure this is the WriteToBigQuery class that we expected, and that
# users did not specifically request the new BQ sink by passing experiment
# flag.
# TODO(BEAM-6928): Remove this function for release 2.14.0.
experiments = options.view_as(DebugOptions).experiments or []
if (not isinstance(transform, beam.io.WriteToBigQuery)
or 'use_beam_bq_sink' in experiments):
return self.apply_PTransform(transform, pcoll, options)
if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
raise RuntimeError(
'Schema auto-detection is not supported on the native sink')
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
if (transform.write_disposition ==
beam.io.BigQueryDisposition.WRITE_TRUNCATE):
raise RuntimeError('Can not use write truncation mode in streaming')
return self.apply_PTransform(transform, pcoll, options)
else:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
schema = None
if transform.schema:
schema = parse_table_schema_from_json(json.dumps(transform.schema))
return pcoll | 'WriteToBigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
transform.table_reference.tableId,
transform.table_reference.datasetId,
transform.table_reference.projectId,
schema,
transform.create_disposition,
transform.write_disposition,
kms_key=transform.kms_key))
我的问题有两个:
- 为什么
DataflowRunner
和 io.BigQuery
class 之间的 contract/expectations 不同?
- 在不等待错误修复的情况下,有人对如何强制
DataflowRunner
使用 WriteToBigQuery
而不是 BigQuerySink
有什么建议吗?
WriteToBigQuery
转换有两种不同的写入 BigQuery 的策略:
- 流式插入 BigQuery 端点
- 定期触发文件加载作业(或对批处理管道触发一次)
对于 Python SDK,我们最初只支持流式插入,并且我们有一个运行器本机实现的文件加载,它只适用于数据流(这是 BigQuerySink
)。
对于 Dataflow 上的批处理管道 运行,BigQuerySink
被替换为 - 正如您正确找到的那样。对于所有其他情况,使用了流式插入。
在 Beam 的最新版本中,我们在 SDK 中添加了对文件加载的本地支持 - 实现在 BigQueryBatchFileLoads
.
中
因为我们不想破坏依赖旧行为的用户,所以我们在实验标志后面屏蔽了 BigQueryBatchFileLoads
。 (标志是use_beam_bq_sink
)。
所以:
在未来的版本中,我们将自动使用 BigQueryBatchFileLoads
,但目前,您有两种访问方式:
- 直接在您的管道中使用它(例如
input | BigQueryBatchFileLoads(...)
)。
- 传递选项
--experiments use_beam_bq_sink
,同时使用 WriteToBigQuery
。
希望对您有所帮助!
对于DataflowRunner内部的实现细节,很多人可能并不关心是用BigQuerySink
还是WriteToBigQuery
但是,在我的例子中,我试图将我的代码部署到数据流模板,并使用 RunTimeValueProvider 作为参数。 WriteToBigQuery
:
class WriteToBigQuery(PTransform):
....
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
Multiple destinations are only supported on Batch pipelines at the
moment.
BigQuerySink不支持:
class BigQuerySink(dataflow_io.NativeSink):
table (str): The ID of the table. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
**dataset** argument is :data:`None` then the table argument must
contain the entire table reference specified as: ``'DATASET.TABLE'`` or
``'PROJECT:DATASET.TABLE'``.
更有趣的是,代码中的BigQuerySink
自2.11.0以来已被弃用。
@deprecated(since='2.11.0', current="WriteToBigQuery")
然而在 DataFlowRunner 中,当前的代码和注释似乎完全不符合 WriteToBigQuery
是默认 class 的预期,要使用 over BigQuerySink
:
def apply_WriteToBigQuery(self, transform, pcoll, options):
# Make sure this is the WriteToBigQuery class that we expected, and that
# users did not specifically request the new BQ sink by passing experiment
# flag.
# TODO(BEAM-6928): Remove this function for release 2.14.0.
experiments = options.view_as(DebugOptions).experiments or []
if (not isinstance(transform, beam.io.WriteToBigQuery)
or 'use_beam_bq_sink' in experiments):
return self.apply_PTransform(transform, pcoll, options)
if transform.schema == beam.io.gcp.bigquery.SCHEMA_AUTODETECT:
raise RuntimeError(
'Schema auto-detection is not supported on the native sink')
standard_options = options.view_as(StandardOptions)
if standard_options.streaming:
if (transform.write_disposition ==
beam.io.BigQueryDisposition.WRITE_TRUNCATE):
raise RuntimeError('Can not use write truncation mode in streaming')
return self.apply_PTransform(transform, pcoll, options)
else:
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
schema = None
if transform.schema:
schema = parse_table_schema_from_json(json.dumps(transform.schema))
return pcoll | 'WriteToBigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
transform.table_reference.tableId,
transform.table_reference.datasetId,
transform.table_reference.projectId,
schema,
transform.create_disposition,
transform.write_disposition,
kms_key=transform.kms_key))
我的问题有两个:
- 为什么
DataflowRunner
和io.BigQuery
class 之间的 contract/expectations 不同? - 在不等待错误修复的情况下,有人对如何强制
DataflowRunner
使用WriteToBigQuery
而不是BigQuerySink
有什么建议吗?
WriteToBigQuery
转换有两种不同的写入 BigQuery 的策略:
- 流式插入 BigQuery 端点
- 定期触发文件加载作业(或对批处理管道触发一次)
对于 Python SDK,我们最初只支持流式插入,并且我们有一个运行器本机实现的文件加载,它只适用于数据流(这是 BigQuerySink
)。
对于 Dataflow 上的批处理管道 运行,BigQuerySink
被替换为 - 正如您正确找到的那样。对于所有其他情况,使用了流式插入。
在 Beam 的最新版本中,我们在 SDK 中添加了对文件加载的本地支持 - 实现在 BigQueryBatchFileLoads
.
因为我们不想破坏依赖旧行为的用户,所以我们在实验标志后面屏蔽了 BigQueryBatchFileLoads
。 (标志是use_beam_bq_sink
)。
所以:
在未来的版本中,我们将自动使用
BigQueryBatchFileLoads
,但目前,您有两种访问方式:- 直接在您的管道中使用它(例如
input | BigQueryBatchFileLoads(...)
)。 - 传递选项
--experiments use_beam_bq_sink
,同时使用WriteToBigQuery
。
- 直接在您的管道中使用它(例如
希望对您有所帮助!