Pyspark - Dataframe foreach 函数不适用于多个 workers/parallelize
Pyspark - Dataframe foreach function not works on multiple workers/parallelize
我正在 运行在 EC2 上建立一个 Spark StandAlone 集群,其中有 1 个主节点和 2 个从节点。集群正在运行。我有一个从 s3 加载数据的 python 应用程序。代码如下:
spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED")
然后我在 df 上应用 .foreach(func)
对 df 的每一行做一些工作:
def test_func(row):
row = modify(row)
row.save() # just an example
df.foreach(test_func)
我已经阅读了文档,他们说 .foreach()
已经针对 distributed/parallel 处理进行了优化。但是,test_func 仅在 1 个节点上 运行ning,请参阅下面的日志:(任务 3 是 .foreach(test_func)
)
INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes)
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2
有没有办法将这个test_func
分发给集群中的多个nodes/workers?非常感谢您的帮助。提前谢谢你。
******更新******
我已经提高了数据,但仍然只有 1 个任务分配给 1 个工作人员,运行 函数需要花费很多时间。
这就是我 运行 应用程序
的方式
./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster
另一件事是即使我设置了 --executor-memory 5G
但工人只使用 1Gb Ram。谁能帮我解决这个问题?我已经被困在这几天了。非常感谢您。
来自@LostInOverflow:
This code doesn't explain single task, if that is really the case.
There could be simply not enough data for more.
这是正确的。在我将数据增加到几万条记录后,任务被拆分并分配给所有执行者。
我正在 运行在 EC2 上建立一个 Spark StandAlone 集群,其中有 1 个主节点和 2 个从节点。集群正在运行。我有一个从 s3 加载数据的 python 应用程序。代码如下:
spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED")
然后我在 df 上应用 .foreach(func)
对 df 的每一行做一些工作:
def test_func(row):
row = modify(row)
row.save() # just an example
df.foreach(test_func)
我已经阅读了文档,他们说 .foreach()
已经针对 distributed/parallel 处理进行了优化。但是,test_func 仅在 1 个节点上 运行ning,请参阅下面的日志:(任务 3 是 .foreach(test_func)
)
INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes)
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2
有没有办法将这个test_func
分发给集群中的多个nodes/workers?非常感谢您的帮助。提前谢谢你。
******更新******
我已经提高了数据,但仍然只有 1 个任务分配给 1 个工作人员,运行 函数需要花费很多时间。 这就是我 运行 应用程序
的方式./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster
另一件事是即使我设置了 --executor-memory 5G
但工人只使用 1Gb Ram。谁能帮我解决这个问题?我已经被困在这几天了。非常感谢您。
来自@LostInOverflow:
This code doesn't explain single task, if that is really the case. There could be simply not enough data for more.
这是正确的。在我将数据增加到几万条记录后,任务被拆分并分配给所有执行者。