如何 运行 Celery 只调度多次,直到再次调用任务?
How to run Celery schedule only a number of times and quite until the task is called again?
我正在使用 django + celery 任务调度程序 运行 每月安排一次任务。但我只想让这个任务 运行 只持续几个月,例如 3 个月或 6 个月或 9 个月..
我如何才能阻止 worker 执行进一步的任务,然后在再次调用该任务时重新启动?
这是我的任务
@task(name="add_profit")
def count():
portfolios = Portfolio.objects.filter(status='ACTIVE')
if portfolios.exists():
for portfolio in portfolios:
user = portfolio.user
#calculates portfolio profit
amount = portfolio.amount * 0.1
if portfolio.duration == '3 Months':
PortfolioProfit.objects.create(user=user, amount=amount)
user.useraccount.account_balance += amount
user.useraccount.save()
这是我的芹菜任务时间表
app.conf.beat_schedule = {
# Executes 1st day of every Month.
'every-minute': {
'task': 'add_profit',
# crontab can be changes to change Schedule
# http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
'schedule': crontab(0, 0, day_of_month = 1),
},
}
选项 1:
您每个月都使用 celery crontab 条目开始任务,并在任务中添加测试:如果当前日期不在特定范围内,您就退出处理。
这有一点开销,但每月一次的视线开销应该是可以接受的。
@task(name="add_profit")
def count():
today = datetime.datetime.now()
if today > datetime.datetime(2020,1, 1):
return
# the remaining part of your task follows here
选项 2:
您执行一次(有或没有 for 循环)一个小代码片段,它为相关月份安排任务。
http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
在下面的示例中,我只是展示了为接下来的三天安排任务的想法:
today = datetime.utcnow()
for delta in range(1, 4):
task.apply_async(args=[arg1, arg2, ...), eta=today + timedelta(days=delta))
这里的任务将按照您想要的频率执行。
但是,如果您将服务器迁移到不同的位置,如果您重置 rabbitmq(或您拥有的任何代理),要安排的任务将会丢失/消失
做出新的回答。我想保留上一个记录,因为它可能会帮助其他遇到不同问题的人。对于您的问题,我认为您只需每月无条件地调用 celery 任务即可。
我认为最简单的方法是更改存储在数据库中的数据,这样您就可以识别活跃的投资组合,存储第一次的日期和最后一次的日期,您会自动添加一个值。
现在,每月 celery 任务将识别 first_date <= 今天 <= last_date 的用户,如果条件为真,则添加该值。
好的。非常感谢@gelonida 的意见。我已经能够实现我的目标。
我创建了到期日(3 个月、6 个月等,取决于用户的选择)的数据库输入,它是使用投资组合创建日期和未来某个时间计算的,如下所示:
portfolio.expiry_date = timezone.now() + timedelta(days = 93) # for 3months
然后在我的任务中我做了这个
from django.utils import timezone
@task(name="add_profit")
def count():
portfolios = Portfolio.objects.filter(status='ACTIVE')
current_datetime = timezone.now()
if portfolios.exists():
for portfolio in portfolios:
if current_datetime > portfolio.expiry_date:
portfolio.status = 'COMPLETED'
portfolio.save()
return
else:
user = portfolio.user
amount = portfolio.amount * 0.1
PortfolioProfit.objects.create(user=user, amount=amount)
user.useraccount.account_balance += amount
user.useraccount.save()
这对我来说非常有效。
我正在使用 django + celery 任务调度程序 运行 每月安排一次任务。但我只想让这个任务 运行 只持续几个月,例如 3 个月或 6 个月或 9 个月..
我如何才能阻止 worker 执行进一步的任务,然后在再次调用该任务时重新启动?
这是我的任务
@task(name="add_profit")
def count():
portfolios = Portfolio.objects.filter(status='ACTIVE')
if portfolios.exists():
for portfolio in portfolios:
user = portfolio.user
#calculates portfolio profit
amount = portfolio.amount * 0.1
if portfolio.duration == '3 Months':
PortfolioProfit.objects.create(user=user, amount=amount)
user.useraccount.account_balance += amount
user.useraccount.save()
这是我的芹菜任务时间表
app.conf.beat_schedule = {
# Executes 1st day of every Month.
'every-minute': {
'task': 'add_profit',
# crontab can be changes to change Schedule
# http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
'schedule': crontab(0, 0, day_of_month = 1),
},
}
选项 1: 您每个月都使用 celery crontab 条目开始任务,并在任务中添加测试:如果当前日期不在特定范围内,您就退出处理。
这有一点开销,但每月一次的视线开销应该是可以接受的。
@task(name="add_profit")
def count():
today = datetime.datetime.now()
if today > datetime.datetime(2020,1, 1):
return
# the remaining part of your task follows here
选项 2: 您执行一次(有或没有 for 循环)一个小代码片段,它为相关月份安排任务。
http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
在下面的示例中,我只是展示了为接下来的三天安排任务的想法:
today = datetime.utcnow()
for delta in range(1, 4):
task.apply_async(args=[arg1, arg2, ...), eta=today + timedelta(days=delta))
这里的任务将按照您想要的频率执行。
但是,如果您将服务器迁移到不同的位置,如果您重置 rabbitmq(或您拥有的任何代理),要安排的任务将会丢失/消失
做出新的回答。我想保留上一个记录,因为它可能会帮助其他遇到不同问题的人。对于您的问题,我认为您只需每月无条件地调用 celery 任务即可。
我认为最简单的方法是更改存储在数据库中的数据,这样您就可以识别活跃的投资组合,存储第一次的日期和最后一次的日期,您会自动添加一个值。
现在,每月 celery 任务将识别 first_date <= 今天 <= last_date 的用户,如果条件为真,则添加该值。
好的。非常感谢@gelonida 的意见。我已经能够实现我的目标。
我创建了到期日(3 个月、6 个月等,取决于用户的选择)的数据库输入,它是使用投资组合创建日期和未来某个时间计算的,如下所示:
portfolio.expiry_date = timezone.now() + timedelta(days = 93) # for 3months
然后在我的任务中我做了这个
from django.utils import timezone
@task(name="add_profit")
def count():
portfolios = Portfolio.objects.filter(status='ACTIVE')
current_datetime = timezone.now()
if portfolios.exists():
for portfolio in portfolios:
if current_datetime > portfolio.expiry_date:
portfolio.status = 'COMPLETED'
portfolio.save()
return
else:
user = portfolio.user
amount = portfolio.amount * 0.1
PortfolioProfit.objects.create(user=user, amount=amount)
user.useraccount.account_balance += amount
user.useraccount.save()
这对我来说非常有效。