如何使用 Spark SQL 获取 MapType 的 ArrayType 的最大值?
How do I get the maximum of an ArrayType of MapTypes using Spark SQL?
我有以下 Spark DataFrame:
df = sql.createDataFrame([
(1, [
{'name': 'john', 'score': '0.8'},
{'name': 'johnson', 'score': '0.9'},
]),
(2, [
{'name': 'jane', 'score': '0.9'},
{'name': 'janine', 'score': '0.4'},
]),
(3, [
{'name': 'sarah', 'score': '0.2'},
{'name': 'sara', 'score': '0.9'},
]),
], schema=['id', 'names'])
Spark 正确推断模式:
root
|-- id: long (nullable = true)
|-- names: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
对于每一行,我想 select 得分最高的名字。我可以使用 Python UDF 执行此操作,如下所示:
import pyspark.sql.types as T
import pyspark.sql.functions as F
def top_name(names):
return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name']
top_name_udf = F.udf(top_name, T.StringType())
df.withColumn('top_name', top_name_udf('names')) \
.select('id', 'top_name') \
.show(truncate=False)
如你所愿,你得到:
+---+--------+
|id |top_name|
+---+--------+
|1 |johnson |
|2 |jane |
|3 |sara |
+---+--------+
如何使用 Spark SQL 执行此操作?是否可以在没有 Python UDF 的情况下 执行此操作,以便数据不会在 Python 和 Java 之间序列化? 1
1 不幸的是,我是 运行 Spark 1.5,无法在 Spark 2.1 中使用 registerJavaFunction
。
使用 sqlContext.registerFunction
方法将您的函数(不是 udf)注册到 sql。同时将您的 df 注册为 sql table.
sqlContext.registerDataFrameAsTable(df, "names_df")
sqlContext.registerFunction("top_name", top_name,T.StringType())
sqlContext.sql("SELECT top_name(names) as top_name from names_df").collect()
> [Row(top_name=u'johnson'), Row(top_name=u'jane'), Row(top_name=u'sara')]
我有以下 Spark DataFrame:
df = sql.createDataFrame([
(1, [
{'name': 'john', 'score': '0.8'},
{'name': 'johnson', 'score': '0.9'},
]),
(2, [
{'name': 'jane', 'score': '0.9'},
{'name': 'janine', 'score': '0.4'},
]),
(3, [
{'name': 'sarah', 'score': '0.2'},
{'name': 'sara', 'score': '0.9'},
]),
], schema=['id', 'names'])
Spark 正确推断模式:
root
|-- id: long (nullable = true)
|-- names: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
对于每一行,我想 select 得分最高的名字。我可以使用 Python UDF 执行此操作,如下所示:
import pyspark.sql.types as T
import pyspark.sql.functions as F
def top_name(names):
return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name']
top_name_udf = F.udf(top_name, T.StringType())
df.withColumn('top_name', top_name_udf('names')) \
.select('id', 'top_name') \
.show(truncate=False)
如你所愿,你得到:
+---+--------+
|id |top_name|
+---+--------+
|1 |johnson |
|2 |jane |
|3 |sara |
+---+--------+
如何使用 Spark SQL 执行此操作?是否可以在没有 Python UDF 的情况下 执行此操作,以便数据不会在 Python 和 Java 之间序列化? 1
1 不幸的是,我是 运行 Spark 1.5,无法在 Spark 2.1 中使用 registerJavaFunction
。
使用 sqlContext.registerFunction
方法将您的函数(不是 udf)注册到 sql。同时将您的 df 注册为 sql table.
sqlContext.registerDataFrameAsTable(df, "names_df")
sqlContext.registerFunction("top_name", top_name,T.StringType())
sqlContext.sql("SELECT top_name(names) as top_name from names_df").collect()
> [Row(top_name=u'johnson'), Row(top_name=u'jane'), Row(top_name=u'sara')]