Dask 不清理 docker 容器中的上下文

Dask doesn't clean up context in docker container

我们有一个 Dask 管道,其中我们基本上使用 LocalCluster 作为进程池。即我们用 LocalCluster(processes=True, threads_per_worker=1) 启动集群。像这样:

dask_cluster = LocalCluster(processes=True, threads_per_worker=1)
    with Client(dask_cluster) as dask_client:
        exit_code = run_processing(input_file, dask_client, db_state).value

我们的工作流和任务并行化在 运行 本地时效果很好。但是,当我们将代码复制到 Docker 容器(基于 centos)时,处理完成并且我们 有时 在容器退出时出现以下错误:

Traceback (most recent call last):^M
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/multiprocessing/queues.py", line 240, in _feed^M
    send_bytes(obj)^M
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/multiprocessing/connection.py", line 200, in send_bytes^M
    self._send_bytes(m[offset:offset + size])^M
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/multiprocessing/connection.py", line 404, in _send_bytes^M
    self._send(header + buf)^M
  File "/opt/rh/rh-python36/root/usr/lib64/python3.6/multiprocessing/connection.py", line 368, in _send^M
    n = write(self._handle, buf)^M
BrokenPipeError: [Errno 32] Broken pipe^M

此外,我们收到此错误的多个实例,这让我认为错误来自废弃的工作进程。我们目前的工作理论是,这在某种程度上与 "Docker zombie reaping problem" 有关,但如果不从完全不同的 docker 图像开始,我们不知道如何修复它,我们不想这样做。

有没有办法只使用 Dask cluster/client 清理方法来解决这个问题?

您应该将集群创建为上下文管理器。它实际上是启动进程的东西,而不是客户端。

with LocalCluster(...):
    ...