如何从 celery 任务中 return django 视图?
how to return a django view from within a celery task?
我需要从 django celery 任务中 return 一个表单。我的任务是从以下 Django 视图调用的:
class MyView(CreateView):
model = MyModel
form_class = MyForm
success_url = '/create/form'
def form_valid(self, form):
form.save()
# I call my task in here
period.delay()
print("Loading task...")
return super(MyView, self).form_valid(form)
我的任务名称是 "period" 它将日期与 objective 进行比较以打开事件,而我的 IF 为真。我的 "event" 是用户必须确认存在的处方集。
我的任务:
from .views import MyAnotherView
# others imports...
"""
in my settings.py, I had to call tha task every minute:
CELERYBEAT_SCHEDULE = {
'add-periodic-events': {
'task': 'myapp.tasks.period',
'schedule': crontab(minute='*'),
}
}
"""
@shared_task(serializer='json')
def period():
event = MyModel.objects.get(id=1) # I limited my model to receive only one object
request = RequestFactory.get('/another/form')
view = MyAnotherView()
week_d = week_day(event.day) # day is a field of my model
event_d = event_day(week_d, event.hour) # hour is a field of my model
conf_d = presence_confirm(event.before_days, event.begin_hour, event_d) # before_days and begin_hour are fields of my model
utc_now = pytz.utc.localize(datetime.utcnow())
n = utc_now.astimezone(pytz.timezone('America/Recife'))
t_str = '{}-{}-{}'.format(n.year, n.month, n.day)
t_hour = ' {}:{}'.format(n.hour, n.minute)
today = t_str + t_hour
today = datetime.strptime(today, '%Y-%m-%d %H:%M')
if (today >= conf_d) and (today < event_d):
# HOW TO CALL MY FORMULARY???
print("Call my formulary")
view.setup(request)
else:
# another thing
我想在调用我的任务后显示的公式,当条件为真时,它将来自以下 django 模型:
class MyAnotherModel(models.Model):
OPTIONS = (
(True, 'Sim'),
(False, 'Não'),
)
player = models.OneToOneField(MyUserModel, primary_key=True, on_delete=models.CASCADE)
confirm = models.BooleanField(choices=OPTIONS, default=False)
modified = models.DateTimeField(auto_now_add=True)
简而言之,我希望在我的条件成立时出现我的处方集。因此,我曾尝试在我的任务中使用 RequestFactory.get 来捕获 URL 并调用视图 (MyAnotherView)。
# ...
request = RequestFactory.get('/another/form')
view = MyAnotherView() # View responsável em instanciar MyAnotherModel
# ...
if (today >= conf_d) and (today < event_d):
view.setup(request)
但是,我收到了 ImportError:无法导入名称 'MyAnotherView'。如果有人可以帮助我,我很感激!
我的问题是检查今天的日期是否在两个日期之间。如果是这样,我必须"show"一个表格。我用芹菜做的这个检查。我的问题是,只要今天的日期在启用表格的日期之间,我该怎么做。
我解决问题如下:
- 我的 celery 任务不在请求周期内 运行,因为它是异步的 - @dirkgroten - 我很快想到在我的 django 模型 (MyModel) 中创建一个新字段:
enable = models.BooleanField(default=False)
- 在我的任务中更新此字段:
# ...
if (today >= conf_d) and (today < event_d):
event.enable = True
event.save()
else:
event.enable = False
event.save()
- Return 此信息作为表单视图中模板中的上下文
# In MyAnotherView
model = MyAnotherModel
fields = ['player', 'confirm']
success_url = '/list/users'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
obj = MyModel.objects.get(id=1)
context['enable'] = obj.enable
return context
<!--This is the template that shows form-->
{% if enable %}
{{ form }}
{% else %}
<h2> Don't show <h2>
{% endif %}
如果有人认为有更好的方法,他们可以建议我,但我现在会有那个答案。
我需要从 django celery 任务中 return 一个表单。我的任务是从以下 Django 视图调用的:
class MyView(CreateView):
model = MyModel
form_class = MyForm
success_url = '/create/form'
def form_valid(self, form):
form.save()
# I call my task in here
period.delay()
print("Loading task...")
return super(MyView, self).form_valid(form)
我的任务名称是 "period" 它将日期与 objective 进行比较以打开事件,而我的 IF 为真。我的 "event" 是用户必须确认存在的处方集。
我的任务:
from .views import MyAnotherView
# others imports...
"""
in my settings.py, I had to call tha task every minute:
CELERYBEAT_SCHEDULE = {
'add-periodic-events': {
'task': 'myapp.tasks.period',
'schedule': crontab(minute='*'),
}
}
"""
@shared_task(serializer='json')
def period():
event = MyModel.objects.get(id=1) # I limited my model to receive only one object
request = RequestFactory.get('/another/form')
view = MyAnotherView()
week_d = week_day(event.day) # day is a field of my model
event_d = event_day(week_d, event.hour) # hour is a field of my model
conf_d = presence_confirm(event.before_days, event.begin_hour, event_d) # before_days and begin_hour are fields of my model
utc_now = pytz.utc.localize(datetime.utcnow())
n = utc_now.astimezone(pytz.timezone('America/Recife'))
t_str = '{}-{}-{}'.format(n.year, n.month, n.day)
t_hour = ' {}:{}'.format(n.hour, n.minute)
today = t_str + t_hour
today = datetime.strptime(today, '%Y-%m-%d %H:%M')
if (today >= conf_d) and (today < event_d):
# HOW TO CALL MY FORMULARY???
print("Call my formulary")
view.setup(request)
else:
# another thing
我想在调用我的任务后显示的公式,当条件为真时,它将来自以下 django 模型:
class MyAnotherModel(models.Model):
OPTIONS = (
(True, 'Sim'),
(False, 'Não'),
)
player = models.OneToOneField(MyUserModel, primary_key=True, on_delete=models.CASCADE)
confirm = models.BooleanField(choices=OPTIONS, default=False)
modified = models.DateTimeField(auto_now_add=True)
简而言之,我希望在我的条件成立时出现我的处方集。因此,我曾尝试在我的任务中使用 RequestFactory.get 来捕获 URL 并调用视图 (MyAnotherView)。
# ...
request = RequestFactory.get('/another/form')
view = MyAnotherView() # View responsável em instanciar MyAnotherModel
# ...
if (today >= conf_d) and (today < event_d):
view.setup(request)
但是,我收到了 ImportError:无法导入名称 'MyAnotherView'。如果有人可以帮助我,我很感激!
我的问题是检查今天的日期是否在两个日期之间。如果是这样,我必须"show"一个表格。我用芹菜做的这个检查。我的问题是,只要今天的日期在启用表格的日期之间,我该怎么做。 我解决问题如下:
- 我的 celery 任务不在请求周期内 运行,因为它是异步的 - @dirkgroten - 我很快想到在我的 django 模型 (MyModel) 中创建一个新字段:
enable = models.BooleanField(default=False)
- 在我的任务中更新此字段:
# ...
if (today >= conf_d) and (today < event_d):
event.enable = True
event.save()
else:
event.enable = False
event.save()
- Return 此信息作为表单视图中模板中的上下文
# In MyAnotherView
model = MyAnotherModel
fields = ['player', 'confirm']
success_url = '/list/users'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
obj = MyModel.objects.get(id=1)
context['enable'] = obj.enable
return context
<!--This is the template that shows form-->
{% if enable %}
{{ form }}
{% else %}
<h2> Don't show <h2>
{% endif %}
如果有人认为有更好的方法,他们可以建议我,但我现在会有那个答案。