Apache Spark 中的递归方法调用
Recursive method call in Apache Spark
我正在从 Apache Spark 上的数据库构建家谱,使用递归搜索为数据库中的每个人找到最终 parent(即位于家谱顶部的人) .
假设搜索id时返回的第一个人是正确的parent
val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personSeq = peopleById.lookup(personId)
val person = personSeq(0)
if(person.personId == "0 "|| person.id == person.parentId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
出现以下错误
"Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063."
我通过阅读其他类似的问题了解到,问题是我从 foreach 循环中调用 findUltimateParentId
,如果我从 shell 中调用带有个人 ID 的方法, 它 returns 正确的终极 parent id
但是,none 其他建议的解决方案对我有用,或者至少我看不到如何在我的程序中实现它们,有人可以帮忙吗?
使用 SparkContext.broadcast 解决了这个问题:
val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personOption = broadcastedPeople.value.get(personId)
if(personOption.isEmpty) {
return "0";
}
val person = personOption.get
if(person.personId == 0 || person.orgId == person.personId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
现在工作得很好!
如果我对你的理解是正确的——这是一个适用于任何大小输入的解决方案(尽管性能可能不是很好)——它在 RDD 上执行 N 次迭代,其中 N 是 "deepest family"(最大距离从祖先到 child) 输入:
// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])
// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}
object RecursiveParentLookup {
// requested method
def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {
// all persons keyed by id
def byId = rdd.keyBy(_.id).cache()
// recursive function that "climbs" one generation at each iteration
def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
val cached = persons.cache()
// find which persons can climb further up family tree
val haveGrandparents = cached.filter(_.hasGrandparent)
if (haveGrandparents.isEmpty()) {
cached // we're done, return result
} else {
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
// for those who can - join with persons to find the grandparent and attach it instead of parent
val withGrandparents = haveGrandparents
.keyBy(_.ancestor.get.parentId.get) // grandparent id
.join(byId)
.values
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
// call this method recursively on the result
done ++ climbOneGeneration(withGrandparents)
}
}
// call recursive method - start by assuming each person is its own parent, if it has one:
climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
}
}
下面是一个可以更好地理解其工作原理的测试:
/**
* Example input tree:
*
* 1 5
* | |
* ----- 2 ----- 6
* | |
* 3 4
*
*/
val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))
test("find ultimate parent") {
val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
val result = RecursiveParentLookup.findUltimateParent(input).collect()
result should contain theSameElementsAs Seq(
WithAncestor(person1, None),
WithAncestor(person2, Some(person1)),
WithAncestor(person3, Some(person1)),
WithAncestor(person4, Some(person1)),
WithAncestor(person5, None),
WithAncestor(person6, Some(person5))
)
}
将您的输入映射到这些 Person
objects 并将输出 WithAncestor
objects 映射到您需要的任何内容应该很容易。请注意,此代码假定如果任何人具有 parentId X - 输入中实际存在具有该 ID 的另一个人
我正在从 Apache Spark 上的数据库构建家谱,使用递归搜索为数据库中的每个人找到最终 parent(即位于家谱顶部的人) .
假设搜索id时返回的第一个人是正确的parent
val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personSeq = peopleById.lookup(personId)
val person = personSeq(0)
if(person.personId == "0 "|| person.id == person.parentId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
出现以下错误
"Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example,
rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be performed inside of therdd1.map
transformation. For more information, see SPARK-5063."
我通过阅读其他类似的问题了解到,问题是我从 foreach 循环中调用 findUltimateParentId
,如果我从 shell 中调用带有个人 ID 的方法, 它 returns 正确的终极 parent id
但是,none 其他建议的解决方案对我有用,或者至少我看不到如何在我的程序中实现它们,有人可以帮忙吗?
使用 SparkContext.broadcast 解决了这个问题:
val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personOption = broadcastedPeople.value.get(personId)
if(personOption.isEmpty) {
return "0";
}
val person = personOption.get
if(person.personId == 0 || person.orgId == person.personId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
现在工作得很好!
如果我对你的理解是正确的——这是一个适用于任何大小输入的解决方案(尽管性能可能不是很好)——它在 RDD 上执行 N 次迭代,其中 N 是 "deepest family"(最大距离从祖先到 child) 输入:
// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])
// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}
object RecursiveParentLookup {
// requested method
def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {
// all persons keyed by id
def byId = rdd.keyBy(_.id).cache()
// recursive function that "climbs" one generation at each iteration
def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
val cached = persons.cache()
// find which persons can climb further up family tree
val haveGrandparents = cached.filter(_.hasGrandparent)
if (haveGrandparents.isEmpty()) {
cached // we're done, return result
} else {
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
// for those who can - join with persons to find the grandparent and attach it instead of parent
val withGrandparents = haveGrandparents
.keyBy(_.ancestor.get.parentId.get) // grandparent id
.join(byId)
.values
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
// call this method recursively on the result
done ++ climbOneGeneration(withGrandparents)
}
}
// call recursive method - start by assuming each person is its own parent, if it has one:
climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
}
}
下面是一个可以更好地理解其工作原理的测试:
/**
* Example input tree:
*
* 1 5
* | |
* ----- 2 ----- 6
* | |
* 3 4
*
*/
val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))
test("find ultimate parent") {
val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
val result = RecursiveParentLookup.findUltimateParent(input).collect()
result should contain theSameElementsAs Seq(
WithAncestor(person1, None),
WithAncestor(person2, Some(person1)),
WithAncestor(person3, Some(person1)),
WithAncestor(person4, Some(person1)),
WithAncestor(person5, None),
WithAncestor(person6, Some(person5))
)
}
将您的输入映射到这些 Person
objects 并将输出 WithAncestor
objects 映射到您需要的任何内容应该很容易。请注意,此代码假定如果任何人具有 parentId X - 输入中实际存在具有该 ID 的另一个人