在 Apache Beam 中导入 Google Firestore Python 客户端

Importing Google Firestore Python client in Apache beam

我正在尝试编写一个小型 DoFn 以将数据从我的数据流管道写入 Cloud Firestore。在本地,一切都按预期工作,但是当尝试 运行 数据流时,一切都崩溃了!

这是我的函数:

class FirestoreWriteDoFn(beam.DoFn):
  def __init__(self):
    super(FirestoreWriteDoFn, self).__init__()

  def start_bundle(self):
    import google.cloud 
    self.db = google.cloud.firestore.Client(project='ag-audience')

  def process(self, element):
    fb_data = {
      'topics': element.get('page_keywords').split(','),
      'title': element.get('page_title')
    }
    logging.info('Inserting into Firebase: %s', fb_data)
    fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
    result = fb_doc.create(fb_data)
    yield result

这是部署它的命令:

$ python pipe/main.py \
  --runner=dataflow \
  --project=ag-audience \
  --region=europe-west1 \
  --machine_type=n1-standard-4 \
  --temp_location=gs://ag-dataflow/tmp \
  --requirements_file requirements.txt \
  --save_main_session \
  --streaming

这是我的 requirements.txt:

google-cloud-firestore>=1.3.0

我试过很多东西: - 在文件顶部全局导入 firestore 模块。 - 以不同方式导入:import x from yimport y。 - 在代码的各个部分导入它。

错误总是一些未定义的东西: NameError: global name 'google' is not defined [while running 'generatedPtransform-480']

编辑:(添加管道代码)

with beam.Pipeline(argv=pipeline_args) as p:

    crawled_features = (p 
      | 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
      | 'DebugLogInput' >> beam.ParDo(LogResults())
      | 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
    )

    firebase_stream = (crawled_features
      | 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
      | 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
    )

    bigquery_stream = (crawled_features 
      | 'ShapeRow' >> beam.Map(ShapeBQRow)
      | 'LogShapedBQRow' >> beam.ParDo(LogResults())
      | 'WriteBigQuery' >> beam.io.WriteToBigQuery(
        table=BIGQUERY_TABLE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )

问题出在 Beam 版本上。在 2.13.0 中可能存在一些错误,但在 2.12.0 中它可以正常工作,基于 Python package errors while running GCP Dataflow。我也亲自验证了。