如何查询spark streaming中的特殊列
how to query the special columns in spark streaming
我在 spark streaming 中从 kafka 得到了一个 json,:
{"name":"name","value1":"value1"}
{"name":"name","value1":"value1","value2":"value2"}
读取并获取其架构:
val df = spark.read.json(rdd.map(_._2))
df.printSchema() //shows
//--name
//--values1
// --values2
df.createOrReplaceTempView("df")
spark.sql("select name,values2 from df")
但它会输出:
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:77)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:310)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:310)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp(QueryPlan.scala:282)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:292)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:296)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
顺便说一句,它在 spark 中正常,但在流式传输中失败。
有人知道吗?
看来,您的代码在语法上不正确,因此,Catalyst 优化器,在分析 您的查询时,无法使用 Catalog.
解析所有引用
.json()
方法采用 path 或 RDD[String];不是表达式。
对您的代码稍作更改,我想会解决此错误。请在下面找到。
1.使用 Spark Sql AST
您似乎在使用 AST(因为您将其注册为 TempView)。将您的代码 spark.read.json("select name,values2 from df") 更改为以下内容。
spark.sql("select name,values2 from df")
2。使用 Spark Sql DSL
无需创建 TempView,您也可以实现相同的目的(我更喜欢这个,因为 DSL 中的代码本质上更清晰)。
df.select("name","values2")
在这种情况下,只需跳过代码 df.createOrReplaceTempView("df")
,只需调用上述代码即可。
希望,这有帮助。
我在 spark streaming 中从 kafka 得到了一个 json,:
{"name":"name","value1":"value1"}
{"name":"name","value1":"value1","value2":"value2"}
读取并获取其架构:
val df = spark.read.json(rdd.map(_._2))
df.printSchema() //shows
//--name
//--values1
// --values2
df.createOrReplaceTempView("df")
spark.sql("select name,values2 from df")
但它会输出:
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:310) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:310) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp(QueryPlan.scala:282) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:292) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:296) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
顺便说一句,它在 spark 中正常,但在流式传输中失败。 有人知道吗?
看来,您的代码在语法上不正确,因此,Catalyst 优化器,在分析 您的查询时,无法使用 Catalog.
解析所有引用.json()
方法采用 path 或 RDD[String];不是表达式。
对您的代码稍作更改,我想会解决此错误。请在下面找到。
1.使用 Spark Sql AST
您似乎在使用 AST(因为您将其注册为 TempView)。将您的代码 spark.read.json("select name,values2 from df") 更改为以下内容。
spark.sql("select name,values2 from df")
2。使用 Spark Sql DSL
无需创建 TempView,您也可以实现相同的目的(我更喜欢这个,因为 DSL 中的代码本质上更清晰)。
df.select("name","values2")
在这种情况下,只需跳过代码 df.createOrReplaceTempView("df")
,只需调用上述代码即可。
希望,这有帮助。