查询 Hive 时数据帧 NumberFormatException 上的 Spark 2.2 Thrift 服务器错误 table
Spark 2.2 Thrift server error on dataframe NumberFormatException when query Hive table
我有 Hortonworks HDP 2.6.3 运行 Spark2 (v2.2)。我的测试用例很简单:
创建一个具有一些随机值的 Hive table。蜂巢在端口 10000
在 10016 开启 Spark Thrift 服务器
运行 pyspark 并通过 10016
查询 Hive table
但是,由于 NumberFormatException,我无法从 Spark 获取数据。
这是我的测试用例:
- 使用示例行创建 Hive table:
beeline> !connect jdbc:hive2://localhost:10000/default hive hive
create table test1 (id int, desc varchar(40));
insert into table test1 values (1,"aa"),(2,"bb");
- 运行 Spark Thrift 服务器:
su - spark -c '/usr/hdp/2.6.3.0-235/spark2/sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.port=10016'
运行 pyspark 作为 spark 用户
su - spark -c 'pyspark'
输入以下代码:
df = sqlContext.read.format("jdbc").options(驱动="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test1",用户="hive",密码="hive").load()
df.select("*").show()
我收到这个错误:
17/12/15 08:04:13 ERROR Executor: Exception in task 0.0 in stage 2.0
(TID 2) java.sql.SQLException: Cannot convert column 1 to
integerjava.lang.NumberFormatException: For input string: "id" at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by:
java.lang.NumberFormatException: For input string: "id" at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) at
java.lang.Integer.valueOf(Integer.java:766) at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)
... 23 more 17/12/15 08:04:13 WARN TaskSetManager: Lost task 0.0 in
stage 2.0 (TID 2, localhost, executor driver): java.sql.SQLException:
Cannot convert column 1 to integerjava.lang.NumberFormatException: For
input string: "id" at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by:
java.lang.NumberFormatException: For input string: "id" at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) at
java.lang.Integer.valueOf(Integer.java:766) at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)
... 23 more
17/12/15 08:04:14 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1
times; aborting job Traceback (most recent call last): File
"", line 1, in File
"/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line
336, in show
print(self._jdf.showString(n, 20)) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",
line 1133, in call File
"/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63,
in deco
return f(*a, **kw) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",
line 319, in get_return_value py4j.protocol.Py4JJavaError: An error
occurred while calling o75.showString. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0
in stage 2.0 (TID 2, localhost, executor driver):
java.sql.SQLException: Cannot convert column 1 to
integerjava.lang.NumberFormatException: For input string: "id" at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by:
java.lang.NumberFormatException: For input string: "id" at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) at
java.lang.Integer.valueOf(Integer.java:766) at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)
... 23 more
Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2854)
at
org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2154)
at
org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2154)
at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2838)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837) at
org.apache.spark.sql.Dataset.head(Dataset.scala:2154) at
org.apache.spark.sql.Dataset.take(Dataset.scala:2367) at
org.apache.spark.sql.Dataset.showString(Dataset.scala:245) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:280) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:214) at
java.lang.Thread.run(Thread.java:745) Caused by:
java.sql.SQLException: Cannot convert column 1 to
integerjava.lang.NumberFormatException: For input string: "id" at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source) at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
org.apache.spark.scheduler.Task.run(Task.scala:108) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more Caused by: java.lang.NumberFormatException: For input
string: "id" at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) at
java.lang.Integer.valueOf(Integer.java:766) at
org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346)
... 23 more
我怀疑它与id
列有关,所以我改为:df.select("desc").show()
然后我得到了这个奇怪的结果:
+----+
|desc|
+----+
|desc|
|desc|
+----+
- 如果我回到 Hive 查询,通过端口 10016 一切正常:
beeline> !connect jdbc:hive2://localhost:10016/default hive hive
select * from test1;
+-----+-------+--+
| id | desc |
+-----+-------+--+
| 1 | aa |
| 2 | bb |
+-----+-------+--+
- 如果我在 pyspark 中更改端口 10000,同样的问题仍然存在。
你能帮我理解为什么以及如何通过 Spark 获取行吗?
更新 1
我在这两种情况下都遵循了@Achyuth 的建议,但它们仍然不起作用。
案例一
直线:
create table test4 (id String, desc String);
insert into table test4 values ("1","aa"),("2","bb");
select * from test4;
Pyspark:
>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
| id|desc|
| id|desc|
+---+----+
出于某种原因,它在列名称中返回了?!
案例二
直线:
create table test5 (id int, desc varchar(40)) STORED AS ORC;
insert into table test5 values (1,"aa"),(2,"bb");
select * from test5;
Pyspark:
还是一样的错误Caused by: java.lang.NumberFormatException: For input string: "id"
更新 2
创建一个 table 并通过 Hive 端口 10000 插入值然后查询它。这通过直线工作正常
beeline> !connect jdbc:hive2://localhost:10000/default hive hive
Connecting to jdbc:hive2://localhost:10000/default
Connected to: Apache Hive (version 1.2.1000.2.5.3.0-37)
Driver: Hive JDBC (version 1.2.1000.2.5.3.0-37)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000/default> create table test2 (id String, desc String) STORED AS ORC;
No rows affected (0.3 seconds)
0: jdbc:hive2://localhost:10000/default> insert into table test2 values ("1","aa"),("2","bb");
INFO : Session is already open
INFO : Dag name: insert into table tes..."1","aa"),("2","bb")(Stage-1)
INFO : Tez session was closed. Reopening...
INFO : Session re-established.
INFO :
INFO : Status: Running (Executing on YARN cluster with App id application_1514019042819_0006)
INFO : Map 1: -/-
INFO : Map 1: 0/1
INFO : Map 1: 0(+1)/1
INFO : Map 1: 1/1
INFO : Loading data to table default.test2 from webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test2/.hive-staging_hive_2017-12-23_04-29-54_569_601147868480753216-3/-ext-10000
INFO : Table default.test2 stats: [numFiles=1, numRows=2, totalSize=317, rawDataSize=342]
No rows affected (15.414 seconds)
0: jdbc:hive2://localhost:10000/default> select * from table2;
Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'table2' (state=42S02,code=10001)
0: jdbc:hive2://localhost:10000/default> select * from test2;
+-----------+-------------+--+
| test2.id | test2.desc |
+-----------+-------------+--+
| 1 | aa |
| 2 | bb |
+-----------+-------------+--+
2 rows selected (0.364 seconds)
同样通过直线,我可以使用 Spark Thrift Server 10016 来做同样的事情并且它运行良好:
beeline> !connect jdbc:hive2://localhost:10016/default hive hive
Connecting to jdbc:hive2://localhost:10016/default
1: jdbc:hive2://localhost:10016/default> create table test3 (id String, desc String) STORED AS ORC;
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (1.234 seconds)
1: jdbc:hive2://localhost:10016/default> insert into table test3 values ("1","aa"),("2","bb");
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (9.111 seconds)
1: jdbc:hive2://localhost:10016/default> select * from test3;
+-----+-------+--+
| id | desc |
+-----+-------+--+
| 1 | aa |
| 2 | bb |
+-----+-------+--+
2 rows selected (3.387 seconds)
这意味着 Spark 和 Thrift Server 工作正常。但是使用 pyspark
我遇到了同样的问题,因为结果是空的:
>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test3",user="hive", password="hive").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
+---+----+
更新 3
描述扩展测试 3;
# Detailed Table Information | CatalogTable(
Table: `default`.`test3`
Owner: hive
Created: Sat Dec 23 04:37:14 PST 2017
Last Access: Wed Dec 31 16:00:00 PST 1969
Type: MANAGED
Schema: [`id` string, `desc` string]
Properties: [totalSize=620, numFiles=2, transient_lastDdlTime=1514032656, STATS_GENERATED_VIA_STATS_TASK=true]
Storage(Location: webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3, InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, Serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde, Properties: [serialization.format=1]))
显示创建 TABLE test3;
CREATE TABLE `test3`(`id` string, `desc` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES (
'totalSize' = '620',
'numFiles' = '2',
'transient_lastDdlTime' = '1514032656',
'STATS_GENERATED_VIA_STATS_TASK' = 'true'
)
su - spark -c 'hdfs dfs -cat webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3/part-00000'
即使您正在创建具有特定数据类型的配置单元 table,插入时 table 中的基础数据也会存储为字符串格式。
因此,当 spark 尝试读取数据时,它会使用 Metastore 来查找数据类型。它在配置单元 Metastore 中作为 int 存在,在文件中作为字符串存在,并且它正在抛出强制转换异常。
解决方案
将 table 创建为字符串并从 spark 中读取它将起作用。
create table test1 (id String, desc String);
如果要保留数据类型,请指定创建 table 的文件格式之一,例如 orc 或 parquet,然后将其插入。您可以毫无例外地从 spark 中读取文件
create table test1 (id int, desc varchar(40) STORED AS ORC);
现在的问题是为什么 hive 能够读取它?
Hive 具有可用的良好转换选项,而 spark 则没有。
在互联网上查找了几个小时后才解决了这个问题!!!(没有互联网没有帮助)
由于我还是Scala/Hive中的菜鸟,我无法提供很好的解释。但,
我通过添加 AWS 提供的一些外部 jars(Hive JDBC) 文件解决了这个问题。我还将驱动程序选项更改为“com.amazon.hive.jdbc41.HS2Driver”。
转至以下link下载驱动程序并查看示例代码。
http://awssupportdatasvcs.com/bootstrap-actions/Simba/latest/
我有 Hortonworks HDP 2.6.3 运行 Spark2 (v2.2)。我的测试用例很简单:
创建一个具有一些随机值的 Hive table。蜂巢在端口 10000
在 10016 开启 Spark Thrift 服务器
运行 pyspark 并通过 10016
查询 Hive table
但是,由于 NumberFormatException,我无法从 Spark 获取数据。
这是我的测试用例:
- 使用示例行创建 Hive table:
beeline> !connect jdbc:hive2://localhost:10000/default hive hive create table test1 (id int, desc varchar(40)); insert into table test1 values (1,"aa"),(2,"bb");
- 运行 Spark Thrift 服务器:
su - spark -c '/usr/hdp/2.6.3.0-235/spark2/sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.port=10016'
运行 pyspark 作为 spark 用户 su - spark -c 'pyspark'
输入以下代码:
df = sqlContext.read.format("jdbc").options(驱动="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test1",用户="hive",密码="hive").load()
df.select("*").show()
我收到这个错误:
17/12/15 08:04:13 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "id" at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NumberFormatException: For input string: "id" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.valueOf(Integer.java:766) at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346) ... 23 more 17/12/15 08:04:13 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "id" at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NumberFormatException: For input string: "id" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.valueOf(Integer.java:766) at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346) ... 23 more
17/12/15 08:04:14 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 336, in show print(self._jdf.showString(n, 20)) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o75.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "id" at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NumberFormatException: For input string: "id" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.valueOf(Integer.java:766) at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346) ... 23 more
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1505) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1504) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676) at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2854) at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2154) at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2154) at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:2838) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837) at org.apache.spark.sql.Dataset.head(Dataset.scala:2154) at org.apache.spark.sql.Dataset.take(Dataset.scala:2367) at org.apache.spark.sql.Dataset.showString(Dataset.scala:245) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: java.sql.SQLException: Cannot convert column 1 to integerjava.lang.NumberFormatException: For input string: "id" at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:351) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:394) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter.apply(JdbcUtils.scala:393) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon.getNext(JdbcUtils.scala:312) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: java.lang.NumberFormatException: For input string: "id" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.valueOf(Integer.java:766) at org.apache.hive.jdbc.HiveBaseResultSet.getInt(HiveBaseResultSet.java:346) ... 23 more
我怀疑它与
id
列有关,所以我改为:df.select("desc").show()然后我得到了这个奇怪的结果:
+----+ |desc| +----+ |desc| |desc| +----+
- 如果我回到 Hive 查询,通过端口 10016 一切正常:
beeline> !connect jdbc:hive2://localhost:10016/default hive hive select * from test1; +-----+-------+--+ | id | desc | +-----+-------+--+ | 1 | aa | | 2 | bb | +-----+-------+--+
- 如果我在 pyspark 中更改端口 10000,同样的问题仍然存在。
你能帮我理解为什么以及如何通过 Spark 获取行吗?
更新 1
我在这两种情况下都遵循了@Achyuth 的建议,但它们仍然不起作用。
案例一
直线:
create table test4 (id String, desc String);
insert into table test4 values ("1","aa"),("2","bb");
select * from test4;
Pyspark:
>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test4",user="hive", password="hive").option("fetchsize", "10").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
| id|desc|
| id|desc|
+---+----+
出于某种原因,它在列名称中返回了?!
案例二
直线:
create table test5 (id int, desc varchar(40)) STORED AS ORC;
insert into table test5 values (1,"aa"),(2,"bb");
select * from test5;
Pyspark:
还是一样的错误Caused by: java.lang.NumberFormatException: For input string: "id"
更新 2
创建一个 table 并通过 Hive 端口 10000 插入值然后查询它。这通过直线工作正常
beeline> !connect jdbc:hive2://localhost:10000/default hive hive
Connecting to jdbc:hive2://localhost:10000/default
Connected to: Apache Hive (version 1.2.1000.2.5.3.0-37)
Driver: Hive JDBC (version 1.2.1000.2.5.3.0-37)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000/default> create table test2 (id String, desc String) STORED AS ORC;
No rows affected (0.3 seconds)
0: jdbc:hive2://localhost:10000/default> insert into table test2 values ("1","aa"),("2","bb");
INFO : Session is already open
INFO : Dag name: insert into table tes..."1","aa"),("2","bb")(Stage-1)
INFO : Tez session was closed. Reopening...
INFO : Session re-established.
INFO :
INFO : Status: Running (Executing on YARN cluster with App id application_1514019042819_0006)
INFO : Map 1: -/-
INFO : Map 1: 0/1
INFO : Map 1: 0(+1)/1
INFO : Map 1: 1/1
INFO : Loading data to table default.test2 from webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test2/.hive-staging_hive_2017-12-23_04-29-54_569_601147868480753216-3/-ext-10000
INFO : Table default.test2 stats: [numFiles=1, numRows=2, totalSize=317, rawDataSize=342]
No rows affected (15.414 seconds)
0: jdbc:hive2://localhost:10000/default> select * from table2;
Error: Error while compiling statement: FAILED: SemanticException [Error 10001]: Line 1:14 Table not found 'table2' (state=42S02,code=10001)
0: jdbc:hive2://localhost:10000/default> select * from test2;
+-----------+-------------+--+
| test2.id | test2.desc |
+-----------+-------------+--+
| 1 | aa |
| 2 | bb |
+-----------+-------------+--+
2 rows selected (0.364 seconds)
同样通过直线,我可以使用 Spark Thrift Server 10016 来做同样的事情并且它运行良好:
beeline> !connect jdbc:hive2://localhost:10016/default hive hive
Connecting to jdbc:hive2://localhost:10016/default
1: jdbc:hive2://localhost:10016/default> create table test3 (id String, desc String) STORED AS ORC;
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (1.234 seconds)
1: jdbc:hive2://localhost:10016/default> insert into table test3 values ("1","aa"),("2","bb");
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (9.111 seconds)
1: jdbc:hive2://localhost:10016/default> select * from test3;
+-----+-------+--+
| id | desc |
+-----+-------+--+
| 1 | aa |
| 2 | bb |
+-----+-------+--+
2 rows selected (3.387 seconds)
这意味着 Spark 和 Thrift Server 工作正常。但是使用 pyspark
我遇到了同样的问题,因为结果是空的:
>>> df = sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver", url="jdbc:hive2://localhost:10016/default", dbtable="test3",user="hive", password="hive").load()
>>> df.select("*").show()
+---+----+
| id|desc|
+---+----+
+---+----+
更新 3
描述扩展测试 3;
# Detailed Table Information | CatalogTable(
Table: `default`.`test3`
Owner: hive
Created: Sat Dec 23 04:37:14 PST 2017
Last Access: Wed Dec 31 16:00:00 PST 1969
Type: MANAGED
Schema: [`id` string, `desc` string]
Properties: [totalSize=620, numFiles=2, transient_lastDdlTime=1514032656, STATS_GENERATED_VIA_STATS_TASK=true]
Storage(Location: webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3, InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, Serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde, Properties: [serialization.format=1]))
显示创建 TABLE test3;
CREATE TABLE `test3`(`id` string, `desc` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES (
'totalSize' = '620',
'numFiles' = '2',
'transient_lastDdlTime' = '1514032656',
'STATS_GENERATED_VIA_STATS_TASK' = 'true'
)
su - spark -c 'hdfs dfs -cat webhdfs://demo.myapp.local:40070/apps/hive/warehouse/test3/part-00000'
即使您正在创建具有特定数据类型的配置单元 table,插入时 table 中的基础数据也会存储为字符串格式。
因此,当 spark 尝试读取数据时,它会使用 Metastore 来查找数据类型。它在配置单元 Metastore 中作为 int 存在,在文件中作为字符串存在,并且它正在抛出强制转换异常。
解决方案
将 table 创建为字符串并从 spark 中读取它将起作用。
create table test1 (id String, desc String);
如果要保留数据类型,请指定创建 table 的文件格式之一,例如 orc 或 parquet,然后将其插入。您可以毫无例外地从 spark 中读取文件
create table test1 (id int, desc varchar(40) STORED AS ORC);
现在的问题是为什么 hive 能够读取它? Hive 具有可用的良好转换选项,而 spark 则没有。
在互联网上查找了几个小时后才解决了这个问题!!!(没有互联网没有帮助)
由于我还是Scala/Hive中的菜鸟,我无法提供很好的解释。但, 我通过添加 AWS 提供的一些外部 jars(Hive JDBC) 文件解决了这个问题。我还将驱动程序选项更改为“com.amazon.hive.jdbc41.HS2Driver”。
转至以下link下载驱动程序并查看示例代码。
http://awssupportdatasvcs.com/bootstrap-actions/Simba/latest/