从列表行键创建 Spark DataFrame
Create Spark DataFrame from list row keys
我有一个 HBase 行键列表或 Array[Row]
,我想使用这些 RowKeys 从 HBase 获取的行中创建一个 Spark DataFrame
。
我在想类似的事情:
def getDataFrameFromList(spark: SparkSession, rList : Array[Row]): DataFrame = {
val conf = HBaseConfiguration.create()
val mlRows : List[RDD[String]] = new ArrayList[RDD[String]]
conf.set("hbase.zookeeper.quorum", "dev.server")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent","/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, "hbase_tbl1")
rList.foreach( r => {
var rStr = r.toString()
conf.set(TableInputFormat.SCAN_ROW_START, rStr)
conf.set(TableInputFormat.SCAN_ROW_STOP, rStr + "_")
// read one row
val recsRdd = readHBaseRdd(spark, conf)
mlRows.append(recsRdd)
})
// This works, but it is only one row
//val resourcesDf = spark.read.json(recsRdd)
var resourcesDf = <Code here to convert List[RDD[String]] to DataFrame>
//resourcesDf
spark.emptyDataFrame
}
我可以在 for 循环中做 recsRdd.collect()
并将其转换为字符串并将 json 附加到 ArrayList[String
但我不确定它是否有效,调用 collect()
在这样的 for 循环中。
readHBaseRdd
正在使用 newAPIHadoopRDD
从 HBase
获取数据
def readHBaseRdd(spark: SparkSession, conf: Configuration) = {
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.map {
case (_: ImmutableBytesWritable, value: Result) =>
Bytes.toString(value.getValue(Bytes.toBytes("cf"),
Bytes.toBytes("jsonCol")))
}
}
}
使用 spark.union([mainRdd, recsRdd])
而不是列表或 RDD (mlRows)
为什么只从 HBase 读取一行?间隔尽量大
始终避免调用 collect()
,只为 debug/tests 调用。
我有一个 HBase 行键列表或 Array[Row]
,我想使用这些 RowKeys 从 HBase 获取的行中创建一个 Spark DataFrame
。
我在想类似的事情:
def getDataFrameFromList(spark: SparkSession, rList : Array[Row]): DataFrame = {
val conf = HBaseConfiguration.create()
val mlRows : List[RDD[String]] = new ArrayList[RDD[String]]
conf.set("hbase.zookeeper.quorum", "dev.server")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent","/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, "hbase_tbl1")
rList.foreach( r => {
var rStr = r.toString()
conf.set(TableInputFormat.SCAN_ROW_START, rStr)
conf.set(TableInputFormat.SCAN_ROW_STOP, rStr + "_")
// read one row
val recsRdd = readHBaseRdd(spark, conf)
mlRows.append(recsRdd)
})
// This works, but it is only one row
//val resourcesDf = spark.read.json(recsRdd)
var resourcesDf = <Code here to convert List[RDD[String]] to DataFrame>
//resourcesDf
spark.emptyDataFrame
}
我可以在 for 循环中做 recsRdd.collect()
并将其转换为字符串并将 json 附加到 ArrayList[String
但我不确定它是否有效,调用 collect()
在这样的 for 循环中。
readHBaseRdd
正在使用 newAPIHadoopRDD
从 HBase
def readHBaseRdd(spark: SparkSession, conf: Configuration) = {
val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.map {
case (_: ImmutableBytesWritable, value: Result) =>
Bytes.toString(value.getValue(Bytes.toBytes("cf"),
Bytes.toBytes("jsonCol")))
}
}
}
使用 spark.union([mainRdd, recsRdd])
而不是列表或 RDD (mlRows)
为什么只从 HBase 读取一行?间隔尽量大
始终避免调用 collect()
,只为 debug/tests 调用。