Spark 2.1 Hive 分区添加问题 ORC 格式
Spark 2.1 Hive Partition Adding Issue ORC Format
我正在使用 pyspark 2.1 动态创建从 table A 到 table B 的分区。下面是 DDL 的
create table A (
objid bigint,
occur_date timestamp)
STORED AS ORC;
create table B (
objid bigint,
occur_date timestamp)
PARTITIONED BY (
occur_date_pt date)
STORED AS ORC;
然后我使用 pyspark 代码尝试确定需要合并的分区,下面是我实际执行此操作的代码部分
for row in incremental_df.select(partitioned_column).distinct().collect():
path = '/apps/hive/warehouse/B/' + partitioned_column + '=' + format(row[0])
old_df = merge_df.where(col(partitioned_column).isin(format(row[0])))
new_df = incremental_df.where(col(partitioned_column).isin(format(row[0])))
output_df = old_df.subtract(new_df)
output_df = output_df.unionAll(new_df)
output_df.write.option("compression","none").mode("overwrite").format("orc").save(path)
refresh_metadata_sql = 'MSCK REPAIR TABLE ' + table_name
sqlContext.sql(refresh_metadata_sql)
在执行代码时,我能够看到 HDFS 中的分区
找到 3 个项目
drwx------ 307010265 hdfs 0 2017-06-01 10:31 /apps/hive/warehouse/B/occur_date_pt=2017-06-01
drwx------ 307010265 hdfs 0 2017-06-01 10:31 /apps/hive/warehouse/B/occur_date_pt=2017-06-02
drwx------ 307010265 hdfs 0 2017-06-01 10:31 /apps/hive/warehouse/B/occur_date_pt=2017-06-03
但是当我尝试访问 Spark 中的 table 时,出现数组越界错误
>> merge_df = sqlContext.sql('select * from B')
DataFrame[]
>>> merge_df.show()
17/06/01 10:33:13 ERROR Executor: Exception in task 0.0 in stage 200.0 (TID 4827)
java.lang.IndexOutOfBoundsException: toIndex = 3
at java.util.ArrayList.subListRangeCheck(ArrayList.java:1004)
at java.util.ArrayList.subList(ArrayList.java:996)
at org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.getSchemaOnRead(RecordReaderFactory.java:161)
at org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.createTreeReader(RecordReaderFactory.java:66)
at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.<init>(RecordReaderImpl.java:202)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.rowsOptions(ReaderImpl.java:539)
at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$ReaderPair.<init>(OrcRawRecordMerger.java:183)
at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$OriginalReaderPair.<init>(OrcRawRecordMerger.java:226)
at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.<init>(OrcRawRecordMerger.java:437)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1215)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113)
at org.apache.spark.rdd.HadoopRDD$$anon.liftedTree1(HadoopRDD.scala:252)
at org.apache.spark.rdd.HadoopRDD$$anon.<init>(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
任何解决问题的帮助或指示将不胜感激
将评论作为答案发布以便于参考:
请确保分区列不包含在数据框中。
我正在使用 pyspark 2.1 动态创建从 table A 到 table B 的分区。下面是 DDL 的
create table A (
objid bigint,
occur_date timestamp)
STORED AS ORC;
create table B (
objid bigint,
occur_date timestamp)
PARTITIONED BY (
occur_date_pt date)
STORED AS ORC;
然后我使用 pyspark 代码尝试确定需要合并的分区,下面是我实际执行此操作的代码部分
for row in incremental_df.select(partitioned_column).distinct().collect():
path = '/apps/hive/warehouse/B/' + partitioned_column + '=' + format(row[0])
old_df = merge_df.where(col(partitioned_column).isin(format(row[0])))
new_df = incremental_df.where(col(partitioned_column).isin(format(row[0])))
output_df = old_df.subtract(new_df)
output_df = output_df.unionAll(new_df)
output_df.write.option("compression","none").mode("overwrite").format("orc").save(path)
refresh_metadata_sql = 'MSCK REPAIR TABLE ' + table_name
sqlContext.sql(refresh_metadata_sql)
在执行代码时,我能够看到 HDFS 中的分区
找到 3 个项目 drwx------ 307010265 hdfs 0 2017-06-01 10:31 /apps/hive/warehouse/B/occur_date_pt=2017-06-01 drwx------ 307010265 hdfs 0 2017-06-01 10:31 /apps/hive/warehouse/B/occur_date_pt=2017-06-02 drwx------ 307010265 hdfs 0 2017-06-01 10:31 /apps/hive/warehouse/B/occur_date_pt=2017-06-03
但是当我尝试访问 Spark 中的 table 时,出现数组越界错误
>> merge_df = sqlContext.sql('select * from B')
DataFrame[]
>>> merge_df.show()
17/06/01 10:33:13 ERROR Executor: Exception in task 0.0 in stage 200.0 (TID 4827)
java.lang.IndexOutOfBoundsException: toIndex = 3
at java.util.ArrayList.subListRangeCheck(ArrayList.java:1004)
at java.util.ArrayList.subList(ArrayList.java:996)
at org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.getSchemaOnRead(RecordReaderFactory.java:161)
at org.apache.hadoop.hive.ql.io.orc.RecordReaderFactory.createTreeReader(RecordReaderFactory.java:66)
at org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.<init>(RecordReaderImpl.java:202)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.rowsOptions(ReaderImpl.java:539)
at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$ReaderPair.<init>(OrcRawRecordMerger.java:183)
at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger$OriginalReaderPair.<init>(OrcRawRecordMerger.java:226)
at org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.<init>(OrcRawRecordMerger.java:437)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1215)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113)
at org.apache.spark.rdd.HadoopRDD$$anon.liftedTree1(HadoopRDD.scala:252)
at org.apache.spark.rdd.HadoopRDD$$anon.<init>(HadoopRDD.scala:251)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
任何解决问题的帮助或指示将不胜感激
将评论作为答案发布以便于参考: 请确保分区列不包含在数据框中。