为什么 UDF 在流式查询中抛出 NotSerializableException?
Why does UDF throw NotSerializableException in streaming queries?
我将 Spark 2.4.3 用于一个结构化流应用程序(从 Event Hub Azure 读取流/将流写入 CosmosDB)。数据有一些转换步骤,其中一个步骤是查找 CosmosDB 以进行一些验证并添加更多字段。
//messagesF13 contains PersonHashCode,....
...
val messagesF14 = messagesF13.withColumn("LookupData", getHData($"PersonHashCode"))
//messagesF14.printSchema()
messagesF14.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
getHData的代码复制如下:
case class PersonHolder( id: String,
Person_uid: String,
Person_seq: Integer)
val getHData= udf ( (hash256: String) => {
val queryStmt = s""" SELECT *
FROM c
WHERE c.Person_uid ='$hash256'"""
val readConfig = Config(Map("Endpoint" -> "https://abc-cosmos.documents.azure.com:443/",
"Masterkey" -> "ABCABC==",
"Database" -> "person-data",
"preferredRegions" -> "East US;",
"Collection" -> "tmp-persons",
"query_custom" -> queryStmt,
"SamplingRatio" -> "1.0"))
val coll = spark.sqlContext.read.cosmosDB(readConfig)
coll.createOrReplaceTempView("c")
val q3 = queryStmt + " AND c.Person_seq = 0"
val df3 = spark.sql(q3)
if (df3.head(1).isEmpty){
null //None
}
else {
val y = df31.select($"id",$"Person_uid",$"Person_seq")
val y1 = y.as[PersonHolder].collectAsList
y1.get(0)
}
}
)
它不起作用,(众所周知的)错误是:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException: com.microsoft.azure.eventhubs.ConnectionStringBuilder
有哪些可能workarounds/solutions可以避免错误?
预先感谢您的一些 links/GitHub code/docs!
It does not work
而且不会。对不起。
用户定义函数 (UDF) 运行 在没有 spark.sqlContext
的执行器上。 spark
和 sqlContext
都未在执行程序上初始化。
one step is to make a lookup into CosmosDB for some validation and adding some more fields.
这是一个经典的连接,尤其是。在 getHData
udf:
中使用此代码
val coll = spark.sqlContext.read.cosmosDB(readConfig)
您只需执行以下操作:
val coll = spark.sqlContext.read.cosmosDB(readConfig)
val messagesF14 = messagesF13.join(coll).where(...)
我将 Spark 2.4.3 用于一个结构化流应用程序(从 Event Hub Azure 读取流/将流写入 CosmosDB)。数据有一些转换步骤,其中一个步骤是查找 CosmosDB 以进行一些验证并添加更多字段。
//messagesF13 contains PersonHashCode,....
...
val messagesF14 = messagesF13.withColumn("LookupData", getHData($"PersonHashCode"))
//messagesF14.printSchema()
messagesF14.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
getHData的代码复制如下:
case class PersonHolder( id: String,
Person_uid: String,
Person_seq: Integer)
val getHData= udf ( (hash256: String) => {
val queryStmt = s""" SELECT *
FROM c
WHERE c.Person_uid ='$hash256'"""
val readConfig = Config(Map("Endpoint" -> "https://abc-cosmos.documents.azure.com:443/",
"Masterkey" -> "ABCABC==",
"Database" -> "person-data",
"preferredRegions" -> "East US;",
"Collection" -> "tmp-persons",
"query_custom" -> queryStmt,
"SamplingRatio" -> "1.0"))
val coll = spark.sqlContext.read.cosmosDB(readConfig)
coll.createOrReplaceTempView("c")
val q3 = queryStmt + " AND c.Person_seq = 0"
val df3 = spark.sql(q3)
if (df3.head(1).isEmpty){
null //None
}
else {
val y = df31.select($"id",$"Person_uid",$"Person_seq")
val y1 = y.as[PersonHolder].collectAsList
y1.get(0)
}
}
)
它不起作用,(众所周知的)错误是:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException: com.microsoft.azure.eventhubs.ConnectionStringBuilder
有哪些可能workarounds/solutions可以避免错误? 预先感谢您的一些 links/GitHub code/docs!
It does not work
而且不会。对不起。
用户定义函数 (UDF) 运行 在没有 spark.sqlContext
的执行器上。 spark
和 sqlContext
都未在执行程序上初始化。
one step is to make a lookup into CosmosDB for some validation and adding some more fields.
这是一个经典的连接,尤其是。在 getHData
udf:
val coll = spark.sqlContext.read.cosmosDB(readConfig)
您只需执行以下操作:
val coll = spark.sqlContext.read.cosmosDB(readConfig)
val messagesF14 = messagesF13.join(coll).where(...)