Google Cloud Datastore:计算处于特定状态的所有实体

Google Cloud Datastore: Counting all entities in a certain state

背景

我需要向大约 100 万台设备发送大量通知,我正在使用 Google Cloud Functions 构建它。

在当前设置中,我将每个设备令牌作为 PubSub 消息排队:

这或多或少工作得很好,我从中获得了不错的性能,每秒处理 1.5K 令牌。

问题

我想跟踪整个工作的当前进度。鉴于我知道我希望处理多少通知,我希望能够报告类似 x/1_000_000 已处理的内容,然后当失败+成功的总和与我想要处理的一样多时认为它已完成.

DataStore 文档建议不要 运行 对实体本身进行计数,因为它不会高效,这一点我可以确认。我按照他们的 sharded counter 示例文档实现了一个计数器,我在最后包含了它。

我看到的问题是它既非常慢又很容易返回 409 Contention errors 这使得我的函数调用重试这不是理想的,因为计数本身对过程不是必需的并且每个通知的重试预算有限。在实践中,最失败的事情是增加在过程结束时发生的计数器,这会增加通知读取的负载以检查它们在重试时的状态,这意味着我最终得到一个小于实际成功通知的计数器.

我 运行 一个使用 wrk 的快速基准测试,似乎可以通过递增计数器获得大约 400 RPS,平均延迟为 250 毫秒。与每个通知执行大约 3 个 DataStore 查询的通知逻辑本身相比,这是相当慢的,并且可能比递增计数器更复杂。当添加到争用错误时,我最终得到了一个我认为不稳定的实现。据我所知,Datastore 通常会在持续大量使用的情况下自动扩展,但使用此服务的模式非常罕见,而且对于整批令牌,因此不会有任何以前的流量来扩展它。

问题

代码

与datastore交互的代码

DATASTORE_READ_BATCH_SIZE = 100

class Counter():
    kind = "counter"
    shards = 2000

    @staticmethod
    def _key(namespace, shard):
        return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()

    @staticmethod
    def count(namespace):
        keys = []
        total = 0
        for shard in range(Counter.shards):
            if len(keys) == DATASTORE_READ_BATCH_SIZE:
                counters = client.get_multi(keys)
                total = total + sum([int(c["count"]) for c in counters])
                keys = []
            keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))

        if len(keys) != 0:
            counters = client.get_multi(keys)
            total = total + sum([int(c["count"]) for c in counters])

        return total

    @staticmethod
    def increment(namespace):
        key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
        with client.transaction():
            entity = client.get(key)
            if entity is None:
                entity = datastore.Entity(key=key)
                entity.update({
                    "count": 0,
                })
            entity.update({
                "count": entity["count"] + 1,
            })
            client.put(entity)

这是从 Google 云函数中调用的

from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter

def counter(request):
    args = request.args

    if args.get("platform"):
        Counter.increment(args["platform"])
        return

    return jsonify({
        FCM: Counter.count(FCM),
        APNS: Counter.count(APNS)
    })

这用于递增和读取计数,并按平台拆分为 iOS 和 Android。

最后我放弃了计数器并开始在 BigQuery 中保存通知的状态。定价仍然合理,因为它仍然是每次使用,而且数据插入的流版本似乎足够快,在实践中不会给我带来任何问题。

有了这个,我可以使用一个简单的 sql 查询来计算与批处理作业匹配的所有实体。对于所有实体,这最终需要大约 3 秒的时间,与替代方案相比,这对我来说是可以接受的性能,因为这仅供内部使用。