在此流式查询中 'partitionBy' 之前的 'coalesce' 有什么影响?

What is the effect of 'coalesce' before 'partitionBy' in this streaming query?

我有一个从 Kafka 主题(两个分区)接收数据的流式查询 (Spark Structured Streaming),如下所示:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092")
  .option("subscribe", "mytopic")
  .load()
  .select(from_json(col("value").cast("string"), schema).as("record")).select("record.*")

我想通过date/hour执行一个简单的聚合和分区,并保存到HDFS中的Parquet文件,像这样:

val aggregationQuery = df.withColumn("ROP", from_unixtime((col("attributes.START_TIME")/1000), "yyyy-MM-dd HH:mm").cast(TimestampType))
.withColumn("date", to_date(col("ROP")))
.withColumn("hour", hour(col("ROP")))
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "0 minutes")
.groupBy(window(col("timestamp"), "10 seconds"), col("date"), col("hour"))
.agg(count("attributes.RECORDID").as("NumRecords"))
.coalesce(2)

输出到 Parquet:

aggregationQuery.writeStream
.format("parquet")
.trigger(Trigger.ProcessingTime("10 seconds"))
.partitionBy("date", "hour")
.option("path", "hdfs://cloudera-cluster:8020/user/spark/proyecto1")
.option("checkpointLocation", "hdfs://cloudera-cluster:8020/user/spark/checkpointfolder")
.outputMode("append")
.start()

因此,我得到了类似于此示例的文件夹结构:

         user/spark/proyecto1/date=2015-08-18/hour=20

在每个文件夹中,我在流式传输过程中为每个触发器获取 2 个 Parquet 文件。

我想了解 'coalesce' 和 'partitionBy' 操作对我的数据做了什么,以及与此特定组合相关的任何风险。

顺便说一下,我的集群中只有 2 个节点。

  • coalesce 将整个 Pipeline 的并行度降低到 2。由于它不引入分析障碍,它会向后传播,因此在实践中最好将其替换为 repartition
  • partitionBy 创建一个您看到的目录结构,其值在路径中编码。它从叶文件中删除相应的列。因为日期和时间的基数较低,所以在这种情况下没有特别的风险。

结合这两个创建观察到的目录结构,并将每个叶目录中的文件数限制为最多两个。

合并:它减少了 partitions.In 这种情况下的数量,如果 n 是默认的分区数,它将它减少到 2.It 将每个节点中的所有分区盲目地组合成一个导致 2.This 的分区可能是您在文件夹中获得 2 个文件的原因。

当你使用 partition by 时,它会根据你在 column.Much 中的值创建 n 个分区,就像每个唯一键进入各自的 partition.If 没有正确使用你可能最终有大量分区会在双节点集群中产生开销