为什么无法在 UDF 内部访问数据框? [Apache Spark 斯卡拉]
Why dataframe cannot be accessed inside UDF ? [Apache Spark Scala]
我目前正在使用 Apache Spark 进行流媒体项目。我有 2 个数据源,第一个是从 Kafka 获取新闻数据。这个数据每次都在更新。第二个,我得到 masterWord 词典。该变量包含单词的数据框和单词的唯一键。
我想处理新闻数据,然后通过将数据匹配到masterWord词典,将其从单词Seq转换为words_id的Seq。但是,在我的 UDF 中访问 masterWord 数据框时遇到问题。当我尝试访问 UDF 中的数据帧时,Spark return 这个错误
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 i
n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException
为什么无法在 UDF 中访问数据帧?
从另一个数据框获取价值的最佳做法是什么?
这是我的代码
// read data stream from Kafka
val kafka = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
.option("subscribe", PropertiesLoader.kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "100")
.load()
// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", ColsArtifact.rawSchema).as("data"))
.select("data.*")
.withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text
// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)
// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))
// UDF
val aggregateMongo = udf((content: Seq[String]) => {
masterWord.show()
...
// code for query masterWord whether var content exist or not in masterWord dictionary
})
dataframe 存在于 spark 上下文中,并且仅在驱动程序内部可用
每个任务都会看到数据的一部分(分区)并可以使用它。如果你想让 dataframe 中的数据在 udf 中可用,你必须将它序列化到 master,然后你可以将它广播(或将它作为参数传递,这实际上是相同的)到 udf,在这种情况下是 Spark会将整个事情发送到 udf 运行
的每个实例
如果你想在 UDF 中使用数据帧,你必须创建一个 Broadcast
:
import spark.implicits._
val df_name =Seq("Raphael").toDF("name")
val bc_df_name: Broadcast[DataFrame] = spark.sparkContext.broadcast(df_name)
// use df_name inside udf
val udf_doSomething = udf(() => bc_df_name.value.as[String].first())
Seq(1,2,3)
.toDF("i")
.withColumn("test",udf_doSomething())
.show()
给予
+---+-------+
| i| test|
+---+-------+
| 1|Raphael|
| 2|Raphael|
| 3|Raphael|
+---+-------+
这至少在 local
模式下有效,请确定这是否也适用于集群。无论如何,我不推荐这种方法,更好地转换 (collect
) 驱动程序上的 Scala 数据结构中的数据帧内容(例如 Map
)并广播此变量,或者改用连接。
我目前正在使用 Apache Spark 进行流媒体项目。我有 2 个数据源,第一个是从 Kafka 获取新闻数据。这个数据每次都在更新。第二个,我得到 masterWord 词典。该变量包含单词的数据框和单词的唯一键。
我想处理新闻数据,然后通过将数据匹配到masterWord词典,将其从单词Seq转换为words_id的Seq。但是,在我的 UDF 中访问 masterWord 数据框时遇到问题。当我尝试访问 UDF 中的数据帧时,Spark return 这个错误
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 i n stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException
为什么无法在 UDF 中访问数据帧?
从另一个数据框获取价值的最佳做法是什么?
这是我的代码
// read data stream from Kafka
val kafka = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertiesLoader.kafkaBrokerUrl)
.option("subscribe", PropertiesLoader.kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "100")
.load()
// Transform data stream to Dataframe
val kafkaDF = kafka.selectExpr("CAST(value AS STRING)").as[(String)]
.select(from_json($"value", ColsArtifact.rawSchema).as("data"))
.select("data.*")
.withColumn("raw_text", concat(col("title"), lit(" "), col("text"))) // add column aggregate title and text
// read master word dictionary
val readConfig = ReadConfig(Map("uri" -> "mongodb://10.252.37.112/prayuga", "database" -> "prayuga", "collection" -> "master_word_2"))
var masterWord = MongoSpark.load(spark, readConfig)
// call UDF
val aggregateDF = kafkaDF.withColumn("text_aggregate", aggregateMongo(col("text_selected")))
// UDF
val aggregateMongo = udf((content: Seq[String]) => {
masterWord.show()
...
// code for query masterWord whether var content exist or not in masterWord dictionary
})
dataframe 存在于 spark 上下文中,并且仅在驱动程序内部可用 每个任务都会看到数据的一部分(分区)并可以使用它。如果你想让 dataframe 中的数据在 udf 中可用,你必须将它序列化到 master,然后你可以将它广播(或将它作为参数传递,这实际上是相同的)到 udf,在这种情况下是 Spark会将整个事情发送到 udf 运行
的每个实例如果你想在 UDF 中使用数据帧,你必须创建一个 Broadcast
:
import spark.implicits._
val df_name =Seq("Raphael").toDF("name")
val bc_df_name: Broadcast[DataFrame] = spark.sparkContext.broadcast(df_name)
// use df_name inside udf
val udf_doSomething = udf(() => bc_df_name.value.as[String].first())
Seq(1,2,3)
.toDF("i")
.withColumn("test",udf_doSomething())
.show()
给予
+---+-------+
| i| test|
+---+-------+
| 1|Raphael|
| 2|Raphael|
| 3|Raphael|
+---+-------+
这至少在 local
模式下有效,请确定这是否也适用于集群。无论如何,我不推荐这种方法,更好地转换 (collect
) 驱动程序上的 Scala 数据结构中的数据帧内容(例如 Map
)并广播此变量,或者改用连接。