在 Spark 中调用 Scala UDF 时如何将 BinaryType 转换为 Array[Byte]?

How can I convert BinaryType to Array[Byte] when calling Scala UDF in Spark?

我在 Scala 中编写了以下 UDF:

import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}

def Decompress(compressed: Array[Byte]): String = {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
  val output = scala.io.Source.fromInputStream(inputStream).mkString
  
  return output
}

val decompressUdf = (compressed: Array[Byte]) => {
  Decompress(compressed)
}

spark.udf.register("Decompress", decompressUdf)

然后我尝试使用以下内容调用 UDF:

val sessionsRawDF =
  sessionRawDF
    .withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
    .select(
      current_timestamp().alias("ingesttime"),
      current_timestamp().cast("date").alias("p_ingestdate"),
      col("partition"),
      col("enqueuedTime"),
      col("WebsiteSession").alias("Json")
    )

当我 运行 执行此操作时,出现以下错误:

command-130062350733681:9: error: type mismatch;
found: org.apache.spark.sql.Column required: Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")

在这种情况下,我的印象是 Spark 会隐式获取值并从 spark 类型变为 Array[Byte]。

有人能帮我理解这是怎么回事吗,我已经为此苦苦挣扎了一段时间,不确定还能尝试什么。

您需要先将 Scala 函数转换为 Spark UDF,然后才能将其注册为 UDF。例如,

val decompressUdf = udf(Decompress _)

spark.udf.register("Decompress", decompressUdf)

事实上,如果您只是在 DataFrame 中使用它,则无需注册 UDF API。您可以简单地 运行 第一行并使用 decompressUdf。仅当您想在 SQL.

中使用 UDF 时才需要注册