将任务添加到 ForEachPartition 后无法序列化
Task not serializable after adding it to ForEachPartition
尝试在 spark 结构化流中实现 Apache pulsar Sink 时,我在 spark 中收到任务不可序列化异常。
我已经尝试将 PulsarConfig 外推到一个单独的 class 并在 .foreachPartition lambda 函数中调用它,我通常为 JDBC 连接和我集成到 spark 结构化流中的其他系统执行此操作如下图所示:
PulsarSink Class
class PulsarSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink{
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.toJSON.foreachPartition( partition => {
val pulsarConfig = new PulsarConfig(parameters).client
val producer = pulsarConfig.newProducer(Schema.STRING)
.topic(parameters.get("topic").get)
.compressionType(CompressionType.LZ4)
.sendTimeout(0, TimeUnit.SECONDS)
.create
partition.foreach(rec => producer.send(rec))
producer.flush()
})
}
PulsarConfig Class
class PulsarConfig(parameters: Map[String, String]) {
def client(): PulsarClient = {
import scala.collection.JavaConverters._
if(!parameters.get("tlscert").isEmpty && !parameters.get("tlskey").isEmpty) {
val tlsAuthMap = Map("tlsCertFile" -> parameters.get("tlscert").get,
"tlsKeyFile" -> parameters.get("tlskey").get).asJava
val tlsAuth: Authentication = AuthenticationFactory.create(classOf[AuthenticationTls].getName, tlsAuthMap)
PulsarClient.builder
.serviceUrl(parameters.get("broker").get)
.tlsTrustCertsFilePath(parameters.get("tlscert").get)
.authentication(tlsAuth)
.enableTlsHostnameVerification(false)
.allowTlsInsecureConnection(true)
.build
}
else{
PulsarClient.builder
.serviceUrl(parameters.get("broker").get)
.enableTlsHostnameVerification(false)
.allowTlsInsecureConnection(true)
.build
}
}
}
我收到的错误信息如下:
ERROR StreamExecution: Query [id = 12c715c2-2d62-4523-a37a-4555995ccb74, runId = d409c0db-7078-4654-b0ce-96e46dfb322c] terminated with error
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply$mcV$sp(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
at org.apache.spark.datamediation.impl.sink.PulsarSink.addBatch(PulsarSink.scala:20)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply$mcV$sp(StreamExecution.scala:666)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:666)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:666)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply$mcV$sp(StreamExecution.scala:306)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:206)
Caused by: java.io.NotSerializableException: org.apache.spark.datamediation.impl.sink.PulsarSink
Serialization stack:
- object not serializable (class: org.apache.spark.datamediation.impl.sink.PulsarSink, value: org.apache.spark.datamediation.impl.sink.PulsarSink@38813f43)
- field (class: org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch, name: $outer, type: class org.apache.spark.datamediation.impl.sink.PulsarSink)
- object (class org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 31 more
"foreachPartition" 中使用的值可以从 class 级别重新分配给函数变量:
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val parametersLocal = parameters
data.toJSON.foreachPartition( partition => {
val pulsarConfig = new PulsarConfig(parametersLocal).client
尝试在 spark 结构化流中实现 Apache pulsar Sink 时,我在 spark 中收到任务不可序列化异常。
我已经尝试将 PulsarConfig 外推到一个单独的 class 并在 .foreachPartition lambda 函数中调用它,我通常为 JDBC 连接和我集成到 spark 结构化流中的其他系统执行此操作如下图所示:
PulsarSink Class
class PulsarSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink{
override def addBatch(batchId: Long, data: DataFrame): Unit = {
data.toJSON.foreachPartition( partition => {
val pulsarConfig = new PulsarConfig(parameters).client
val producer = pulsarConfig.newProducer(Schema.STRING)
.topic(parameters.get("topic").get)
.compressionType(CompressionType.LZ4)
.sendTimeout(0, TimeUnit.SECONDS)
.create
partition.foreach(rec => producer.send(rec))
producer.flush()
})
}
PulsarConfig Class
class PulsarConfig(parameters: Map[String, String]) {
def client(): PulsarClient = {
import scala.collection.JavaConverters._
if(!parameters.get("tlscert").isEmpty && !parameters.get("tlskey").isEmpty) {
val tlsAuthMap = Map("tlsCertFile" -> parameters.get("tlscert").get,
"tlsKeyFile" -> parameters.get("tlskey").get).asJava
val tlsAuth: Authentication = AuthenticationFactory.create(classOf[AuthenticationTls].getName, tlsAuthMap)
PulsarClient.builder
.serviceUrl(parameters.get("broker").get)
.tlsTrustCertsFilePath(parameters.get("tlscert").get)
.authentication(tlsAuth)
.enableTlsHostnameVerification(false)
.allowTlsInsecureConnection(true)
.build
}
else{
PulsarClient.builder
.serviceUrl(parameters.get("broker").get)
.enableTlsHostnameVerification(false)
.allowTlsInsecureConnection(true)
.build
}
}
}
我收到的错误信息如下:
ERROR StreamExecution: Query [id = 12c715c2-2d62-4523-a37a-4555995ccb74, runId = d409c0db-7078-4654-b0ce-96e46dfb322c] terminated with error
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply$mcV$sp(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
at org.apache.spark.datamediation.impl.sink.PulsarSink.addBatch(PulsarSink.scala:20)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply$mcV$sp(StreamExecution.scala:666)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:666)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:666)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply$mcV$sp(StreamExecution.scala:306)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:206)
Caused by: java.io.NotSerializableException: org.apache.spark.datamediation.impl.sink.PulsarSink
Serialization stack:
- object not serializable (class: org.apache.spark.datamediation.impl.sink.PulsarSink, value: org.apache.spark.datamediation.impl.sink.PulsarSink@38813f43)
- field (class: org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch, name: $outer, type: class org.apache.spark.datamediation.impl.sink.PulsarSink)
- object (class org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 31 more
"foreachPartition" 中使用的值可以从 class 级别重新分配给函数变量:
override def addBatch(batchId: Long, data: DataFrame): Unit = {
val parametersLocal = parameters
data.toJSON.foreachPartition( partition => {
val pulsarConfig = new PulsarConfig(parametersLocal).client