Avoiding or handling "BadRequestError: The requested query has expired."?

Avoiding or handling "BadRequestError: The requested query has expired."?

我正在使用链式延迟任务和查询游标循环访问 App Engine 中的数据。 Python 2.7,使用 db(不是 ndb)。例如

def loop_assets(cursor = None):

  try:

     assets = models.Asset.all().order('-size')

     if cursor:
        assets.with_cursor(cursor)

     for asset in assets.run():

        if asset.is_special():
           asset.yay = True
           asset.put()

  except db.Timeout:
     cursor = assets.cursor()
     deferred.defer(loop_assets, cursor = cursor,  _countdown = 3, _target = version, _retry_options = dont_retry)
     return

此 运行 总计约 75 分钟(每个任务约 1 分钟),然后引发此异常:

BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results.

正在阅读 the docs,唯一声明的原因是:

New App Engine releases may change internal implementation details, invalidating cursors that depend on them. If an application attempts to use a cursor that is no longer valid, the Datastore raises a BadRequestError exception.

所以也许这就是发生的事情,但我第一次尝试这种技术时遇到了 'change in internal implementation'(除非它们经常发生),这似乎是巧合。

还有其他解释吗? 有没有办法重新设计我的代码来避免这种情况?

如果没有,我认为唯一的解决方案是标记哪些资产已被处理,然后在查询中添加一个额外的过滤器以排除那些,然后在每次结束时手动重新启动进程。

作为参考,this question 问了类似的问题,但接受的答案是 'use cursors',我已经在这样做了,所以这不可能是同一个问题。

你可能想看看AppEngine MapReduce

MapReduce is a programming model for processing large amounts of data in a parallel and distributed fashion. It is useful for large, long-running jobs that cannot be handled within the scope of a single request, tasks like:

  • Analyzing application logs
  • Aggregating related data from external sources
  • Transforming data from one format to another
  • Exporting data for external analysis

问这个问题的时候,我运行有过一次代码,遇到过一次BadRequestError。然后我再次 运行 它,并且它在没有 BadRequestError 的情况下完成,总共 运行ning 约 6 小时。所以在这一点上我会说这个问题最好的 'solution' 是让代码幂等(这样它就可以重试)然后添加一些代码来自动重试。

在我的特定情况下,还可以调整查询,以便在游标 'expires' 的情况下,查询可以重新启动 w/o 游标停止的地方。有效地将查询更改为:

assets = models.Asset.all().order('-size').filter('size <', last_seen_size)

其中 last_seen_size 是从每个任务传递给下一个任务的值。