Azure Databricks Scala:如何替换遵循相应层次结构的行

Azure Databricks Scala : How to replace rows following a respective hirarchy

考虑以下数据集:

我想获得

如您所见,基本思路是遵循列 ACTUAL_ID 指示的路径,直到它为空(如果它还没有)

我尝试在传递完整初始数据帧的地方使用 udf,递归会找到我想要的,但似乎无法将数据帧传递给 UDF。我也研究过替换一行的值,但似乎不可能。

我最近的尝试:

def calculateLatestImdate(df: DataFrame, lookupId: String) : String = {
  var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
  if (foundId == "" || foundId == null)
  {
    lookupId
  }
  else
  {
    calculateLatestImdate(df, foundId);
  }
}

val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => {
  calculateLatestImdate(df,s)
})

val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")

val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))

相信我已经找到了问题的答案。

def calculateLatestId(df: DataFrame) : DataFrame = {
  var joinedDf = df.as("df1").join(df.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "outer").withColumn("FINAL_ID", when($"df2.ID".isNull, $"df1.ID").when($"df2.ACTUAL_ID".isNotNull, $"df2.ACTUAL_ID").otherwise($"df2.ID")).select($"df1.*", $"FINAL_ID").filter($"df1.ID".isNotNull)

  val differentIds = joinedDf.filter($"df1.ACTUAL_ID" =!= $"FINAL_ID")

  joinedDf = joinedDf.withColumn("ACTUAL_ID", $"FINAL_ID").drop($"FINAL_ID")
  
  if(differentIds.count > 0)
  {
    calculateLatestId(joinedDf)
  }
  else
  {
    joinedDf = joinedDf.as("df1").join(joinedDf.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "inner").select($"df1.ID", $"df2.*").drop($"df2.ID")
    joinedDf
  }
}

我相信可以通过某种方式提高性能,可能是通过减少每次迭代后的行数并在最后进行某种连接 + 清理。

对我来说,这看起来有点像图形问题,所以我使用 Scala 和 graphframes 得出了答案。它利用了图框的 connectedComponents 算法和 outDegrees 方法。根据您的示例数据,我假设每棵树的末端都是唯一的,但需要检查此假设。我很想知道更多数据的性能如何,但让我知道您对解决方案的看法。

完整脚本:

// NB graphframes had to be installed separately with the right Scala version 
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


// Create the test data

// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
  ( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
    ( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")

// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
  (2, 3, "is linked to"),
  (3, 6, "is linked to"),
  (4, 5, "is linked to")
)).toDF("src", "dst", "relationship")


// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)


// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")

val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)




// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
  .select("component", "data")
  .where("outDegree is null")

endOfTree.show()


components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
  .select($"c.id", $"c.component", $"t.data")
  .orderBy("id")
  .show()

我的结果:

如果您的数据已经在数据框中,只需 selectwhere 过滤器即可轻松从原始数据框生成边缘数据框,例如

// Create the GraphFrame from the dataframe
val v2 = df

val e2 = df
  .select("id", "actual_id")
  .withColumn("rel", lit("is linked to"))
  .where("actual_id > 0")
  .toDF("src", "dst", "rel")

val g2 = GraphFrame(v2, e2)
print(g2)

g2.vertices.show()
g2.edges.show()