Spark 中的案例陈述
Case statements in Spark
我正在编写 Spark 代码,我需要将 (String,(String,String))
类型的 RDD 转换为 ((String,String),String
)。
我有以下输入文本文件:
Language,Language-code,TotalViewsInThatLang
English,en,10965376,"Main_Page",2938355
Russian,ru,1925718,"%D0%97%D0%B0%D0%B3%D0%BB,915495
Spanish,es,1010810,"Wikipedia:Portada",13603
我创建了一个RDD如下:
val line = sc.textFile(inputFile)
val nrdd = line.map(x=>(x.split(",")(0),(x.split(",")(1),x.split(",")(2))))
nrdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[2] at map at <console>:26
据此我想使用 case
函数创建类型为 ((String,String),String)
的 RDD。
如何使用 map
中的 case
语句执行此操作?
编辑
当我尝试使用 case 函数时出现以下错误:
scala> val frdd = nrdd.map( {case(x,(y,z))=>((x,y),z))})
<console>:1: error: ';' expected but ')' found.
val frdd = nrdd.map({case(x,(y,z))=>((x,y),z))})
^
除非我误解了你的问题,否则你想要这个:
val list: List[((String, String), String)] = List((("a1", "b1"), "c1"), (("a2", "b2"), "c2"))
val res = list.map { case ((a, b), c) => (a, (b, c)) }
println(res) // List((a1,(b1,c1)), (a2,(b2,c2)))
由于您的 RDD 是 Pair RDD,您可以使用 Keyed RDD
中的 swap
。
示例代码:
val keyRDD = sc.parallelize(List((("a1", "b1"), "c1"), (("a2", "b2"), "c2")), 2)
val swappedRDD = keyRDD.map(_.swap)
swappedRDD.foreach(x => println(x))
我正在编写 Spark 代码,我需要将 (String,(String,String))
类型的 RDD 转换为 ((String,String),String
)。
我有以下输入文本文件:
Language,Language-code,TotalViewsInThatLang
English,en,10965376,"Main_Page",2938355
Russian,ru,1925718,"%D0%97%D0%B0%D0%B3%D0%BB,915495
Spanish,es,1010810,"Wikipedia:Portada",13603
我创建了一个RDD如下:
val line = sc.textFile(inputFile)
val nrdd = line.map(x=>(x.split(",")(0),(x.split(",")(1),x.split(",")(2))))
nrdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[2] at map at <console>:26
据此我想使用 case
函数创建类型为 ((String,String),String)
的 RDD。
如何使用 map
中的 case
语句执行此操作?
编辑
当我尝试使用 case 函数时出现以下错误:
scala> val frdd = nrdd.map( {case(x,(y,z))=>((x,y),z))})
<console>:1: error: ';' expected but ')' found.
val frdd = nrdd.map({case(x,(y,z))=>((x,y),z))})
^
除非我误解了你的问题,否则你想要这个:
val list: List[((String, String), String)] = List((("a1", "b1"), "c1"), (("a2", "b2"), "c2"))
val res = list.map { case ((a, b), c) => (a, (b, c)) }
println(res) // List((a1,(b1,c1)), (a2,(b2,c2)))
由于您的 RDD 是 Pair RDD,您可以使用 Keyed RDD
中的 swap
。
示例代码:
val keyRDD = sc.parallelize(List((("a1", "b1"), "c1"), (("a2", "b2"), "c2")), 2)
val swappedRDD = keyRDD.map(_.swap)
swappedRDD.foreach(x => println(x))