自定义 Spark Structured Streaming Sink 没有并行性
No Parallelism with Custom Spark Structured Streaming Sink
我正在编写自定义 Spark 结构化流式接收器,以将从 Kafka 读取的事件写入 Google BQ(大查询)。下面是我写的代码。
代码正在编译并且运行成功。但是My Sink总是运行只在一个executor中(总是驱动程序运行的地方)。我不明白这里的问题。
这是我的自定义 Big Query Sink 的实现。
package bq
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
class DefaultSource extends StreamSinkProvider with DataSourceRegister{
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new BQSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val df = data.sparkSession.createDataFrame
(data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
df.collect().foreach({ row => {
//code that writes the rows to Big Query.
}
}
这是我的驱动程序
// Reading raw events from Kafka
val inputDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
.option("subscribe", "topic")
.option("fetchOffset.numRetries", 5)
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
.selectExpr("value")
.as[Array[Byte]];
// Transforming inputDF to OutputDF
val outputDF = inputDF.map(event => transform(event))
// Writing outputDF events to BQ
val query = outputDF.writeStream
.format("bq")
.option("checkpointLocation",config.getString("checkpointLocation"))
.outputMode(OutputMode.Append())
.start()
//Start Streaming
query.awaitTermination()
尽管我的主题有多个分区,但我的自定义接收器仅在单个执行程序中 运行
使用df.collect
将从执行程序收集所有数据到您的驱动程序。因此,您只看到驱动程序向您的接收器发送数据。
您需要执行 df.foreachPartition
并使用可在您的执行程序上访问的 BQ 生成器。您可能 运行 遇到“任务不可序列化”问题,但您可以查看 以了解如何在 Scala Spark 中解决此问题。
我正在编写自定义 Spark 结构化流式接收器,以将从 Kafka 读取的事件写入 Google BQ(大查询)。下面是我写的代码。
代码正在编译并且运行成功。但是My Sink总是运行只在一个executor中(总是驱动程序运行的地方)。我不明白这里的问题。
这是我的自定义 Big Query Sink 的实现。
package bq
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
class DefaultSource extends StreamSinkProvider with DataSourceRegister{
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
new BQSink(sqlContext, parameters, partitionColumns, outputMode)
}
override def shortName(): String = "bq"
}
class BQSink(sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val df = data.sparkSession.createDataFrame
(data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
df.collect().foreach({ row => {
//code that writes the rows to Big Query.
}
}
这是我的驱动程序
// Reading raw events from Kafka
val inputDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafkaBrokers"))
.option("subscribe", "topic")
.option("fetchOffset.numRetries", 5)
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.load()
.selectExpr("value")
.as[Array[Byte]];
// Transforming inputDF to OutputDF
val outputDF = inputDF.map(event => transform(event))
// Writing outputDF events to BQ
val query = outputDF.writeStream
.format("bq")
.option("checkpointLocation",config.getString("checkpointLocation"))
.outputMode(OutputMode.Append())
.start()
//Start Streaming
query.awaitTermination()
尽管我的主题有多个分区,但我的自定义接收器仅在单个执行程序中 运行
使用df.collect
将从执行程序收集所有数据到您的驱动程序。因此,您只看到驱动程序向您的接收器发送数据。
您需要执行 df.foreachPartition
并使用可在您的执行程序上访问的 BQ 生成器。您可能 运行 遇到“任务不可序列化”问题,但您可以查看