如何将 Pyspark 数据帧存储到 HBase

how to store Pyspark dataframe into HBase

我有一个代码可以将 Pyspark 流数据转换为数据帧。我需要将此数据框存储到 Hbase 中。帮我另外写代码。

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession

def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
    globals()['sparkSessionSingletonInstance'] = SparkSession\
        .builder\
        .config(conf=sparkConf)\
        .getOrCreate()
return globals()['sparkSessionSingletonInstance']


if __name__ == "__main__":
if len(sys.argv) != 3:
    print("Usage: sql_network_wordcount.py <hostname> <port> ", 
file=sys.stderr)
    exit(-1)
host, port = sys.argv[1:]
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(host, int(port))

def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        words = rdd.map(lambda line :line.split(" ")).collect()
        spark = getSparkSessionInstance(rdd.context.getConf())
        linesDataFrame = spark.createDataFrame(words,schema=["lat","lon"])

        linesDataFrame.show()
except :
pass

lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()

您可以使用 Spark-Hbase 连接器从 Spark.It 访问 HBase 在低级别 RDDDataframes 中提供 API。

连接器要求您为 HBase table 定义一个 Schema。下面是为 HBase table 定义的架构示例,名称为 table1,行键作为键,列数 (col1-col8)。请注意,rowkey 也必须详细定义为具有特定 cf (rowkey) 的列 (col0)。

def catalog = '{
        "table":{"namespace":"default", "name":"table1"},\
        "rowkey":"key",\
        "columns":{\
          "col0":{"cf":"rowkey", "col":"key", "type":"string"},\
          "col1":{"cf":"cf1", "col":"col1", "type":"boolean"},\
          "col2":{"cf":"cf1", "col":"col2", "type":"double"},\
          "col3":{"cf":"cf1", "col":"col3", "type":"float"},\
          "col4":{"cf":"cf1", "col":"col4", "type":"int"},\
          "col5":{"cf":"cf2", "col":"col5", "type":"bigint"},\
          "col6":{"cf":"cf2", "col":"col6", "type":"smallint"},\
          "col7":{"cf":"cf2", "col":"col7", "type":"string"},\
          "col8":{"cf":"cf2", "col":"col8", "type":"tinyint"}\
        }\
      }'

根据数据框的模式定义目录后,您可以使用以下方法将数据框写入 HBase:

df.write\
.options(catalog=catalog)\
.format("org.apache.spark.sql.execution.datasources.hbase")\
.save()

从HBase读取数据:

df = spark.\
read.\
format("org.apache.spark.sql.execution.datasources.hbase").\
option(catalog=catalog).\
load()

提交spark申请时需要包含如下所示的Spark-HBase连接器包。

pyspark --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/groups/public/