在 Spark 中调用 Scala UDF 时如何将 BinaryType 转换为 Array[Byte]?
How can I convert BinaryType to Array[Byte] when calling Scala UDF in Spark?
我在 Scala 中编写了以下 UDF:
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}
def Decompress(compressed: Array[Byte]): String = {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
val output = scala.io.Source.fromInputStream(inputStream).mkString
return output
}
val decompressUdf = (compressed: Array[Byte]) => {
Decompress(compressed)
}
spark.udf.register("Decompress", decompressUdf)
然后我尝试使用以下内容调用 UDF:
val sessionsRawDF =
sessionRawDF
.withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
.select(
current_timestamp().alias("ingesttime"),
current_timestamp().cast("date").alias("p_ingestdate"),
col("partition"),
col("enqueuedTime"),
col("WebsiteSession").alias("Json")
)
当我 运行 执行此操作时,出现以下错误:
command-130062350733681:9: error: type mismatch;
found: org.apache.spark.sql.Column
required: Array[Byte]
decompressUdf(col("WebsiteSession")).alias("Json")
在这种情况下,我的印象是 Spark 会隐式获取值并从 spark 类型变为 Array[Byte]。
有人能帮我理解这是怎么回事吗,我已经为此苦苦挣扎了一段时间,不确定还能尝试什么。
您需要先将 Scala 函数转换为 Spark UDF,然后才能将其注册为 UDF。例如,
val decompressUdf = udf(Decompress _)
spark.udf.register("Decompress", decompressUdf)
事实上,如果您只是在 DataFrame 中使用它,则无需注册 UDF API。您可以简单地 运行 第一行并使用 decompressUdf
。仅当您想在 SQL.
中使用 UDF 时才需要注册
我在 Scala 中编写了以下 UDF:
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}
def Decompress(compressed: Array[Byte]): String = {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
val output = scala.io.Source.fromInputStream(inputStream).mkString
return output
}
val decompressUdf = (compressed: Array[Byte]) => {
Decompress(compressed)
}
spark.udf.register("Decompress", decompressUdf)
然后我尝试使用以下内容调用 UDF:
val sessionsRawDF =
sessionRawDF
.withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
.select(
current_timestamp().alias("ingesttime"),
current_timestamp().cast("date").alias("p_ingestdate"),
col("partition"),
col("enqueuedTime"),
col("WebsiteSession").alias("Json")
)
当我 运行 执行此操作时,出现以下错误:
command-130062350733681:9: error: type mismatch;
found: org.apache.spark.sql.Column required: Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")
在这种情况下,我的印象是 Spark 会隐式获取值并从 spark 类型变为 Array[Byte]。
有人能帮我理解这是怎么回事吗,我已经为此苦苦挣扎了一段时间,不确定还能尝试什么。
您需要先将 Scala 函数转换为 Spark UDF,然后才能将其注册为 UDF。例如,
val decompressUdf = udf(Decompress _)
spark.udf.register("Decompress", decompressUdf)
事实上,如果您只是在 DataFrame 中使用它,则无需注册 UDF API。您可以简单地 运行 第一行并使用 decompressUdf
。仅当您想在 SQL.