Heroku 上的 Django Celery 任务导致内存使用率过高

Django Celery task on Heroku causes high memory usage

我在 Heroku 上有一个 celery 任务连接到外部 API 并检索一些数据,存储在数据库中并重复数百次。很快(大约 10 次循环后)Heroku 开始警告内存使用率过高。有什么想法吗?

tasks.py

@app.task
def retrieve_details():
    for p in PObj.objects.filter(some_condition=True):
        p.fetch()

models.py

def fetch(self):
    v_data = self.service.getV(**dict(
        Number=self.v.number
    ))
    response = self.map_response(v_data)

    for key in ["some_key","some_other_key",]:
        setattr(self.v, key, response.get(key))

    self.v.save()

Heroky 日志

2017-01-01 10:26:25.634
132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)

Go to the log: https://api.heroku.com/myapps/xxx@heroku.com/addons/logentries

You are receiving this email because your Logentries alarm "Memory quota exceeded"
has been triggered.

In context:
2017-01-01 10:26:25.568 131 <45>1 2017-01-01T10:26:25.457354+00:00 heroku run.5891 - - Process running mem=595M(116.2%)
2017-01-01 10:26:25.634 132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)

您基本上是将一堆数据加载到内存中的 Python 字典中。这将导致大量内存开销,尤其是当您从本地数据库中抓取大量对象时。

您真的需要将所有这些对象存储在字典中吗?

大多数人为这样的事情做的是:

  • 一次从数据库中检索一个对象。
  • 处理该项目(执行您需要的任何逻辑)。
  • 重复。

这样一来,您在任何给定时间都只会在内存中存储一​​个对象,从而大大减少内存占用。

如果我是你,我会想办法将我的逻辑移动到数据库查询中,或者只是单独处理每个项目。

为了扩展名副其实的 rdegges 思想,以下是我过去在使用 celery/python 时使用的两种策略来帮助减少内存占用:(1) 启动子任务,每个子任务只处理一个对象and/or (2) 使用发电机。

  1. 启动每个只处理一个对象的子任务:

    @app.task
    def retrieve_details():
        qs = PObj.objects.filter(some_condition=True)
        for p in qs.values_list('id', flat=True):
            do_fetch.delay(p)
    
    @app.task
    def do_fetch(n_id):
        p = PObj.objects.get(id=n_id)
        p.fetch()
    

    现在您可以调整 celery,使其在处理 N 个 PObj(任务)后终止进程,以使用 --max-tasks-per-child.

  2. 保持低内存占用
  3. 使用生成器:您也可以尝试使用生成器,这样您就可以(理论上)在调用 fetch

    后丢弃 PObj
    def ps_of_interest(chunk=10):
        n = chunk
        start = 0
        while n == chunk:
            some_ps = list(PObj.objects.filter(some_condition=True)[start:start + n])
            n = len(some_ps)
            start += chunk
            for p in some_ps:
                yield p
    
    @app.task
    def retrieve_details():
        for p in ps_of_interest():
            p.fetch()
    

为了我的钱,我会选择选项 #1。