django.core.cache.lock 在 Celery 任务中不起作用

django.core.cache.lock doesn't work in a Celery task

我有以下 (tasks.py):

from celery import shared_task
from django.core.cache import cache

@shared_task
def test(param1: str) -> None:
    with cache.lock("lock-1"):
        print("hello")

当我执行 test.delay() 时,没有打印任何内容,这让我相信 cache.lock("lock-1") 有问题。

我使用 Redis 作为我的缓存和 Celery 后端,这是在 settings.py 中配置的。

这里有什么问题?如果django.core.cache不能用作锁定机制(确保一次只有一个test运行,可以用什么代替?)谢谢!

找出原因 - 原来我没有阅读文档。

添加这个有效:

# https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html
@contextmanager
def redis_lock(lock_id):
    timeout_at = time.monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    # Second value is arbitrary
    status = cache.add(lock_id, "lock", timeout=LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if time.monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

那么,锁语句就变成了

with redis_lock("lock-1"):