如何使用 Spark SQL 获取 MapType 的 ArrayType 的最大值?

How do I get the maximum of an ArrayType of MapTypes using Spark SQL?

我有以下 Spark DataFrame:

df = sql.createDataFrame([
        (1, [
                {'name': 'john', 'score': '0.8'},
                {'name': 'johnson', 'score': '0.9'},
            ]),
        (2, [
                {'name': 'jane', 'score': '0.9'},
                {'name': 'janine', 'score': '0.4'},
            ]),
        (3, [
                {'name': 'sarah', 'score': '0.2'},
                {'name': 'sara', 'score': '0.9'},
            ]),
    ], schema=['id', 'names'])

Spark 正确推断模式:

root
 |-- id: long (nullable = true)
 |-- names: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

对于每一行,我想 select 得分最高的名字。我可以使用 Python UDF 执行此操作,如下所示:

import pyspark.sql.types as T
import pyspark.sql.functions as F

def top_name(names):
    return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name']

top_name_udf = F.udf(top_name, T.StringType())

df.withColumn('top_name', top_name_udf('names')) \
    .select('id', 'top_name') \
    .show(truncate=False)

如你所愿,你得到:

+---+--------+
|id |top_name|
+---+--------+
|1  |johnson |
|2  |jane    |
|3  |sara    |
+---+--------+

如何使用 Spark SQL 执行此操作?是否可以在没有 Python UDF 的情况下 执行此操作,以便数据不会在 Python 和 Java 之间序列化? 1


1 不幸的是,我是 运行 Spark 1.5,无法在 Spark 2.1 中使用 registerJavaFunction

使用 sqlContext.registerFunction 方法将您的函数(不是 udf)注册到 sql。同时将您的 df 注册为 sql table.

sqlContext.registerDataFrameAsTable(df, "names_df")

sqlContext.registerFunction("top_name", top_name,T.StringType())

sqlContext.sql("SELECT top_name(names) as top_name from names_df").collect()

> [Row(top_name=u'johnson'), Row(top_name=u'jane'), Row(top_name=u'sara')]