Glue 作业将多个分区写入同一个文件
Glue Job Writes Multiple Partitions to the Same File
我正在尝试编写一个粘合作业,使用文件的 csv 的每一行将多个 csv 文件转换为单独的 json 文件。作业完成后,s3 中会显示正确数量的文件,但有些是空的,有些在同一个文件中有多个 json 个对象。
应用映射后,这就是我创建分区和写入文件的方式:
numEntities = applyMapping1.toDF().count()
partitions = applymapping1.repartition(numEntities)
partitions.toDF().write.mode("ignore").format("json").option("header", "true").save("s3://location/test")
使用这个,一些文件被创建为一个 json 文件,一个接一个地有 2 个对象,有些是正确的,有些是空的。
有什么方法可以确保每个分区都创建一个单独的文件,只包含其数据?
我认为 repartition
后面的 Partitioner 不完全 符合您的意图:
它根据您的要求创建了尽可能多的分区 - 到目前为止一切顺利。但它并没有将行分配到每个分区中的一个分区中。这可能是由于 HashPartitioner 中的逻辑为多个行计算了相同的哈希值。
作为 repartition.save...
的替代方法,您可以使用 foreachPartition
然后遍历每一行,将其保存到文件(例如在 /tmp
下)并将其上传到 S3。在这样做之前我不会 repartition
因为将从 foreachPartition
执行的 UDF 相当昂贵,所以你应该尽量减少 UDF 调用的次数。
这是一个对我有用的例子。不过,它是用 Scala 编写的:
dynamicFrame.
repartition(1).
toDF().
foreachPartition(p => {
val out = new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream("/tmp/temp.xsv.gz")))
p.foreach(r => {
val row = ...
out.write(row)
})
val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).build()
val tm = TransferManagerBuilder.standard().withS3Client(s3).build()
val rq = new PutObjectRequest(bucket, key, new File("/tmp/temp.xsv.gz"))
tm.upload(rq).waitForCompletion()
})
好的,看起来我成功了。根据 rowing-ghoul 的回答,我最终使用 foreach 来处理数据,但由于 spark 的工作原理,我不得不在之后将数据发送到 s3。我还必须使用累加器将 json 字符串存储在 foreach 中。
class ArrayAccumulator(AccumulatorParam):
def zero(self, value):
return []
def addInPlace(self, val1, val2):
val1.extend(val2)
return val1
jsonAccumulator = sc.accumulator([], ArrayAccumulator())
def write_to_json(row):
# Process json
jsonAccumulator += [json]
mappedDF = applymapping1.toDF()
mappedDF.foreach(write_to_json)
count = 0
for x in jsonAccumulator.value:
s3.Object('bucket-name', 'test/' + str(count) + '.json').put(Body=x)
count += 1
我正在尝试编写一个粘合作业,使用文件的 csv 的每一行将多个 csv 文件转换为单独的 json 文件。作业完成后,s3 中会显示正确数量的文件,但有些是空的,有些在同一个文件中有多个 json 个对象。
应用映射后,这就是我创建分区和写入文件的方式:
numEntities = applyMapping1.toDF().count()
partitions = applymapping1.repartition(numEntities)
partitions.toDF().write.mode("ignore").format("json").option("header", "true").save("s3://location/test")
使用这个,一些文件被创建为一个 json 文件,一个接一个地有 2 个对象,有些是正确的,有些是空的。
有什么方法可以确保每个分区都创建一个单独的文件,只包含其数据?
我认为 repartition
后面的 Partitioner 不完全 符合您的意图:
它根据您的要求创建了尽可能多的分区 - 到目前为止一切顺利。但它并没有将行分配到每个分区中的一个分区中。这可能是由于 HashPartitioner 中的逻辑为多个行计算了相同的哈希值。
作为 repartition.save...
的替代方法,您可以使用 foreachPartition
然后遍历每一行,将其保存到文件(例如在 /tmp
下)并将其上传到 S3。在这样做之前我不会 repartition
因为将从 foreachPartition
执行的 UDF 相当昂贵,所以你应该尽量减少 UDF 调用的次数。
这是一个对我有用的例子。不过,它是用 Scala 编写的:
dynamicFrame.
repartition(1).
toDF().
foreachPartition(p => {
val out = new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream("/tmp/temp.xsv.gz")))
p.foreach(r => {
val row = ...
out.write(row)
})
val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).build()
val tm = TransferManagerBuilder.standard().withS3Client(s3).build()
val rq = new PutObjectRequest(bucket, key, new File("/tmp/temp.xsv.gz"))
tm.upload(rq).waitForCompletion()
})
好的,看起来我成功了。根据 rowing-ghoul 的回答,我最终使用 foreach 来处理数据,但由于 spark 的工作原理,我不得不在之后将数据发送到 s3。我还必须使用累加器将 json 字符串存储在 foreach 中。
class ArrayAccumulator(AccumulatorParam):
def zero(self, value):
return []
def addInPlace(self, val1, val2):
val1.extend(val2)
return val1
jsonAccumulator = sc.accumulator([], ArrayAccumulator())
def write_to_json(row):
# Process json
jsonAccumulator += [json]
mappedDF = applymapping1.toDF()
mappedDF.foreach(write_to_json)
count = 0
for x in jsonAccumulator.value:
s3.Object('bucket-name', 'test/' + str(count) + '.json').put(Body=x)
count += 1