如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?
How to make a pyspark job properly parallelizable on multiple nodes and avoid memory issues?
我目前正在从事一项 PySpark 工作 (Spark 2.2.0),该工作旨在基于一组文档训练 Latent Dirichlet Allocation 模型。输入文档以 CSV 文件的形式提供,位于 Google Cloud Storage。
以下代码在单个节点上成功 运行 Google Cloud Dataproc 集群(4vCPUs / 15GB 内存)和一小部分文档(~6500),生成的主题数量少 (10) 和迭代次数少 (100)。
但是,其他尝试使用更大的文档集或更高的主题数或迭代数值会很快导致内存问题和作业失败。
此外,在将此作业提交到 4 节点集群时,我可以看到只有一个工作节点在实际工作(30% CPU 使用率),让我认为代码没有针对并行处理。
代码
conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True), # id of the document
StructField("cleaned_content", StringType(), True)]) # cleaned text content (used for LDA)
# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)
print("{} document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"
print("{} partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"
# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))
# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)
# Instantiate LDA model
lda = LDA(k=100, # number of topics
optimizer="online", # 'online' or 'em'
maxIter=100,
featuresCol="features",
topicConcentration=0.01, # beta
optimizeDocConcentration=True, # alpha
learningOffset=2.0,
learningDecay=0.8,
checkpointInterval=10,
keepLastCheckpoint=True)
# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)
# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")
以下是遇到的警告和错误消息的示例:
WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
问题
- 是否可以对代码本身进行任何优化以确保其可扩展性?
- 我们如何让 Spark 将作业分布到所有工作节点上并希望避免内存问题?
如果您的输入数据很小,即使您的管道最终对小数据进行密集计算,那么基于大小的分区将导致分区太少而无法扩展。由于您的 getNumPartitions()
打印 1
,这表明 Spark 将最多使用 1 个执行程序核心来处理该数据,这就是为什么您只看到一个工作节点在工作。
您可以尝试更改初始 spark.read.csv
行以在末尾包含 repartition
:
doc_df = spark.read.csv(path="gs://...", ...).repartition(32)
然后您可以通过在后面的行中看到 getNumPartitions()
打印 32
来验证它是否如您所愿。
我目前正在从事一项 PySpark 工作 (Spark 2.2.0),该工作旨在基于一组文档训练 Latent Dirichlet Allocation 模型。输入文档以 CSV 文件的形式提供,位于 Google Cloud Storage。
以下代码在单个节点上成功 运行 Google Cloud Dataproc 集群(4vCPUs / 15GB 内存)和一小部分文档(~6500),生成的主题数量少 (10) 和迭代次数少 (100)。 但是,其他尝试使用更大的文档集或更高的主题数或迭代数值会很快导致内存问题和作业失败。
此外,在将此作业提交到 4 节点集群时,我可以看到只有一个工作节点在实际工作(30% CPU 使用率),让我认为代码没有针对并行处理。
代码
conf = pyspark.SparkConf().setAppName("lda-training")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# CSV schema declaration
csv_schema = StructType([StructField("doc_id", StringType(), True), # id of the document
StructField("cleaned_content", StringType(), True)]) # cleaned text content (used for LDA)
# Step 1: Load CSV
doc_df = spark.read.csv(path="gs://...", encoding="UTF-8", schema=csv_schema)
print("{} document(s) loaded".format(doc_df.count()))
# This prints "25000 document(s) loaded"
print("{} partitions".format(doc_df.rdd.getNumPartitions()))
# This prints "1"
# Step 2: Extracting words
extract_words = functions.udf(lambda row: split_row(row), ArrayType(StringType()))
doc_df = doc_df.withColumn("words", extract_words(doc_df["cleaned_content"]))
# Step 3: Generate count vectors (BOW) for each document
count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
vectorizer_model = count_vectorizer.fit(doc_df)
dataset = vectorizer_model.transform(doc_df)
# Instantiate LDA model
lda = LDA(k=100, # number of topics
optimizer="online", # 'online' or 'em'
maxIter=100,
featuresCol="features",
topicConcentration=0.01, # beta
optimizeDocConcentration=True, # alpha
learningOffset=2.0,
learningDecay=0.8,
checkpointInterval=10,
keepLastCheckpoint=True)
# Step 4: Train LDA model on corpus (this is the long part of the job)
lda_model = lda.fit(dataset)
# Save LDA model to Cloud Storage
lda_model.write().overwrite().save("gs://...")
以下是遇到的警告和错误消息的示例:
WARN org.apache.spark.scheduler.TaskSetManager: Stage 7 contains a task of very large size (3791 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.TaskSetManager: Stage 612 contains a task of very large size (142292 KB). The maximum recommended task size is 100 KB.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 303.0 (TID 302, cluster-lda-w-1.c.cognitive-search-engine-dev.internal, executor 3): ExecutorLostFailure (executor 3 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 3 on cluster-lda-w-1.c.cognitive-search-engine-dev.internal: Container killed by YARN for exceeding memory limits. 6.1 GB of 6 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
问题
- 是否可以对代码本身进行任何优化以确保其可扩展性?
- 我们如何让 Spark 将作业分布到所有工作节点上并希望避免内存问题?
如果您的输入数据很小,即使您的管道最终对小数据进行密集计算,那么基于大小的分区将导致分区太少而无法扩展。由于您的 getNumPartitions()
打印 1
,这表明 Spark 将最多使用 1 个执行程序核心来处理该数据,这就是为什么您只看到一个工作节点在工作。
您可以尝试更改初始 spark.read.csv
行以在末尾包含 repartition
:
doc_df = spark.read.csv(path="gs://...", ...).repartition(32)
然后您可以通过在后面的行中看到 getNumPartitions()
打印 32
来验证它是否如您所愿。