芹菜共享数据
Celery shared data
我已经使用 Django 通道在 Django 应用程序中实现了 websocket,现在前端通过 websocket 发送一些数据,我希望当前的 运行 celery 任务能够读取它。我尝试创建共享内存静态对象,但无法正常工作。
SimulationInputs.add(simulation_id=simulation.id, init_data=init_inputs)
return InteractiveSimulationTask.delay_or_fail(
simulation_id=simulation.id
)
class SimulationData:
data = ''
class SimulationInputs:
data = None
@classmethod
def init_manager(cls, manager):
manager = Manager()
cls.data = manager.dict()
@classmethod
def add(cls, simulation_id, init_data):
cls.data[simulation_id] = init_data
@classmethod
def write(cls, simulation_id, simulation_data):
if cls.data.get(simulation_id):
cls.data[simulation_id] = simulation_data
@classmethod
def read(cls, simulation_id, simulation_data):
simulation_data.data = cls.data.get(simulation_id)
# manage.y
if __name__ == "__main__":
SimulationInputs.init_manager()
class InteractiveSimulationTask(JobtasticTask):
def calculate_result(self, simulation_id, **kwargs):
while True:
SimulationInputs.read(simulation_id=self.simulation.id, simulation_data=simulation_data)
任务总是抛出异常cls.data.get(simulation_id): NoneObjectType has no method get
我需要在 celery 任务和主进程之间共享数据。
有什么提示吗?
由于您使用的是芹菜,您可能有 redis
或其他一些可用的内存存储。考虑将其用作间接层,即 read
和 write
方法使用 simulation_id
作为 simulation data
的键
我认为您遇到的问题是由于 python class
的生命周期造成的。在 init_manager
中,当您分配给 cls.data
时,您将覆盖 class
的 属性,而不是实例的 属性。正如错误消息所证明的那样,这并没有按照您的意愿进行:cls.data 将变为 None
。
我想你想要的是 "Singleton Pattern"。您希望只有一个 SimulationInputs
对象可以 read/write 每个 ID 的数据。此讨论可以帮助您 implementing a singleton in python
我得出的结论是Django 和celery 不应该共享内存,因为它们在diff 上。 process,它们是不同的程序,所以它们应该通过套接字或消息系统进行通信。我通过使用 redis Pub/Sub https://redis.io/topics/pubsub.
解决了我的问题
我已经使用 Django 通道在 Django 应用程序中实现了 websocket,现在前端通过 websocket 发送一些数据,我希望当前的 运行 celery 任务能够读取它。我尝试创建共享内存静态对象,但无法正常工作。
SimulationInputs.add(simulation_id=simulation.id, init_data=init_inputs)
return InteractiveSimulationTask.delay_or_fail(
simulation_id=simulation.id
)
class SimulationData:
data = ''
class SimulationInputs:
data = None
@classmethod
def init_manager(cls, manager):
manager = Manager()
cls.data = manager.dict()
@classmethod
def add(cls, simulation_id, init_data):
cls.data[simulation_id] = init_data
@classmethod
def write(cls, simulation_id, simulation_data):
if cls.data.get(simulation_id):
cls.data[simulation_id] = simulation_data
@classmethod
def read(cls, simulation_id, simulation_data):
simulation_data.data = cls.data.get(simulation_id)
# manage.y
if __name__ == "__main__":
SimulationInputs.init_manager()
class InteractiveSimulationTask(JobtasticTask):
def calculate_result(self, simulation_id, **kwargs):
while True:
SimulationInputs.read(simulation_id=self.simulation.id, simulation_data=simulation_data)
任务总是抛出异常cls.data.get(simulation_id): NoneObjectType has no method get
我需要在 celery 任务和主进程之间共享数据。
有什么提示吗?
由于您使用的是芹菜,您可能有 redis
或其他一些可用的内存存储。考虑将其用作间接层,即 read
和 write
方法使用 simulation_id
作为 simulation data
我认为您遇到的问题是由于 python class
的生命周期造成的。在 init_manager
中,当您分配给 cls.data
时,您将覆盖 class
的 属性,而不是实例的 属性。正如错误消息所证明的那样,这并没有按照您的意愿进行:cls.data 将变为 None
。
我想你想要的是 "Singleton Pattern"。您希望只有一个 SimulationInputs
对象可以 read/write 每个 ID 的数据。此讨论可以帮助您 implementing a singleton in python
我得出的结论是Django 和celery 不应该共享内存,因为它们在diff 上。 process,它们是不同的程序,所以它们应该通过套接字或消息系统进行通信。我通过使用 redis Pub/Sub https://redis.io/topics/pubsub.
解决了我的问题