Pyspark - Dataframe foreach 函数不适用于多个 workers/parallelize

Pyspark - Dataframe foreach function not works on multiple workers/parallelize

我正在 运行在 EC2 上建立一个 Spark StandAlone 集群,其中有 1 个主节点和 2 个从节点。集群正在运行。我有一个从 s3 加载数据的 python 应用程序。代码如下:

spark = SparkSession.builder.appName("Example").getOrCreate()
df = spark.read.csv("s3n://bucket-name/file-name.csv", header=True, mode="DROPMALFORMED")

然后我在 df 上应用 .foreach(func) 对 df 的每一行做一些工作:

def test_func(row):
    row = modify(row)
    row.save() # just an example

df.foreach(test_func)

我已经阅读了文档,他们说 .foreach() 已经针对 distributed/parallel 处理进行了优化。但是,test_func 仅在 1 个节点上 运行ning,请参阅下面的日志:(任务 3 是 .foreach(test_func)

INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 1xx.xxx.xxx.xx2, partition 0, PROCESS_LOCAL, 17460 bytes)
INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 1xx.xxx.xxx.xx2

有没有办法将这个test_func分发给集群中的多个nodes/workers?非常感谢您的帮助。提前谢谢你。

******更新******

我已经提高了数据,但仍然只有 1 个任务分配给 1 个工作人员,运行 函数需要花费很多时间。 这就是我 运行 应用程序

的方式
./bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 --master spark://ip-xxx-xxx-xxx-xxx.us-west-2.compute.internal:7077 examples/src/main/python/test.py --executor-memory 5G --deploy-mode cluster

另一件事是即使我设置了 --executor-memory 5G 但工人只使用 1Gb Ram。谁能帮我解决这个问题?我已经被困在这几天了。非常感谢您。

来自@LostInOverflow:

This code doesn't explain single task, if that is really the case. There could be simply not enough data for more.

这是正确的。在我将数据增加到几万条记录后,任务被拆分并分配给所有执行者。