如何获取流水线的输出并对Cloud Firestore进行读写
How to obtain the output of the pipeline and perform read&write to Cloud Firestore
我正在使用 Apache Beam 从 Pub/Sub 获取日志,其中包含网页浏览流量的信息。每个页面都包含唯一 ID,当一个页面浏览流量日志来自 Pub/Sub 时,Cloud Dataflow 将以恒定的窗口方式收集它们并对其进行计数。在组合器的最后,我们会得到这样的东西:
12345, 2
12456, 1
15213, 1
...
据我所知,ParDo 是一种用于通用并行处理的 Beam 变换。组合后,我希望实现一个转换,将查询写入 Cloud Firestore 以获取现有的页面浏览 ID,获取当前浏览量,对其执行添加并执行写入操作以从组合输出中逐个更新浏览量,如图所示多于。有什么建议吗?
下面是我目前为止的 UpdateViewCount 代码。当我得到查询时,似乎不可能有一个 for 循环来获取查询(这将只有一行查询,因为综合浏览量是唯一的)
class UpdateIntoFireStore(beam.DoFn):
def process(self, element):
listingid, count = element
doc_ref = db.collection('listings').where('listingid', u'==', '12345')
try:
docs = doc_ref.get()
for doc in docs:
print doc
except NotFound:
print(u'No such document!')
我解决了。不需要循环来检索数据,我应该检索带有文档名称的特定 ID。
doc_ref = db.collection(u'listings').document(listingid)
try:
doc = doc_ref.get()
doc_dict = doc.to_dict()
self.cur_count = doc_dict[u'count']
doc_ref.update({
u'count': self.cur_count + count
})
except NotFound:
doc_ref.set({'count': count})
我正在使用 Apache Beam 从 Pub/Sub 获取日志,其中包含网页浏览流量的信息。每个页面都包含唯一 ID,当一个页面浏览流量日志来自 Pub/Sub 时,Cloud Dataflow 将以恒定的窗口方式收集它们并对其进行计数。在组合器的最后,我们会得到这样的东西:
12345, 2
12456, 1
15213, 1
...
据我所知,ParDo 是一种用于通用并行处理的 Beam 变换。组合后,我希望实现一个转换,将查询写入 Cloud Firestore 以获取现有的页面浏览 ID,获取当前浏览量,对其执行添加并执行写入操作以从组合输出中逐个更新浏览量,如图所示多于。有什么建议吗?
下面是我目前为止的 UpdateViewCount 代码。当我得到查询时,似乎不可能有一个 for 循环来获取查询(这将只有一行查询,因为综合浏览量是唯一的)
class UpdateIntoFireStore(beam.DoFn):
def process(self, element):
listingid, count = element
doc_ref = db.collection('listings').where('listingid', u'==', '12345')
try:
docs = doc_ref.get()
for doc in docs:
print doc
except NotFound:
print(u'No such document!')
我解决了。不需要循环来检索数据,我应该检索带有文档名称的特定 ID。
doc_ref = db.collection(u'listings').document(listingid)
try:
doc = doc_ref.get()
doc_dict = doc.to_dict()
self.cur_count = doc_dict[u'count']
doc_ref.update({
u'count': self.cur_count + count
})
except NotFound:
doc_ref.set({'count': count})