如何使用 PySpark 并行 运行 独立转换?
How to run independent transformations in parallel using PySpark?
我正在尝试 运行 2 个函数使用 PySpark 在单个 RDD 上并行执行完全独立的转换。有哪些方法可以做到这一点?
def doXTransforms(sampleRDD):
(X transforms)
def doYTransforms(sampleRDD):
(Y Transforms)
if __name__ == "__main__":
sc = SparkContext(appName="parallelTransforms")
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)
rows_rdd = hive_context.sql("select * from tables.X_table")
p1 = Process(target=doXTransforms , args=(rows_rdd,))
p1.start()
p2 = Process(target=doYTransforms, args=(rows_rdd,))
p2.start()
p1.join()
p2.join()
sc.stop()
这行不通,我现在明白这行不通了。
但是有没有其他方法可以使这项工作正常进行?具体有没有 python-spark 特定的解决方案?
只需使用线程并确保集群有足够的资源同时处理两个任务。
from threading import Thread
import time
def process(rdd, f):
def delay(x):
time.sleep(1)
return f(x)
return rdd.map(delay).sum()
rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))
t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2 = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()
可以说这在实践中并不经常有用,但在其他方面应该可以正常工作。
您可以进一步将 in-application scheduling 与 FAIR
调度程序和调度程序池一起使用,以更好地控制执行策略。
您也可以尝试 pyspark-asyncactions
(免责声明 - 这个答案的作者也是包的作者)它提供了一组围绕 Spark API 和 concurrent.futures
的包装器:
import asyncactions
import concurrent.futures
f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()
[x.result() for x in concurrent.futures.as_completed([f1, f2])]
我正在尝试 运行 2 个函数使用 PySpark 在单个 RDD 上并行执行完全独立的转换。有哪些方法可以做到这一点?
def doXTransforms(sampleRDD):
(X transforms)
def doYTransforms(sampleRDD):
(Y Transforms)
if __name__ == "__main__":
sc = SparkContext(appName="parallelTransforms")
sqlContext = SQLContext(sc)
hive_context = HiveContext(sc)
rows_rdd = hive_context.sql("select * from tables.X_table")
p1 = Process(target=doXTransforms , args=(rows_rdd,))
p1.start()
p2 = Process(target=doYTransforms, args=(rows_rdd,))
p2.start()
p1.join()
p2.join()
sc.stop()
这行不通,我现在明白这行不通了。 但是有没有其他方法可以使这项工作正常进行?具体有没有 python-spark 特定的解决方案?
只需使用线程并确保集群有足够的资源同时处理两个任务。
from threading import Thread
import time
def process(rdd, f):
def delay(x):
time.sleep(1)
return f(x)
return rdd.map(delay).sum()
rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))
t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2 = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()
可以说这在实践中并不经常有用,但在其他方面应该可以正常工作。
您可以进一步将 in-application scheduling 与 FAIR
调度程序和调度程序池一起使用,以更好地控制执行策略。
您也可以尝试 pyspark-asyncactions
(免责声明 - 这个答案的作者也是包的作者)它提供了一组围绕 Spark API 和 concurrent.futures
的包装器:
import asyncactions
import concurrent.futures
f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()
[x.result() for x in concurrent.futures.as_completed([f1, f2])]