合并循环中生成的多个RDD
Merge multiple RDD generated in loop
我在 scala 中调用一个函数,它给出一个 RDD[(Long,Long,Double)]
作为它的输出。
def helperfunction(): RDD[(Long, Long, Double)]
我在代码的另一部分循环调用这个函数,我想合并所有生成的 RDD。调用该函数的循环看起来像这样
for (i <- 1 to n){
val tOp = helperfunction()
// merge the generated tOp
}
我想做的事情类似于 Java 中当您想合并字符串时 StringBuilder 会为您做的事情。我看过合并 RDD 的技术,这些技术主要指向使用这样的联合函数
RDD1.union(RDD2)
但这需要在合并之前生成两个 RDD。我虽然初始化了一个 var RDD1 来累积 for 循环之外的结果,但我不确定如何初始化一个 [(Long,Long,Double)]
类型的空白 RDD。我也是从 spark 开始的,所以我什至不确定这是否是解决这个问题的最优雅的方法。
您说得对,这可能不是执行此操作的最佳方法,但我们需要更多信息来了解您在每次调用辅助函数时生成新 RDD 所要实现的目标。
您可以在循环之前定义 1 个 RDD 并为其分配一个 var,然后 运行 通过您的循环。这是一个例子:
val rdd = sc.parallelize(1 to 100)
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble))
var new_rdd = rdd_tuple
println("Initial RDD count: " + new_rdd.count())
for (i <- 2 to 4) {
new_rdd = new_rdd.union(rdd_tuple)
}
println("New count after loop: " + new_rdd.count())
您可以使用函数式编程范式来实现您想要的,而不是使用 vars :
val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _)
此外,如果您仍然需要创建一个空的 RDD,您可以使用:
val empty = sc.emptyRDD[(long, long, String)]
我在 scala 中调用一个函数,它给出一个 RDD[(Long,Long,Double)]
作为它的输出。
def helperfunction(): RDD[(Long, Long, Double)]
我在代码的另一部分循环调用这个函数,我想合并所有生成的 RDD。调用该函数的循环看起来像这样
for (i <- 1 to n){
val tOp = helperfunction()
// merge the generated tOp
}
我想做的事情类似于 Java 中当您想合并字符串时 StringBuilder 会为您做的事情。我看过合并 RDD 的技术,这些技术主要指向使用这样的联合函数
RDD1.union(RDD2)
但这需要在合并之前生成两个 RDD。我虽然初始化了一个 var RDD1 来累积 for 循环之外的结果,但我不确定如何初始化一个 [(Long,Long,Double)]
类型的空白 RDD。我也是从 spark 开始的,所以我什至不确定这是否是解决这个问题的最优雅的方法。
您说得对,这可能不是执行此操作的最佳方法,但我们需要更多信息来了解您在每次调用辅助函数时生成新 RDD 所要实现的目标。
您可以在循环之前定义 1 个 RDD 并为其分配一个 var,然后 运行 通过您的循环。这是一个例子:
val rdd = sc.parallelize(1 to 100)
val rdd_tuple = rdd.map(x => (x.toLong, (x*10).toLong, x.toDouble))
var new_rdd = rdd_tuple
println("Initial RDD count: " + new_rdd.count())
for (i <- 2 to 4) {
new_rdd = new_rdd.union(rdd_tuple)
}
println("New count after loop: " + new_rdd.count())
您可以使用函数式编程范式来实现您想要的,而不是使用 vars :
val rdd = (1 to n).map(x => helperFunction()).reduce(_ union _)
此外,如果您仍然需要创建一个空的 RDD,您可以使用:
val empty = sc.emptyRDD[(long, long, String)]