DF insertInto 没有保留混合结构化数据的所有列(json,字符串)
DF insertInto is not persisting all columns for mixed structured data ( json, string)
DataFrame saveAsTable 正确保留了所有列值,但 insertInto 函数未存储所有列,尤其是 json 数据被截断,后续列未存储在配置单元中 table.
我们的环境
- Spark 2.2.0
- 电子病历 5.10.0
- Scala 2.11.8
样本数据为
a8f11f90-20c9-11e8-b93e-2fc569d27605 efe5bdb3-baac-5d8e-6cae57771c13 Unknown E657F298-2D96-4C7D-8516-E228153FE010 NonDemarcated {"org-id":"efe5bdb3-baac-5d8e-6cae57771c13","nodeid":"N02c00056","parkingzoneid":"E657F298-2D96-4C7D-8516-E228153FE010","site-id":"a8f11f90-20c9-11e8-b93e-2fc569d27605","channel":1,"type":"Park","active":true,"tag":"","configured_date":"2017-10-23
23:29:11.20","vs":[5.0,1.7999999523162842,1.5]}
DF SaveAsTable
val spark = SparkSession.builder().appName("Spark SQL Test").
config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
enableHiveSupport().getOrCreate()
val zoneStatus = spark.table("zone_status")
zoneStatus.select(col("site-id"),col("org-id"), col("groupid"), col("zid"), col("type"), lit(0), col("config"), unix_timestamp().alias("ts")).
write.mode(SaveMode.Overwrite).saveAsTable("dwh_zone_status")
在结果中正确存储数据 table:
a8f11f90-20c9-11e8-b93e-2fc569d27605 efe5bdb3-baac-5d8e-6cae57771c13 Unknown E657F298-2D96-4C7D-8516-E228153FE010 NonDemarcated 0 {"org-id":"efe5bdb3-baac-5d8e-6cae57771c13","nodeid":"N02c00056","parkingzoneid":"E657F298-2D96-4C7D-8516-E228153FE010","site-id":"a8f11f90-20c9-11e8-b93e-2fc569d27605","channel":1,"type":"Park","active":true,"tag":"","configured_date":"2017-10-23 23:29:11.20","vs":[5.0,1.7999999523162842,1.5]} 1520453589
DF insertInto
zoneStatus.
select(col("site-id"),col("org-id"), col("groupid"), col("zid"), col("type"), lit(0), col("config"), unix_timestamp().alias("ts")).
write.mode(SaveMode.Overwrite).insertInto("zone_status_insert")
但是 insertInto 并没有持久化所有内容。 json 字符串正在部分存储,后续列未存储。
a8f11f90-20c9-11e8-b93e-2fc569d27605 efe5bdb3-baac-5d8e-6cae57771c13 Unknown E657F298-2D96-4C7D-8516-E228153FE010 NonDemarcated 0 {"org-id":"efe5bdb3-baac-5d8e-6cae57771c13" NULL
我们在项目中使用 insertInto 函数,最近在解析 json 数据以拉取其他指标时遇到。我们注意到配置内容没有完全存储。计划更改为 saveAsTable 但我们可以避免代码更改,如果有任何解决方法可用于添加到 spark 配置中。
您可以使用以下替代方法将数据插入 table。
val zoneStatusDF = zoneStatus.
select(col("site-id"),col("org-id"), col("groupid"), col("zid"), col("type"), lit(0), col("config"), unix_timestamp().alias("ts"))
zoneStatusDF.registerTempTable("zone_status_insert ")
或者
zoneStatus.sqlContext.sql("create table zone_status_insert as select * from zone_status")
原因是使用
创建的模式
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
删除 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','后 可以使用 insertInto 保存全部内容。
DataFrame saveAsTable 正确保留了所有列值,但 insertInto 函数未存储所有列,尤其是 json 数据被截断,后续列未存储在配置单元中 table.
我们的环境
- Spark 2.2.0
- 电子病历 5.10.0
- Scala 2.11.8
样本数据为
a8f11f90-20c9-11e8-b93e-2fc569d27605 efe5bdb3-baac-5d8e-6cae57771c13 Unknown E657F298-2D96-4C7D-8516-E228153FE010 NonDemarcated {"org-id":"efe5bdb3-baac-5d8e-6cae57771c13","nodeid":"N02c00056","parkingzoneid":"E657F298-2D96-4C7D-8516-E228153FE010","site-id":"a8f11f90-20c9-11e8-b93e-2fc569d27605","channel":1,"type":"Park","active":true,"tag":"","configured_date":"2017-10-23
23:29:11.20","vs":[5.0,1.7999999523162842,1.5]}
DF SaveAsTable
val spark = SparkSession.builder().appName("Spark SQL Test").
config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
enableHiveSupport().getOrCreate()
val zoneStatus = spark.table("zone_status")
zoneStatus.select(col("site-id"),col("org-id"), col("groupid"), col("zid"), col("type"), lit(0), col("config"), unix_timestamp().alias("ts")).
write.mode(SaveMode.Overwrite).saveAsTable("dwh_zone_status")
在结果中正确存储数据 table:
a8f11f90-20c9-11e8-b93e-2fc569d27605 efe5bdb3-baac-5d8e-6cae57771c13 Unknown E657F298-2D96-4C7D-8516-E228153FE010 NonDemarcated 0 {"org-id":"efe5bdb3-baac-5d8e-6cae57771c13","nodeid":"N02c00056","parkingzoneid":"E657F298-2D96-4C7D-8516-E228153FE010","site-id":"a8f11f90-20c9-11e8-b93e-2fc569d27605","channel":1,"type":"Park","active":true,"tag":"","configured_date":"2017-10-23 23:29:11.20","vs":[5.0,1.7999999523162842,1.5]} 1520453589
DF insertInto
zoneStatus.
select(col("site-id"),col("org-id"), col("groupid"), col("zid"), col("type"), lit(0), col("config"), unix_timestamp().alias("ts")).
write.mode(SaveMode.Overwrite).insertInto("zone_status_insert")
但是 insertInto 并没有持久化所有内容。 json 字符串正在部分存储,后续列未存储。
a8f11f90-20c9-11e8-b93e-2fc569d27605 efe5bdb3-baac-5d8e-6cae57771c13 Unknown E657F298-2D96-4C7D-8516-E228153FE010 NonDemarcated 0 {"org-id":"efe5bdb3-baac-5d8e-6cae57771c13" NULL
我们在项目中使用 insertInto 函数,最近在解析 json 数据以拉取其他指标时遇到。我们注意到配置内容没有完全存储。计划更改为 saveAsTable 但我们可以避免代码更改,如果有任何解决方法可用于添加到 spark 配置中。
您可以使用以下替代方法将数据插入 table。
val zoneStatusDF = zoneStatus.
select(col("site-id"),col("org-id"), col("groupid"), col("zid"), col("type"), lit(0), col("config"), unix_timestamp().alias("ts"))
zoneStatusDF.registerTempTable("zone_status_insert ")
或者
zoneStatus.sqlContext.sql("create table zone_status_insert as select * from zone_status")
原因是使用
创建的模式ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE
删除 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','后 可以使用 insertInto 保存全部内容。