Celery 3.1.19 为每个工作线程创建过多线程导致服务器超载 100% CPU 无法创建更多线程

Celery 3.1.19 creating too many threads for each worker causing the server to overload with 100% CPU unable to create more threads

用例:在 Django 框架(1.6 版)中使用 celery 来安排主要写入数据库的任务。我只有一个自定义队列,芹菜节拍调度程序将任务放在其中。创建了一个 celery worker 监听这个并发为 8

的队列

问题:8 个单独的工作人员中的每一个都继续创建永远不会回收的线程(我的猜测)。这会导致线程过多(我已经看到计数高达 20k 线程)。在 4-5 小时内,线程数达到 10k!

我看到的错误:无法启动新线程。

Python 关于谁开始新线程的回溯给了我这个:调用 django 保存创建一个新线程。这里的“adgroup”是一个django模型对象

[2015-12-03 18:40:17,133: WARNING/Worker-3] adgroup.save(update_fields=['bids_today', 'impressions_today', 'spent_today', 'last_metric_update_time'])
[2015-12-03 18:40:17,887: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/django/db/models/base.py", line 545, in save
[2015-12-03 18:40:17,887: WARNING/Worker-3] force_update=force_update, update_fields=update_fields)
[2015-12-03 18:40:18,715: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/django/db/models/base.py", line 582, in save_base
[2015-12-03 18:40:18,716: WARNING/Worker-3] update_fields=update_fields, raw=raw, using=using)
[2015-12-03 18:40:18,716: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/django/dispatch/dispatcher.py", line 185, in send
[2015-12-03 18:40:18,716: WARNING/Worker-3] response = receiver(signal=self, sender=sender, **named)
[2015-12-03 18:40:19,300: INFO/MainProcess] Task ExtendTV.celery_tasks.stats_collector.collectAdGroupMetricsTask[2ae52b3d-77b9-46d3-93ac-d7fad9b96382] succeeded in 26.486441362s: None
[2015-12-03 18:40:19,395: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/haystack/signals.py", line 48, in handle_save
[2015-12-03 18:40:19,593: WARNING/Worker-3] index.update_object(instance, using=using)
[2015-12-03 18:40:19,593: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/haystack/indexes.py", line 274, in update_object
[2015-12-03 18:40:19,593: WARNING/Worker-3] backend.update(self, [instance])
[2015-12-03 18:40:19,593: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/haystack/backends/whoosh_backend.py", line 208, in update
[2015-12-03 18:40:20,515: WARNING/Worker-3] writer.commit()
[2015-12-03 18:40:20,516: WARNING/Worker-3] File "/home/ec2-user/venv/local/lib/python2.7/dist-packages/whoosh/writing.py", line 1043, in commit
[2015-12-03 18:40:21,318: WARNING/Worker-3] self.start()
[2015-12-03 18:40:21,642: WARNING/Worker-3] File "/usr/lib64/python2.7/threading.py", line 748, in start
[2015-12-03 18:40:22,340: WARNING/Worker-3] _start_new_thread(self.__bootstrap, ())
[2015-12-03 18:40:22,340: WARNING/Worker-3] error: can't start new thread

杂项信息: 从图片中可以看出内存在正常范围内。 这个“线程问题”在以前的 celery 3.0.x 版本中是不存在的。然而这里的内存变得相当高

我用来创建工人的 Celery 命令:

celery -A ProjectName worker -l DEBUG -Q ExampleQueueName

我使用的 Celery 设置:

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES=60*60*24
CELERYD_PREFETCH_MULTIPLIER = 128

其他相关设置: 使用 rabbitmq 3.5.4 作为消息代理

更新:

def collectAdGroupMetricsTask(*args, **kwargs):
    try:
        adgroup = AdGroup.objects.get(id=kwargs.get("adgroupID"))
        collectAdGroupMetrics(adgroup)
    except Exception as e:
        logger.error("Could not retreive AdGroup for collectAdGroupMetrics. " + str(e))
    return

def collectAdGroupMetrics(adgroup, currDate=None):
    Value1=function1_making_another_db_call()
    Value2=function2_making_another_db_call()
    adgroup.fieldname1 = Value1
    adgroup.fieldname2 = Value2    
    adgroup.save(update_fields=['fieldname1', 'fieldname2'])

Example of worker process having lots of threads.

whoosh(python 程序包)出现问题,它试图获取写锁并一直等待,导致创建了如此多 threads.Hence 从已安装应用程序列表中删除了 whoosh在姜戈。 还在 celery 中使用 maxtasksperchild 配置来防止内存不断增长。

  • 首先在pythonic虚拟环境中安装包gevent
  • 接下来对 运行 celery
  • 的命令进行了一些更改
  • 最后,我附加了参数--pool gevent。默认情况下,celery 使用 pool 'prefork',这应该有一些错误。
  • 选择 gevent 后,celery 的进程数降低到子(并发)进程数。