Celery 在将我的查询集 obj 作为参数传递时引发错误
Celery raise error while passing my queryset obj as parameter
我试图执行一个周期性任务,所以我将 celery 与 Django 1.8 和 Django Rest Framework 以及 Postgres 一起用作数据库。当我尝试将我的 obj 发送到任务时,我得到 TypeError: foreign_model_obj is not JSON serializable
。如何将我的查询集对象传递给我的任务。
views.py :
class MyModelCreateApiView(generics.CreateAPIView):
queryset = MyModel.objects.all()
serializer_class = MyModelSerializer
authentication_classes = (TokenAuthentication,)
def create(self, request, *args, **kwargs):
data = dict()
data['foreign_model_id'] = kwargs['pk']
foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])
obj = MyModel.objects.create(**data)
result = serialize_query(MyModel, {"id": obj.id})
local_time = foreign_model_obj.time
my_celery_task.apply_async([foreign_model_obj], eta=local_time)
return Response(result)
tasks.py :
@celery_app.task(name="my_celery_task")
def my_first_celery_task(mymodel_obj):
# ... updating obj attributes
mymodel_obj.save()
您只需发送实例的 id
并在任务中检索对象。
传递实例是一个不好的做法,因为它可以同时更改,特别是你正在执行你的任务,就像它看起来的那样。
views.py :
class MyModelCreateApiView(generics.CreateAPIView):
queryset = MyModel.objects.all()
serializer_class = MyModelSerializer
authentication_classes = (TokenAuthentication,)
def create(self, request, *args, **kwargs):
data = dict()
data['foreign_model_id'] = kwargs['pk']
foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])
obj = MyModel.objects.create(**data)
result = serialize_query(MyModel, {"id": obj.id})
local_time = foreign_model_obj.time
my_celery_task.apply_async([foreign_model_obj.id], eta=local_time) # send only the obj id
return Response(result)
tasks.py :
@celery_app.task(name="my_celery_task")
def my_celery_task(mymodel_obj_id):
my_model_obj = MyModel.objects.get(id=mymodel_obj_id) # retrieve your object here
# ... updating obj attributes
mymodel_obj.save()
序列化方式可以改成pickle,但不建议将queryset作为参数传递。摘自 Celery 文档:
另一个问题是 Django 模型对象。它们不应该作为任务的参数传递。当任务是 运行 时,从数据库中 re-fetch 对象几乎总是更好,因为使用旧数据可能会导致竞争条件。
http://docs.celeryproject.org/en/latest/userguide/tasks.html
实际上,恕我直言,最好的方法是获取查询集的可 picklable 组件,然后在任务中重新生成查询集 (https://docs.djangoproject.com/en/1.9/ref/models/querysets/):
import pickle
query = pickle.loads(s) # Assuming 's' is the pickled string.
qs = MyModel.objects.filter(a__in=[1,2,3]) # whatever you want here...
querystr = pickle.dumps(qs.query) # pickle the queryset
my_celery_task.apply_async(querystr, eta=local_time) # send only the string...
任务:
@celery_app.task(name="my_celery_task")
def my_celery_task(querystr):
my_model_objs = MyModel.objects.all()
my_model_objs.query = pickle.loads(querystr) # Restore the queryset
# ... updating obj attributes
item=my_model_objs[0]
我认为这是最好的方法,因为查询将在任务中执行(可能是第一次),防止各种计时问题,它不需要在调用者中执行(所以不会加倍询问)。
我试图执行一个周期性任务,所以我将 celery 与 Django 1.8 和 Django Rest Framework 以及 Postgres 一起用作数据库。当我尝试将我的 obj 发送到任务时,我得到 TypeError: foreign_model_obj is not JSON serializable
。如何将我的查询集对象传递给我的任务。
views.py :
class MyModelCreateApiView(generics.CreateAPIView):
queryset = MyModel.objects.all()
serializer_class = MyModelSerializer
authentication_classes = (TokenAuthentication,)
def create(self, request, *args, **kwargs):
data = dict()
data['foreign_model_id'] = kwargs['pk']
foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])
obj = MyModel.objects.create(**data)
result = serialize_query(MyModel, {"id": obj.id})
local_time = foreign_model_obj.time
my_celery_task.apply_async([foreign_model_obj], eta=local_time)
return Response(result)
tasks.py :
@celery_app.task(name="my_celery_task")
def my_first_celery_task(mymodel_obj):
# ... updating obj attributes
mymodel_obj.save()
您只需发送实例的 id
并在任务中检索对象。
传递实例是一个不好的做法,因为它可以同时更改,特别是你正在执行你的任务,就像它看起来的那样。
views.py :
class MyModelCreateApiView(generics.CreateAPIView):
queryset = MyModel.objects.all()
serializer_class = MyModelSerializer
authentication_classes = (TokenAuthentication,)
def create(self, request, *args, **kwargs):
data = dict()
data['foreign_model_id'] = kwargs['pk']
foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])
obj = MyModel.objects.create(**data)
result = serialize_query(MyModel, {"id": obj.id})
local_time = foreign_model_obj.time
my_celery_task.apply_async([foreign_model_obj.id], eta=local_time) # send only the obj id
return Response(result)
tasks.py :
@celery_app.task(name="my_celery_task")
def my_celery_task(mymodel_obj_id):
my_model_obj = MyModel.objects.get(id=mymodel_obj_id) # retrieve your object here
# ... updating obj attributes
mymodel_obj.save()
序列化方式可以改成pickle,但不建议将queryset作为参数传递。摘自 Celery 文档:
另一个问题是 Django 模型对象。它们不应该作为任务的参数传递。当任务是 运行 时,从数据库中 re-fetch 对象几乎总是更好,因为使用旧数据可能会导致竞争条件。
http://docs.celeryproject.org/en/latest/userguide/tasks.html
实际上,恕我直言,最好的方法是获取查询集的可 picklable 组件,然后在任务中重新生成查询集 (https://docs.djangoproject.com/en/1.9/ref/models/querysets/):
import pickle
query = pickle.loads(s) # Assuming 's' is the pickled string.
qs = MyModel.objects.filter(a__in=[1,2,3]) # whatever you want here...
querystr = pickle.dumps(qs.query) # pickle the queryset
my_celery_task.apply_async(querystr, eta=local_time) # send only the string...
任务:
@celery_app.task(name="my_celery_task")
def my_celery_task(querystr):
my_model_objs = MyModel.objects.all()
my_model_objs.query = pickle.loads(querystr) # Restore the queryset
# ... updating obj attributes
item=my_model_objs[0]
我认为这是最好的方法,因为查询将在任务中执行(可能是第一次),防止各种计时问题,它不需要在调用者中执行(所以不会加倍询问)。