使用 Pyspark 与 Hbase 交互的最佳方式是什么
What is the best possible way of interacting with Hbase using Pyspark
我正在使用 pyspark [spark2.3.1] 和 Hbase1.2.1,我想知道使用 pyspark 访问 Hbase 的最佳方式是什么?
我做了一些初步的搜索,发现几乎没有可用的选项,比如使用 shc-core:1.1.1-2.1-s_2.11.jar 这可以实现,但无论在哪里我尝试寻找一些示例,大多数地方的代码都是用 Scala 编写的,或者示例也是基于 Scala 的。我尝试在 pyspark 中实现基本代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext
def main():
sc = SparkContext()
sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
catalog = ''.join("""{
"table":{"namespace":"default", "name":"firsttable"},
"rowkey":"key",
"columns":{
"firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
"secondcol":{"cf":"d", "col":"colname", "type":"string"}
}
}""".split())
df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
df.select("secondcol").show()
# entry point for PySpark application
if __name__ == '__main__':
main()
和运行它使用:
spark-submit --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py
它返回空白输出:
+---------+
|secondcol|
+---------+
+---------+
我不确定我做错了什么?也不确定这样做的最佳方法是什么?
任何参考资料将不胜感激。
此致
最后,使用 SHC,我能够使用 pyspark 代码通过 Spark-2.3.1 连接到 HBase-1.2.1。以下是我的作品:
我所有的 hadoop [namenode, datanode, nodemanager, resourcemanager] & hbase [Hmaster, HRegionServer, HQuorumPeer] 守护进程都已启动并且 运行 在我的 EC2 实例上。
我将 emp.csv 文件放在 hdfs 位置 /test/emp.csv,数据为:
key,empId,empName,empWeight
1,"E007","Bhupesh",115.10
2,"E008","Chauhan",110.23
3,"E009","Prithvi",90.0
4,"E0010","Raj",80.0
5,"E0011","Chauhan",100.0
我使用以下代码行创建了 readwriteHBase.py 文件 [用于从 HDFS 读取 emp.csv 文件,然后首先创建 tblEmployee HBase,将数据推送到 tblEmployee,然后再次从同一个 table 读取一些数据并将其显示在控制台上]:
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate()
dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase"
writeCatalog = ''.join("""{
"table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"int"},
"empId":{"cf":"personal","col":"empId","type":"string"},
"empName":{"cf":"personal", "col":"empName", "type":"string"},
"empWeight":{"cf":"personal", "col":"empWeight", "type":"double"}
}
}""".split())
writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv")
print("csv file read", writeDF.show())
writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save()
print("csv file written to HBase")
readCatalog = ''.join("""{
"table":{"namespace":"default", "name":"tblEmployee"},
"rowkey":"key",
"columns":{
"key":{"cf":"rowkey", "col":"key", "type":"int"},
"empId":{"cf":"personal","col":"empId","type":"string"},
"empName":{"cf":"personal", "col":"empName", "type":"string"}
}
}""".split())
print("going to read data from Hbase table")
readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load()
print("data read from HBase table")
readDF.select("empId", "empName").show()
readDF.show()
# entry point for PySpark application
if __name__ == '__main__':
main()
运行 VM 控制台上的此脚本使用命令:
spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
中间结果:读取 CSV 文件后:
+---+-----+-------+---------+
|key|empId|empName|empWeight|
+---+-----+-------+---------+
| 1| E007|Bhupesh| 115.1|
| 2| E008|Chauhan| 110.23|
| 3| E009|Prithvi| 90.0|
| 4|E0010| Raj| 80.0|
| 5|E0011|Chauhan| 100.0|
+---+-----+-------+---------+
最终输出:从 HBase 读取数据后 table:
+-----+-------+
|empId|empName|
+-----+-------+
| E007|Bhupesh|
| E008|Chauhan|
| E009|Prithvi|
|E0010| Raj|
|E0011|Chauhan|
+-----+-------+
注意:在创建 Hbase table 并将数据插入 HBase table 时,它期望 NumberOfRegions 应该大于 3,因此我添加了 options(catalog=writeCatalog, newtable=5)
向 HBase 添加数据时
我正在使用 pyspark [spark2.3.1] 和 Hbase1.2.1,我想知道使用 pyspark 访问 Hbase 的最佳方式是什么?
我做了一些初步的搜索,发现几乎没有可用的选项,比如使用 shc-core:1.1.1-2.1-s_2.11.jar 这可以实现,但无论在哪里我尝试寻找一些示例,大多数地方的代码都是用 Scala 编写的,或者示例也是基于 Scala 的。我尝试在 pyspark 中实现基本代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext
def main():
sc = SparkContext()
sqlc = SQLContext(sc)
data_source_format = 'org.apache.spark.sql.execution.datasources.hbase'
catalog = ''.join("""{
"table":{"namespace":"default", "name":"firsttable"},
"rowkey":"key",
"columns":{
"firstcol":{"cf":"rowkey", "col":"key", "type":"string"},
"secondcol":{"cf":"d", "col":"colname", "type":"string"}
}
}""".split())
df = sqlc.read.options(catalog=catalog).format(data_source_format).load()
df.select("secondcol").show()
# entry point for PySpark application
if __name__ == '__main__':
main()
和运行它使用:
spark-submit --master yarn-client --files /opt/hbase-1.1.2/conf/hbase-site.xml --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --jars /home/ubuntu/hbase-spark-2.0.0-alpha4.jar HbaseMain2.py
它返回空白输出:
+---------+
|secondcol|
+---------+
+---------+
我不确定我做错了什么?也不确定这样做的最佳方法是什么?
任何参考资料将不胜感激。
此致
最后,使用 SHC,我能够使用 pyspark 代码通过 Spark-2.3.1 连接到 HBase-1.2.1。以下是我的作品:
我所有的 hadoop [namenode, datanode, nodemanager, resourcemanager] & hbase [Hmaster, HRegionServer, HQuorumPeer] 守护进程都已启动并且 运行 在我的 EC2 实例上。
我将 emp.csv 文件放在 hdfs 位置 /test/emp.csv,数据为:
key,empId,empName,empWeight 1,"E007","Bhupesh",115.10 2,"E008","Chauhan",110.23 3,"E009","Prithvi",90.0 4,"E0010","Raj",80.0 5,"E0011","Chauhan",100.0
我使用以下代码行创建了 readwriteHBase.py 文件 [用于从 HDFS 读取 emp.csv 文件,然后首先创建 tblEmployee HBase,将数据推送到 tblEmployee,然后再次从同一个 table 读取一些数据并将其显示在控制台上]:
from pyspark.sql import SparkSession def main(): spark = SparkSession.builder.master("yarn-client").appName("HelloSpark").getOrCreate() dataSourceFormat = "org.apache.spark.sql.execution.datasources.hbase" writeCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empWeight":{"cf":"personal", "col":"empWeight", "type":"double"} } }""".split()) writeDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/test/emp.csv") print("csv file read", writeDF.show()) writeDF.write.options(catalog=writeCatalog, newtable=5).format(dataSourceFormat).save() print("csv file written to HBase") readCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"} } }""".split()) print("going to read data from Hbase table") readDF = spark.read.options(catalog=readCatalog).format(dataSourceFormat).load() print("data read from HBase table") readDF.select("empId", "empName").show() readDF.show() # entry point for PySpark application if __name__ == '__main__': main()
运行 VM 控制台上的此脚本使用命令:
spark-submit --master yarn-client --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://nexus-private.hortonworks.com/nexus/content/repositories/IN-QA/ readwriteHBase.py
中间结果:读取 CSV 文件后:
+---+-----+-------+---------+ |key|empId|empName|empWeight| +---+-----+-------+---------+ | 1| E007|Bhupesh| 115.1| | 2| E008|Chauhan| 110.23| | 3| E009|Prithvi| 90.0| | 4|E0010| Raj| 80.0| | 5|E0011|Chauhan| 100.0| +---+-----+-------+---------+
最终输出:从 HBase 读取数据后 table:
+-----+-------+ |empId|empName| +-----+-------+ | E007|Bhupesh| | E008|Chauhan| | E009|Prithvi| |E0010| Raj| |E0011|Chauhan| +-----+-------+
注意:在创建 Hbase table 并将数据插入 HBase table 时,它期望 NumberOfRegions 应该大于 3,因此我添加了 options(catalog=writeCatalog, newtable=5)
向 HBase 添加数据时