spark dag 中的意外排序
Unexpected sort in a spark dag
我写了下面的代码,我想从 kafka 读取并写入按年、月、日和小时分区的 parquet 文件。
在 dag 中,我看到了一个排序操作(下图)。
这个排序操作是要在一个执行器内部排序还是在执行器之间移动数据?
我不希望这项工作需要在执行者之间洗牌数据,因为单个执行者可以从 kafka 读取并在文件夹中写入一个独立的镶木地板文件。
private val year = date_format($"timestamp", "yyyy").alias("year")
private val month = date_format($"timestamp", "MM").alias("month")
private val day = date_format($"timestamp", "dd").alias("day")
private val hour = date_format($"timestamp", "HH").alias("hour")
val source = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.load()
// transformation
.select(year, month, day, hour, $"key", $"value",
$"topic", $"partition".alias("partition_int"), $"offset", $"timestamp".cast("long").alias("timestamp_ms"),
$"timestampType".alias("timestamp_type"))
.writeStream
.trigger(Trigger.ProcessingTime(1000 * 60 * 3))
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF
.write
.mode("append")
.partitionBy("year", "month", "day", "hour")
.parquet(tableRoot)
}).option("checkpointLocation", checkpointLocation)
.start().awaitTermination()
这只是每个任务的一种排序。 partitionBy 没有洗牌。
我写了下面的代码,我想从 kafka 读取并写入按年、月、日和小时分区的 parquet 文件。 在 dag 中,我看到了一个排序操作(下图)。 这个排序操作是要在一个执行器内部排序还是在执行器之间移动数据? 我不希望这项工作需要在执行者之间洗牌数据,因为单个执行者可以从 kafka 读取并在文件夹中写入一个独立的镶木地板文件。
private val year = date_format($"timestamp", "yyyy").alias("year")
private val month = date_format($"timestamp", "MM").alias("month")
private val day = date_format($"timestamp", "dd").alias("day")
private val hour = date_format($"timestamp", "HH").alias("hour")
val source = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.load()
// transformation
.select(year, month, day, hour, $"key", $"value",
$"topic", $"partition".alias("partition_int"), $"offset", $"timestamp".cast("long").alias("timestamp_ms"),
$"timestampType".alias("timestamp_type"))
.writeStream
.trigger(Trigger.ProcessingTime(1000 * 60 * 3))
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF
.write
.mode("append")
.partitionBy("year", "month", "day", "hour")
.parquet(tableRoot)
}).option("checkpointLocation", checkpointLocation)
.start().awaitTermination()
这只是每个任务的一种排序。 partitionBy 没有洗牌。