Glue 作业将多个分区写入同一个文件

Glue Job Writes Multiple Partitions to the Same File

我正在尝试编写一个粘合作业,使用文件的 csv 的每一行将多个 csv 文件转换为单独的 json 文件。作业完成后,s3 中会显示正确数量的文件,但有些是空的,有些在同一个文件中有多个 json 个对象。

应用映射后,这就是我创建分区和写入文件的方式:

numEntities = applyMapping1.toDF().count()
partitions = applymapping1.repartition(numEntities)
partitions.toDF().write.mode("ignore").format("json").option("header", "true").save("s3://location/test")

使用这个,一些文件被创建为一个 json 文件,一个接一个地有 2 个对象,有些是正确的,有些是空的。

有什么方法可以确保每个分区都创建一个单独的文件,只包含其数据?

我认为 repartition 后面的 Partitioner 不完全 符合您的意图:

它根据您的要求创建了尽可能多的分区 - 到目前为止一切顺利。但它并没有将行分配到每个分区中的一个分区中。这可能是由于 HashPartitioner 中的逻辑为多个行计算了相同的哈希值。

作为 repartition.save... 的替代方法,您可以使用 foreachPartition 然后遍历每一行,将其保存到文件(例如在 /tmp 下)并将其上传到 S3。在这样做之前我不会 repartition 因为将从 foreachPartition 执行的 UDF 相当昂贵,所以你应该尽量减少 UDF 调用的次数。

这是一个对我有用的例子。不过,它是用 Scala 编写的:

dynamicFrame.
  repartition(1).
  toDF().
  foreachPartition(p => {
    val out = new BufferedOutputStream(new GZIPOutputStream(new FileOutputStream("/tmp/temp.xsv.gz")))
    p.foreach(r => {
      val row = ...
      out.write(row)
    })
    val s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.EU_CENTRAL_1).build()
    val tm = TransferManagerBuilder.standard().withS3Client(s3).build()
    val rq = new PutObjectRequest(bucket, key, new File("/tmp/temp.xsv.gz"))
    tm.upload(rq).waitForCompletion()
  })

好的,看起来我成功了。根据 rowing-ghoul 的回答,我最终使用 foreach 来处理数据,但由于 spark 的工作原理,我不得不在之后将数据发送到 s3。我还必须使用累加器将 json 字符串存储在 foreach 中。

class ArrayAccumulator(AccumulatorParam):
  def zero(self, value):
    return []
  def addInPlace(self, val1, val2):
    val1.extend(val2)
    return val1
jsonAccumulator = sc.accumulator([], ArrayAccumulator())

def write_to_json(row):
  # Process json
  jsonAccumulator += [json]

mappedDF = applymapping1.toDF()
mappedDF.foreach(write_to_json)

count = 0
for x in jsonAccumulator.value:
  s3.Object('bucket-name', 'test/' + str(count) + '.json').put(Body=x)
  count += 1