SCALA - 如何在结构中调用函数?

SCALA - How could a function be called in a struct?

我构建了一个数据框,它有一个字符串列,它实际上是一个结构,变成了一个 JSON 字符串。

val df = df_helper.select(
                     lit("some data").as("id"),
                     to_json(
                          struct(
                              col("id"),
                              col("type"),
                              col("path")
                          )
                     )).as("content")

我还构建了一个函数,它将标识符 id:String 作为参数并吐出一个字符串列表。

def buildHierarchy(id:String) : List[String] = {

  val check_df = hierarchy_df.select(
    $"parent_id",
    $"id"
  ).where($"id" === id)

  val pathArray = List(id)
  val parentString = check_df.select($"parent_id").first.getString(0)

  if (parentString == null) {
    return pathArray
  }
  else {
    val pathList = buildHierarchy(parentString)
    val finalList: List[String] = pathList ++ pathArray
    return finalList
  }
}

我想调用此函数并将 path 列替换为函数的结果。这可能吗,或者有解决方法吗?

提前致谢!

在以下博客的帮助下 post 我建立了层次结构:

https://www.qubole.com/blog/processing-hierarchical-data-using-spark-graphx-pregel-api/

我在最终数据框中包含了所有必要的信息,我需要为结构的每个元素建立详细信息,以及每个元素的层次结构路径。然后作为一个新专栏,我为每个元素创建了一个包含详细信息的结构。

val content_df = hierarchy_df.select($"path")
                                    .withColumn("content", 
                                           struct( col("id"),
                                                   col("type"),
                                                   col("path")
                                            ))

我分解了路径以到达其中的每个标识符,但保留了位置顺序,以加入路径中每个级别的内容。

val exploded_df = content_df.select(
                         $"*",posexplode($"path"), 
                         $"pos".as("exploded_level"),
                         $"col".as("exploded_id"))

然后最后,将内容与路径连接起来,将内容聚合成一个路径,其中包含每个级别的所有内容。

val level_content_df = exploded_df.as("e")
           .join(content_df.as("ouc"), col("e.col") === col("ouc.id"), "left")
                      .select($"e.*", $"ouc.content".as("content"))

val pathFull_df = level_content_df
            .groupBy(col("id").as("id"))
            .agg(sort_array(collect_list("content")).as("pathFull"))

最后我又把所有的东西放在一起了:)

val content_with_path_df = content_df.as("ou")
              .join(pathFull_df.as("opf"), col("ou.id") === col("opf.id"), "left")
              .select($"ou.*", $"opf.pathFull".as("pathFull"))

如果有什么不明白的地方,请随时联系我,我花了一段时间才弄明白! :D