从 Python 中的 Apache Beam Dataflow 连接到 Google Cloud BigQuery 时出现类型错误?
TypeError when connecting to Google Cloud BigQuery from Apache Beam Dataflow in Python?
当尝试在 apache beam 的 google 云数据流中初始化 python BigQuery Client() 时,出现类型错误:
TypeError('__init__() takes 2 positional arguments but 3 were given')
我正在使用 Python 3.7 和 apache beam 数据流,我必须初始化客户端并手动写入 BigQuery 而不是使用 ptransform,因为我想使用动态 table 名称通过运行时参数传递。
我试过将项目和凭据传递给客户,但它似乎没有做任何事情。此外,如果我使用 google-cloud-bigquery==1.11.2 而不是 1.13.0,它工作正常,在 apache beam 之外使用 1.13.0 也完全正常。
我显然删除了一些代码,但这实际上是引发错误的原因
class SaveObjectsBigQuery(beam.DoFn):
def process(self, element, *args, **kwargs):
# Establish BigQuery client
client = bigquery.Client(project=project)
def run():
pipeline_options = PipelineOptions()
# GoogleCloud options object
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
_data = (p
| "Create" >> beam.Create(["Start"])
)
save_data_bigquery = _data | "Save to BigQuery" >> beam.ParDo(SaveObjectsBigQuery())
在 google-cloud-bigquery 的早期版本中,这工作正常,我可以使用运行时参数和 insert_rows_json 创建一个 table 没有任何问题。显然使用 WriteToBigquery Ptransform 是理想的,但由于需要动态命名 bigquery tables.
,所以这是不可能的
编辑:
我更新了代码以尝试取出运行时值提供程序和 lambda 函数,尽管两者都收到了类似的错误:
`AttributeError: 'function/RuntimeValueProvider' 对象没有属性 'tableId'
我实际上是在尝试使用运行时值提供程序启动数据流模板以使用 WriteToBigQuery PTransform 动态命名 bigquery table。
save_data_bigquery = _data | WriteToBigQuery(
project=project,
dataset="campaign_contact",
table=value_provider.RuntimeValueProvider(option_name="table", default_value=None, value_type=str),
schema="id:STRING",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)
save_data_bigquery = _data | WriteToBigQuery(
table=lambda table: f"{project}:dataset.{runtime_options.table}",
schema="id:STRING",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)
从 Beam 2.12 开始,您可以使用 WriteToBigQuery
转换来动态分配目的地。我建议您尝试一下:)
查看 Beam 代码库中的 this test,其中显示了一个示例。
当尝试在 apache beam 的 google 云数据流中初始化 python BigQuery Client() 时,出现类型错误:
TypeError('__init__() takes 2 positional arguments but 3 were given')
我正在使用 Python 3.7 和 apache beam 数据流,我必须初始化客户端并手动写入 BigQuery 而不是使用 ptransform,因为我想使用动态 table 名称通过运行时参数传递。
我试过将项目和凭据传递给客户,但它似乎没有做任何事情。此外,如果我使用 google-cloud-bigquery==1.11.2 而不是 1.13.0,它工作正常,在 apache beam 之外使用 1.13.0 也完全正常。
我显然删除了一些代码,但这实际上是引发错误的原因
class SaveObjectsBigQuery(beam.DoFn):
def process(self, element, *args, **kwargs):
# Establish BigQuery client
client = bigquery.Client(project=project)
def run():
pipeline_options = PipelineOptions()
# GoogleCloud options object
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
_data = (p
| "Create" >> beam.Create(["Start"])
)
save_data_bigquery = _data | "Save to BigQuery" >> beam.ParDo(SaveObjectsBigQuery())
在 google-cloud-bigquery 的早期版本中,这工作正常,我可以使用运行时参数和 insert_rows_json 创建一个 table 没有任何问题。显然使用 WriteToBigquery Ptransform 是理想的,但由于需要动态命名 bigquery tables.
,所以这是不可能的编辑:
我更新了代码以尝试取出运行时值提供程序和 lambda 函数,尽管两者都收到了类似的错误:
`AttributeError: 'function/RuntimeValueProvider' 对象没有属性 'tableId'
我实际上是在尝试使用运行时值提供程序启动数据流模板以使用 WriteToBigQuery PTransform 动态命名 bigquery table。
save_data_bigquery = _data | WriteToBigQuery(
project=project,
dataset="campaign_contact",
table=value_provider.RuntimeValueProvider(option_name="table", default_value=None, value_type=str),
schema="id:STRING",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)
save_data_bigquery = _data | WriteToBigQuery(
table=lambda table: f"{project}:dataset.{runtime_options.table}",
schema="id:STRING",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)
从 Beam 2.12 开始,您可以使用 WriteToBigQuery
转换来动态分配目的地。我建议您尝试一下:)
查看 Beam 代码库中的 this test,其中显示了一个示例。