MapReduce 函数
MapReduce functions
我正在使用 scala IDE 在 eclipse IDE 上开发一个 spark 项目
我想就这个 MapReduce 问题寻求帮助
地图函数:
- 删除列 'sport' 和 'bourse'
- 删除任何包含 'NULL'
的行
- 添加一个新的列持续时间周期。这必须根据学生的周期取值:许可证(3 年)、硕士(3 年)、Ingeniorat(5 年)和博士学位(3 年)
减速器:
- 根据年级、周期和专业将所有学生加起来。
我的输入是
matricule,dateins,cycle,specialite,bourse,sport
0000000001,1999-11-22,硕士,IC,非,非
0000000002,2014-02-01,Null,IC,Null,Oui
0000000003,2006-09-07,Null,Null,Oui,Oui
0000000004,2008-12-11,Master,IC,Oui,Oui
0000000005,2006-06-07,Master,SI,Non,Oui
0000000006,1996-11-16,Ingeniorat,SI,Null,Null
等等。
这是我开始的代码。我删除了列 'sport' 'bourse' 并提取了年份
val sc = new SparkContext(conf)
val x = sc.textFile("/home/amel/one")
val re = x.map(_.split(",")).foreach(r => println(r(1).dropRight(6), r(2),r(3)))
这是我得到的结果
(2000,许可证,Isil)
(2001,硕士,SSI)
我想要的结果是:
年周期持续时间专业 Nbr-students
(2000,驾照,3年,Isil,400)
(2001,硕士,3年,SSI,120)
// 我希望 'Nbr-students' 列是每年根据周期和专业的学生人数。
我假设您只想要年份 - 如果您不想要年份,请将 cols(1).split("-")(0)
更改为 cols(1)
。
首先,我使用您的示例数据伪造了一些数据:
val x = sc.parallelize(Array(
"001,2000-12-22,License,Isil,no,yes",
"002,2001-11-30,Master,SSI,no,no",
"003,2001-11-30,Master,SSI,no,no",
"004,2001-11-30,Master,SSI,no,no",
"005,2000-12-22,License,Isil,no,yes"
))
接下来我做了一些RDD转换。首先,我删除并创建了必要的列,然后我向每一行添加了 1 的计数。最后,我 reduceByKey 来计算所有具有相同信息的行:
val re = x.map(row => {
val cols = row.split(",")
val cycle = cols(2)
val years = cycle match {
case "License" => "3 years"
case "Master" => "3 years"
case "Ingeniorat" => "5 years"
case "Doctorate" => "3 years"
case _ => "other"
}
(cols(1).split("-")(0) + "," + years + "," + cycle + "," + cols(3), 1)
}).reduceByKey(_ + _)
re.collect.foreach(println)
(2000,3 years,License,Isil,2)
(2001,3 years,Master,SSI,3)
我正在使用 scala IDE 在 eclipse IDE 上开发一个 spark 项目
我想就这个 MapReduce 问题寻求帮助
地图函数:
- 删除列 'sport' 和 'bourse'
- 删除任何包含 'NULL' 的行
- 添加一个新的列持续时间周期。这必须根据学生的周期取值:许可证(3 年)、硕士(3 年)、Ingeniorat(5 年)和博士学位(3 年)
减速器:
- 根据年级、周期和专业将所有学生加起来。
我的输入是
matricule,dateins,cycle,specialite,bourse,sport
0000000001,1999-11-22,硕士,IC,非,非
0000000002,2014-02-01,Null,IC,Null,Oui
0000000003,2006-09-07,Null,Null,Oui,Oui
0000000004,2008-12-11,Master,IC,Oui,Oui
0000000005,2006-06-07,Master,SI,Non,Oui
0000000006,1996-11-16,Ingeniorat,SI,Null,Null
等等。
这是我开始的代码。我删除了列 'sport' 'bourse' 并提取了年份
val sc = new SparkContext(conf)
val x = sc.textFile("/home/amel/one")
val re = x.map(_.split(",")).foreach(r => println(r(1).dropRight(6), r(2),r(3)))
这是我得到的结果
(2000,许可证,Isil)
(2001,硕士,SSI)
我想要的结果是:
年周期持续时间专业 Nbr-students
(2000,驾照,3年,Isil,400)
(2001,硕士,3年,SSI,120)
// 我希望 'Nbr-students' 列是每年根据周期和专业的学生人数。
我假设您只想要年份 - 如果您不想要年份,请将 cols(1).split("-")(0)
更改为 cols(1)
。
首先,我使用您的示例数据伪造了一些数据:
val x = sc.parallelize(Array(
"001,2000-12-22,License,Isil,no,yes",
"002,2001-11-30,Master,SSI,no,no",
"003,2001-11-30,Master,SSI,no,no",
"004,2001-11-30,Master,SSI,no,no",
"005,2000-12-22,License,Isil,no,yes"
))
接下来我做了一些RDD转换。首先,我删除并创建了必要的列,然后我向每一行添加了 1 的计数。最后,我 reduceByKey 来计算所有具有相同信息的行:
val re = x.map(row => {
val cols = row.split(",")
val cycle = cols(2)
val years = cycle match {
case "License" => "3 years"
case "Master" => "3 years"
case "Ingeniorat" => "5 years"
case "Doctorate" => "3 years"
case _ => "other"
}
(cols(1).split("-")(0) + "," + years + "," + cycle + "," + cols(3), 1)
}).reduceByKey(_ + _)
re.collect.foreach(println)
(2000,3 years,License,Isil,2)
(2001,3 years,Master,SSI,3)