如何获取流水线的输出并对Cloud Firestore进行读写

How to obtain the output of the pipeline and perform read&write to Cloud Firestore

我正在使用 Apache Beam 从 Pub/Sub 获取日志,其中包含网页浏览流量的信息。每个页面都包含唯一 ID,当一个页面浏览流量日志来自 Pub/Sub 时,Cloud Dataflow 将以恒定的窗口方式收集它们并对其进行计数。在组合器的最后,我们会得到这样的东西:

12345, 2
12456, 1
15213, 1
...

据我所知,ParDo 是一种用于通用并行处理的 Beam 变换。组合后,我希望实现一个转换,将查询​​写入 Cloud Firestore 以获取现有的页面浏览 ID,获取当前浏览量,对其执行添加并执行写入操作以从组合输出中逐个更新浏览量,如图所示多于。有什么建议吗?

下面是我目前为止的 UpdateViewCount 代码。当我得到查询时,似乎不可能有一个 for 循环来获取查询(这将只有一行查询,因为综合浏览量是唯一的)

class UpdateIntoFireStore(beam.DoFn):
    def process(self, element):
        listingid, count = element
        doc_ref = db.collection('listings').where('listingid', u'==', '12345')
        try:
            docs = doc_ref.get()
            for doc in docs:
                print doc
        except NotFound:
            print(u'No such document!')

我解决了。不需要循环来检索数据,我应该检索带有文档名称的特定 ID。

doc_ref = db.collection(u'listings').document(listingid)
try:
    doc = doc_ref.get()
    doc_dict = doc.to_dict()
    self.cur_count = doc_dict[u'count']
    doc_ref.update({
        u'count': self.cur_count + count
    })
except NotFound:
    doc_ref.set({'count': count})