从 HBase 检索数据并将其格式化为 Scala Dataframe
Retrieve and format data from HBase to scala Dataframe
我正在尝试从 hbase table 获取数据到 apache spark 环境中,但我不知道如何格式化它。谁能帮帮我。
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
type Record = (String, Option[String], Option[String])
val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam")
scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
| k6c| 0.056,NA,0.054|17.277,17.283,17.256|
| ad| NA,23.0| 24.0,23.6|
+--------------+-----------------+--------------------+
然而,我实际上想要它的格式如下。每个逗号分隔值都在新行中,每个 NA 被 null 值替换。 iacp 和 temp 列中的值应该是 float 类型。每行可以有不同数量的逗号分隔值。
提前致谢!
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7| 0.051| 17.326|
| ab7| 0.052| 17.344|
| ab7| 0.055| 17.21|
| k6c| 0.056| 17.277|
| k6c| null| 17.283|
| k6c| 0.054| 17.256|
| ad| null| 24.0|
| ad| 23| 26.0|
+--------------+-----------------+--------------------+
您的 hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF
代码行应生成一个等效于以下内容的 DataFrame:
val df = Seq(
("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
("ad", Some("NA,23.0"), Some("24.0,23.6"))
).toDF("rowkey", "iacp", "temp")
要将数据集转换为想要的结果,您可以应用一个 UDF,将 iacp
和 temp
CSV 字符串的元素配对,生成一个 (Option[Double], Option[Double])
数组,即然后explode
-ed,如下图:
import org.apache.spark.sql.functions._
import spark.implicits._
def pairUpCSV = udf{ (s1: String, s2: String) =>
import scala.util.Try
def toNumericArr(csv: String) = csv.split(",").map{
case s if Try(s.toDouble).isSuccess => Some(s)
case _ => None
}
toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
}
df.
withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
withColumn("csv_pair", explode($"csv_pairs")).
select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
show(false)
// +------+-----+------+
// |rowkey|iacp |temp |
// +------+-----+------+
// |ab7 |0.051|17.326|
// |ab7 |0.052|17.344|
// |ab7 |0.055|17.21 |
// |k6c |0.056|17.277|
// |k6c |null |17.283|
// |k6c |0.054|17.256|
// |ad |null |24.0 |
// |ad |23.0 |23.6 |
// +------+-----+------+
请注意,值 NA
属于方法 toNumericArr
中的默认情况,因此不会作为单独的情况单独列出。此外,UDF 中使用 zipAll
(而不是 zip
)来涵盖 iacp
和 temp
CSV 字符串具有不同元素大小的情况。
我正在尝试从 hbase table 获取数据到 apache spark 环境中,但我不知道如何格式化它。谁能帮帮我。
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
type Record = (String, Option[String], Option[String])
val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam")
scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
| k6c| 0.056,NA,0.054|17.277,17.283,17.256|
| ad| NA,23.0| 24.0,23.6|
+--------------+-----------------+--------------------+
然而,我实际上想要它的格式如下。每个逗号分隔值都在新行中,每个 NA 被 null 值替换。 iacp 和 temp 列中的值应该是 float 类型。每行可以有不同数量的逗号分隔值。
提前致谢!
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7| 0.051| 17.326|
| ab7| 0.052| 17.344|
| ab7| 0.055| 17.21|
| k6c| 0.056| 17.277|
| k6c| null| 17.283|
| k6c| 0.054| 17.256|
| ad| null| 24.0|
| ad| 23| 26.0|
+--------------+-----------------+--------------------+
您的 hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF
代码行应生成一个等效于以下内容的 DataFrame:
val df = Seq(
("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
("ad", Some("NA,23.0"), Some("24.0,23.6"))
).toDF("rowkey", "iacp", "temp")
要将数据集转换为想要的结果,您可以应用一个 UDF,将 iacp
和 temp
CSV 字符串的元素配对,生成一个 (Option[Double], Option[Double])
数组,即然后explode
-ed,如下图:
import org.apache.spark.sql.functions._
import spark.implicits._
def pairUpCSV = udf{ (s1: String, s2: String) =>
import scala.util.Try
def toNumericArr(csv: String) = csv.split(",").map{
case s if Try(s.toDouble).isSuccess => Some(s)
case _ => None
}
toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
}
df.
withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
withColumn("csv_pair", explode($"csv_pairs")).
select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
show(false)
// +------+-----+------+
// |rowkey|iacp |temp |
// +------+-----+------+
// |ab7 |0.051|17.326|
// |ab7 |0.052|17.344|
// |ab7 |0.055|17.21 |
// |k6c |0.056|17.277|
// |k6c |null |17.283|
// |k6c |0.054|17.256|
// |ad |null |24.0 |
// |ad |23.0 |23.6 |
// +------+-----+------+
请注意,值 NA
属于方法 toNumericArr
中的默认情况,因此不会作为单独的情况单独列出。此外,UDF 中使用 zipAll
(而不是 zip
)来涵盖 iacp
和 temp
CSV 字符串具有不同元素大小的情况。