在 Celery 任务中使用导入变量
Using imported variable in Celery task
我有一个 Flask 应用程序,它使用 SocketIO 与当前在线的用户进行通信。我通过将用户 ID 映射到会话 ID 来跟踪它们,然后我可以使用它与它们通信:
online_users = {'uid...':'sessionid...'}
我在启动应用程序的 run.py
文件中删除它,然后在需要时导入它:
from app import online_users
我正在使用 Celery 和 RabbitMQ 进行任务部署,我需要在任务中使用这个字典。所以我按上面的方式导入它,但是当我使用它时它是空的,即使我知道它已填充。我在阅读 后意识到这是因为每个任务都是异步的并且用空字典启动一个新进程,所以我最好的选择是使用某种数据库或缓存。
我不想 运行 额外的服务,我只需要从 dict 中读取(我不会从任务中写入它)。 cache/database 是我唯一的选择吗?
这取决于你在 dict 中有什么....如果它是你可以序列化为字符串的东西,你可以将它序列化为 Json 并将它作为参数传递给该任务。如果它是一个您无法序列化的对象,那么是的,您需要使用 cache/database。
我遇到了 this discussion 这似乎正是我想要做的事情的解决方案。
Communication through a message queue is now implemented in package python-socketio, through the use of Kombu, which provides a common API to work with several message queues including Redis and RabbitMQ.
据说正式版本很快就会发布,但目前可以使用 additional package.
我有一个 Flask 应用程序,它使用 SocketIO 与当前在线的用户进行通信。我通过将用户 ID 映射到会话 ID 来跟踪它们,然后我可以使用它与它们通信:
online_users = {'uid...':'sessionid...'}
我在启动应用程序的 run.py
文件中删除它,然后在需要时导入它:
from app import online_users
我正在使用 Celery 和 RabbitMQ 进行任务部署,我需要在任务中使用这个字典。所以我按上面的方式导入它,但是当我使用它时它是空的,即使我知道它已填充。我在阅读
我不想 运行 额外的服务,我只需要从 dict 中读取(我不会从任务中写入它)。 cache/database 是我唯一的选择吗?
这取决于你在 dict 中有什么....如果它是你可以序列化为字符串的东西,你可以将它序列化为 Json 并将它作为参数传递给该任务。如果它是一个您无法序列化的对象,那么是的,您需要使用 cache/database。
我遇到了 this discussion 这似乎正是我想要做的事情的解决方案。
Communication through a message queue is now implemented in package python-socketio, through the use of Kombu, which provides a common API to work with several message queues including Redis and RabbitMQ.
据说正式版本很快就会发布,但目前可以使用 additional package.