为什么 UDF 在流式查询中抛出 NotSerializableException?

Why does UDF throw NotSerializableException in streaming queries?

我将 Spark 2.4.3 用于一个结构化流应用程序(从 Event Hub Azure 读取流/将流写入 CosmosDB)。数据有一些转换步骤,其中一个步骤是查找 CosmosDB 以进行一些验证并添加更多字段。

//messagesF13 contains PersonHashCode,....
...
val messagesF14 = messagesF13.withColumn("LookupData", getHData($"PersonHashCode"))
//messagesF14.printSchema()
messagesF14.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

getHData的代码复制如下:

case class PersonHolder( id: String, 
                    Person_uid: String,
                    Person_seq: Integer)

val getHData= udf ( (hash256: String) => {
         val queryStmt = s""" SELECT * 
                                FROM c
                                WHERE c.Person_uid ='$hash256'"""
        val readConfig = Config(Map("Endpoint" -> "https://abc-cosmos.documents.azure.com:443/",
                                   "Masterkey" -> "ABCABC==",
                                   "Database" -> "person-data",
                                   "preferredRegions" -> "East US;",
                                   "Collection" -> "tmp-persons", 
                                  "query_custom" -> queryStmt,  
                                  "SamplingRatio" -> "1.0"))

          val coll = spark.sqlContext.read.cosmosDB(readConfig)
          coll.createOrReplaceTempView("c")

          val q3 = queryStmt + " AND c.Person_seq = 0"
          val df3 = spark.sql(q3)

          if (df3.head(1).isEmpty){
              null //None
          }
          else {
              val y = df31.select($"id",$"Person_uid",$"Person_seq")
              val y1 = y.as[PersonHolder].collectAsList
              y1.get(0)
          }
  }
)

它不起作用,(众所周知的)错误是:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task not serializable: java.io.NotSerializableException: com.microsoft.azure.eventhubs.ConnectionStringBuilder

有哪些可能workarounds/solutions可以避免错误? 预先感谢您的一些 links/GitHub code/docs!

It does not work

而且不会。对不起。

用户定义函数 (UDF) 运行 在没有 spark.sqlContext 的执行器上。 sparksqlContext 都未在执行程序上初始化。

one step is to make a lookup into CosmosDB for some validation and adding some more fields.

这是一个经典的连接,尤其是。在 getHData udf:

中使用此代码
val coll = spark.sqlContext.read.cosmosDB(readConfig)

您只需执行以下操作:

val coll = spark.sqlContext.read.cosmosDB(readConfig)
val messagesF14 = messagesF13.join(coll).where(...)