spark scala:将 DataFrame 或 Dataset 转换为单个逗号分隔的字符串
spark scala : Convert DataFrame OR Dataset to single comma separated string
下面是将打印一列 DataSet[Row]:
的 spark scala 代码
import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
.appName("Spark DataValidation")
.config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
.getOrCreate()
val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID"
val pkValues = spark
.read
.json(kafkaPath)
.select("message.data.*")
.select(pk)
.distinct()
pkValues.show()
关于代码的输出:
+--------------+
|APPLICATION_ID|
+--------------+
| 388|
| 447|
| 346|
| 861|
| 361|
| 557|
| 482|
| 518|
| 432|
| 422|
| 533|
| 733|
| 472|
| 457|
| 387|
| 394|
| 786|
| 458|
+--------------+
问题:
如何将此数据框转换为逗号分隔的字符串变量?
预期输出:
val data:String= "388,447,346,861,361,557,482,518,432,422,533,733,472,457,387,394,786,458"
请建议如何将 DataFrame[Row] 或 Dataset 转换为一个 String 。
使用collect_list:
import org.apache.spark.sql.functions._
val data = pkValues.select(collect_list(col(pk))) // collect to one row
.as[Array[Long]] // set encoder, so you will have strongly-typed Dataset
.take(1)(0) // get the first row - result will be Array[Long]
.mkString(",") // and join all values
但是,执行所有行的收集或获取是一个非常糟糕的主意。相反,您可能希望将 pkValues 保存在 .write
?或者将其作为其他函数的参数,以保持分布式计算
编辑:刚刚注意到,@SCouto 在我之后发布了其他答案。收集也将是正确的,使用 collect_list 函数你有一个优势 - 如果你愿意,你可以轻松地进行分组,即将键分组为偶数和奇数。这取决于您喜欢哪种解决方案,使用 collect 更简单或更长的一行,但更强大
我认为这不是个好主意,因为 dataFrame 是分布式对象并且可能非常庞大。 Collect
会将所有的数据都带到驱动中,所以这种操作要慎重。
以下是您可以使用数据帧执行的操作(两个选项):
df.select("APPLICATION_ID").rdd.map(r => r(0)).collect.mkString(",")
df.select("APPLICATION_ID").collect.mkString(",")
只有 3 行的测试数据帧的结果:
String = 388,447,346
编辑:使用 DataSet,您可以直接执行以下操作:
ds.collect.mkString(",")
下面是将打印一列 DataSet[Row]:
的 spark scala 代码import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
.appName("Spark DataValidation")
.config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
.getOrCreate()
val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID"
val pkValues = spark
.read
.json(kafkaPath)
.select("message.data.*")
.select(pk)
.distinct()
pkValues.show()
关于代码的输出:
+--------------+
|APPLICATION_ID|
+--------------+
| 388|
| 447|
| 346|
| 861|
| 361|
| 557|
| 482|
| 518|
| 432|
| 422|
| 533|
| 733|
| 472|
| 457|
| 387|
| 394|
| 786|
| 458|
+--------------+
问题:
如何将此数据框转换为逗号分隔的字符串变量?
预期输出:
val data:String= "388,447,346,861,361,557,482,518,432,422,533,733,472,457,387,394,786,458"
请建议如何将 DataFrame[Row] 或 Dataset 转换为一个 String 。
使用collect_list:
import org.apache.spark.sql.functions._
val data = pkValues.select(collect_list(col(pk))) // collect to one row
.as[Array[Long]] // set encoder, so you will have strongly-typed Dataset
.take(1)(0) // get the first row - result will be Array[Long]
.mkString(",") // and join all values
但是,执行所有行的收集或获取是一个非常糟糕的主意。相反,您可能希望将 pkValues 保存在 .write
?或者将其作为其他函数的参数,以保持分布式计算
编辑:刚刚注意到,@SCouto 在我之后发布了其他答案。收集也将是正确的,使用 collect_list 函数你有一个优势 - 如果你愿意,你可以轻松地进行分组,即将键分组为偶数和奇数。这取决于您喜欢哪种解决方案,使用 collect 更简单或更长的一行,但更强大
我认为这不是个好主意,因为 dataFrame 是分布式对象并且可能非常庞大。 Collect
会将所有的数据都带到驱动中,所以这种操作要慎重。
以下是您可以使用数据帧执行的操作(两个选项):
df.select("APPLICATION_ID").rdd.map(r => r(0)).collect.mkString(",")
df.select("APPLICATION_ID").collect.mkString(",")
只有 3 行的测试数据帧的结果:
String = 388,447,346
编辑:使用 DataSet,您可以直接执行以下操作:
ds.collect.mkString(",")