将数据写入管道中的 Google 个 Firestore

Write data into Google Firestore in pipeline

我想通过带有 Apache Beam 的 Dataflow runner 将数据从 Cloud BigQuery 读取到 Cloud Datastore。从 documentation 开始,尚不支持 Firestore。我自己写 class 来做。

class UpdateIntoFireStore(beam.DoFn):

    def process(self, element):
        try:
            cred = credentials.Certificate({
              "..."
            })

            firebase_admin.initialize_app(cred, {
            'projectId': '...',
            })
        except ValueError:
            pass
        db = firestore.client()
        doc_ref = db.collection(u'poi')
        doc_ref.add(element)

流水线如下:

job = ( p  | 'Read from BigQuery' >> Read(BigQuerySource(query="SELECT * FROM ...", use_standard_sql=True))
           | 'Update to Firestore' >> beam.ParDo(UpdateIntoFireStore()))

这个方法好吗?我担心并行处理对 Cloud Firestore 上这些写操作的影响。

这与从数据流进行外部调用完全一样。从技术上讲,这将起作用。但是,有几件事需要注意。

  1. 无法保证单个元素将被处理多少次,因此您可能会在 firestore 中获得同一元素的多个条目。
  2. 您将对每个元素单独调用 firestore,并且没有 firestore 的缓存 clients/connections。

使用 start_bundle 定义您的客户端。

start_bundle - Called before a bundle of elements is processed on a worker. Elements to be processed are split into bundles and distributed to workers. Before a worker calls process() on the first element of its bundle, it calls this method.

更好的方法:

class FirestoreDoFn(beam.DoFn):

def __init__(self):
    super(FirestoreDoFn, self).__init__()

def start_bundle(self):
    self.firestore_client = GoogleServices(
        crendential_path="<cred-path-in-here>"
    ).init_firestore(
        project_id="<your-project-id>",
        collection_id="<collection-id>"
    )

def process(self, element, *args, **kwargs):
    logging.info(element)
    # response = self.firestore_client.save()
    # logging.info("response: {}".format(response))
    return {"status":"ok"}