在 Spark 中计算逐点互信息
Computing Pointwise Mutual Information in Spark
我正在尝试计算 pointwise mutual information (PMI)。
我有两个 RDD,分别定义为 p(x, y) 和 p(x):
pii: RDD[((String, String), Double)]
pi: RDD[(String, Double)]
我编写的用于从 RDD pii
和 pi
计算 PMI 的任何代码都不是很好。我的方法是首先将 RDD pii
展平并在处理元组元素时加入 pi
两次。
val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
.join(pi).values
.map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
.join(pi).values
.map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
// handle boundary conditions, etc
log(pab) - log(pa) - log(pb)
}
显然,这很糟糕。有没有更好的(惯用的)方法来做到这一点?
注意:我可以通过将日志概率存储在 pi
和 pii
中来优化日志,但选择以这种方式编写以使问题清晰。
使用 broadcast
将是一个解决方案。
val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
case ((x, y), pxy) =>
(x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}
假设:pi
拥有 pii
中的所有 x
和 y
。
我正在尝试计算 pointwise mutual information (PMI)。
我有两个 RDD,分别定义为 p(x, y) 和 p(x):
pii: RDD[((String, String), Double)]
pi: RDD[(String, Double)]
我编写的用于从 RDD pii
和 pi
计算 PMI 的任何代码都不是很好。我的方法是首先将 RDD pii
展平并在处理元组元素时加入 pi
两次。
val pmi = pii.map(x => (x._1._1, (x._1._2, x._1, x._2)))
.join(pi).values
.map(x => (x._1._1, (x._1._2, x._1._3, x._2)))
.join(pi).values
.map(x => (x._1._1, computePMI(x._1._2, x._1._3, x._2)))
// pmi: org.apache.spark.rdd.RDD[((String, String), Double)]
...
def computePMI(pab: Double, pa: Double, pb: Double) = {
// handle boundary conditions, etc
log(pab) - log(pa) - log(pb)
}
显然,这很糟糕。有没有更好的(惯用的)方法来做到这一点?
注意:我可以通过将日志概率存储在 pi
和 pii
中来优化日志,但选择以这种方式编写以使问题清晰。
使用 broadcast
将是一个解决方案。
val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
case ((x, y), pxy) =>
(x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}
假设:pi
拥有 pii
中的所有 x
和 y
。