在使用 django_celery_beat 设置的 Django 视图和使用 Redis 的 Cachine 中使用 Celery 周期性任务输出

Use Celery periodic tasks output in Django views set up with django_celery_beat and Cachine with Redis

我正在尝试使用 Celery 在我的一个模型上执行一个相当耗时的算法。 目前在我的 home.tasks.py 我有:

@shared_task(bind=True)
def get_hot_posts():
    return Post.objects.get_hot()
    
    
@shared_task(bind=True)
def get_top_posts():
    pass

在我的 Post 对象模型管理器中我有:

def get_hot(self):
    
    qs = (
        self.get_queryset()
        .select_related("author")
    )
    
    qs_list = list(qs)
    sorted_post = sorted(qs_list, key=lambda p: p.hot(), reverse=True)
    
    return sorted_post

其中return是热帖列表对象

我使用了django_celery_beat来设置周期性任务。我已经在 settings.py

中配置了
CELERY_BEAT_SCHEDULE = {
    'update-hot-posts': {
        'task':'get_hot_posts',
        'schedule': 3600.0
    },
    'update-top-posts': {
        'task':'get_top_posts',
        'schedule': 86400
    }
}

如果我可以在 Celery 任务中对我的模型执行任何功能,我不会这样做,但我的目的是每 1 小时计算一次热门帖子,然后在我的一个视图中简单地使用它。我怎样才能做到这一点,我无法找到如何获取该任务的输出并在我的视图中使用它以便在我的模板中呈现它。

提前致谢!

编辑

我正在缓存结果:

settings.py:

CACHES = {
    "default": {
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/1",
        "OPTIONS": {
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
            "IGNORE_EXCEPTIONS": True,
            
        }
    }
}

CACHE_TTL = getattr(设置, 'CACHE_TTL', DEFAULT_TIMEOUT)

@shared_task(bind=True)
def get_hot_posts():
    hot_posts = Post.objects.get_hot()
    cache.set("hot_posts", hot_posts, timeout=CACHE_TTL)

但是,当访问我视图中的对象时 return None,我的任务似乎没有工作。

@login_required
def hot_posts(request):
    posts = cache.get("hot_posts")
    context = { 'posts':posts, 'hot_active':'-active'}
    return render(request, 'home/homepage/home.html', context)

如何检查我的任务是否正常 运行ning?它实际上正在运行并缓存查询集功能。

编辑settings.py中的配置:

BROKER_URL = 'redis://localhost:6379'
BROKER_TRANSPORT = 'redis'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULE = {
    'update-hot-posts': {
        'task':'get_hot_posts',
        'schedule': 3600.0
    },
    'update-top-posts': {
        'task':'get_top_posts',
        'schedule': 86400.0
    },
    'tester': {
        'task':'tester',
        'schedule': 60.0
    }
}

当我进入我的视图和cache.get returns None 时,我没有看到结果,我认为我的任务不是 运行ning 但我不能找出原因。

这就是当我 运行 我的工人时发生的事情:

celery -A register worker -E --loglevel=info

 -------------- celery@apples-MacBook-Pro-2.local v4.4.6 (cliffs)
--- ***** ----- 
-- ******* ---- Darwin-16.7.0-x86_64-i386-64bit 2020-07-06 01:46:36
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         register:0x10f3da050
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost:6379/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . home.tasks.get_hot_posts
  . home.tasks.get_top_posts
  . home.tasks.tester

[2020-07-06 01:46:38,449: INFO/MainProcess] Connected to redis://localhost:6379//
[2020-07-06 01:46:38,500: INFO/MainProcess] mingle: searching for neighbors
[2020-07-06 01:46:39,592: INFO/MainProcess] mingle: all alone
[2020-07-06 01:46:39,650: INFO/MainProcess] celery@apples-MacBook-Pro-2.local ready.

也用于启动节拍我使用:

  celery -A register beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

我的建议是改变模型并使其可标记。也许是这样的:https://django-taggit.readthedocs.io/

完成后,您可以修改计算热门帖子的 celery 作业。计算出新的热门帖子后,您可以从所有现有帖子中删除所有“热门”标签,然后用“热门”标签标记新的热门帖子。

然后您的视图代码可以简单地过滤带有热门标签的帖子。

编辑

如果您想确保您的代码确实在执行,可以使用一些扩展来实现。例如,django-celery-results 后端将在数据库中存储您的 @shared_task returns(通常是 JSON 如果这是您的消息编码)的任何数据以及时间戳,甚至可能是输入参数.这样您就可以看到 if/that 您的任务正在 运行ning 中。

https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#django-celery-results-using-the-django-orm-cache-as-a-result-backend

您也可以考虑 django-celery-beat 以确保您有一个很好的视觉方式来通过 django admin 查看作业计划

https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#django-celery-beat-database-backed-periodic-tasks-with-admin-interface

编辑 2

如果您要使用数据库调度程序(强烈推荐!),那么您需要登录到管理员并在您想要的计划中添加您的任务。

https://pinoylearnpython.com/wp-content/uploads/2019/04/Django-Celery-Beat-on-Admin-Site-Pinoy-Learn-Python-1024x718.jpg

编辑 3

在你的settings.py

CELERY_BEAT_SCHEDULE = {
    'update-hot-posts': {
        'task':'get_hot_posts',
        'schedule': 3600.0
    },
    'update-top-posts': {
        'task':'get_top_posts',
        'schedule': 86400.0
    },
    'tester': {
        'task':'tester',
        'schedule': 60.0
    }
}

那里的第三个任务称为 tester,它应该每 60 秒 运行。我在你的任务中没有看到这一点。因为你试图安排一个没有在任何地方定义的任务,因为 @shared_task 芹菜变得混乱并给你关于 tester.

的错误消息