AWS EMR 上的 Spark 2.2.0 写入 Parquet 会丢弃行

Spark 2.2.0 on AWS EMR writing to Parquet drops rows

所以我遇到一个问题,当写入分区的 Parquet 文件时,DataFrame 中的某些行会被删除。

这是我的步骤:

  1. 使用指定架构从 S3 读取 CSV 数据文件
  2. 按 'date' 列(日期类型)分区
  3. mode=append
  4. 写成 Parquet

阅读的第一步按预期工作,没有解析问题。对于质量检查,我执行以下操作:

对于 date='2012-11-22' 的特定分区,对 CSV 文件、加载的 DataFrame 和 parquet 文件执行计数。

下面是一些使用 pyspark 重现的代码:

logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted

我也试过写入HDFS,结果是一样的。这发生在多个分区上。 default/null 分区中没有记录。 logs_df 以上准确无误。

我尝试的第二个实验是编写一个未分区的镶木地板文件。上面代码的唯一区别是省略了 partitionBy():

logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')

加载此镶木地板集和 运行 如上所述的计数为 date='2012-11-22' 和其他日期产生了 5000 的正确结果。将模式设置为 overwrite 或不设置(使用默认值)会导致相同的数据丢失。

我的环境是:

我非常感谢修复或解决方法或使用 Spark 转换为 parquet 文件的其他方法。

谢谢,

编辑:我无法重现第二个实验。所以假设分区和未分区在写入 Parquet 或 JSON.

时似乎都删除了记录

所以谜团肯定在模式定义中。然而,出乎意料的是它不是日期或时间戳。它实际上是布尔值。

我已经从 Redshift 中导出了 CSV,它将 bool 写为 tf。当我检查推断模式时,这些字段被标记为字符串类型。在 CSV 文件中使用 truefalse 进行的简单测试将它们识别为布尔值。

所以我原以为日期和时间戳解析会像往常一样出错,但它是布尔值。吸取教训。

感谢指点