GCP 数据流 - WriteToBigQuery() 期间出现 NoneType 错误

GCP Dataflow - NoneType error during WriteToBigQuery()

我正在尝试使用 beam 将 csv 文件中的数据从 GCS 传输到 BQ,但是当我调用 WriteToBigQuery 时出现 NoneType 错误。错误信息:

AttributeError: 'NoneType' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']

我的管道代码:

import apache_beam as beam
from apache_beam.pipeline import PipelineOptions
from apache_beam.io.textio import ReadFromText


options = {
    'project': project,
    'region': region,
    'temp_location': bucket
    'staging_location': bucket
    'setup_file': './setup.py'
}


class Split(beam.DoFn):
    def process(self, element):
        n, cc = element.split(",")
        return [{
            'n': int(n.strip('"')),
            'connection_country': str(cc.strip()),
        }]


pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)

with beam.Pipeline(options=pipeline_options) as pipeline:
    (pipeline
        | 'Read from GCS' >> ReadFromText('file_path*', skip_header_lines=1)
        | 'parse input' >> beam.ParDo(Split())
        | 'print' >> beam.Map(print)
        | 'Write to BQ' >> beam.io.WriteToBigQuery(
            'from_gcs', 'demo', schema='n:INTEGER, connection_country:STRING',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
        )

我的 csv 看起来像这样:

在 print() 阶段的光束摘录如下所示:

感谢任何帮助!

您收到该错误是因为 print 函数 没有 return 任何东西,所以没有元素进入 WriteToBQ 步骤。您可以使用以下方法修复它:

def print_fn(element):
    print(element)
    return element

{..}
        | 'print' >> beam.Map(print_fn) # Note that now I'm referencing to the fn
        | 'Write to BQ' >> beam.io.WriteToBigQuery(
{..}

此外,如果您要在 Dataflow 中 运行 这个,print 不会出现,但您可以使用 logging.info()