Google Cloud Datastore:计算处于特定状态的所有实体
Google Cloud Datastore: Counting all entities in a certain state
背景
我需要向大约 100 万台设备发送大量通知,我正在使用 Google Cloud Functions 构建它。
在当前设置中,我将每个设备令牌作为 PubSub 消息排队:
- 在 DataStore 中存储待处理通知,用于跟踪重试和成功状态
- 尝试发送通知
- 将通知标记为成功或失败(如果重试次数足够多但尚未通过)
这或多或少工作得很好,我从中获得了不错的性能,每秒处理 1.5K 令牌。
问题
我想跟踪整个工作的当前进度。鉴于我知道我希望处理多少通知,我希望能够报告类似 x/1_000_000 已处理的内容,然后当失败+成功的总和与我想要处理的一样多时认为它已完成.
DataStore 文档建议不要 运行 对实体本身进行计数,因为它不会高效,这一点我可以确认。我按照他们的 sharded counter 示例文档实现了一个计数器,我在最后包含了它。
我看到的问题是它既非常慢又很容易返回 409 Contention error
s 这使得我的函数调用重试这不是理想的,因为计数本身对过程不是必需的并且每个通知的重试预算有限。在实践中,最失败的事情是增加在过程结束时发生的计数器,这会增加通知读取的负载以检查它们在重试时的状态,这意味着我最终得到一个小于实际成功通知的计数器.
我 运行 一个使用 wrk 的快速基准测试,似乎可以通过递增计数器获得大约 400 RPS,平均延迟为 250 毫秒。与每个通知执行大约 3 个 DataStore 查询的通知逻辑本身相比,这是相当慢的,并且可能比递增计数器更复杂。当添加到争用错误时,我最终得到了一个我认为不稳定的实现。据我所知,Datastore 通常会在持续大量使用的情况下自动扩展,但使用此服务的模式非常罕见,而且对于整批令牌,因此不会有任何以前的流量来扩展它。
问题
- 我是否遗漏了一些可以改进以降低速度的计数器实现?
- 我应该考虑采用其他方法来获得我想要的东西吗?
代码
与datastore交互的代码
DATASTORE_READ_BATCH_SIZE = 100
class Counter():
kind = "counter"
shards = 2000
@staticmethod
def _key(namespace, shard):
return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()
@staticmethod
def count(namespace):
keys = []
total = 0
for shard in range(Counter.shards):
if len(keys) == DATASTORE_READ_BATCH_SIZE:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
keys = []
keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))
if len(keys) != 0:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
return total
@staticmethod
def increment(namespace):
key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
with client.transaction():
entity = client.get(key)
if entity is None:
entity = datastore.Entity(key=key)
entity.update({
"count": 0,
})
entity.update({
"count": entity["count"] + 1,
})
client.put(entity)
这是从 Google 云函数中调用的
from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter
def counter(request):
args = request.args
if args.get("platform"):
Counter.increment(args["platform"])
return
return jsonify({
FCM: Counter.count(FCM),
APNS: Counter.count(APNS)
})
这用于递增和读取计数,并按平台拆分为 iOS 和 Android。
最后我放弃了计数器并开始在 BigQuery 中保存通知的状态。定价仍然合理,因为它仍然是每次使用,而且数据插入的流版本似乎足够快,在实践中不会给我带来任何问题。
有了这个,我可以使用一个简单的 sql 查询来计算与批处理作业匹配的所有实体。对于所有实体,这最终需要大约 3 秒的时间,与替代方案相比,这对我来说是可以接受的性能,因为这仅供内部使用。
背景
我需要向大约 100 万台设备发送大量通知,我正在使用 Google Cloud Functions 构建它。
在当前设置中,我将每个设备令牌作为 PubSub 消息排队:
- 在 DataStore 中存储待处理通知,用于跟踪重试和成功状态
- 尝试发送通知
- 将通知标记为成功或失败(如果重试次数足够多但尚未通过)
这或多或少工作得很好,我从中获得了不错的性能,每秒处理 1.5K 令牌。
问题
我想跟踪整个工作的当前进度。鉴于我知道我希望处理多少通知,我希望能够报告类似 x/1_000_000 已处理的内容,然后当失败+成功的总和与我想要处理的一样多时认为它已完成.
DataStore 文档建议不要 运行 对实体本身进行计数,因为它不会高效,这一点我可以确认。我按照他们的 sharded counter 示例文档实现了一个计数器,我在最后包含了它。
我看到的问题是它既非常慢又很容易返回 409 Contention error
s 这使得我的函数调用重试这不是理想的,因为计数本身对过程不是必需的并且每个通知的重试预算有限。在实践中,最失败的事情是增加在过程结束时发生的计数器,这会增加通知读取的负载以检查它们在重试时的状态,这意味着我最终得到一个小于实际成功通知的计数器.
我 运行 一个使用 wrk 的快速基准测试,似乎可以通过递增计数器获得大约 400 RPS,平均延迟为 250 毫秒。与每个通知执行大约 3 个 DataStore 查询的通知逻辑本身相比,这是相当慢的,并且可能比递增计数器更复杂。当添加到争用错误时,我最终得到了一个我认为不稳定的实现。据我所知,Datastore 通常会在持续大量使用的情况下自动扩展,但使用此服务的模式非常罕见,而且对于整批令牌,因此不会有任何以前的流量来扩展它。
问题
- 我是否遗漏了一些可以改进以降低速度的计数器实现?
- 我应该考虑采用其他方法来获得我想要的东西吗?
代码
与datastore交互的代码
DATASTORE_READ_BATCH_SIZE = 100
class Counter():
kind = "counter"
shards = 2000
@staticmethod
def _key(namespace, shard):
return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()
@staticmethod
def count(namespace):
keys = []
total = 0
for shard in range(Counter.shards):
if len(keys) == DATASTORE_READ_BATCH_SIZE:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
keys = []
keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))
if len(keys) != 0:
counters = client.get_multi(keys)
total = total + sum([int(c["count"]) for c in counters])
return total
@staticmethod
def increment(namespace):
key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
with client.transaction():
entity = client.get(key)
if entity is None:
entity = datastore.Entity(key=key)
entity.update({
"count": 0,
})
entity.update({
"count": entity["count"] + 1,
})
client.put(entity)
这是从 Google 云函数中调用的
from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter
def counter(request):
args = request.args
if args.get("platform"):
Counter.increment(args["platform"])
return
return jsonify({
FCM: Counter.count(FCM),
APNS: Counter.count(APNS)
})
这用于递增和读取计数,并按平台拆分为 iOS 和 Android。
最后我放弃了计数器并开始在 BigQuery 中保存通知的状态。定价仍然合理,因为它仍然是每次使用,而且数据插入的流版本似乎足够快,在实践中不会给我带来任何问题。
有了这个,我可以使用一个简单的 sql 查询来计算与批处理作业匹配的所有实体。对于所有实体,这最终需要大约 3 秒的时间,与替代方案相比,这对我来说是可以接受的性能,因为这仅供内部使用。