运行 pyspark 中的多个函数并行调用
Running Multiple Function calls inside pyspark in parallel
我有一个函数,我在 pyspark 中 运行ning-shell
import pandas as pd
def compute(x):
data = pd.read_csv("/tmp/data_{}.csv".format(x))
# Some Spark processing
# Writes back final output in tmp
我想 运行 在 x 的列表上并行执行此操作。
我试过了 -
x_list=[14,63]
from multiprocessing import Process
for x in x_list:
p = Process(target = compute, args = (x,))
p.start()
脚本到此结束。我希望他们在脚本完成之前完全 运行。
我怎样才能做到这一点?
您必须掌握您启动的每个进程并join()
它们:
from multiprocessing import Process
import pandas as pd
def compute(x):
data = pd.read_csv("/tmp/data_{}.csv".format(x))
# Some Spark processing
# Writes back final output in tmp
x_list = [14,63]
processes = []
for x in x_list:
p = Process(target=compute, args=(x,))
processes.append(p)
p.start()
for p in processes:
p.join()
我有一个函数,我在 pyspark 中 运行ning-shell
import pandas as pd
def compute(x):
data = pd.read_csv("/tmp/data_{}.csv".format(x))
# Some Spark processing
# Writes back final output in tmp
我想 运行 在 x 的列表上并行执行此操作。 我试过了 -
x_list=[14,63]
from multiprocessing import Process
for x in x_list:
p = Process(target = compute, args = (x,))
p.start()
脚本到此结束。我希望他们在脚本完成之前完全 运行。
我怎样才能做到这一点?
您必须掌握您启动的每个进程并join()
它们:
from multiprocessing import Process
import pandas as pd
def compute(x):
data = pd.read_csv("/tmp/data_{}.csv".format(x))
# Some Spark processing
# Writes back final output in tmp
x_list = [14,63]
processes = []
for x in x_list:
p = Process(target=compute, args=(x,))
processes.append(p)
p.start()
for p in processes:
p.join()