使用 pyspark 在数据框的模式中搜索

search in schema of a dataframe using pyspark

我有一组数据帧 dfs,具有不同的架构,例如:

      root
       |-- A_id: string (nullable = true)
       |-- b_cd: string (nullable = true)
       |-- c_id: integer (nullable = true)
       |-- d_info: struct (nullable = true)
       |    |-- eid: string (nullable = true)
       |    |-- oid: string (nullable = true)
       |-- l: array (nullable = true)
       |    |-- m: struct (containsNull = true)
       |    |    |-- n: string (nullable = true)
       |    |    |-- o: string (nullable = true)
        ..........

我想检查,例如,"oid" 是否在其中一列中给出(这里在 d_info 列下)。我如何在架构中搜索一组数据框并区分它们。 Pyspark 或 Scala 的建议都有帮助。谢谢

可以使用递归函数为 DataFame StructType(包括嵌套的 StructType)创建 [节点,根到节点路径] 的 dictionary/map。

val df = spark.read.json("nested_data.json")
val path = searchSchema(df.schema, "n", "root")

def searchSchema(schema: StructType, key: String, path: String): String = {
  val paths = scala.collection.mutable.Map[String, String]()
  addPaths(schema, path, paths)
  paths(key)
}

def addPaths(schema: StructType, path: String, paths: scala.collection.mutable.Map[String, String]): Unit = {
for (field <- schema.fields) {
  val _path = s"$path.${field.name}"
  paths += (field.name -> _path)
  field.dataType match {
    case structType: StructType => addPaths(structType, _path, paths)
    case arrayType: ArrayType => addPaths(arrayType.elementType.asInstanceOf[StructType], _path, paths)
    case _ => //donothing
  }
 }
}

输入输出

Input = {"A_id":"A_id","b_cd":"b_cd","c_id":1,"d_info":{"eid":"eid","oid":"oid"},"l":[{"m":{"n":"n1","o":"01"}},{"m":{"n":"n2","o":"02"}}]}

Output = Map(n -> root.l.m.n, b_cd -> root.b_cd, d_info -> root.d_info, m -> root.l.m, oid -> root.d_info.oid, c_id -> root.c_id, l -> root.l, o -> root.l.m.o, eid -> root.d_info.eid, A_id -> root.A_id)