AWS EMR 上的 Spark 2.2.0 写入 Parquet 会丢弃行
Spark 2.2.0 on AWS EMR writing to Parquet drops rows
所以我遇到一个问题,当写入分区的 Parquet 文件时,DataFrame 中的某些行会被删除。
这是我的步骤:
- 使用指定架构从 S3 读取 CSV 数据文件
- 按 'date' 列(日期类型)分区
- 用
mode=append
写成 Parquet
阅读的第一步按预期工作,没有解析问题。对于质量检查,我执行以下操作:
对于 date='2012-11-22'
的特定分区,对 CSV 文件、加载的 DataFrame 和 parquet 文件执行计数。
下面是一些使用 pyspark 重现的代码:
logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted
我也试过写入HDFS,结果是一样的。这发生在多个分区上。 default/null 分区中没有记录。 logs_df
以上准确无误。
我尝试的第二个实验是编写一个未分区的镶木地板文件。上面代码的唯一区别是省略了 partitionBy()
:
logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')
加载此镶木地板集和 运行 如上所述的计数为 date='2012-11-22'
和其他日期产生了 5000 的正确结果。将模式设置为 overwrite
或不设置(使用默认值)会导致相同的数据丢失。
我的环境是:
- 电子病历 5.9.0
- Spark 2.2.0
- Hadoop 发行版:Amazon 2.7.3
- 尝试使用 EMRFS 一致视图和不一致视图。然而,大多数测试都是写入 HDFS 以避免任何 S3 一致性问题。
我非常感谢修复或解决方法或使用 Spark 转换为 parquet 文件的其他方法。
谢谢,
编辑:我无法重现第二个实验。所以假设分区和未分区在写入 Parquet 或 JSON.
时似乎都删除了记录
所以谜团肯定在模式定义中。然而,出乎意料的是它不是日期或时间戳。它实际上是布尔值。
我已经从 Redshift 中导出了 CSV,它将 bool 写为 t
和 f
。当我检查推断模式时,这些字段被标记为字符串类型。在 CSV 文件中使用 true
和 false
进行的简单测试将它们识别为布尔值。
所以我原以为日期和时间戳解析会像往常一样出错,但它是布尔值。吸取教训。
感谢指点
所以我遇到一个问题,当写入分区的 Parquet 文件时,DataFrame 中的某些行会被删除。
这是我的步骤:
- 使用指定架构从 S3 读取 CSV 数据文件
- 按 'date' 列(日期类型)分区
- 用
mode=append
写成 Parquet
阅读的第一步按预期工作,没有解析问题。对于质量检查,我执行以下操作:
对于 date='2012-11-22'
的特定分区,对 CSV 文件、加载的 DataFrame 和 parquet 文件执行计数。
下面是一些使用 pyspark 重现的代码:
logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted
我也试过写入HDFS,结果是一样的。这发生在多个分区上。 default/null 分区中没有记录。 logs_df
以上准确无误。
我尝试的第二个实验是编写一个未分区的镶木地板文件。上面代码的唯一区别是省略了 partitionBy()
:
logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')
加载此镶木地板集和 运行 如上所述的计数为 date='2012-11-22'
和其他日期产生了 5000 的正确结果。将模式设置为 overwrite
或不设置(使用默认值)会导致相同的数据丢失。
我的环境是:
- 电子病历 5.9.0
- Spark 2.2.0
- Hadoop 发行版:Amazon 2.7.3
- 尝试使用 EMRFS 一致视图和不一致视图。然而,大多数测试都是写入 HDFS 以避免任何 S3 一致性问题。
我非常感谢修复或解决方法或使用 Spark 转换为 parquet 文件的其他方法。
谢谢,
编辑:我无法重现第二个实验。所以假设分区和未分区在写入 Parquet 或 JSON.
时似乎都删除了记录所以谜团肯定在模式定义中。然而,出乎意料的是它不是日期或时间戳。它实际上是布尔值。
我已经从 Redshift 中导出了 CSV,它将 bool 写为 t
和 f
。当我检查推断模式时,这些字段被标记为字符串类型。在 CSV 文件中使用 true
和 false
进行的简单测试将它们识别为布尔值。
所以我原以为日期和时间戳解析会像往常一样出错,但它是布尔值。吸取教训。
感谢指点