为什么无法在 UDF 内部访问数据框? [Apache Spark 斯卡拉]

Why dataframe cannot be accessed inside UDF ? [Apache Spark Scala]

我目前正在使用 Apache Spark 进行流媒体项目。我有 2 个数据源,第一个是从 Kafka 获取新闻数据。这个数据每次都在更新。第二个,我得到 masterWord 词典。该变量包含单词的数据框和单词的唯一键。

我想处理新闻数据,然后通过将数据匹配到masterWord词典,将其从单词Seq转换为words_id的Seq。但是,在我的 UDF 中访问 masterWord 数据框时遇到问题。当我尝试访问 UDF 中的数据帧时,Spark return 这个错误

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 i n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException

为什么无法在 UDF 中访问数据帧?

从另一个数据框获取价值的最佳做法是什么?

这是我的代码

// read data stream from Kafka
val kafka = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
  .option("subscribe", PropertiesLoader.kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", "100")
  .load()

// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
  .select(from_json($"value", ColsArtifact.rawSchema).as("data"))
  .select("data.*")
  .withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text

// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)

// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))

// UDF
val aggregateMongo = udf((content: Seq[String]) => {
  masterWord.show()
  ...
  // code for query masterWord whether var content exist or not in masterWord dictionary
})

dataframe 存在于 spark 上下文中,并且仅在驱动程序内部可用 每个任务都会看到数据的一部分(分区)并可以使用它。如果你想让 dataframe 中的数据在 udf 中可用,你必须将它序列化到 master,然后你可以将它广播(或将它作为参数传递,这实际上是相同的)到 udf,在这种情况下是 Spark会将整个事情发送到 udf 运行

的每个实例

如果你想在 UDF 中使用数据帧,你必须创建一个 Broadcast :

import spark.implicits._

val df_name =Seq("Raphael").toDF("name")

val bc_df_name: Broadcast[DataFrame] = spark.sparkContext.broadcast(df_name)

// use df_name inside udf
val udf_doSomething = udf(() => bc_df_name.value.as[String].first())

Seq(1,2,3)
  .toDF("i")
  .withColumn("test",udf_doSomething())
  .show()

给予

+---+-------+
|  i|   test|
+---+-------+
|  1|Raphael|
|  2|Raphael|
|  3|Raphael|
+---+-------+

这至少在 local 模式下有效,请确定这是否也适用于集群。无论如何,我不推荐这种方法,更好地转换 (collect) 驱动程序上的 Scala 数据结构中的数据帧内容(例如 Map)并广播此变量,或者改用连接。