如何从一个地方获取多个主题的数据进行处理?
How to get data from multiple topics in one place for processing?
我有一个要求,我必须从 3 个 kafka 主题获取消息作为流数据,然后根据这 3 个主题数据之间的连接生成结果。请建议我使用 Direct Stream for Scala 的好方法。
谢谢
如果不同topic的数据是一样的,并且你消费数据的处理逻辑是一样的,你可以在同一个stream中从不同的topic消费,做聚合。如果不同主题的处理逻辑不同,则指定concurrentThreads为4,然后在4个流之间做聚合。您可以查看 spark structured streaming 文档以了解如何使用多个主题。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
<--- your aggregation logic here --->
我有一个要求,我必须从 3 个 kafka 主题获取消息作为流数据,然后根据这 3 个主题数据之间的连接生成结果。请建议我使用 Direct Stream for Scala 的好方法。 谢谢
如果不同topic的数据是一样的,并且你消费数据的处理逻辑是一样的,你可以在同一个stream中从不同的topic消费,做聚合。如果不同主题的处理逻辑不同,则指定concurrentThreads为4,然后在4个流之间做聚合。您可以查看 spark structured streaming 文档以了解如何使用多个主题。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
<--- your aggregation logic here --->