如何查询spark streaming中的特殊列

how to query the special columns in spark streaming

我在 spark streaming 中从 kafka 得到了一个 json,:

{"name":"name","value1":"value1"}
{"name":"name","value1":"value1","value2":"value2"}

读取并获取其架构:

     val df = spark.read.json(rdd.map(_._2))
     df.printSchema() //shows
      //--name
      //--values1
      // --values2
    df.createOrReplaceTempView("df")
    spark.sql("select name,values2 from df")

但它会输出:

  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:77)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$$anonfun$apply.applyOrElse(CheckAnalysis.scala:74)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:310)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp.apply(TreeNode.scala:310)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp(QueryPlan.scala:282)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform(QueryPlan.scala:292)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform.apply(QueryPlan.scala:296)
  at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

顺便说一句,它在 spark 中正常,但在流式传输中失败。 有人知道吗?

看来,您的代码在语法上不正确,因此,Catalyst 优化器,在分析 您的查询时,无法使用 Catalog.

解析所有引用

.json() 方法采用 pathRDD[String];不是表达式。

对您的代码稍作更改,我想会解决此错误。请在下面找到。

1.使用 Spark Sql AST

您似乎在使用 AST(因为您将其注册为 TempView)。将您的代码 spark.read.json("select name,values2 from df") 更改为以下内容。

    spark.sql("select name,values2 from df")

2。使用 Spark Sql DSL

无需创建 TempView,您也可以实现相同的目的(我更喜欢这个,因为 DSL 中的代码本质上更清晰)。

 df.select("name","values2")

在这种情况下,只需跳过代码 df.createOrReplaceTempView("df"),只需调用上述代码即可。

希望,这有帮助。