我应该担心 datastoreRpcErrors 吗?
Should I have concern about datastoreRpcErrors?
当我 运行 写入 google 云数据存储的数据流作业时,有时我看到指标显示我有一个或两个 datastoreRpcErrors
:
由于这些数据存储写入通常包含一批键,我想知道在RpcError 的情况下,是否会自动重试。如果没有,处理这些情况的好方法是什么?
我认为你应该首先确定发生了哪种错误,以便查看你的选择。
但是,在官方数据存储文档中,列出了所有可能的 errors and their error codes 。幸运的是,他们为每个人提供了建议的行动。
我的建议是您执行他们的建议,如果它们对您无效,请查看替代方案
tl;dr:默认情况下 datastoreRpcErrors
将自动使用 5 次重试。
我在beam python sdk中挖掘了datastoreio
的代码。看起来最终的实体突变是通过 DatastoreWriteFn()
.
批量刷新的
# Flush the current batch of mutations to Cloud Datastore.
_, latency_ms = helper.write_mutations(
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000)
RPCError 被 helper
中的 write_mutations
中的这段代码捕获; commit
方法有一个装饰器 @retry.with_exponential_backoff
;并且默认重试次数设置为5; retry_on_rpc_error
定义了触发重试的具体 RPCError
和 SocketError
原因。
for mutation in mutations:
commit_request.mutations.add().CopyFrom(mutation)
@retry.with_exponential_backoff(num_retries=5,
retry_filter=retry_on_rpc_error)
def commit(request):
# Client-side throttling.
while throttler.throttle_request(time.time()*1000):
try:
response = datastore.commit(request)
...
except (RPCError, SocketError):
if rpc_stats_callback:
rpc_stats_callback(errors=1)
raise
...
当我 运行 写入 google 云数据存储的数据流作业时,有时我看到指标显示我有一个或两个 datastoreRpcErrors
:
由于这些数据存储写入通常包含一批键,我想知道在RpcError 的情况下,是否会自动重试。如果没有,处理这些情况的好方法是什么?
我认为你应该首先确定发生了哪种错误,以便查看你的选择。
但是,在官方数据存储文档中,列出了所有可能的 errors and their error codes 。幸运的是,他们为每个人提供了建议的行动。
我的建议是您执行他们的建议,如果它们对您无效,请查看替代方案
tl;dr:默认情况下 datastoreRpcErrors
将自动使用 5 次重试。
我在beam python sdk中挖掘了datastoreio
的代码。看起来最终的实体突变是通过 DatastoreWriteFn()
.
# Flush the current batch of mutations to Cloud Datastore.
_, latency_ms = helper.write_mutations(
self._datastore, self._project, self._mutations,
self._throttler, self._update_rpc_stats,
throttle_delay=_Mutate._WRITE_BATCH_TARGET_LATENCY_MS/1000)
RPCError 被 helper
中的 write_mutations
中的这段代码捕获; commit
方法有一个装饰器 @retry.with_exponential_backoff
;并且默认重试次数设置为5; retry_on_rpc_error
定义了触发重试的具体 RPCError
和 SocketError
原因。
for mutation in mutations:
commit_request.mutations.add().CopyFrom(mutation)
@retry.with_exponential_backoff(num_retries=5,
retry_filter=retry_on_rpc_error)
def commit(request):
# Client-side throttling.
while throttler.throttle_request(time.time()*1000):
try:
response = datastore.commit(request)
...
except (RPCError, SocketError):
if rpc_stats_callback:
rpc_stats_callback(errors=1)
raise
...