带 Dask 分布式的 Streamz
Streamz with Dask Distributed
基于 streamz documentation,可以通过以下方式利用 dask 分布式集群:
from distributed import Client
client = Client('tcp://localhost:8786') # Connect to scheduler that has distributed workers
from streamz import Stream
source = Stream()
(source.scatter() # scatter local elements to cluster, creating a DaskStream
.map(increment) # map a function remotely
.buffer(5) # allow five futures to stay on the cluster at any time
.gather() # bring results back to local process
.sink(write)) # call write locally
for x in range(10):
source.emit(x)
从概念上讲,尚不清楚为什么我们不必将分发的 dask client
作为参数传递给实例化 Stream()
。更具体地说,Stream()
如何知道要附加到哪个调度程序?
如果你有两个调度程序,它们在不相关的节点上有工作人员,你会怎么做:
from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')
如何分别为 client_1
和 client_2
创建两个流?
Dask 的基本规则是,如果定义了分布式客户端,则将其用于任何 Dask 计算。如果有多个分布式客户端,则使用最近创建的那个还活着。
Streamz 没有明确让您在 .scatter()
时选择使用哪个客户端,它使用 dask.distributed.default_client()
来选择一个。您可能希望向他们提出问题以允许使用 client=
关键字。该工作流甚至不适合基于上下文的方法。现在,如果你想让多个 streamz 同时处理不同 Dask 集群上的数据,你可能必须操纵 dask.distributed.client._global_clients
.
的状态
基于 streamz documentation,可以通过以下方式利用 dask 分布式集群:
from distributed import Client
client = Client('tcp://localhost:8786') # Connect to scheduler that has distributed workers
from streamz import Stream
source = Stream()
(source.scatter() # scatter local elements to cluster, creating a DaskStream
.map(increment) # map a function remotely
.buffer(5) # allow five futures to stay on the cluster at any time
.gather() # bring results back to local process
.sink(write)) # call write locally
for x in range(10):
source.emit(x)
从概念上讲,尚不清楚为什么我们不必将分发的 dask client
作为参数传递给实例化 Stream()
。更具体地说,Stream()
如何知道要附加到哪个调度程序?
如果你有两个调度程序,它们在不相关的节点上有工作人员,你会怎么做:
from distributed import Client
client_1 = Client('tcp://1.2.3.4:8786')
client_2 = Client('tcp://10.20.30.40:8786')
如何分别为 client_1
和 client_2
创建两个流?
Dask 的基本规则是,如果定义了分布式客户端,则将其用于任何 Dask 计算。如果有多个分布式客户端,则使用最近创建的那个还活着。
Streamz 没有明确让您在 .scatter()
时选择使用哪个客户端,它使用 dask.distributed.default_client()
来选择一个。您可能希望向他们提出问题以允许使用 client=
关键字。该工作流甚至不适合基于上下文的方法。现在,如果你想让多个 streamz 同时处理不同 Dask 集群上的数据,你可能必须操纵 dask.distributed.client._global_clients
.