Spark 结构化流应用程序从多个 Kafka 主题读取
Spark structured streaming app reading from multiple Kafka topics
我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从多个 Kafka 主题中读取,做一些相对简单的处理(主要是聚合和一些连接)并将结果发布到其他一些卡夫卡主题。因此在同一个应用程序中处理多个流。
我想知道从资源的角度(内存、执行程序、线程、Kafka 侦听器等)是否会有所不同,如果我只设置 1 个订阅多个主题的直接 readStream,然后将流拆分为选择,每个主题 1 个 readStream。
类似
df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...
对比
t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")
其中一个比另一个多"efficient"吗?我找不到任何关于这是否有所作为的文档。
谢谢!
每个动作都需要完整的沿袭执行。你最好把它分成三个独立的 kafka 读取。否则你将阅读每个主题 N 次,其中 N 是写入次数。
我真的不建议这样做,但如果你想将所有主题都放在同一个阅读中,那么这样做:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.filter().write.format(...).save(...) // location 1
batchDF.filter().write.format(...).save(...) // location 2
batchDF.unpersist()
}
从资源(内存和核心)的角度来看,如果您 运行将其作为集群上的多个流(多个驱动器-执行器),将会有所不同。
对于第一种情况,你提到了-
df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...
考虑到您在上面提供的驱动程序和 2 个执行程序。
第二种情况-
t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")
您可以运行将它们作为不同的流 - 2 个驱动程序和 2 个执行程序(每个执行程序 1 个)。
在第二种情况下,需要更多的内存和内核来提供额外的驱动程序。
我有一个 Spark 结构化流应用程序 (v2.3.2),它需要从多个 Kafka 主题中读取,做一些相对简单的处理(主要是聚合和一些连接)并将结果发布到其他一些卡夫卡主题。因此在同一个应用程序中处理多个流。
我想知道从资源的角度(内存、执行程序、线程、Kafka 侦听器等)是否会有所不同,如果我只设置 1 个订阅多个主题的直接 readStream,然后将流拆分为选择,每个主题 1 个 readStream。
类似
df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...
对比
t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")
其中一个比另一个多"efficient"吗?我找不到任何关于这是否有所作为的文档。
谢谢!
每个动作都需要完整的沿袭执行。你最好把它分成三个独立的 kafka 读取。否则你将阅读每个主题 N 次,其中 N 是写入次数。
我真的不建议这样做,但如果你想将所有主题都放在同一个阅读中,那么这样做:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.filter().write.format(...).save(...) // location 1
batchDF.filter().write.format(...).save(...) // location 2
batchDF.unpersist()
}
从资源(内存和核心)的角度来看,如果您 运行将其作为集群上的多个流(多个驱动器-执行器),将会有所不同。
对于第一种情况,你提到了-
df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...
考虑到您在上面提供的驱动程序和 2 个执行程序。
第二种情况-
t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")
您可以运行将它们作为不同的流 - 2 个驱动程序和 2 个执行程序(每个执行程序 1 个)。 在第二种情况下,需要更多的内存和内核来提供额外的驱动程序。