无法将 Streamz Kafka Stream 转换为 Dask Stream
Unable to convert Streamz Kafka Stream to Dask Stream
我无法将 Streamz 流转换为使用 Kafka source.PFB 代码生成的 Dask Stream
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
{'bootstrap.servers': 'kafkaXX:9092',
'group.id': 'streamz'})
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()
我收到这条错误消息
ValueError: Two different event loops active
kafka源码,如果没有另外说明,会在一个线程中启动自己的事件循环。对 Client()
的调用也执行此操作。要将循环从一个传递到另一个,你可以做
Stream.from_kafka(..., loop=client.loop)
请注意,对 .scatter()
的调用也需要显式访问事件循环,但由于这是 dask-specific,它知道使用您激活的任何客户端的循环。
我无法将 Streamz 流转换为使用 Kafka source.PFB 代码生成的 Dask Stream
from streamz import Stream
from streamz.dataframe import Random
from streamz.dataframe import DataFrame
import json
from dask.distributed import Client
client = Client()
source = Stream.from_kafka(['logs'],
{'bootstrap.servers': 'kafkaXX:9092',
'group.id': 'streamz'})
source.scatter().map(json.loads).buffer(8).gather().sink(print)
source.start()
我收到这条错误消息
ValueError: Two different event loops active
kafka源码,如果没有另外说明,会在一个线程中启动自己的事件循环。对 Client()
的调用也执行此操作。要将循环从一个传递到另一个,你可以做
Stream.from_kafka(..., loop=client.loop)
请注意,对 .scatter()
的调用也需要显式访问事件循环,但由于这是 dask-specific,它知道使用您激活的任何客户端的循环。