将数据写入管道中的 Google 个 Firestore
Write data into Google Firestore in pipeline
我想通过带有 Apache Beam 的 Dataflow runner 将数据从 Cloud BigQuery 读取到 Cloud Datastore。从 documentation 开始,尚不支持 Firestore。我自己写 class 来做。
class UpdateIntoFireStore(beam.DoFn):
def process(self, element):
try:
cred = credentials.Certificate({
"..."
})
firebase_admin.initialize_app(cred, {
'projectId': '...',
})
except ValueError:
pass
db = firestore.client()
doc_ref = db.collection(u'poi')
doc_ref.add(element)
流水线如下:
job = ( p | 'Read from BigQuery' >> Read(BigQuerySource(query="SELECT * FROM ...", use_standard_sql=True))
| 'Update to Firestore' >> beam.ParDo(UpdateIntoFireStore()))
这个方法好吗?我担心并行处理对 Cloud Firestore 上这些写操作的影响。
这与从数据流进行外部调用完全一样。从技术上讲,这将起作用。但是,有几件事需要注意。
- 无法保证单个元素将被处理多少次,因此您可能会在 firestore 中获得同一元素的多个条目。
- 您将对每个元素单独调用 firestore,并且没有 firestore 的缓存 clients/connections。
使用 start_bundle
定义您的客户端。
start_bundle
- Called before a bundle of elements is processed on a worker.
Elements to be processed are split into bundles and distributed
to workers. Before a worker calls process() on the first element
of its bundle, it calls this method.
更好的方法:
class FirestoreDoFn(beam.DoFn):
def __init__(self):
super(FirestoreDoFn, self).__init__()
def start_bundle(self):
self.firestore_client = GoogleServices(
crendential_path="<cred-path-in-here>"
).init_firestore(
project_id="<your-project-id>",
collection_id="<collection-id>"
)
def process(self, element, *args, **kwargs):
logging.info(element)
# response = self.firestore_client.save()
# logging.info("response: {}".format(response))
return {"status":"ok"}
我想通过带有 Apache Beam 的 Dataflow runner 将数据从 Cloud BigQuery 读取到 Cloud Datastore。从 documentation 开始,尚不支持 Firestore。我自己写 class 来做。
class UpdateIntoFireStore(beam.DoFn):
def process(self, element):
try:
cred = credentials.Certificate({
"..."
})
firebase_admin.initialize_app(cred, {
'projectId': '...',
})
except ValueError:
pass
db = firestore.client()
doc_ref = db.collection(u'poi')
doc_ref.add(element)
流水线如下:
job = ( p | 'Read from BigQuery' >> Read(BigQuerySource(query="SELECT * FROM ...", use_standard_sql=True))
| 'Update to Firestore' >> beam.ParDo(UpdateIntoFireStore()))
这个方法好吗?我担心并行处理对 Cloud Firestore 上这些写操作的影响。
这与从数据流进行外部调用完全一样。从技术上讲,这将起作用。但是,有几件事需要注意。
- 无法保证单个元素将被处理多少次,因此您可能会在 firestore 中获得同一元素的多个条目。
- 您将对每个元素单独调用 firestore,并且没有 firestore 的缓存 clients/connections。
使用 start_bundle
定义您的客户端。
start_bundle
- Called before a bundle of elements is processed on a worker. Elements to be processed are split into bundles and distributed to workers. Before a worker calls process() on the first element of its bundle, it calls this method.
更好的方法:
class FirestoreDoFn(beam.DoFn):
def __init__(self):
super(FirestoreDoFn, self).__init__()
def start_bundle(self):
self.firestore_client = GoogleServices(
crendential_path="<cred-path-in-here>"
).init_firestore(
project_id="<your-project-id>",
collection_id="<collection-id>"
)
def process(self, element, *args, **kwargs):
logging.info(element)
# response = self.firestore_client.save()
# logging.info("response: {}".format(response))
return {"status":"ok"}