Python 芹菜配置返回交换错误

Python celery configuration returning exchange errors

使用以下整体配置来设置 celery,这里要注意的是我正在尝试添加更多队列,这就是 运行 到这个问题中。我是芹菜的新手,所以请忽略错误,如果我这边需要任何细节,请告诉我

使用命令:

celery --app=my_app worker -Q a_queue,b_queue,c_queue,d_queue,e_queue,f_queue,g_queue,h_queue,i_queue,j_queue,k_queue,l_queue,m_queue,n_queue,o_queue,p_queue,q_queue,r_queue,s_queue --loglevel=INFO -Ofair

BROKER_URL = f'redis://{REDIS_CONFIGURATION}:6379/{DB_SETTINGS}'

app.conf.task_routes = ([
    # SABRE queues
    ('x.a', {'queue': 'a_queue'}),
    ('x.b', {'queue': 'b_queue'}),
    ('x.c', {'queue': 'c_queue'}),

    ('xa.a', {'queue': 'd_queue'}),
    ('xb.b', {'queue': 'e_queue'}),
    ('xb.c', {'queue': 'f_queue'}),

    ('xc.a', {'queue': 'g_queue'}),
    ('xd.b', {'queue': 'h_queue'}),
    ('xe.c', {'queue': 'i_queue'}),

    ('xf.a'}, {'queue': 'j_queue'}),
    ('xg.b', {'queue': 'k_queue'}),
    ('xh.c', {'queue': 'l_queue'}),

    ('xi.a', {'queue': 'm_queue'}
    ('xj.b', {'queue': 'n_queue'}),
    ('xk.c', {'queue': 'o_queue'}),

    ('xl.a', {'queue': 'p_queue'}),
    ('xm.b', {'queue': 'q_queue'}),
    ('xm.c', {'queue': 'r_queue'}),

    ('providers.custom_tasks.*', {'queue': 's_queue'})
],)

app.conf.broker_url = BROKER_URL

收到以下错误

   File "/usr/local/lib/python3.6/site-packages/celery/worker/worker.py", line 203, in start
     self.blueprint.start(self)
   File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
     step.start(parent)
   File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 370, in start
     return self.obj.start()
   File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 318, in start
     blueprint.start(self)
   File "/usr/local/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
     step.start(parent)
   File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/tasks.py", line 37, in start
     c.connection, on_decode_error=c.on_decode_error,
   File "/usr/local/lib/python3.6/site-packages/celery/app/amqp.py", line 302, in TaskConsumer
     **kw
   File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 386, in __init__
     self.revive(self.channel)
   File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 407, in revive
     self.declare()
   File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 420, in declare
     queue.declare()
   File "/usr/local/lib/python3.6/site-packages/kombu/entity.py", line 604, in declare
     self._create_exchange(nowait=nowait, channel=channel)
   File "/usr/local/lib/python3.6/site-packages/kombu/entity.py", line 611, in _create_exchange
     self.exchange.declare(nowait=nowait, channel=channel)
   File "/usr/local/lib/python3.6/site-packages/kombu/entity.py", line 182, in declare
     return (channel or self.channel).exchange_declare(
   File "/usr/local/lib/python3.6/site-packages/kombu/abstract.py", line 119, in channel
     type(self).__name__))
kombu.exceptions.NotBoundError: Can't call method on Exchange not bound to a channel

使用REDIS作为代理并接收交换错误。

这可能与 kombu 中已知的 issue 有关。如果您使用的是旧问题,请尝试将 kombu 更新到 4.1.0 或更高版本,这个问题可能会消失。