从数据集中的地图按键排序
Order by key from a Map in a Dataset
我想按时间戳对我从 HDFS 检索的一些 avro 文件进行排序。
我的 avro 文件的架构是:
headers : Map[String,String], body : String
现在棘手的部分是时间戳 是地图中 和 key/value 之一。所以我在地图中包含这样的时间戳:
key_1 -> value_1, key_2 -> value_2, timestamp -> 1234567, key_n ->
value_n
请注意,值的类型是字符串。
我创建了一个案例 class 以使用此模式创建我的数据集:
case class Root(headers : Map[String,String], body: String)
创建我的数据集:
val ds = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
我真的不知道如何开始这个问题,因为我只能得到列 headers 和 body。如何获得嵌套值以最终按时间戳排序?
我想做这样的事情:
ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")
一点点精确:我不想从我的初始数据集中丢失任何数据,只是一个排序操作。
我用的是 Spark 2.3.0.
您可以使用 Scala 的 sortBy,它接受一个函数。我建议您将 val ds 显式声明为 Vector(或其他集合),这样您将在 IntelliJ 中看到适用的函数(如果您使用的是 IntelliJ)并且它肯定会编译。
根据您的代码查看下面的示例:
case class Root(headers : Map[String,String], body: String)
val ds: Vector[Root] = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse
编辑:添加了反向(假设您希望它降序)。在作为参数传递的函数内部,您还将处理时间戳。
加载的 Dataset
应该类似于下面的示例数据集:
case class Root(headers : Map[String, String], body: String)
val ds = Seq(
Root(Map("k11"->"v11", "timestamp"->"1554231600", "k12"->"v12"), "body1"),
Root(Map("k21"->"v21", "timestamp"->"1554134400", "k22"->"v22"), "body2")
).toDS
您可以简单地通过 timestamp
键查找 Map
,cast
值为 Long
,然后执行 orderBy
,如下所示:
ds.
withColumn("ts", $"headers"("timestamp").cast("Long")).
orderBy("ts").
show(false)
// +-------------------------------------------------+-----+----------+
// |headers |body |ts |
// +-------------------------------------------------+-----+----------+
// |[k21 -> v21, timestamp -> 1554134400, k22 -> v22]|body2|1554134400|
// |[k11 -> v11, timestamp -> 1554231600, k12 -> v12]|body1|1554231600|
// +-------------------------------------------------+-----+----------+
请注意 $"headers"("timestamp")
与使用 apply
列方法(即 $"headers".apply("timestamp")
)相同。
或者,您也可以使用 getItem
按键访问 Map
,例如:
$"headers".getItem("timestamp")
import org.apache.spark.sql.{Encoders, Encoder, Dataset}
import org.apache.spark.sql.functions.{col, desc}
import java.sql.Timestamp
case class Nested(key_1: String,key_2: String,timestamp: Timestamp,key_n: String)
case class Root(headers:Nested,body:String)
implicit val rootCodec: Encoder[Root] = Encoders.product[Root]
val avroDS:Dataset[Root] = spark.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sortedDF: DataFrame = avroDS.orderBy(desc(col("timestamp")))
此代码段会直接将您的 Avro 数据转换为 Dataset[Root]
。您将不必依赖于导入 sparksession.implicits
,并且会消除将 timestamp 字段转换为 TimestampType 的步骤。在内部,Spark 的时间戳数据类型是使用 java.sql.Timestamp
.
实现的
我想按时间戳对我从 HDFS 检索的一些 avro 文件进行排序。
我的 avro 文件的架构是:
headers : Map[String,String], body : String
现在棘手的部分是时间戳 是地图中 和 key/value 之一。所以我在地图中包含这样的时间戳:
key_1 -> value_1, key_2 -> value_2, timestamp -> 1234567, key_n -> value_n
请注意,值的类型是字符串。
我创建了一个案例 class 以使用此模式创建我的数据集:
case class Root(headers : Map[String,String], body: String)
创建我的数据集:
val ds = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
我真的不知道如何开始这个问题,因为我只能得到列 headers 和 body。如何获得嵌套值以最终按时间戳排序?
我想做这样的事情:
ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")
一点点精确:我不想从我的初始数据集中丢失任何数据,只是一个排序操作。
我用的是 Spark 2.3.0.
您可以使用 Scala 的 sortBy,它接受一个函数。我建议您将 val ds 显式声明为 Vector(或其他集合),这样您将在 IntelliJ 中看到适用的函数(如果您使用的是 IntelliJ)并且它肯定会编译。
根据您的代码查看下面的示例:
case class Root(headers : Map[String,String], body: String)
val ds: Vector[Root] = spark
.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse
编辑:添加了反向(假设您希望它降序)。在作为参数传递的函数内部,您还将处理时间戳。
加载的 Dataset
应该类似于下面的示例数据集:
case class Root(headers : Map[String, String], body: String)
val ds = Seq(
Root(Map("k11"->"v11", "timestamp"->"1554231600", "k12"->"v12"), "body1"),
Root(Map("k21"->"v21", "timestamp"->"1554134400", "k22"->"v22"), "body2")
).toDS
您可以简单地通过 timestamp
键查找 Map
,cast
值为 Long
,然后执行 orderBy
,如下所示:
ds.
withColumn("ts", $"headers"("timestamp").cast("Long")).
orderBy("ts").
show(false)
// +-------------------------------------------------+-----+----------+
// |headers |body |ts |
// +-------------------------------------------------+-----+----------+
// |[k21 -> v21, timestamp -> 1554134400, k22 -> v22]|body2|1554134400|
// |[k11 -> v11, timestamp -> 1554231600, k12 -> v12]|body1|1554231600|
// +-------------------------------------------------+-----+----------+
请注意 $"headers"("timestamp")
与使用 apply
列方法(即 $"headers".apply("timestamp")
)相同。
或者,您也可以使用 getItem
按键访问 Map
,例如:
$"headers".getItem("timestamp")
import org.apache.spark.sql.{Encoders, Encoder, Dataset}
import org.apache.spark.sql.functions.{col, desc}
import java.sql.Timestamp
case class Nested(key_1: String,key_2: String,timestamp: Timestamp,key_n: String)
case class Root(headers:Nested,body:String)
implicit val rootCodec: Encoder[Root] = Encoders.product[Root]
val avroDS:Dataset[Root] = spark.read
.format("com.databricks.spark.avro")
.load(pathToHDFS)
.as[Root]
val sortedDF: DataFrame = avroDS.orderBy(desc(col("timestamp")))
此代码段会直接将您的 Avro 数据转换为 Dataset[Root]
。您将不必依赖于导入 sparksession.implicits
,并且会消除将 timestamp 字段转换为 TimestampType 的步骤。在内部,Spark 的时间戳数据类型是使用 java.sql.Timestamp
.