如何使用结构化流的 writestream 重新分区写入文件?
How can write files with repartition with structured streaming's writestream?
我有一个结构化的流代码,可以从 Kafka 读取数据并转储到 HDFS。在转储数据时,我根据三列对数据进行分区。我面临的问题是在批处理过程中生成了许多小文件。我想在每个 partitionBy 的批处理中只生成一个文件。我不确定如何在这种情况下应用重新分区,因为它似乎不起作用。
query = df.selectExpr("CAST(value as STRING)") \
.repartition(1) \
.writeStream.partitionBy('host', 'dt', 'h') \ ==> repartition(1) is not working here
.format("parquet") \
.outputMode("append") \
.option("checkpointLocation", self.checkpoint_location) \
.option('path', self.hdfs_path) \
.option('failOnDataLoss', 'false') \
.option("startingOffset", "earliest") \
.trigger(processingTime='2 seconds').start()
我不想编写另一个清理作业来从路径读取数据、重新分区并在每个分区中存储具有所需文件数的数据。
我有运行一些使用重新分区的测试,它似乎对我有用。
我创建了一个测试 Kafka 主题,它有字符串格式的数据 id-value
。然后在流代码中,我在 -
上拆分 value
并使用 partitionBy('id')
写入数据以模仿您的代码行为。我正在使用 kafka broker 0.10 和 spark 版本 2.4.3.
见下面的代码:
from pyspark.sql.functions import col, split
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "partitionTestTopic") \
.option("startingOffsets", "earliest") \
.load()
使用repartition(1)
:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.withColumn('id', split(col('value'), '-').getItem(0)) \
.repartition(1) \
.writeStream.partitionBy('id') \
.format("parquet") \
.outputMode("append") \
.option('path', 'test-1/data') \
.option("checkpointLocation", "test-1/checkpoint") \
.trigger(processingTime='20 seconds') \
.start()
输出:
├── id=1
│ └── part-00000-9812bd07-3c0f-442e-a80c-5c09553f20e8.c000.snappy.parquet
├── id=2
│ └── part-00000-522e99b6-3900-4702-baf7-2c55819b775c.c000.snappy.parquet
├── id=3
│ └── part-00000-5ed9bef0-4941-451f-884e-8e94a351323f.c000.snappy.parquet
使用repartition(3)
:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.withColumn('id', split(col('value'), '-').getItem(0)) \
.repartition(3) \
.writeStream.partitionBy('id') \
.format("parquet") \
.outputMode("append") \
.option('path', 'test-3/data') \
.option("checkpointLocation", "test-3/checkpoint") \
.trigger(processingTime='20 seconds') \
.start()
输出:
├── id=1
│ ├── part-00000-ed6ed5dd-b376-40a2-9920-d0cb36c7120f.c000.snappy.parquet
│ ├── part-00001-fa64e597-a4e1-4ac2-967f-5ea8aae96c13.c000.snappy.parquet
│ └── part-00002-0e0feab8-57d8-4bd2-a94f-0206ff90f16e.c000.snappy.parquet
├── id=2
│ ├── part-00000-c417dac5-271f-4356-b577-ff6c9f45792e.c000.snappy.parquet
│ ├── part-00001-7c90eb8a-986a-4602-a386-50f8d6d85e77.c000.snappy.parquet
│ └── part-00002-0e59e779-84e8-4fcf-ad62-ef3f4dbaccd5.c000.snappy.parquet
├── id=3
│ ├── part-00000-8a555649-1141-42fe-9cb5-0acf0efc5997.c000.snappy.parquet
│ ├── part-00001-ce4aaa50-e41b-4f5f-837c-661459b747b8.c000.snappy.parquet
│ └── part-00002-9f95261e-bd4c-4f1e-bce2-f8ab3b8b01ec.c000.snappy.parquet
您提到您正在使用 10-minute
批处理,因此您也应该相应地更新触发间隔。现在是 2 seconds
但应该是 trigger(processingTime='10 minutes')
(参考 class ProcessingTime(Trigger)
here )。这可能就是您收到太多小文件的原因。
如果您将 reaprtition(1)
与 10 minute batch
一起使用,将会出现大量数据混洗,并且只有一个核心(每个 host, dt and h
)最终会写入所有数据,您赢了能够在一定程度上使用火花并行性。还有一个缺点是分区比 128MB
大。由于您没有在查询中进行任何聚合,因此您应该 decrease your batch size
并使用 repartition(1)
这样您就可以获得可接受的分区大小。
我有一个结构化的流代码,可以从 Kafka 读取数据并转储到 HDFS。在转储数据时,我根据三列对数据进行分区。我面临的问题是在批处理过程中生成了许多小文件。我想在每个 partitionBy 的批处理中只生成一个文件。我不确定如何在这种情况下应用重新分区,因为它似乎不起作用。
query = df.selectExpr("CAST(value as STRING)") \
.repartition(1) \
.writeStream.partitionBy('host', 'dt', 'h') \ ==> repartition(1) is not working here
.format("parquet") \
.outputMode("append") \
.option("checkpointLocation", self.checkpoint_location) \
.option('path', self.hdfs_path) \
.option('failOnDataLoss', 'false') \
.option("startingOffset", "earliest") \
.trigger(processingTime='2 seconds').start()
我不想编写另一个清理作业来从路径读取数据、重新分区并在每个分区中存储具有所需文件数的数据。
我有运行一些使用重新分区的测试,它似乎对我有用。
我创建了一个测试 Kafka 主题,它有字符串格式的数据 id-value
。然后在流代码中,我在 -
上拆分 value
并使用 partitionBy('id')
写入数据以模仿您的代码行为。我正在使用 kafka broker 0.10 和 spark 版本 2.4.3.
见下面的代码:
from pyspark.sql.functions import col, split
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "partitionTestTopic") \
.option("startingOffsets", "earliest") \
.load()
使用repartition(1)
:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.withColumn('id', split(col('value'), '-').getItem(0)) \
.repartition(1) \
.writeStream.partitionBy('id') \
.format("parquet") \
.outputMode("append") \
.option('path', 'test-1/data') \
.option("checkpointLocation", "test-1/checkpoint") \
.trigger(processingTime='20 seconds') \
.start()
输出:
├── id=1
│ └── part-00000-9812bd07-3c0f-442e-a80c-5c09553f20e8.c000.snappy.parquet
├── id=2
│ └── part-00000-522e99b6-3900-4702-baf7-2c55819b775c.c000.snappy.parquet
├── id=3
│ └── part-00000-5ed9bef0-4941-451f-884e-8e94a351323f.c000.snappy.parquet
使用repartition(3)
:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.withColumn('id', split(col('value'), '-').getItem(0)) \
.repartition(3) \
.writeStream.partitionBy('id') \
.format("parquet") \
.outputMode("append") \
.option('path', 'test-3/data') \
.option("checkpointLocation", "test-3/checkpoint") \
.trigger(processingTime='20 seconds') \
.start()
输出:
├── id=1
│ ├── part-00000-ed6ed5dd-b376-40a2-9920-d0cb36c7120f.c000.snappy.parquet
│ ├── part-00001-fa64e597-a4e1-4ac2-967f-5ea8aae96c13.c000.snappy.parquet
│ └── part-00002-0e0feab8-57d8-4bd2-a94f-0206ff90f16e.c000.snappy.parquet
├── id=2
│ ├── part-00000-c417dac5-271f-4356-b577-ff6c9f45792e.c000.snappy.parquet
│ ├── part-00001-7c90eb8a-986a-4602-a386-50f8d6d85e77.c000.snappy.parquet
│ └── part-00002-0e59e779-84e8-4fcf-ad62-ef3f4dbaccd5.c000.snappy.parquet
├── id=3
│ ├── part-00000-8a555649-1141-42fe-9cb5-0acf0efc5997.c000.snappy.parquet
│ ├── part-00001-ce4aaa50-e41b-4f5f-837c-661459b747b8.c000.snappy.parquet
│ └── part-00002-9f95261e-bd4c-4f1e-bce2-f8ab3b8b01ec.c000.snappy.parquet
您提到您正在使用 10-minute
批处理,因此您也应该相应地更新触发间隔。现在是 2 seconds
但应该是 trigger(processingTime='10 minutes')
(参考 class ProcessingTime(Trigger)
here )。这可能就是您收到太多小文件的原因。
如果您将 reaprtition(1)
与 10 minute batch
一起使用,将会出现大量数据混洗,并且只有一个核心(每个 host, dt and h
)最终会写入所有数据,您赢了能够在一定程度上使用火花并行性。还有一个缺点是分区比 128MB
大。由于您没有在查询中进行任何聚合,因此您应该 decrease your batch size
并使用 repartition(1)
这样您就可以获得可接受的分区大小。