将两个 Seq 列之间的相关性计算到第三列的正确方法
The proper way to compute correlation between two Seq columns into a third column
我有一个 DataFrame,其中每行有 3 列:
ID:Long, ratings1:Seq[Double], ratings2:Seq[Double]
对于每一行,我需要计算这些向量之间的相关性。
我想出了以下解决方案,它似乎效率低下(不像 Jarrod Roberson 提到的那样工作),因为我必须为每个 Seq 创建 RDD:
val similarities = ratingPairs.map(row => {
val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1"))
val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2"))
val corr:Double = Statistics.corr(ratings1, ratings2)
Similarity(row.getAs[Long]("ID"), corr)
})
有没有办法正确计算这种相关性?
假设您有数组的相关函数:
def correlation(arr1: Array[Double], arr2: Array[Double]): Double
(对于完全独立于 Spark 的该功能的潜在实现,您可以提出单独的问题或在线搜索,有一些足够接近的资源,例如 this implementation)。
现在,剩下要做的就是用 UDF 包装这个函数并使用它:
import org.apache.spark.sql.functions._
import spark.implicits._
val corrUdf = udf {
(arr1: Seq[Double], arr2: Seq[Double]) => correlation(arr1.toArray, arr2.toArray)
}
val result = df.select($"ID", corrUdf($"ratings1", $"ratings2") as "correlation")
我有一个 DataFrame,其中每行有 3 列:
ID:Long, ratings1:Seq[Double], ratings2:Seq[Double]
对于每一行,我需要计算这些向量之间的相关性。
我想出了以下解决方案,它似乎效率低下(不像 Jarrod Roberson 提到的那样工作),因为我必须为每个 Seq 创建 RDD:
val similarities = ratingPairs.map(row => {
val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1"))
val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2"))
val corr:Double = Statistics.corr(ratings1, ratings2)
Similarity(row.getAs[Long]("ID"), corr)
})
有没有办法正确计算这种相关性?
假设您有数组的相关函数:
def correlation(arr1: Array[Double], arr2: Array[Double]): Double
(对于完全独立于 Spark 的该功能的潜在实现,您可以提出单独的问题或在线搜索,有一些足够接近的资源,例如 this implementation)。
现在,剩下要做的就是用 UDF 包装这个函数并使用它:
import org.apache.spark.sql.functions._
import spark.implicits._
val corrUdf = udf {
(arr1: Seq[Double], arr2: Seq[Double]) => correlation(arr1.toArray, arr2.toArray)
}
val result = df.select($"ID", corrUdf($"ratings1", $"ratings2") as "correlation")