在 Azure Databricks 的 Spark-SQL 中创建用户定义(非临时)函数

Creating User Defined (not temporary) Function in Spark-SQL for Azure Databricks

也许这很愚蠢,我是一名 Microsoft SQL / C# 开发人员,之前从未真正使用过任何其他 IDE / 编写的 JAVA / SCALA。 我正在将一些 Azure SQL 查询迁移到 Azure Databricks 解决方案。

似乎没有等效于 TSQL DATEDIFF_BIG 函数 (https://docs.microsoft.com/en-us/sql/t-sql/functions/datediff-transact-sql?view=sql-server-2017)

您找到的解决方案是 - 编写您自己的 UDF。

我已经在 SCALA Notebook 中完成了(见下文)——这对于临时功能来说效果很好。 (https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html)

这是我找到的最有用的示例 https://github.com/johnmuller87/spark-udf

有相当多的临时函数示例,但是 none 我找到了非 JAVA / SCALA 开发人员的永久函数。

我安装了 SBT(Windows - https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Windows.html 的最新版本) 我还安装了 Intellj

我 运行 为 IBAN 示例构建了 SBT,但是在将 JAR 上传到我的 Clusterd 之后,我无法获得 SQL 功能,并且功能注册正常工作。

CREATE FUNCTION ValidateIBAN AS 'com.ing.wbaa.spark.udf.ValidateIBAN' USING JAR 'spark_udf_assembly_0_2_0' --without extension

SELECT ValidateIBAN('NL20INGB0001234567')

错误总是 "Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF 'com.ing.wbaa.spark.udf.ValidateIBAN'; line 1 pos 7"

//import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.udf
import java.time.temporal.ChronoUnit;

// Define function to calculate local time offset
def getTimestampDifference(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

spark.udf.register("DATETIMEDIFF", udf(getTimestampDifference(_:java.lang.String, _:java.sql.Timestamp,_:java.sql.Timestamp),LongType))

实际上我需要的是 - 如何将 SCALA Notebook 转换为 SQL 函数以便我可以永久使用它 SQL 查看 Azure Databricks 集群版本 5.4(包括 Apache Spark 2.4.3、Scala 2.11)

感谢您的帮助

Spark 不会为您提供任何持续超过单个 spark 会话( 或 Databricks 术语中的集群生命周期)的永久功能。如果您需要 long-running spark 会话(仅 SQL 部分),您可以考虑将这些 UDF 添加到 Hive 并从 Spark 调用它们。否则(考虑瞬态集群)每次启动集群时都需要 re-add 它。

您的 UDF 的代码是 non-optimal:不处理 empty/null 值/它会抛出异常

对于基本(标准)spark UDF,请参阅https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html不需要真正的接口(与 Hive 不同)

关于: SQL 函数(仅在 SQL 中)/ SBT:

如果您真的需要它(对于这个简单的用例)https://github.com/geoHeil/sparkSimpleProjectTemplate.g8 可以作为您的示例。

但对于此代码,不需要额外的依赖项。创建一个文本/Scala 文件就足够了,其中包含函数所需的 < 100 行代码。 然后可以使用 API 即通过 https://docs.databricks.com/user-guide/dev-tools/databricks-cli.html 和一些脚本在集群创建时调用此文件(Notebook?),因此表现得像永久的。

此外: 始终考虑使用 spark 本机(催化剂优化)功能。 常规 datediff 可能已经做了很多您的 datediff-big 需要完成的事情以及减去普通时间戳类型的列。 如果我通过简要浏览它而正确理解它,则只缺少将输出格式化为所需的粒度(即,将从 t-SQL 函数开箱即用)并且可以通过将其与不同的函数嵌套来完成喜欢:

  • 或手动划分返回的差值

您引用的 Databricks 中的 CREATE FUNCTION 语句实际上是一个 Hive 命令,而不是 Spark,它期望 UDF class 是一个 Hive UDF。

这也是您收到 "No handler for UDF/UDAF/UDTF" 错误的原因。您链接的示例实现了 Spark UDF,而您需要实现的是 Hive UDF.

要创建一个 Hive UDF,您需要实现一个扩展 class org.apache.hadoop.hive.ql.exec.UDF 的 class 并实现一个名为 evaluate 的函数。在你的例子中,整个 class 应该是这样的:

class GetTimestampDifference extends UDF {

  def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

}

然后您需要将其编译为 JAR 文件,将其复制到 databricks 文件系统的某处并使用与之前相同的命令创建永久函数(假设您保留 IBAN 示例的命名空间):

CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'

SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))

假设您仍在修改开始时使用的 IBAN 示例项目,为了创建 jar 文件,您必须将以下包依赖项添加到 build.sbt 文件:

"org.apache.spark" %% "spark-hive" % "2.4.3"