Spark SQL insertInto() 分区键失败
Spark SQL insertInto() failing for partition key
我正在尝试从 S3 加载数据,对其进行转换,然后插入带分区的配置单元 table。
首先我开始使用 creation_date (bigint) 作为分区键并且它运行良好,但是现在当我尝试使用 creation_month 分区键插入相同的数据时它失败了。
这是代码
var hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
var df = hiveCtx.read.json("s3n://spark-feedstore/2016/1/*")
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
hiveCtx.sql("SET hive.exec.dynamic.partition = true")
hiveCtx.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
df.persist(StorageLevel.MEMORY_AND_DISK)
df.registerTempTable("posts")
第一个table的架构
[external_id,string,]
[tags,array<string>,]
[creation_date,bigint,]
[video_url,string,]
# Partition Information
creation_date bigint
第 2 个架构 table
[external_id,string,]
[tags,array<string>,]
[creation_date,bigint,]
[video_url,string,]
[creation_month,date,]
# Partition Information
creation_month bigint
插入第一个 table using 没问题。
var udf = hiveCtx .sql("select externalId as external_id, first(sourceMap['tags']) as tags, first(sourceMap['creation_date']) as creation_date,
first(sourceMap['video_url']) as video_url
from posts group by externalId")
udf.write.mode(SaveMode.Append).partitionBy("creation_date").insertInto("posts_1")
但是插入第二个 table 会出错。
var udf = hiveCtx .sql("select externalId as external_id, first(sourceMap['brand_hashtags']) as brand_hashtags, first(sourceMap['creation_date']) as creation_date,
first(sourceMap['video_url']) as video_url, trunc(from_unixtime(first(sourceMap['creation_date']) / 1000), 'MONTH') 作为 creation_month 来自按 externalId 分组的帖子")
udf.write.mode(SaveMode.Append).partitionBy("creation_month").insertInto("posts_2")
错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'cast(creation_date as array<string>)' due to data type mismatch: cannot cast LongType to ArrayType(StringType,true);
我不确定当我们添加另一个字段时会发生什么变化 creation_month。两个 table 的架构的每个方面似乎都完全相同。
我明白了。
这是按列的顺序排列的。
字段顺序是
external_id, tag, video_url, creation_date
但是在 select 查询中我有
external_id, creation_date, tag, video_url
因此 Hive 试图将 creation_date 转换为数组
我正在尝试从 S3 加载数据,对其进行转换,然后插入带分区的配置单元 table。
首先我开始使用 creation_date (bigint) 作为分区键并且它运行良好,但是现在当我尝试使用 creation_month 分区键插入相同的数据时它失败了。
这是代码
var hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
var df = hiveCtx.read.json("s3n://spark-feedstore/2016/1/*")
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
hiveCtx.sql("SET hive.exec.dynamic.partition = true")
hiveCtx.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
df.persist(StorageLevel.MEMORY_AND_DISK)
df.registerTempTable("posts")
第一个table的架构
[external_id,string,]
[tags,array<string>,]
[creation_date,bigint,]
[video_url,string,]
# Partition Information
creation_date bigint
第 2 个架构 table
[external_id,string,]
[tags,array<string>,]
[creation_date,bigint,]
[video_url,string,]
[creation_month,date,]
# Partition Information
creation_month bigint
插入第一个 table using 没问题。
var udf = hiveCtx .sql("select externalId as external_id, first(sourceMap['tags']) as tags, first(sourceMap['creation_date']) as creation_date,
first(sourceMap['video_url']) as video_url
from posts group by externalId")
udf.write.mode(SaveMode.Append).partitionBy("creation_date").insertInto("posts_1")
但是插入第二个 table 会出错。
var udf = hiveCtx .sql("select externalId as external_id, first(sourceMap['brand_hashtags']) as brand_hashtags, first(sourceMap['creation_date']) as creation_date,
first(sourceMap['video_url']) as video_url, trunc(from_unixtime(first(sourceMap['creation_date']) / 1000), 'MONTH') 作为 creation_month 来自按 externalId 分组的帖子")
udf.write.mode(SaveMode.Append).partitionBy("creation_month").insertInto("posts_2")
错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'cast(creation_date as array<string>)' due to data type mismatch: cannot cast LongType to ArrayType(StringType,true);
我不确定当我们添加另一个字段时会发生什么变化 creation_month。两个 table 的架构的每个方面似乎都完全相同。
我明白了。 这是按列的顺序排列的。
字段顺序是
external_id, tag, video_url, creation_date
但是在 select 查询中我有
external_id, creation_date, tag, video_url
因此 Hive 试图将 creation_date 转换为数组