如何构造一个 Dask 应用程序来处理来自队列的固定数量的输入?
How to structure a Dask application that processes a fixed number of inputs from a queue?
我们需要执行以下操作。给定一个将提供已知数量消息的 Redis 通道:
对于从频道消费的每条消息:
- 从 Redis
获取 JSON 文档
- 解析JSON文档,提取结果对象列表
聚合所有结果对象以生成单个结果
我们希望将第 1 步和第 2 步分配给许多工作人员,并避免将所有结果收集到内存中。我们还想显示两个步骤的进度条。
但是,我们找不到构建应用程序的好方法,以便我们可以看到进度并保持工作在系统中移动而不会因不合时宜的时间而阻塞。
例如,在第 1 步中,如果我们从 Redis 通道读取到队列中,那么我们可以将队列传递给 Dask,在这种情况下,我们会在每条消息传入时开始处理它,而无需等待所有消息。但是,如果我们使用队列,我们将看不到显示进度的方法(大概是因为队列通常具有未知大小?)
如果我们从 Redis 通道收集到一个列表并将其传递给 Dask 那么我们可以看到进度,但是我们必须等待来自 Redis 的所有消息才能开始处理第一个消息。
是否有解决此类问题的推荐方法?
如果您的 Redis 通道是并发访问安全的,那么您可能会提交许多期货以从通道中提取元素。这些将 运行 在不同的机器上。
from dask.distributed import Client, progress
client = Client(...)
futures = [client.submit(pull_from_redis_channel, ..., pure=False) for _ in range(n_items)]
futures2 = client.map(process, futures)
progress(futures2)
我们需要执行以下操作。给定一个将提供已知数量消息的 Redis 通道:
对于从频道消费的每条消息:
- 从 Redis 获取 JSON 文档
- 解析JSON文档,提取结果对象列表
聚合所有结果对象以生成单个结果
我们希望将第 1 步和第 2 步分配给许多工作人员,并避免将所有结果收集到内存中。我们还想显示两个步骤的进度条。
但是,我们找不到构建应用程序的好方法,以便我们可以看到进度并保持工作在系统中移动而不会因不合时宜的时间而阻塞。
例如,在第 1 步中,如果我们从 Redis 通道读取到队列中,那么我们可以将队列传递给 Dask,在这种情况下,我们会在每条消息传入时开始处理它,而无需等待所有消息。但是,如果我们使用队列,我们将看不到显示进度的方法(大概是因为队列通常具有未知大小?)
如果我们从 Redis 通道收集到一个列表并将其传递给 Dask 那么我们可以看到进度,但是我们必须等待来自 Redis 的所有消息才能开始处理第一个消息。
是否有解决此类问题的推荐方法?
如果您的 Redis 通道是并发访问安全的,那么您可能会提交许多期货以从通道中提取元素。这些将 运行 在不同的机器上。
from dask.distributed import Client, progress
client = Client(...)
futures = [client.submit(pull_from_redis_channel, ..., pure=False) for _ in range(n_items)]
futures2 = client.map(process, futures)
progress(futures2)