将 Spark 数据帧保存到按日期分区的 HDFS
Save Spark dataframe to HDFS partitioned by date
我需要将 Spark 数据帧中的数据以 Avro 格式写入 HDFS。挑战在于数据应每天保存,因此目录将如下所示:tablename/2019-08-12、tablename/2019-08-13 等等。
我只有一个时间戳字段,我需要从中提取创建目录名称的日期。
我建立了一个有两个问题的方法:
1) 从时间戳中提取日期有困难
3)在大型数据集(以后会更大)上,性能会很差,因为启动了很多任务。
那么我该如何 change/improve 这种方法呢?
这是我使用的代码(dataDF是输入数据):
val uniqueDates = dataDF.select("update_database_time").distinct.
collect.map(elem => elem.getTimestamp(0).getDate)
uniqueDates.map(date => {
val resultDF = dataDF.where(to_date(dataDF.col("update_database_time")) <=> date)
val pathToSave = s"${dataDir}/${tableNameValue}/${date}"
dataDF.write
.format("avro")
.option("avroSchema", SchemaRegistry.getSchema(
schemaRegistryConfig.url,
schemaRegistryConfig.dataSchemaSubject,
schemaRegistryConfig.dataSchemaVersion))
.save(s"${hdfsURL}${pathToSave}")
resultDF
})
.reduce(_.union(_))
如果你能接受像
这样的目录结构
tablename/date=2019-08-12
tablename/date=2019-08-13
相反,DataFrameWriter.partitionBy
就可以了。例如
val df =
Seq((Timestamp.valueOf("2019-06-01 12:00:00"), 1),
(Timestamp.valueOf("2019-06-01 12:00:01"), 2),
(Timestamp.valueOf("2019-06-02 12:00:00"), 3)).toDF("time", "foo")
df.withColumn("date", to_date($"time"))
.write
.partitionBy("date")
.format("avro")
.save("/tmp/foo")
产生以下结构
find /tmp/foo
/tmp/foo
/tmp/foo/._SUCCESS.crc
/tmp/foo/date=2019-06-01
/tmp/foo/date=2019-06-01/.part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-01/.part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/_SUCCESS
/tmp/foo/date=2019-06-02
/tmp/foo/date=2019-06-02/part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-02/.part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
我需要将 Spark 数据帧中的数据以 Avro 格式写入 HDFS。挑战在于数据应每天保存,因此目录将如下所示:tablename/2019-08-12、tablename/2019-08-13 等等。 我只有一个时间戳字段,我需要从中提取创建目录名称的日期。 我建立了一个有两个问题的方法: 1) 从时间戳中提取日期有困难 3)在大型数据集(以后会更大)上,性能会很差,因为启动了很多任务。 那么我该如何 change/improve 这种方法呢?
这是我使用的代码(dataDF是输入数据):
val uniqueDates = dataDF.select("update_database_time").distinct.
collect.map(elem => elem.getTimestamp(0).getDate)
uniqueDates.map(date => {
val resultDF = dataDF.where(to_date(dataDF.col("update_database_time")) <=> date)
val pathToSave = s"${dataDir}/${tableNameValue}/${date}"
dataDF.write
.format("avro")
.option("avroSchema", SchemaRegistry.getSchema(
schemaRegistryConfig.url,
schemaRegistryConfig.dataSchemaSubject,
schemaRegistryConfig.dataSchemaVersion))
.save(s"${hdfsURL}${pathToSave}")
resultDF
})
.reduce(_.union(_))
如果你能接受像
这样的目录结构tablename/date=2019-08-12
tablename/date=2019-08-13
相反,DataFrameWriter.partitionBy
就可以了。例如
val df =
Seq((Timestamp.valueOf("2019-06-01 12:00:00"), 1),
(Timestamp.valueOf("2019-06-01 12:00:01"), 2),
(Timestamp.valueOf("2019-06-02 12:00:00"), 3)).toDF("time", "foo")
df.withColumn("date", to_date($"time"))
.write
.partitionBy("date")
.format("avro")
.save("/tmp/foo")
产生以下结构
find /tmp/foo
/tmp/foo
/tmp/foo/._SUCCESS.crc
/tmp/foo/date=2019-06-01
/tmp/foo/date=2019-06-01/.part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-01/.part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/_SUCCESS
/tmp/foo/date=2019-06-02
/tmp/foo/date=2019-06-02/part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-02/.part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc