如何过滤 Spark Dataframe 的 MapType 字段?
How to filter MapType field of a Spark Dataframe?
我有一个 Spark Dataframe,其中一个字段是 MapType ....我可以获取 maptype 字段的任何键的数据,但是当我为特定应用过滤器时无法执行此操作特定键的值...
val line = List (("Sanjay", Map("one" -> 1, "two" -> 2)), ("Taru", Map("one" -> 10, "two" -> 20)) )
我创建了上述列表的 RDD 和 DF,并且正在尝试在 DF 中获取,映射值如果 >= 5 .....但是我在 Spark Repl 中遇到以下异常。请帮助
val rowrddDFFinal = rowrddDF.select(rowrddDF("data.one").alias("data")).filter(rowrddDF("data.one").geq(5))
org.apache.spark.sql.AnalysisException: resolved attribute(s) data#1
missin
//| g from data#3 in operator !Filter (data#1[one] AS one#4 >= 5);
//| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalys
//| is(CheckAnalysis.scala:38)
//| at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer
//| .scala:42)
//| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAn
//| alysis.apply(CheckAnalysis.scala:121)
//| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAn
//| alysis.apply(CheckAnalysis.scala:50)
//| at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala
//| :98)
//| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnaly
//| sis(CheckAnalysis.scala:50)
//| at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyze
//| r.scala:42)
//| at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLCont
//| ext.scala:931)
要访问 Array
或 Map
列中的值,您可以使用 Column.getItem
方法:
rowrddDF
.where($"data".getItem("one").geq(5))
.select($"data".getItem("one").alias("data"))
如果您在 select
之后更喜欢 filter
,则不能再使用 rowrddDF.apply
。相反,您应该直接访问别名列:
df
.select($"data".getItem("one").alias("data"))
.filter($"data".geq(5))
我有一个 Spark Dataframe,其中一个字段是 MapType ....我可以获取 maptype 字段的任何键的数据,但是当我为特定应用过滤器时无法执行此操作特定键的值...
val line = List (("Sanjay", Map("one" -> 1, "two" -> 2)), ("Taru", Map("one" -> 10, "two" -> 20)) )
我创建了上述列表的 RDD 和 DF,并且正在尝试在 DF 中获取,映射值如果 >= 5 .....但是我在 Spark Repl 中遇到以下异常。请帮助
val rowrddDFFinal = rowrddDF.select(rowrddDF("data.one").alias("data")).filter(rowrddDF("data.one").geq(5))
org.apache.spark.sql.AnalysisException: resolved attribute(s) data#1 missin //| g from data#3 in operator !Filter (data#1[one] AS one#4 >= 5); //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalys //| is(CheckAnalysis.scala:38) //| at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer //| .scala:42) //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAn //| alysis.apply(CheckAnalysis.scala:121) //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAn //| alysis.apply(CheckAnalysis.scala:50) //| at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala //| :98) //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnaly //| sis(CheckAnalysis.scala:50) //| at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyze //| r.scala:42) //| at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLCont //| ext.scala:931)
要访问 Array
或 Map
列中的值,您可以使用 Column.getItem
方法:
rowrddDF
.where($"data".getItem("one").geq(5))
.select($"data".getItem("one").alias("data"))
如果您在 select
之后更喜欢 filter
,则不能再使用 rowrddDF.apply
。相反,您应该直接访问别名列:
df
.select($"data".getItem("one").alias("data"))
.filter($"data".geq(5))