使用 PySpark 写入 HBase Table 时出错

Error while writing to HBase Table using PySpark

我正在尝试使用 pySpark 写入 hbase table。到目前为止,我可以从 hbase 读取数据。但在写入 hbase table.

时出现异常
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *

properties = {
  "instanceId" : "hbase",
  "zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181",
  "hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b",
  "hbase.use.hbase.context" : False,
  "hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml",
  "hbase.table"  : "t"
}
spark = SparkSession\
        .builder\
        .appName("hbaseWrite")\
        .getOrCreate()

sc = spark.sparkContext

#I am able to read the data successfully.
#df = spark.read.format("org.apache.hadoop.hbase.spark")\
#    .options( **properties)\
#    .load()

data = [("3","DATA 3 A", "DATA 3 B")]
columns = ['KEY_FIELD','A','B']
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns])
df = spark.createDataFrame(data, schema=cSchema)
df.write\
      .options( **properties)\
      .mode('overwrite').format("org.apache.hadoop.hbase.spark").save()

正在执行以下格式的命令:

spark2-submit --master local[*] write_to_hbase.py

Spark版本:2.2.0.cloudera1(我无法更改我的spark版本) HBase版本:1.2.0-cdh5.12.0(但我可以更改我的HBase版本)

注意:我已将 hbase jar 添加到 spark2 jar 文件夹,并将以下依赖 jar 添加到 spark2 jar 文件夹。

  1. spark-core_2.11-1.6.1.jar
  2. htrace-core-3.1.0-incubating.jar
  3. scala-library-2.9.1.jar

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o70.save.
: java.lang.RuntimeException: org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select.
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:476)

我已经尝试了多种建议,但没有任何效果。这可能是一个重复的问题,但我没有其他选择可以找到答案。

如果您使用 Cloudera distribution,那么 Hard Luck 没有使用 PYSAPRK 写入 HBASE 的官方方法。 Cloudera support Team.

证实了这一点

但是如果您正在使用 Hortonworks 并且您有 spark 2.0 那么下面的 link 应该可以帮助您入门。

Pyspark to Hbase write

通过编译@Aniket Kulkarni

建议的 git 回购 https://github.com/hortonworks-spark/shc and put the shc jar in the spark jar folder. and followed the 解决了这个问题

最终代码看起来像这样,

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *

properties = {
  "instanceId" : "hbase",
  "zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181",
  "hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b",
  "hbase.use.hbase.context" : False,
  "hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml",
  "hbase.table"  : "test_table"
}
spark = SparkSession.builder\
        .appName("hbaseWrite")\
        .getOrCreate()

sc = spark.sparkContext
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"test_table"}
    "rowkey":"key",
    "columns":{
        "KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
        "A":{"cf":"c", "col":"a", "type":"string"},
        "B":{"cf":"c", "col":"b", "type":"string"}
    }
}""".split())


data = [("3","DATA 3 A", "DATA 3 B")]
columns = ['KEY_FIELD','A','B']
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns])
df = spark.createDataFrame(data, schema=cSchema)
df.write\
      .options(catalog=catalog)\
      .options( **properties)\
      .mode('overwrite').format("org.apache.spark.sql.execution.datasources.hbase").save()