为什么 Celery 周期性任务只触发一次函数

Why does Celery periodic tasks fire a function only once

我构建了一个小型网络抓取器功能,可以从网络上获取一些数据并将其填充到我的数据库中,效果很好。 现在我想使用 Celery 周期性任务每 20 秒定期触发这个函数。 我浏览了文档,一切似乎都为开发而设置(使用 redis 作为代理)。

这是我在 project/stocksapp 中的 tasks.py 文件,其中我定期触发的函数是:

# Celery imports

from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery.utils.log import get_task_logger
from datetime import timedelta

logger = get_task_logger(__name__)

# periodic functions

@periodic_task(
    run_every=(timedelta(seconds=20)),
    name="getStocksDataDax",
    ignore_result=True
)
def getStocksDataDax():
    print("fired")

现在当我启动 worker 时,该函数似乎被触发一次且仅一次(数据库被填充)。但在那之后,该函数不再被触发,尽管控制台提示它:

C:\Users\Jonas\Desktop\CFD\CFD>celery -A CFD beat -l info
celery beat v4.4.2 (cliffs) is starting.
__    -    ... __   -        _
LocalTime -> 2020-05-15 23:06:29
Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2020-05-15 23:06:29,990: INFO/MainProcess] beat: Starting...

[2020-05-15 23:06:30,024: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:06:50,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:07:10,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:07:30,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:07:50,015: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:08:10,016: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:08:30,016: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)
[2020-05-15 23:08:50,016: INFO/MainProcess] Scheduler: Sending due task getStocksDataDax (getStocksDataDax)

project/project/celery.py

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CFD.settings')

app = Celery('CFD',
             broker='redis://localhost:6379/0',
             backend='amqp://',
             include=['CFD.tasks'])

app.conf.broker_transport_options = {'visibility_timeout': 3600}
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

函数本身总共运行大约 1 秒。

在此设置中,要让 worker/celery 按预期每 20 秒触发一次该功能,基本上哪里会出现问题?

celery -A CFD beat -l info 只启动 Celery 节拍进程。您应该有一个单独的 Celery 工作进程 - 在不同的终端 运行 类似 celery -A CFD worker -c 8 -O fair -l info.