Azure Databricks Scala:如何替换遵循相应层次结构的行
Azure Databricks Scala : How to replace rows following a respective hirarchy
考虑以下数据集:
我想获得
如您所见,基本思路是遵循列 ACTUAL_ID 指示的路径,直到它为空(如果它还没有)
我尝试在传递完整初始数据帧的地方使用 udf,递归会找到我想要的,但似乎无法将数据帧传递给 UDF。我也研究过替换一行的值,但似乎不可能。
我最近的尝试:
def calculateLatestImdate(df: DataFrame, lookupId: String) : String = {
var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
if (foundId == "" || foundId == null)
{
lookupId
}
else
{
calculateLatestImdate(df, foundId);
}
}
val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => {
calculateLatestImdate(df,s)
})
val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")
val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))
相信我已经找到了问题的答案。
def calculateLatestId(df: DataFrame) : DataFrame = {
var joinedDf = df.as("df1").join(df.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "outer").withColumn("FINAL_ID", when($"df2.ID".isNull, $"df1.ID").when($"df2.ACTUAL_ID".isNotNull, $"df2.ACTUAL_ID").otherwise($"df2.ID")).select($"df1.*", $"FINAL_ID").filter($"df1.ID".isNotNull)
val differentIds = joinedDf.filter($"df1.ACTUAL_ID" =!= $"FINAL_ID")
joinedDf = joinedDf.withColumn("ACTUAL_ID", $"FINAL_ID").drop($"FINAL_ID")
if(differentIds.count > 0)
{
calculateLatestId(joinedDf)
}
else
{
joinedDf = joinedDf.as("df1").join(joinedDf.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "inner").select($"df1.ID", $"df2.*").drop($"df2.ID")
joinedDf
}
}
我相信可以通过某种方式提高性能,可能是通过减少每次迭代后的行数并在最后进行某种连接 + 清理。
对我来说,这看起来有点像图形问题,所以我使用 Scala 和 graphframes 得出了答案。它利用了图框的 connectedComponents 算法和 outDegrees
方法。根据您的示例数据,我假设每棵树的末端都是唯一的,但需要检查此假设。我很想知道更多数据的性能如何,但让我知道您对解决方案的看法。
完整脚本:
// NB graphframes had to be installed separately with the right Scala version
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
// Create the test data
// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")
// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
(2, 3, "is linked to"),
(3, 6, "is linked to"),
(4, 5, "is linked to")
)).toDF("src", "dst", "relationship")
// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)
// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)
// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
.select("component", "data")
.where("outDegree is null")
endOfTree.show()
components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
.select($"c.id", $"c.component", $"t.data")
.orderBy("id")
.show()
我的结果:
如果您的数据已经在数据框中,只需 select
和 where
过滤器即可轻松从原始数据框生成边缘数据框,例如
// Create the GraphFrame from the dataframe
val v2 = df
val e2 = df
.select("id", "actual_id")
.withColumn("rel", lit("is linked to"))
.where("actual_id > 0")
.toDF("src", "dst", "rel")
val g2 = GraphFrame(v2, e2)
print(g2)
g2.vertices.show()
g2.edges.show()
考虑以下数据集:
我想获得
如您所见,基本思路是遵循列 ACTUAL_ID 指示的路径,直到它为空(如果它还没有)
我尝试在传递完整初始数据帧的地方使用 udf,递归会找到我想要的,但似乎无法将数据帧传递给 UDF。我也研究过替换一行的值,但似乎不可能。
我最近的尝试:
def calculateLatestImdate(df: DataFrame, lookupId: String) : String = {
var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
if (foundId == "" || foundId == null)
{
lookupId
}
else
{
calculateLatestImdate(df, foundId);
}
}
val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => {
calculateLatestImdate(df,s)
})
val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")
val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))
相信我已经找到了问题的答案。
def calculateLatestId(df: DataFrame) : DataFrame = {
var joinedDf = df.as("df1").join(df.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "outer").withColumn("FINAL_ID", when($"df2.ID".isNull, $"df1.ID").when($"df2.ACTUAL_ID".isNotNull, $"df2.ACTUAL_ID").otherwise($"df2.ID")).select($"df1.*", $"FINAL_ID").filter($"df1.ID".isNotNull)
val differentIds = joinedDf.filter($"df1.ACTUAL_ID" =!= $"FINAL_ID")
joinedDf = joinedDf.withColumn("ACTUAL_ID", $"FINAL_ID").drop($"FINAL_ID")
if(differentIds.count > 0)
{
calculateLatestId(joinedDf)
}
else
{
joinedDf = joinedDf.as("df1").join(joinedDf.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "inner").select($"df1.ID", $"df2.*").drop($"df2.ID")
joinedDf
}
}
我相信可以通过某种方式提高性能,可能是通过减少每次迭代后的行数并在最后进行某种连接 + 清理。
对我来说,这看起来有点像图形问题,所以我使用 Scala 和 graphframes 得出了答案。它利用了图框的 connectedComponents 算法和 outDegrees
方法。根据您的示例数据,我假设每棵树的末端都是唯一的,但需要检查此假设。我很想知道更多数据的性能如何,但让我知道您对解决方案的看法。
完整脚本:
// NB graphframes had to be installed separately with the right Scala version
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
// Create the test data
// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")
// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
(2, 3, "is linked to"),
(3, 6, "is linked to"),
(4, 5, "is linked to")
)).toDF("src", "dst", "relationship")
// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)
// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)
// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
.select("component", "data")
.where("outDegree is null")
endOfTree.show()
components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
.select($"c.id", $"c.component", $"t.data")
.orderBy("id")
.show()
我的结果:
如果您的数据已经在数据框中,只需 select
和 where
过滤器即可轻松从原始数据框生成边缘数据框,例如
// Create the GraphFrame from the dataframe
val v2 = df
val e2 = df
.select("id", "actual_id")
.withColumn("rel", lit("is linked to"))
.where("actual_id > 0")
.toDF("src", "dst", "rel")
val g2 = GraphFrame(v2, e2)
print(g2)
g2.vertices.show()
g2.edges.show()