Celery 任务在没有 celerybeat 的情况下在任务成功后调用自身
Celery task calling itself after task succeeds without celerybeat
我想在当前任务完成后每隔 30 分钟偶尔调用一次 celery 任务,但有时任务花费的时间比预期的要长,因为任务是从远程服务器下载文件。所以我不想使用 celeryBeat。此外,使用自我。重试仅适用于我想发生错误的时间。这是我的任务:
@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(self):
my_file = session.get('example.com/hello.mp4')
if my_file.status_code == requests.codes["OK"]:
open("hello.mp4", "wb").write(my_file.content)
else:
self.retry()
更新:
好吧,我将结构更改为:
@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(url):
my_file = session.get(url, name)
if my_file.status_code == requests.codes["OK"]:
open(name, "wb").write(my_file.content)
else:
self.retry()
@shared_task(name="download_all", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_all(self):
my_list = [...] # bunch of urls with names
jobs = []
for name, url in my_list:
jobs.append(download_big.si(url, name))
group(jobs)()
所以在这种情况下,我必须调用 download_all 方法而不是 download_big,这样我就可以并行下载文件,当所有组任务完成后,它需要再次调用自己30 分钟后。
您可以尝试使用 chord,它将 运行 一组任务,当它们完成时,将 运行 一个可用于重新安排的回调。
例如
from celery import chord
@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(url):
my_file = session.get(url, name)
if my_file.status_code == requests.codes["OK"]:
open(name, "wb").write(my_file.content)
else:
self.retry()
@shared_task(name="download_all", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_all(self):
my_list = [...] # bunch of urls with names
jobs = []
for name, url in my_list:
jobs.append(download_big.si(url, name))
# Run the group and reschedule once all tasks complete
chord(jobs)(download_all.apply_async(countdown=1800))
我想在当前任务完成后每隔 30 分钟偶尔调用一次 celery 任务,但有时任务花费的时间比预期的要长,因为任务是从远程服务器下载文件。所以我不想使用 celeryBeat。此外,使用自我。重试仅适用于我想发生错误的时间。这是我的任务:
@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(self):
my_file = session.get('example.com/hello.mp4')
if my_file.status_code == requests.codes["OK"]:
open("hello.mp4", "wb").write(my_file.content)
else:
self.retry()
更新:
好吧,我将结构更改为:
@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(url):
my_file = session.get(url, name)
if my_file.status_code == requests.codes["OK"]:
open(name, "wb").write(my_file.content)
else:
self.retry()
@shared_task(name="download_all", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_all(self):
my_list = [...] # bunch of urls with names
jobs = []
for name, url in my_list:
jobs.append(download_big.si(url, name))
group(jobs)()
所以在这种情况下,我必须调用 download_all 方法而不是 download_big,这样我就可以并行下载文件,当所有组任务完成后,它需要再次调用自己30 分钟后。
您可以尝试使用 chord,它将 运行 一组任务,当它们完成时,将 运行 一个可用于重新安排的回调。
例如
from celery import chord
@shared_task(name="download_big", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_big(url):
my_file = session.get(url, name)
if my_file.status_code == requests.codes["OK"]:
open(name, "wb").write(my_file.content)
else:
self.retry()
@shared_task(name="download_all", bind=True, acks_late=true, autoretry_for=(Exception, requests.exceptiosn.RequestException), retry_kwargs={"max_retries": 4, "countdown": 3}):
def download_all(self):
my_list = [...] # bunch of urls with names
jobs = []
for name, url in my_list:
jobs.append(download_big.si(url, name))
# Run the group and reschedule once all tasks complete
chord(jobs)(download_all.apply_async(countdown=1800))