将两个 Seq 列之间的相关性计算到第三列的正确方法

The proper way to compute correlation between two Seq columns into a third column

我有一个 DataFrame,其中每行有 3 列:

ID:Long, ratings1:Seq[Double], ratings2:Seq[Double]

对于每一行,我需要计算这些向量之间的相关性。

我想出了以下解决方案,它似乎效率低下(不像 Jarrod Roberson 提到的那样工作),因为我必须为每个 Seq 创建 RDD:

val similarities = ratingPairs.map(row => {
      val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1"))
      val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2"))
      val corr:Double = Statistics.corr(ratings1, ratings2)

      Similarity(row.getAs[Long]("ID"), corr)
    })

有没有办法正确计算这种相关性?

假设您有数组的相关函数:

def correlation(arr1: Array[Double], arr2: Array[Double]): Double

(对于完全独立于 Spark 的该功能的潜在实现,您可以提出单独的问题或在线搜索,有一些足够接近的资源,例如 this implementation)。

现在,剩下要做的就是用 UDF 包装这个函数并使用它:

import org.apache.spark.sql.functions._
import spark.implicits._

val corrUdf = udf {
  (arr1: Seq[Double], arr2: Seq[Double]) => correlation(arr1.toArray, arr2.toArray)
}

val result = df.select($"ID", corrUdf($"ratings1", $"ratings2") as "correlation")