Spark on Google Cloud Dataproc 作业在最后阶段失败
Spark on Google Cloud Dataproc job failures on last stages
我在 Dataproc 上使用 Spark 集群,但我的作业在处理结束时失败了。
我的数据源是 Google 云存储上的 csv 格式的文本日志文件(总容量为 3.5TB,5000 个文件)。
处理逻辑如下:
- 将文件读取到 DataFrame(架构 ["timestamp"、"message"]);
- 将所有消息分组为 window 1 秒;
- 将管道 [Tokenizer -> HashingTF] 应用于每个分组的消息以提取单词及其频率以构建特征向量;
- 在 GCS 上保存带有时间线的特征向量。
我遇到的问题是,在一小部分数据(如 10 个文件)上处理效果很好,但是当我 运行 它在所有文件上处理时,它最终失败并出现错误喜欢 "Container killed by YARN for exceeding memory limits. 25.0 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead."
我的集群有 25 个 worker,有 n1-highmem-8 台机器。所以我在谷歌上搜索了这个错误,并将 "spark.yarn.executor.memoryOverhead" 参数增加到 6500MB。
现在我的 spark 作业仍然失败,但出现错误 "Job aborted due to stage failure: Total size of serialized results of 4293 tasks (1920.0 MB) is bigger than spark.driver.maxResultSize (1920.0 MB)"
我是 spark 的新手,我认为我在配置级别或代码中做错了什么。如果你能帮我把这些东西清理干净就好了!
这是我的 spark 任务代码:
import logging
import string
from datetime import datetime
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'
sc = pyspark.SparkContext()
spark = SparkSession\
.builder\
.appName("LogsVectorizer")\
.getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)
logger.info("Start log processing at {}...".format(NOW))
# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)
# CSV data schema to build DataFrame
schema = StructType([
StructField("timestamp", StringType()),
StructField("message", StringType())])
# Helpers to clean strings in log fields
def cleaning_string(s):
try:
# Remove ids (like: app[2352] -> app)
s = re.sub('\[.*\]', 'IDTAG', s)
if s == '':
s = 'EMPTY'
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def normalize_string(s):
try:
# Remove punctuation
s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
# Remove digits
s = re.sub('\d*', '', s)
# Remove extra spaces
s = ' '.join(s.split())
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def line_splitter(line):
line = line.split(',')
timestamp = line[0]
full_message = ' '.join(line[1:])
full_message = normalize_string(cleaning_string(full_message))
return [timestamp, full_message]
# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)
# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))
# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())
# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
.agg(join_(F.collect_list("message")).alias("messages"))
# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)
pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])
logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)
logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
.withColumn("tokens_counts", asDense(logs_csv.tokens_counts))
# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)
# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)
spark.driver.maxResultSize
是驱动程序大小的问题,它在 Dataproc 中运行在主节点上。
默认情况下,master 内存的 1/4 分配给 Driver,其中 1/2 设置为 spark.driver.maxResultSize
(最大的 RDD Spark 会让你 .collect()
。
我猜 Tokenizer
或 HashingTF
正在移动 "metadata" 通过驱动程序,这是您的键空间的大小。要增加允许的大小,您可以增加 spark.driver.maxResultSize
,但您可能还想增加 spark.driver.memory
and/or,同时使用更大的母版。 Spark's configuration guide 有更多信息。
我在 Dataproc 上使用 Spark 集群,但我的作业在处理结束时失败了。
我的数据源是 Google 云存储上的 csv 格式的文本日志文件(总容量为 3.5TB,5000 个文件)。
处理逻辑如下:
- 将文件读取到 DataFrame(架构 ["timestamp"、"message"]);
- 将所有消息分组为 window 1 秒;
- 将管道 [Tokenizer -> HashingTF] 应用于每个分组的消息以提取单词及其频率以构建特征向量;
- 在 GCS 上保存带有时间线的特征向量。
我遇到的问题是,在一小部分数据(如 10 个文件)上处理效果很好,但是当我 运行 它在所有文件上处理时,它最终失败并出现错误喜欢 "Container killed by YARN for exceeding memory limits. 25.0 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead."
我的集群有 25 个 worker,有 n1-highmem-8 台机器。所以我在谷歌上搜索了这个错误,并将 "spark.yarn.executor.memoryOverhead" 参数增加到 6500MB。
现在我的 spark 作业仍然失败,但出现错误 "Job aborted due to stage failure: Total size of serialized results of 4293 tasks (1920.0 MB) is bigger than spark.driver.maxResultSize (1920.0 MB)"
我是 spark 的新手,我认为我在配置级别或代码中做错了什么。如果你能帮我把这些东西清理干净就好了!
这是我的 spark 任务代码:
import logging
import string
from datetime import datetime
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'
sc = pyspark.SparkContext()
spark = SparkSession\
.builder\
.appName("LogsVectorizer")\
.getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)
logger.info("Start log processing at {}...".format(NOW))
# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)
# CSV data schema to build DataFrame
schema = StructType([
StructField("timestamp", StringType()),
StructField("message", StringType())])
# Helpers to clean strings in log fields
def cleaning_string(s):
try:
# Remove ids (like: app[2352] -> app)
s = re.sub('\[.*\]', 'IDTAG', s)
if s == '':
s = 'EMPTY'
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def normalize_string(s):
try:
# Remove punctuation
s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
# Remove digits
s = re.sub('\d*', '', s)
# Remove extra spaces
s = ' '.join(s.split())
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def line_splitter(line):
line = line.split(',')
timestamp = line[0]
full_message = ' '.join(line[1:])
full_message = normalize_string(cleaning_string(full_message))
return [timestamp, full_message]
# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)
# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))
# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())
# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
.agg(join_(F.collect_list("message")).alias("messages"))
# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)
pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])
logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)
logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
.withColumn("tokens_counts", asDense(logs_csv.tokens_counts))
# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)
# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)
spark.driver.maxResultSize
是驱动程序大小的问题,它在 Dataproc 中运行在主节点上。
默认情况下,master 内存的 1/4 分配给 Driver,其中 1/2 设置为 spark.driver.maxResultSize
(最大的 RDD Spark 会让你 .collect()
。
我猜 Tokenizer
或 HashingTF
正在移动 "metadata" 通过驱动程序,这是您的键空间的大小。要增加允许的大小,您可以增加 spark.driver.maxResultSize
,但您可能还想增加 spark.driver.memory
and/or,同时使用更大的母版。 Spark's configuration guide 有更多信息。