芹菜共享数据

Celery shared data

我已经使用 Django 通道在 Django 应用程序中实现了 websocket,现在前端通过 websocket 发送一些数据,我希望当前的 运行 celery 任务能够读取它。我尝试创建共享内存静态对象,但无法正常工作。

   SimulationInputs.add(simulation_id=simulation.id, init_data=init_inputs)
    return InteractiveSimulationTask.delay_or_fail(
        simulation_id=simulation.id
    )
class SimulationData:
    data = ''


class SimulationInputs:
    data = None

    @classmethod
    def init_manager(cls, manager):
        manager = Manager()
        cls.data = manager.dict()

    @classmethod
    def add(cls, simulation_id, init_data):
        cls.data[simulation_id] = init_data

    @classmethod
    def write(cls, simulation_id, simulation_data):
        if cls.data.get(simulation_id):
            cls.data[simulation_id] = simulation_data

    @classmethod
    def read(cls, simulation_id, simulation_data):
        simulation_data.data = cls.data.get(simulation_id)

# manage.y
if __name__ == "__main__":
    SimulationInputs.init_manager()

class InteractiveSimulationTask(JobtasticTask):
     def calculate_result(self, simulation_id, **kwargs):
         while True:
           SimulationInputs.read(simulation_id=self.simulation.id, simulation_data=simulation_data)   

任务总是抛出异常cls.data.get(simulation_id): NoneObjectType has no method get

我需要在 celery 任务和主进程之间共享数据。

有什么提示吗?

由于您使用的是芹菜,您可能有 redis 或其他一些可用的内存存储。考虑将其用作间接层,即 readwrite 方法使用 simulation_id 作为 simulation data

的键

我认为您遇到的问题是由于 python class 的生命周期造成的。在 init_manager 中,当您分配给 cls.data 时,您将覆盖 class 的 属性,而不是实例的 属性。正如错误消息所证明的那样,这并没有按照您的意愿进行:cls.data 将变为 None

我想你想要的是 "Singleton Pattern"。您希望只有一个 SimulationInputs 对象可以 read/write 每个 ID 的数据。此讨论可以帮助您 implementing a singleton in python

我得出的结论是Django 和celery 不应该共享内存,因为它们在diff 上。 process,它们是不同的程序,所以它们应该通过套接字或消息系统进行通信。我通过使用 redis Pub/Sub https://redis.io/topics/pubsub.

解决了我的问题