无法让 Celery/RabbitMQ 到 运行 我在 Django 中的共享任务

Cant get Celery/RabbitMQ to run my shared task in Django

我有一个已经部署并在生产中运行的应用程序,但这都是由其他人完成的。我现在正在尝试制作环境的本地版本,但我似乎无法让我的本地 Celery/RabbitMQ 真正 运行 完成任务。

该应用程序非常大,因此我不会在这里 post 全部尝试,但我从调试中获得了一些可能有用的线索。一个是这个。当我运行以下功能:

task_id = celery_send_playbook_msg_util.apply_async([brand_user.id, pb['id'], sequence_id, '', False, False, message_id,
                                                   pb['playbook'], event_type == constants.event_types['Abandoned']],
                                                  eta=delivery_datetime, queue='high_priority', priority=8)

print("Celery Task ID: " + str(task_id))

我确实在 return 中获得了 UUID 样式 task_id。这向我表明 Celery Broker 正在 运行ning。我还为 celery broker 尝试了以下配置选项(到目前为止 none 有效)

#BROKER_URL = 'amqp://test:test@192.168.33.10:5672//'
#BROKER_URL = 'amqp://test:test@localhost:5672//'
#BROKER_URL = 'amqp://test:test@localhost//'
BROKER_URL = 'amqp://test:test@192.168.33.10//'

其他线索:

我突然想到,查看我用来启动 worker 的命令的输出可能会有所帮助,所以这里是:

celery -A Python worker --loglevel=debug

 -------------- celery@vagrant v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Linux-4.15.0-29-generic-x86_64-with-Ubuntu-18.04-bionic 2019-08-02 20:42:29
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         Python:0x7fa1367f0650
- ** ---------- .> transport:   amqp://test:**@192.168.33.10:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . Python.celery.debug_task
  . Python.celery.send_messages_daily_unreadcount
  . Sensus.tasks.bulk_manual_optin_from_csv_task
  . Sensus.tasks.celery_csv_upload_send_message
  . Sensus.tasks.celery_send_messages_daily_util
  . Sensus.tasks.celery_send_msg_util
  . Sensus.tasks.celery_send_payment_message
  . Sensus.tasks.celery_send_playbook_msg_util
  . Sensus.tasks.consolidate_messages_and_analyze_sentiment
  . Sensus.tasks.scheduled_broadcast_task
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

[2019-08-02 20:42:29,590: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@vagrant.vm', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,592: INFO/MainProcess] Connected to amqp://test:**@192.168.33.10:5672//
[2019-08-02 20:42:29,601: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@vagrant.vm', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,603: INFO/MainProcess] mingle: searching for neighbors
[2019-08-02 20:42:29,604: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:29,606: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:29,621: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@vagrant.vm', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,623: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:29,624: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,630: INFO/MainProcess] mingle: all alone
[2019-08-02 20:42:30,636: DEBUG/MainProcess] using channel_id: 2
[2019-08-02 20:42:30,637: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,641: DEBUG/MainProcess] using channel_id: 3
[2019-08-02 20:42:30,642: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,645: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:30,646: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,649: WARNING/MainProcess] /home/vagrant/.local/lib/python2.7/site-packages/celery/fixups/django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2019-08-02 20:42:30,650: INFO/MainProcess] celery@vagrant ready.
[2019-08-02 20:42:30,651: DEBUG/MainProcess] basic.qos: prefetch_count->4
[2019-08-02 20:42:50,649: DEBUG/MainProcess] heartbeat_tick : for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:42:50,651: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: None/None, now - 28/58, monotonic - 11221.8028604, last_heartbeat_sent - 11221.8028469, heartbeat int. - 60 for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:43:10,655: DEBUG/MainProcess] heartbeat_tick : for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:43:10,656: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/58, now - 28/88, monotonic - 11241.8086932, last_heartbeat_sent - 11221.8028469, heartbeat int. - 60 for connection bcbf34af62f3488c8bbcee3f18b42621

所以我想通了。方法如下。

首先我使用以下命令查看队列中的内容:

sudo rabbitmqctl list_queues

这给了我以下输出:

Listing queues
d68c3a7d-ed35-3c79-b571-0d01ccda84ad    1
2753309c-9f03-399c-871d-5b4ffcbea462    0
high_priority   23
8ce8d7e0-0081-3937-80fb-ff238be8f410    1
4ce2ecce-6954-3c07-857a-4221fe613e72    0
celery  0
celery@vagrant.celery.pidbox    0
celeryev.1a7429e0-48b2-4ead-925c-42ee1855247d   0
8127f8e8-073c-3972-a563-829ab207b964    0

我很好奇 'high_priority' 旁边的 23 是什么,我注意到每次我尝试一些应该放在队列中的东西时,它一直在上升。事实证明,在我的应用程序中,当我们将某些东西放入队列时,我们不只是将它放入通用队列,而是将它放入我们命名为 'high_priority' 的队列中。因为我没有注意到这一点,所以我开始让我的工作人员查看一般队列。为了解决这个问题,我在 worker 调用中添加了一个 -Q 选项,如下所示:

celery -A Python worker --loglevel=debug -Q high_priority

这解决了问题