如何在 Spark 中使用多个列表创建地图
How to create Map using multiple lists in Spark
我正在尝试弄清楚如何使用下面的示例条目从 RDD myRDD
访问特定元素:
(600,List((600,111,7,1), (615,111,3,5))
(601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))
我想使用子列表中的第 3 个字段作为 ID 从 Redis 数据库中提取一些数据。例如,在 (600,List((600,111,1,1), (615,111,1,5))
的情况下,ID 为 7
和 3
。
在 (601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))
的情况下,ID 为 2
、3
和 9
。
问题是我不知道如何使用多个 ID 来收集值。在下面给定的代码中,我使用 line._2(3)
,但它不正确,因为这样我访问子列表而不是这些子列表中的字段。
我应该使用 flatMap
还是类似的?
val newRDD = myRDD.mapPartitions(iter => {
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
iter.map({line => (line._1,
redisPool.withJedisClient { client =>
val start_date: String = Dress.up(client).hget("id:"+line._2(3),"start_date")
val end_date: String = Dress.up(client).hget("id:"+line._2(3),"end_date")
val additionalData = List((start_date,end_date))
Map(("base_data", line._2), ("additional_data", additionalData))
})
})
})
newRDD.collect().foreach(println)
如果我们假设 Redis DB 包含一些相关数据,那么结果 newRDD
可能如下:
(600,Map("base_data" -> List((600,111,7,1), (615,111,3,5)), "additional_data" -> List((2014,2015),(2015,2016)))
(601,Map("base_data" -> List((622,112,2,1), (615,111,3,5), (456,111,9,12)), "additional_data" -> List((2010,2015),(2011,2016),(2014,2016)))
要获取 line._2
中每个元组的第三个元素的列表,请使用 line._2.map(_._3)
(假设 line
的类型是 (Int, List[(Int, Int, Int, Int)])
,就像从您的例如,不涉及像 Any
这样的类型)。总的来说,您的代码看起来应该像
iter.map({ case (first, second) => (first,
redisPool.withJedisClient { client =>
val additionalData = second.map { tuple =>
val start_date: String = Dress.up(client).hget("id:"+tuple._3,"start_date")
val end_date: String = Dress.up(client).hget("id:"+tuple._3,"end_date")
(start_date, end_date)
}
Map(("base_data", second), ("additional_data", additionalData))
})
})
我正在尝试弄清楚如何使用下面的示例条目从 RDD myRDD
访问特定元素:
(600,List((600,111,7,1), (615,111,3,5))
(601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))
我想使用子列表中的第 3 个字段作为 ID 从 Redis 数据库中提取一些数据。例如,在 (600,List((600,111,1,1), (615,111,1,5))
的情况下,ID 为 7
和 3
。
在 (601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))
的情况下,ID 为 2
、3
和 9
。
问题是我不知道如何使用多个 ID 来收集值。在下面给定的代码中,我使用 line._2(3)
,但它不正确,因为这样我访问子列表而不是这些子列表中的字段。
我应该使用 flatMap
还是类似的?
val newRDD = myRDD.mapPartitions(iter => {
val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
iter.map({line => (line._1,
redisPool.withJedisClient { client =>
val start_date: String = Dress.up(client).hget("id:"+line._2(3),"start_date")
val end_date: String = Dress.up(client).hget("id:"+line._2(3),"end_date")
val additionalData = List((start_date,end_date))
Map(("base_data", line._2), ("additional_data", additionalData))
})
})
})
newRDD.collect().foreach(println)
如果我们假设 Redis DB 包含一些相关数据,那么结果 newRDD
可能如下:
(600,Map("base_data" -> List((600,111,7,1), (615,111,3,5)), "additional_data" -> List((2014,2015),(2015,2016)))
(601,Map("base_data" -> List((622,112,2,1), (615,111,3,5), (456,111,9,12)), "additional_data" -> List((2010,2015),(2011,2016),(2014,2016)))
要获取 line._2
中每个元组的第三个元素的列表,请使用 line._2.map(_._3)
(假设 line
的类型是 (Int, List[(Int, Int, Int, Int)])
,就像从您的例如,不涉及像 Any
这样的类型)。总的来说,您的代码看起来应该像
iter.map({ case (first, second) => (first,
redisPool.withJedisClient { client =>
val additionalData = second.map { tuple =>
val start_date: String = Dress.up(client).hget("id:"+tuple._3,"start_date")
val end_date: String = Dress.up(client).hget("id:"+tuple._3,"end_date")
(start_date, end_date)
}
Map(("base_data", second), ("additional_data", additionalData))
})
})