在 Azure Databricks 的 Spark-SQL 中创建用户定义(非临时)函数
Creating User Defined (not temporary) Function in Spark-SQL for Azure Databricks
也许这很愚蠢,我是一名 Microsoft SQL / C# 开发人员,之前从未真正使用过任何其他 IDE / 编写的 JAVA / SCALA。
我正在将一些 Azure SQL 查询迁移到 Azure Databricks 解决方案。
似乎没有等效于 TSQL DATEDIFF_BIG 函数 (https://docs.microsoft.com/en-us/sql/t-sql/functions/datediff-transact-sql?view=sql-server-2017)
您找到的解决方案是 - 编写您自己的 UDF。
我已经在 SCALA Notebook 中完成了(见下文)——这对于临时功能来说效果很好。 (https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html)
这是我找到的最有用的示例 https://github.com/johnmuller87/spark-udf。
有相当多的临时函数示例,但是 none 我找到了非 JAVA / SCALA 开发人员的永久函数。
我安装了 SBT(Windows - https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Windows.html 的最新版本)
我还安装了 Intellj
我 运行 为 IBAN 示例构建了 SBT,但是在将 JAR 上传到我的 Clusterd 之后,我无法获得 SQL 功能,并且功能注册正常工作。
CREATE FUNCTION ValidateIBAN AS 'com.ing.wbaa.spark.udf.ValidateIBAN' USING JAR 'spark_udf_assembly_0_2_0' --without extension
SELECT ValidateIBAN('NL20INGB0001234567')
错误总是
"Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF 'com.ing.wbaa.spark.udf.ValidateIBAN'; line 1 pos 7"
//import org.apache.spark.sql.types._ // include the Spark Types to define our schema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.udf
import java.time.temporal.ChronoUnit;
// Define function to calculate local time offset
def getTimestampDifference(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {
//https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
//https://spark.apache.org/docs/2.4.0/sql-reference.html
//https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement
interval match
{
case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
}
}
spark.udf.register("DATETIMEDIFF", udf(getTimestampDifference(_:java.lang.String, _:java.sql.Timestamp,_:java.sql.Timestamp),LongType))
实际上我需要的是 - 如何将 SCALA Notebook 转换为 SQL 函数以便我可以永久使用它 SQL 查看
Azure Databricks 集群版本 5.4(包括 Apache Spark 2.4.3、Scala 2.11)
- 实施什么Class
- 实现什么方法(在 c# 中重写)- 关于 HIVE 或 SPARK 也有不同的文章
- 如何设置 SBT Built 或任何其他方式在 Java 存档中编译它,这样我就可以成功地创建和 运行 SQL 函数(在 SQL 仅,不在 pyhton 代码中,也不在 scala 代码中 - 在 SQL Notebook 中)
感谢您的帮助
Spark 不会为您提供任何持续超过单个 spark 会话( 或 Databricks 术语中的集群生命周期)的永久功能。如果您需要 long-running spark 会话(仅 SQL 部分),您可以考虑将这些 UDF 添加到 Hive 并从 Spark 调用它们。否则(考虑瞬态集群)每次启动集群时都需要 re-add 它。
您的 UDF 的代码是 non-optimal:不处理 empty/null 值/它会抛出异常
对于基本(标准)spark UDF,请参阅https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html不需要真正的接口(与 Hive 不同)
关于:
SQL 函数(仅在 SQL 中)/ SBT:
如果您真的需要它(对于这个简单的用例)https://github.com/geoHeil/sparkSimpleProjectTemplate.g8 可以作为您的示例。
但对于此代码,不需要额外的依赖项。创建一个文本/Scala 文件就足够了,其中包含函数所需的 < 100 行代码。
然后可以使用 API 即通过 https://docs.databricks.com/user-guide/dev-tools/databricks-cli.html 和一些脚本在集群创建时调用此文件(Notebook?),因此表现得像永久的。
此外:
始终考虑使用 spark 本机(催化剂优化)功能。 常规 datediff 可能已经做了很多您的 datediff-big 需要完成的事情以及减去普通时间戳类型的列。
如果我通过简要浏览它而正确理解它,则只缺少将输出格式化为所需的粒度(即,将从 t-SQL 函数开箱即用)并且可以通过将其与不同的函数嵌套来完成喜欢:
- 年
- 天
- 周
- 或手动划分返回的差值
您引用的 Databricks 中的 CREATE FUNCTION 语句实际上是一个 Hive 命令,而不是 Spark,它期望 UDF class 是一个 Hive UDF。
这也是您收到 "No handler for UDF/UDAF/UDTF" 错误的原因。您链接的示例实现了 Spark UDF,而您需要实现的是 Hive UDF.
要创建一个 Hive UDF,您需要实现一个扩展 class org.apache.hadoop.hive.ql.exec.UDF 的 class 并实现一个名为 evaluate 的函数。在你的例子中,整个 class 应该是这样的:
class GetTimestampDifference extends UDF {
def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {
//https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
//https://spark.apache.org/docs/2.4.0/sql-reference.html
//https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement
interval match
{
case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
}
}
}
然后您需要将其编译为 JAR 文件,将其复制到 databricks 文件系统的某处并使用与之前相同的命令创建永久函数(假设您保留 IBAN 示例的命名空间):
CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'
SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))
假设您仍在修改开始时使用的 IBAN 示例项目,为了创建 jar 文件,您必须将以下包依赖项添加到 build.sbt 文件:
"org.apache.spark" %% "spark-hive" % "2.4.3"
也许这很愚蠢,我是一名 Microsoft SQL / C# 开发人员,之前从未真正使用过任何其他 IDE / 编写的 JAVA / SCALA。 我正在将一些 Azure SQL 查询迁移到 Azure Databricks 解决方案。
似乎没有等效于 TSQL DATEDIFF_BIG 函数 (https://docs.microsoft.com/en-us/sql/t-sql/functions/datediff-transact-sql?view=sql-server-2017)
您找到的解决方案是 - 编写您自己的 UDF。
我已经在 SCALA Notebook 中完成了(见下文)——这对于临时功能来说效果很好。 (https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html)
这是我找到的最有用的示例 https://github.com/johnmuller87/spark-udf。
有相当多的临时函数示例,但是 none 我找到了非 JAVA / SCALA 开发人员的永久函数。
我安装了 SBT(Windows - https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Windows.html 的最新版本) 我还安装了 Intellj
我 运行 为 IBAN 示例构建了 SBT,但是在将 JAR 上传到我的 Clusterd 之后,我无法获得 SQL 功能,并且功能注册正常工作。
CREATE FUNCTION ValidateIBAN AS 'com.ing.wbaa.spark.udf.ValidateIBAN' USING JAR 'spark_udf_assembly_0_2_0' --without extension
SELECT ValidateIBAN('NL20INGB0001234567')
错误总是 "Error in SQL statement: AnalysisException: No handler for UDF/UDAF/UDTF 'com.ing.wbaa.spark.udf.ValidateIBAN'; line 1 pos 7"
//import org.apache.spark.sql.types._ // include the Spark Types to define our schema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.udf
import java.time.temporal.ChronoUnit;
// Define function to calculate local time offset
def getTimestampDifference(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {
//https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
//https://spark.apache.org/docs/2.4.0/sql-reference.html
//https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement
interval match
{
case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
}
}
spark.udf.register("DATETIMEDIFF", udf(getTimestampDifference(_:java.lang.String, _:java.sql.Timestamp,_:java.sql.Timestamp),LongType))
实际上我需要的是 - 如何将 SCALA Notebook 转换为 SQL 函数以便我可以永久使用它 SQL 查看 Azure Databricks 集群版本 5.4(包括 Apache Spark 2.4.3、Scala 2.11)
- 实施什么Class
- 实现什么方法(在 c# 中重写)- 关于 HIVE 或 SPARK 也有不同的文章
- 如何设置 SBT Built 或任何其他方式在 Java 存档中编译它,这样我就可以成功地创建和 运行 SQL 函数(在 SQL 仅,不在 pyhton 代码中,也不在 scala 代码中 - 在 SQL Notebook 中)
感谢您的帮助
Spark 不会为您提供任何持续超过单个 spark 会话(
您的 UDF 的代码是 non-optimal:不处理 empty/null 值/它会抛出异常
对于基本(标准)spark UDF,请参阅https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs.html不需要真正的接口(与 Hive 不同)
关于: SQL 函数(仅在 SQL 中)/ SBT:
如果您真的需要它(对于这个简单的用例)https://github.com/geoHeil/sparkSimpleProjectTemplate.g8 可以作为您的示例。
但对于此代码,不需要额外的依赖项。创建一个文本/Scala 文件就足够了,其中包含函数所需的 < 100 行代码。 然后可以使用 API 即通过 https://docs.databricks.com/user-guide/dev-tools/databricks-cli.html 和一些脚本在集群创建时调用此文件(Notebook?),因此表现得像永久的。
此外:
始终考虑使用 spark 本机(催化剂优化)功能。
- 年
- 天
- 周
- 或手动划分返回的差值
您引用的 Databricks 中的 CREATE FUNCTION 语句实际上是一个 Hive 命令,而不是 Spark,它期望 UDF class 是一个 Hive UDF。
这也是您收到 "No handler for UDF/UDAF/UDTF" 错误的原因。您链接的示例实现了 Spark UDF,而您需要实现的是 Hive UDF.
要创建一个 Hive UDF,您需要实现一个扩展 class org.apache.hadoop.hive.ql.exec.UDF 的 class 并实现一个名为 evaluate 的函数。在你的例子中,整个 class 应该是这样的:
class GetTimestampDifference extends UDF {
def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {
//https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
//https://spark.apache.org/docs/2.4.0/sql-reference.html
//https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement
interval match
{
case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
}
}
}
然后您需要将其编译为 JAR 文件,将其复制到 databricks 文件系统的某处并使用与之前相同的命令创建永久函数(假设您保留 IBAN 示例的命名空间):
CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'
SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))
假设您仍在修改开始时使用的 IBAN 示例项目,为了创建 jar 文件,您必须将以下包依赖项添加到 build.sbt 文件:
"org.apache.spark" %% "spark-hive" % "2.4.3"