Spark RDD:填充规则时间序列
Spark RDD: filling inregular time series
所以我有一个包含不规则时间序列数据的 RDD:
1,<值 1>
4、<值4>
6、<值6>
..等等
我需要将其填充到常规时间序列中:
1,<值 1>
2、<值 1>
3、<值 1>
4、<值4>
5、<值 4>
6、<值6>
..等等
到目前为止,我已经创建了一个包含 1,2,3,4,5,6,.. 的 RDD,然后将其 leftOuterJoin'ed 到原始 RDD,这给了我:
1,<值 1>
2、
3、
4、<值4>
5、
6、<值6>
..等等
所以我面临的问题是用先前非空行的值填充那些 2、3、5。
我更愿意在 RDD 级别上执行此操作而不使用 sparkSQL,这当然是不得已的选择。进入 Scala Array 级别并不是很吸引人,因为出于性能问题,我更愿意将其保持在 RDD 级别。
谢谢
没有初始的相对简单的解决方案join
。让我们从虚拟数据和辅助函数开始:
val rdd = sc.parallelize(Seq(
(3L, 1.0), (1L, 4.0), (5L, 3.6), (7L, 0.2), (8L, 0.0)))
def fillTimePoints(xs: Array[(Long, Double)]) = xs match {
case Array((xTime, xValue), (yTime, _)) => {
val diff = yTime - xTime
if (diff == 0) Seq((xTime, xValue))
else (xTime, xValue) +: (1 until diff.toInt)
.map(_.toLong)
.map(i => (i + xTime, xValue))
}
case _ => Seq.empty[(Long, Double)]
}
现在剩下的就是在排序的 RDD 上滑动:
import org.apache.spark.mllib.rdd.RDDFunctions._
rdd.sortBy(_._1).sliding(2).flatMap(fillTimePoints).collect
// Array[(Long, Double)] = Array((1,4.0), (2,4.0), (3,1.0),
// (4,1.0), (5,3.6), (6,3.6), (7,0.2))
备注:
sliding
是开发者 API 的一部分。其 class 中的大多数方法在最近的版本中已被弃用。仍然可以从头开始编写代码,但现在它应该可以工作,
您可能更喜欢使用 RangePartitioner
后跟 repartitionAndSortWithinPartitions
而不是排序。然后,您可以使用 mapPartitions
将 preservePartitioning
设置为 true 来应用局部滑动,最后填补空白(再次 preservePartitioning
)。它需要更多的工作,但你得到的输出分区 RangePartitioner
有用的东西。
所以我有一个包含不规则时间序列数据的 RDD:
1,<值 1>
4、<值4>
6、<值6>
..等等
我需要将其填充到常规时间序列中:
1,<值 1>
2、<值 1>
3、<值 1>
4、<值4>
5、<值 4>
6、<值6>
..等等
到目前为止,我已经创建了一个包含 1,2,3,4,5,6,.. 的 RDD,然后将其 leftOuterJoin'ed 到原始 RDD,这给了我:
1,<值 1>
2、
3、
4、<值4>
5、
6、<值6>
..等等
所以我面临的问题是用先前非空行的值填充那些 2、3、5。
我更愿意在 RDD 级别上执行此操作而不使用 sparkSQL,这当然是不得已的选择。进入 Scala Array 级别并不是很吸引人,因为出于性能问题,我更愿意将其保持在 RDD 级别。
谢谢
没有初始的相对简单的解决方案join
。让我们从虚拟数据和辅助函数开始:
val rdd = sc.parallelize(Seq(
(3L, 1.0), (1L, 4.0), (5L, 3.6), (7L, 0.2), (8L, 0.0)))
def fillTimePoints(xs: Array[(Long, Double)]) = xs match {
case Array((xTime, xValue), (yTime, _)) => {
val diff = yTime - xTime
if (diff == 0) Seq((xTime, xValue))
else (xTime, xValue) +: (1 until diff.toInt)
.map(_.toLong)
.map(i => (i + xTime, xValue))
}
case _ => Seq.empty[(Long, Double)]
}
现在剩下的就是在排序的 RDD 上滑动:
import org.apache.spark.mllib.rdd.RDDFunctions._
rdd.sortBy(_._1).sliding(2).flatMap(fillTimePoints).collect
// Array[(Long, Double)] = Array((1,4.0), (2,4.0), (3,1.0),
// (4,1.0), (5,3.6), (6,3.6), (7,0.2))
备注:
sliding
是开发者 API 的一部分。其 class 中的大多数方法在最近的版本中已被弃用。仍然可以从头开始编写代码,但现在它应该可以工作,您可能更喜欢使用
RangePartitioner
后跟repartitionAndSortWithinPartitions
而不是排序。然后,您可以使用mapPartitions
将preservePartitioning
设置为 true 来应用局部滑动,最后填补空白(再次preservePartitioning
)。它需要更多的工作,但你得到的输出分区RangePartitioner
有用的东西。