将自定义函数的输出从默认的 StringType 转换为 pyspark 中的 mapType
Convert the output of a custom function from the default StringType to mapType in pyspark
我是 运行 一个嵌套的 pyspark SQL 查询。在子查询中,我使用自定义函数 return 父查询将使用 explode 的字典来扩展该字典结果。
问题是尽管我 return 字典,但主查询会将此列类型视为 stringType 并且 explode 不会工作。
def ff(k,vList):return dict([(k+v,v) for v in vList])
df2 = sqlContext.createDataFrame([Row(a=1, b=1),Row(a=1, b=2)])
df2.registerTempTable("ttt2")
sqlContext.registerFunction('ff',ff)
spark.sql("select a ,(bb) from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)").show()
+---+----------+
| a| bb|
+---+----------+
| 1|{2=1, 3=2}|
+---+----------+
但是当我在主查询中使用 explode 时
spark.sql("select a ,explode(bb) from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)").show()
AnalysisException: u"cannot resolve 'explode(__auto_generated_subquery_name.`bb`)' due to data type mismatch: input to function explode should be array or map type, not string; line 1 pos 10;\n'Project [a#178L, unresolvedalias(explode(bb#294), None)]\n+- SubqueryAlias __auto_generated_subquery_name\n +- Aggregate [a#178L], [a#178L, ff(a#178L, collect_list(b#179L, 0, 0)) AS bb#294]\n +- SubqueryAlias ttt2\n +- LogicalRDD [a#178L, b#179L, mapfield#180], false\n"
如何将函数的输出转换为 mapType 或 ArrayType?
您需要为用户定义的函数指定 return 类型。默认情况下,registerFunction()
会将 return 类型设置为 string
。如果
你输入 help(sqlContext.registerFunction)
,你会看到:
registerFunction(self, name, f, returnType=StringType)
...
In addition to a name and the function itself, the return type can be optionally specified.
When the return type is not given it default to a string and conversion will automatically
be done. For any other return type, the produced object must match the specified type.
对于您的情况,您需要执行以下操作:
from pyspark.sql.types import *
sqlContext.registerFunction('ff',ff,returnType=MapType(StringType(),IntegerType()))
spark.sql(
"select a,bb from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)"
).show()
#+---+-------------------+
#| a| bb|
#+---+-------------------+
#| 1|Map(2 -> 1, 3 -> 2)|
#+---+-------------------+
spark.sql(
"select a,explode(bb) from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)"
).show()
#+---+---+-----+
#| a|key|value|
#+---+---+-----+
#| 1| 2| 1|
#| 1| 3| 2|
#+---+---+-----+
这里我使用 MapType(StringType(), IntegerType())
来指定它是一个字符串(键)到整数(值)的映射。你可能想根据你的实际数据修改这些。
我是 运行 一个嵌套的 pyspark SQL 查询。在子查询中,我使用自定义函数 return 父查询将使用 explode 的字典来扩展该字典结果。
问题是尽管我 return 字典,但主查询会将此列类型视为 stringType 并且 explode 不会工作。
def ff(k,vList):return dict([(k+v,v) for v in vList])
df2 = sqlContext.createDataFrame([Row(a=1, b=1),Row(a=1, b=2)])
df2.registerTempTable("ttt2")
sqlContext.registerFunction('ff',ff)
spark.sql("select a ,(bb) from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)").show()
+---+----------+
| a| bb|
+---+----------+
| 1|{2=1, 3=2}|
+---+----------+
但是当我在主查询中使用 explode 时
spark.sql("select a ,explode(bb) from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)").show()
AnalysisException: u"cannot resolve 'explode(__auto_generated_subquery_name.`bb`)' due to data type mismatch: input to function explode should be array or map type, not string; line 1 pos 10;\n'Project [a#178L, unresolvedalias(explode(bb#294), None)]\n+- SubqueryAlias __auto_generated_subquery_name\n +- Aggregate [a#178L], [a#178L, ff(a#178L, collect_list(b#179L, 0, 0)) AS bb#294]\n +- SubqueryAlias ttt2\n +- LogicalRDD [a#178L, b#179L, mapfield#180], false\n"
如何将函数的输出转换为 mapType 或 ArrayType?
您需要为用户定义的函数指定 return 类型。默认情况下,registerFunction()
会将 return 类型设置为 string
。如果
你输入 help(sqlContext.registerFunction)
,你会看到:
registerFunction(self, name, f, returnType=StringType)
...
In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type.
对于您的情况,您需要执行以下操作:
from pyspark.sql.types import *
sqlContext.registerFunction('ff',ff,returnType=MapType(StringType(),IntegerType()))
spark.sql(
"select a,bb from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)"
).show()
#+---+-------------------+
#| a| bb|
#+---+-------------------+
#| 1|Map(2 -> 1, 3 -> 2)|
#+---+-------------------+
spark.sql(
"select a,explode(bb) from (select a,ff(a,collect_list(b)) as bb from ttt2 group by a)"
).show()
#+---+---+-----+
#| a|key|value|
#+---+---+-----+
#| 1| 2| 1|
#| 1| 3| 2|
#+---+---+-----+
这里我使用 MapType(StringType(), IntegerType())
来指定它是一个字符串(键)到整数(值)的映射。你可能想根据你的实际数据修改这些。