Spark RDD:填充规则时间序列

Spark RDD: filling inregular time series

所以我有一个包含不规则时间序列数据的 RDD:

1,<值 1>
4、<值4>
6、<值6>
..等等

我需要将其填充到常规时间序列中:

1,<值 1>
2、<值 1>
3、<值 1>
4、<值4>
5、<值 4>
6、<值6>
..等等

到目前为止,我已经创建了一个包含 1,2,3,4,5,6,.. 的 RDD,然后将其 leftOuterJoin'ed 到原始 RDD,这给了我:

1,<值 1>
2、
3、
4、<值4>
5、
6、<值6>
..等等

所以我面临的问题是用先前非空行的值填充那些 2、3、5。

我更愿意在 RDD 级别上执行此操作而不使用 sparkSQL,这当然是不得已的选择。进入 Scala Array 级别并不是很吸引人,因为出于性能问题,我更愿意将其保持在 RDD 级别。

谢谢

没有初始的相对简单的解决方案join。让我们从虚拟数据和辅助函数开始:

val rdd = sc.parallelize(Seq(
    (3L, 1.0), (1L, 4.0), (5L, 3.6), (7L, 0.2), (8L, 0.0)))

def fillTimePoints(xs: Array[(Long, Double)]) = xs match {
  case Array((xTime, xValue), (yTime, _)) => {
    val diff = yTime - xTime

    if (diff == 0) Seq((xTime, xValue))
    else (xTime, xValue) +: (1 until diff.toInt)
      .map(_.toLong)
      .map(i => (i + xTime, xValue))
  }

  case _ => Seq.empty[(Long, Double)]
}

现在剩下的就是在排序的 RDD 上滑动:

import org.apache.spark.mllib.rdd.RDDFunctions._

rdd.sortBy(_._1).sliding(2).flatMap(fillTimePoints).collect

//  Array[(Long, Double)] = Array((1,4.0), (2,4.0), (3,1.0), 
//    (4,1.0), (5,3.6), (6,3.6), (7,0.2))

备注:

  • sliding 是开发者 API 的一部分。其 class 中的大多数方法在最近的版本中已被弃用。仍然可以从头开始编写代码,但现在它应该可以工作,

  • 您可能更喜欢使用 RangePartitioner 后跟 repartitionAndSortWithinPartitions 而不是排序。然后,您可以使用 mapPartitionspreservePartitioning 设置为 true 来应用局部滑动,最后填补空白(再次 preservePartitioning)。它需要更多的工作,但你得到的输出分区 RangePartitioner 有用的东西。