在数据帧上使用 partitionBy 时对数据处理发出警告

Warning on dataproc while using partitionBy on a dataframe

我正在尝试使用 dataproc 上的 pyspark 将数据帧内容写入 google 云存储。虽然写入成功,但我粘贴在下面的日志中有很多警告消息。在创建集群时或在 pyspark 程序中是否缺少我需要的某些设置?或者这是一些 google 问题?

注意:数据帧写入 google 存储的数据大于 120 GB 未压缩。但是即使我处理未压缩的 1GB 大小的数据,我也注意到了相同的警告。这是一个包含 50 列的简单数据框,已读取,完成一些转换并写入磁盘。

Dataframe写入语句如下:

df.write.partitionBy("dt").format('csv').mode("overwrite").options(delimiter="|").save("gs://bucket/tbl/")

Pyspark 日志中的警告语句:

18/04/01 19:58:28 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 182.0 in stage 3.0 (TID 68943, admg-tellrd-w-20.c.syw-analytics-repo-dev.internal, executor 219): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$$anonfun$apply$mcV$sp.apply(FileFormatWriter.scala:191)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$$anonfun$apply$mcV$sp.apply(FileFormatWriter.scala:190)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:861)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:86)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:141)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.releaseResources(FileFormatWriter.scala:475)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute.apply(FileFormatWriter.scala:450)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute.apply(FileFormatWriter.scala:440)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator.foreach(AbstractScalaRowIterator.scala:26)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:440)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:258)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask.apply(FileFormatWriter.scala:256)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
        ... 8 more
Caused by: java.io.IOException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 410 Gone
{
  "code" : 500,
  "errors" : [ {
    "domain" : "global",
    "message" : "Backend Error",
    "reason" : "backendError"
  } ],
  "message" : "Backend Error"
}
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:432)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
        at java.nio.channels.Channels.close(Channels.java:178)
        at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
        at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.close(GoogleHadoopOutputStream.java:126)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:857)
        ... 20 more
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 410 Gone
{
  "code" : 500,
  "errors" : [ {
    "domain" : "global",
    "message" : "Backend Error",
    "reason" : "backendError"
  } ],
  "message" : "Backend Error"
}
        at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:358)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more

(这不是问题的答案,但这不适合评论。它与在 write.partitionBy 之前调用 repartition(...) 的线程有关)。

如果没有 repartition(...),这将在 GCS 上永远持续下去。在引擎盖下,当您说 write.partitionBy(...) 时,spark 的任务将为每个 连续 一次为每个分区写入一个文件。这在 HDFS 上已经很慢了,但由于 GCS 具有更高的延迟,所以会更慢。如果创建每个文件需要 500 毫秒,则写入 2300 个分区文件每个任务将花费 ~20 分钟。

如果您打乱数据,您将引入另一个 ("reduce") 阶段的任务,该阶段将以一个分区的所有数据结束。因此,您只需编写 2300 个文件,而不是编写 2300*previous-stage-task 文件。这就是您想要的,尤其是当您有多个分区时。

您可能想要试验 repartition(...) 生成的分区数(也称为 reducer 任务)。默认情况下它是 200,但你可能想提高它。每个 reducer 将以 2300 个输出分区的一个子集结束,并将连续写入每个输出文件。同样,假设写入一个文件需要 500 毫秒,2300/200 = 115 个文件 = 每个任务约 1 分钟。如果你有更多的 reducer,你将获得更多的并行性,因此每个任务将花费更少的时间。但是您应该根据集群中的节点数量设置减速器的数量(例如 vcore 数量的 4 倍)。

此外,您可能希望将 spark.executor.cores 提高到 4 (--properties spark.executor.cores=4),因为这将是相当 IO 限制的。

这不是问题的答案,而是现有需求的代码流。

col1        col2        col3    col4       col5
asd234qsds  2014-01-02  23.99  2014-01-02  Y
2343fsdf55  2014-01-03  22.56  2014-01-03  Y
123fdfr555  2014-01-04  34.23  2014-01-04  N
2343fsdf5f  2014-01-05  45.33  2014-01-05  N
asd234qsds  2014-01-02  27.99  2014-01-07  Y

请注意:第一行和最后一行具有相同的键,但在 Window 函数期间只会考虑最后一行。我的实际数据有 51 列,Window 函数在 9 列上。我不确定压缩数据是否会为此过程增加任何开销。

lines1 = sc.textFile("gs://incrmental_file.txt*") -- uncompressed data 210KB
part1 = lines1.map(lambda l: l.split("|")) 
df = part1.map(lambda c: Row(col1=c[0],col2=c[1],col3=c[2],col4=c[3], col5 =c[4]))
schema_df = spark.createDataFrame(df)
schema_df.createOrReplaceTempView("df")
#schema_incr_tbl = spark.sql("""select col1,col2,col3,col4,col5 from df""") 

lines2 = sc.textFile("gs://hist_files.gz*") -- full year compressed data 38GiB
part2 = lines2.map(lambda l: l.split("|"))
df2 = part2.map(lambda c: Row(col1=c[0],col2=c[1],col3=c[2],col4=c[3], col5 =c[4])) 
schema_df2 = spark.createDataFrame(df2)
schema_df2.createOrReplaceTempView("df2")   

union_fn = schema_hist_tbl.union(schema_incr_tbl)
w = Window.partitionBy("col1","col2").orderBy(col("col4").desc())
union_result = union_fn.withColumn("row_num", 
func.row_number().over(w)).where(col("row_num") == 1).drop("row_num").drop("col4")
union_result.createOrReplaceTempView("merged_tbl") 
schema_merged_tbl = spark.sql("""select col1,col2,col3,col5,col5 as col6 merged_tbl""") 

schema_merged_tbl.write.partitionBy("col6").format('csv').mode("overwrite").options(delimiter=delim,codec="org.apache.hadoop.io.compress.GzipCodec") .保存("hdfs_merge_path")

我试过你的代码,它确实很慢 -- 对我来说它花了 8 多分钟。

通过使用 Dataframes 而不是 RDD 来读取 CSV 文件,我获得了显着的加速(下降到不到 5 分钟)。这避免了在 JVM <-> Python 之间传送所有数据。这是我使用的代码:

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import *

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

schema = schema = StructType([
  StructField("col1", StringType(), True),
  StructField("col2", StringType(), True),
  StructField("col3", StringType(), True),
  StructField("col4", StringType(), True),
  StructField("col5", StringType(), True)])

schema_df = spark.read.schema(schema).option("delimiter", "|").csv("gs://path/to/incremental_file.txt*")
schema_df.createOrReplaceTempView("df")

schema_df2 = spark.read.schema(schema).option("delimiter", "|").csv("gs://path/to/hist-file*.gz")
schema_df2.createOrReplaceTempView("df2")   

union_fn = schema_df2.union(schema_df)
w = Window.partitionBy("col1","col2").orderBy(col("col4").desc())
union_result = union_fn.withColumn("row_num", row_number().over(w)).where(col("row_num") == 1).drop("row_num").drop("col4")
union_result.createOrReplaceTempView("merged_tbl")
schema_merged_tbl = spark.sql("""select col1,col2,col3,col5 as col6 from merged_tbl""")
schema_merged_tbl.write.partitionBy("col6").format('csv').mode("overwrite").options(delimiter='|',codec="org.apache.hadoop.io.compress.GzipCodec").save("hdfs_merge_path")