将 sql 服务器 jar 添加到 pyspark 的 class 路径后无法查询配置单元

unable to query hive after adding sql server jar to pyspark's class path

Hive 设置正确,我可以在使用 pyspark 输入 repl 后使用 spark.sql 查询它。我想从 sql 服务器读取 table 并将其保存到配置单元。如果启动 repl 传递一个 jdbc jar 像 pyspark --driver-class-path sqljdbc4.jar --jars sqljdbc4.jar 我可以从 sql 服务器读取。但是现在 spark 无法访问 hive。对现有配置单元 table 的任何查询都会导致 Lzo 编解码器错误(见下文)。

我想知道如何 query/pull 关闭外部 sql 服务器 table,然后将其保存到现有配置单元 table。

spark.sql("select max(product_id) from table").show()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 287, in show
        print(self._jdf.showString(n, truncate))
      File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString.
    : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
    Exchange SinglePartition
    +- *HashAggregate(keys=[], functions=[partial_max(product_id#35)], output=[max#45])
       +- HiveTableScan [product_id#35], MetastoreRelation db, table

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
        at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute.apply(Dataset.scala:2193)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute(Dataset.scala:2192)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
        at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:1935)
        at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:1934)
        at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Error in configuring object
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
        at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:185)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:263)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute.apply(ShuffleExchange.scala:123)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute.apply(ShuffleExchange.scala:114)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 41 more
    Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        ... 80 more
    Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:139)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:179)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 85 more
    Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:132)
        ... 87 more

--driver-class-path 标志替换您在默认设置中配置的任何值。它不会附加新值。您可能已将 LZO jar 添加到 spark-defaults.conf 文件中的 class 路径,但它被忽略了,因为您正在用标志覆盖该设置。您应该:

1) 在您的 --driver-class-path 设置

中包含整个驱动程序 class 路径

2) 将SQL JDBC jar 添加到spark-defaults.conf 文件中spark.driver.extraClassPath 设置中的class 路径

此外,正如@Tim 在评论中指出的那样,您仍然需要使用 --jars 标志将 jar 提供给命令行。