为什么对 kafka 主题的查询流连接会花费这么长时间?
Why could streaming join of queries over kafka topics take so long?
我正在使用 Spark Structured Streaming 并加入来自 Kafka 主题的两个流。
我注意到流式查询每条记录大约需要 15 秒。在下面的截图中,阶段 id 2 需要 15 秒。为什么会这样?
代码如下:
val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"
val spark = SparkSession
.builder
.master("local")
.getOrCreate
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", false)
.load
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", false)
.load
val order_details = df1
.withColumn(...)
.select(...)
val invoice_details = df2
.withColumn(...)
.where(...)
order_details
.join(invoice_details)
.where(order_details.col("s_order_id") === invoice_details.col("order_id"))
.select(...)
.writeStream
.format("console")
.option("truncate", false)
.start
.awaitTermination()
代码方面一切正常。唯一的问题是加入这两个流的时间。如何优化此查询?
鉴于主 URL,即 .master("local")
,执行时间很可能不令人满意。至少将其更改为 local[*]
,您应该会更快地找到连接。
我正在使用 Spark Structured Streaming 并加入来自 Kafka 主题的两个流。
我注意到流式查询每条记录大约需要 15 秒。在下面的截图中,阶段 id 2 需要 15 秒。为什么会这样?
代码如下:
val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"
val spark = SparkSession
.builder
.master("local")
.getOrCreate
import spark.implicits._
val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", false)
.load
val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", false)
.load
val order_details = df1
.withColumn(...)
.select(...)
val invoice_details = df2
.withColumn(...)
.where(...)
order_details
.join(invoice_details)
.where(order_details.col("s_order_id") === invoice_details.col("order_id"))
.select(...)
.writeStream
.format("console")
.option("truncate", false)
.start
.awaitTermination()
代码方面一切正常。唯一的问题是加入这两个流的时间。如何优化此查询?
鉴于主 URL,即 .master("local")
,执行时间很可能不令人满意。至少将其更改为 local[*]
,您应该会更快地找到连接。