附加到镶木地板文件的 EMR Spark 步骤正在覆盖镶木地板文件
EMR Spark step to append to parquet files is overwriting parquet files
Amazon EMR 集群(1 个主节点,2 个节点)上的 Spark 2.4.2 使用 Python 3.6
我正在读取 Amazon s3 中的对象,将它们压缩为镶木地板格式,并将它们添加(追加)到现有的镶木地板数据存储中。当我 运行 我在 pyspark 中的代码 shell 我能够读取/压缩对象并将新的镶木地板文件添加到现有的镶木地板文件中,当我 运行 查询parquet数据,显示所有数据都在parquet文件夹中。但是,当我 运行 我的 EMR 集群上的某个步骤中的代码时,现有的镶木地板文件会被新文件覆盖。同样的查询会显示只有新数据存在,而parquet数据所在的s3文件夹只有新数据。
步骤关键代码如下:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
我希望这会将 input_folder
中的数据附加到 parquet_folder
中的现有数据,但在 EMR 步骤中执行时会被覆盖。我试过 .saveAsTable
中没有 mode='append'
(在 pyspark shell 中没有必要)。
建议?
我不知道为什么你的方法不起作用,但我使用 .parquet(path)
而不是 .saveAsTable(...)
得到了更好的结果。我不知道这种行为的原因,但我之前没有看到 saveAsTable
用于保存数据对象,因为它在 Hive metastore 中创建了一个 table (这不是 "physical" 数据对象)。
如果您的步骤 运行 通过 Apache Livy,它们的行为可能与在 shell 上的行为不同。如果你确实在使用 Livy,你可以在 Zeppelin notebook 上测试你的代码,在你的代码单元格上表明你应该使用 %livy-pyspark
执行器运行。
Amazon EMR 集群(1 个主节点,2 个节点)上的 Spark 2.4.2 使用 Python 3.6
我正在读取 Amazon s3 中的对象,将它们压缩为镶木地板格式,并将它们添加(追加)到现有的镶木地板数据存储中。当我 运行 我在 pyspark 中的代码 shell 我能够读取/压缩对象并将新的镶木地板文件添加到现有的镶木地板文件中,当我 运行 查询parquet数据,显示所有数据都在parquet文件夹中。但是,当我 运行 我的 EMR 集群上的某个步骤中的代码时,现有的镶木地板文件会被新文件覆盖。同样的查询会显示只有新数据存在,而parquet数据所在的s3文件夹只有新数据。
步骤关键代码如下:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
我希望这会将 input_folder
中的数据附加到 parquet_folder
中的现有数据,但在 EMR 步骤中执行时会被覆盖。我试过 .saveAsTable
中没有 mode='append'
(在 pyspark shell 中没有必要)。
建议?
我不知道为什么你的方法不起作用,但我使用 .parquet(path)
而不是 .saveAsTable(...)
得到了更好的结果。我不知道这种行为的原因,但我之前没有看到 saveAsTable
用于保存数据对象,因为它在 Hive metastore 中创建了一个 table (这不是 "physical" 数据对象)。
如果您的步骤 运行 通过 Apache Livy,它们的行为可能与在 shell 上的行为不同。如果你确实在使用 Livy,你可以在 Zeppelin notebook 上测试你的代码,在你的代码单元格上表明你应该使用 %livy-pyspark
执行器运行。