如何将一个 RDD 拆分为两个或多个 RDD?
How do I split an RDD into two or more RDDs?
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD? ,它仍然是单个 RDD。
如果您熟悉 SAS,则如下所示:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
这导致了两个不同的数据集。必须立即坚持才能获得我想要的结果...
不可能从单个转换中产生多个 RDD*。如果要拆分 RDD,则必须为每个拆分条件应用 filter
。例如:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果你只有一个二元条件并且计算成本很高,你可能更喜欢这样的东西:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
这意味着只有一个谓词计算,但需要额外传递所有数据。
重要的是要注意,只要输入 RDD 被正确缓存并且没有关于数据分布的额外假设,就重复过滤器和嵌套 if- 的 for 循环之间的时间复杂度而言,没有显着差异否则。
对于 N 个元素和 M 个条件,您必须执行的操作数显然与 N 乘以 M 成正比。在 for 循环的情况下,它应该更接近 (N + MN) / 2 并且重复过滤器正好是 NM但归根结底,它就是 O(NM)。您可以看到我与 Jason Lenderman 的讨论**,了解一些优缺点。
在非常高的层面上,您应该考虑两件事:
Spark 转换是惰性的,直到您执行一个操作,您的 RDD 才会具体化
为什么重要?回到我的例子:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果以后我决定我只需要 rdd_odd
那么就没有理由实现 rdd_even
.
如果您查看 SAS 示例来计算 work.split2
,您需要具体化输入数据和 work.split1
。
RDD 提供声明式 API。当您使用 filter
或 map
时,完全取决于 Spark 引擎如何执行此操作。只要传递给转换的函数没有副作用,它就会为优化整个管道创造多种可能性。
归根结底,这个案例并没有特别到足以证明其自身转变的合理性。
这个带有过滤器模式的映射实际上是在核心Spark中使用的。请参阅我对 randomSplit
方法的 and a relevant part 的回答。
如果唯一的目标是实现输入拆分,可以使用 partitionBy
子句 DataFrameWriter
文本输出格式:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
* Spark 中只有 3 种基本类型的转换:
- RDD[T] => RDD[T]
- RDD[T] => RDD[U]
- (RDD[T], RDD[U]) => RDD[W]
其中 T、U、W 可以是原子类型或 products / tuples (K, V). Any other operation has to be expressed using some combination of the above. You can check the original RDD paper 以获得更多详细信息。
** https://chat.whosebug.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
*** 另见 Scala Spark: Split collection into several RDD?
如果你使用 randomSplit API call 拆分一个 RDD,你会得到一个 RDD 数组。
如果要返回5个RDD,传入5个权重值。
例如
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)
splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
一种方法是使用自定义分区程序根据您的过滤条件对数据进行分区。这可以通过扩展 Partitioner
并实现类似于 RangePartitioner
.
的东西来实现
然后可以使用映射分区从分区的 RDD 构造多个 RDD,而无需读取所有数据。
val filtered = partitioned.mapPartitions { iter => {
new Iterator[Int](){
override def hasNext: Boolean = {
if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
false
} else {
iter.hasNext
}
}
override def next():Int = iter.next()
}
请注意,过滤后的 RDD 中的分区数将与分区后的 RDD 中的分区数相同,因此应该使用合并来减少它并删除空分区。
正如上面提到的其他 post 作者一样,没有单一的原生 RDD 转换来拆分 RDD,但是这里有一些 "multiplex" 操作可以有效地模拟各种 "splitting" 在RDDs上,没有多次读取:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
随机拆分的一些具体方法:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
方法可从开源 silex 项目获得:
https://github.com/willb/silex
一个博客 post 解释它们的工作原理:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
如其他地方所述,这些方法确实涉及内存换取速度的权衡,因为它们通过计算整个分区结果来操作 "eagerly" 而不是 "lazily." 因此,这些方法是可能的运行 进入大分区上的内存问题,而更传统的惰性转换不会。
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD? ,它仍然是单个 RDD。
如果您熟悉 SAS,则如下所示:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
这导致了两个不同的数据集。必须立即坚持才能获得我想要的结果...
不可能从单个转换中产生多个 RDD*。如果要拆分 RDD,则必须为每个拆分条件应用 filter
。例如:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果你只有一个二元条件并且计算成本很高,你可能更喜欢这样的东西:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
这意味着只有一个谓词计算,但需要额外传递所有数据。
重要的是要注意,只要输入 RDD 被正确缓存并且没有关于数据分布的额外假设,就重复过滤器和嵌套 if- 的 for 循环之间的时间复杂度而言,没有显着差异否则。
对于 N 个元素和 M 个条件,您必须执行的操作数显然与 N 乘以 M 成正比。在 for 循环的情况下,它应该更接近 (N + MN) / 2 并且重复过滤器正好是 NM但归根结底,它就是 O(NM)。您可以看到我与 Jason Lenderman 的讨论**,了解一些优缺点。
在非常高的层面上,您应该考虑两件事:
Spark 转换是惰性的,直到您执行一个操作,您的 RDD 才会具体化
为什么重要?回到我的例子:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果以后我决定我只需要
rdd_odd
那么就没有理由实现rdd_even
.如果您查看 SAS 示例来计算
work.split2
,您需要具体化输入数据和work.split1
。RDD 提供声明式 API。当您使用
filter
或map
时,完全取决于 Spark 引擎如何执行此操作。只要传递给转换的函数没有副作用,它就会为优化整个管道创造多种可能性。
归根结底,这个案例并没有特别到足以证明其自身转变的合理性。
这个带有过滤器模式的映射实际上是在核心Spark中使用的。请参阅我对 randomSplit
方法的
如果唯一的目标是实现输入拆分,可以使用 partitionBy
子句 DataFrameWriter
文本输出格式:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
* Spark 中只有 3 种基本类型的转换:
- RDD[T] => RDD[T]
- RDD[T] => RDD[U]
- (RDD[T], RDD[U]) => RDD[W]
其中 T、U、W 可以是原子类型或 products / tuples (K, V). Any other operation has to be expressed using some combination of the above. You can check the original RDD paper 以获得更多详细信息。
** https://chat.whosebug.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
*** 另见 Scala Spark: Split collection into several RDD?
如果你使用 randomSplit API call 拆分一个 RDD,你会得到一个 RDD 数组。
如果要返回5个RDD,传入5个权重值。
例如
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)
splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
一种方法是使用自定义分区程序根据您的过滤条件对数据进行分区。这可以通过扩展 Partitioner
并实现类似于 RangePartitioner
.
然后可以使用映射分区从分区的 RDD 构造多个 RDD,而无需读取所有数据。
val filtered = partitioned.mapPartitions { iter => {
new Iterator[Int](){
override def hasNext: Boolean = {
if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
false
} else {
iter.hasNext
}
}
override def next():Int = iter.next()
}
请注意,过滤后的 RDD 中的分区数将与分区后的 RDD 中的分区数相同,因此应该使用合并来减少它并删除空分区。
正如上面提到的其他 post 作者一样,没有单一的原生 RDD 转换来拆分 RDD,但是这里有一些 "multiplex" 操作可以有效地模拟各种 "splitting" 在RDDs上,没有多次读取:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
随机拆分的一些具体方法:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
方法可从开源 silex 项目获得:
https://github.com/willb/silex
一个博客 post 解释它们的工作原理:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
如其他地方所述,这些方法确实涉及内存换取速度的权衡,因为它们通过计算整个分区结果来操作 "eagerly" 而不是 "lazily." 因此,这些方法是可能的运行 进入大分区上的内存问题,而更传统的惰性转换不会。