无法将 Streamz Kafka Stream 转换为 Dask Stream

Unable to convert Streamz Kafka Stream to Dask Stream

我无法将 Streamz 流转换为使用 Kafka source.PFB 代码生成的 Dask Stream

from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
       {'bootstrap.servers': 'kafkaXX:9092',
        'group.id': 'streamz'}) 
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()

我收到这条错误消息

ValueError: Two different event loops active

kafka源码,如果没有另外说明,会在一个线程中启动自己的事件循环。对 Client() 的调用也执行此操作。要将循环从一个传递到另一个,你可以做

Stream.from_kafka(..., loop=client.loop)

请注意,对 .scatter() 的调用也需要显式访问事件循环,但由于这是 dask-specific,它知道使用您激活的任何客户端的循环。