如何将 spark 中的 for 循环与 scala 并行化?
How can I parallelize a for loop in spark with scala?
例如,我们有一个包含 2000 个股票代码在过去 3 年的收盘价的 parquet 文件,我们要计算每个代码的 5 天移动平均线。
所以我创建了一个 spark SQLContext,然后
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
获取交易品种列表,
val symbols = marketData.select("SYMBOL").distinct().collect()
这里是 for 循环:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
显然,在 spark 上执行 for 循环很慢,并且每个小结果的 save()
也会减慢过程(我尝试在 for 循环外定义一个 var result
并联合所有输出一起进行 IO 操作,但我得到了 Whosebug 异常),那么如何并行化 for 循环并优化 IO 操作?
你写的程序运行在驱动程序("master")spark节点中。如果您在并行结构 (RDD) 上运行,则此程序中的表达式只能并行化。
试试这个:
marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg) }.foreach{ case (symbol,averages) => averages.save() }
其中 symbolize
取一行符号 x day 和 returns 一个元组 (symbol, day)。
对于答案的第一部分,我不同意 Carlos 的观点。该程序在驱动程序中没有 运行 ("master")。
循环按顺序执行 运行,但对于每个符号执行:
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
是并行完成的,因为 markedData
是一个 Spark DataFrame 并且它是分布式的。
例如,我们有一个包含 2000 个股票代码在过去 3 年的收盘价的 parquet 文件,我们要计算每个代码的 5 天移动平均线。
所以我创建了一个 spark SQLContext,然后
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
获取交易品种列表,
val symbols = marketData.select("SYMBOL").distinct().collect()
这里是 for 循环:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
显然,在 spark 上执行 for 循环很慢,并且每个小结果的 save()
也会减慢过程(我尝试在 for 循环外定义一个 var result
并联合所有输出一起进行 IO 操作,但我得到了 Whosebug 异常),那么如何并行化 for 循环并优化 IO 操作?
你写的程序运行在驱动程序("master")spark节点中。如果您在并行结构 (RDD) 上运行,则此程序中的表达式只能并行化。
试试这个:
marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg) }.foreach{ case (symbol,averages) => averages.save() }
其中 symbolize
取一行符号 x day 和 returns 一个元组 (symbol, day)。
对于答案的第一部分,我不同意 Carlos 的观点。该程序在驱动程序中没有 运行 ("master")。
循环按顺序执行 运行,但对于每个符号执行:
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
是并行完成的,因为 markedData
是一个 Spark DataFrame 并且它是分布式的。