如何在 Spark 中使用多个列表创建地图

How to create Map using multiple lists in Spark

我正在尝试弄清楚如何使用下面的示例条目从 RDD myRDD 访问特定元素:

(600,List((600,111,7,1), (615,111,3,5))
(601,List((622,112,2,1), (615,111,3,5), (456,111,9,12))

我想使用子列表中的第 3 个字段作为 ID 从 Redis 数据库中提取一些数据。例如,在 (600,List((600,111,1,1), (615,111,1,5)) 的情况下,ID 为 73。 在 (601,List((622,112,2,1), (615,111,3,5), (456,111,9,12)) 的情况下,ID 为 239

问题是我不知道如何使用多个 ID 来收集值。在下面给定的代码中,我使用 line._2(3),但它不正确,因为这样我访问子列表而不是这些子列表中的字段。 我应该使用 flatMap 还是类似的?

  val newRDD = myRDD.mapPartitions(iter => {
    val redisPool = new Pool(new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000))
    iter.map({line => (line._1,
      redisPool.withJedisClient { client =>
        val start_date: String = Dress.up(client).hget("id:"+line._2(3),"start_date")
        val end_date: String = Dress.up(client).hget("id:"+line._2(3),"end_date")
        val additionalData = List((start_date,end_date))
        Map(("base_data", line._2), ("additional_data", additionalData))
      })
    })
  })
  newRDD.collect().foreach(println)

如果我们假设 Redis DB 包含一些相关数据,那么结果 newRDD 可能如下:

(600,Map("base_data" -> List((600,111,7,1), (615,111,3,5)), "additional_data" -> List((2014,2015),(2015,2016)))
(601,Map("base_data" -> List((622,112,2,1), (615,111,3,5), (456,111,9,12)), "additional_data" -> List((2010,2015),(2011,2016),(2014,2016)))

要获取 line._2 中每个元组的第三个元素的列表,请使用 line._2.map(_._3)(假设 line 的类型是 (Int, List[(Int, Int, Int, Int)]),就像从您的例如,不涉及像 Any 这样的类型)。总的来说,您的代码看起来应该像

iter.map({ case (first, second) => (first,
  redisPool.withJedisClient { client =>
    val additionalData = second.map { tuple =>
      val start_date: String = Dress.up(client).hget("id:"+tuple._3,"start_date")
      val end_date: String = Dress.up(client).hget("id:"+tuple._3,"end_date")
      (start_date, end_date)
    }
    Map(("base_data", second), ("additional_data", additionalData))
  })
})