在 Spark 中处理日期
Working with Dates in Spark
我有一个要求,其中要解析 CSV 文件,识别特定日期之间的记录,并找到该持续时间内每个 ProductCategory 的每个销售人员的总销售额和平均销售额。以下是 CSV 文件结构:
SalesPersonId、SalesPersonName、SaleDate、SaleAmount、ProductCategory
请帮助解决这个问题。在 Scala 中寻找解决方案
我尝试了什么:
使用如下所述的 SimpleDateFormat:
val 格式 = 新 java.text.SimpleDateFormat("MM/dd/yyyy")
并使用以下代码创建了一个 RDD:
val onlyHouseLoan = readFile.map(line => (line.split(",")(0), line.split(",")(2), line.split("," )(3).toLong, format.parse(line.split(",")(4).toString())))
但是,我尝试在突出显示的表达式之上使用日历,但出现 NumberformatExpression 错误。
因此,只需以您描述的 csv 文件格式快速创建一个 rdd
val list = sc.parallelize(List(("1","Timothy","04/02/2015","100","TV"), ("1","Timothy","04/03/2015","10","Book"), ("1","Timothy","04/03/2015","20","Book"), ("1","Timothy","04/05/2015","10","Book"),("2","Ursula","04/02/2015","100","TV")))
然后运行
import java.time.LocalDate
import java.time.format.DateTimeFormatter
val startDate = LocalDate.of(2015,1,4)
val endDate = LocalDate.of(2015,4,5)
val result = list
.filter{case(_,_,date,_,_) => {
val localDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("MM/dd/yyyy"))
localDate.isAfter(startDate) && localDate.isBefore(endDate)}}
.map{case(id, _, _, amount, category) => ((id, category), (amount.toDouble, 1))}
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.map{case((id, category),(total, sales)) => (id, List((category, total, total/sales)))}
.reduceByKey(_ ++ _)
会给你
(1,List((Book,30.0,15.0), (TV,100.0,100.0)))
(2,List((TV,100.0,100.0)))
格式为 (SalesPersonId, [(ProductCategory,TotalSaleAmount, AvgSaleAmount)]。这就是您要查找的内容吗?
我有一个要求,其中要解析 CSV 文件,识别特定日期之间的记录,并找到该持续时间内每个 ProductCategory 的每个销售人员的总销售额和平均销售额。以下是 CSV 文件结构:
SalesPersonId、SalesPersonName、SaleDate、SaleAmount、ProductCategory
请帮助解决这个问题。在 Scala 中寻找解决方案
我尝试了什么:
使用如下所述的 SimpleDateFormat: val 格式 = 新 java.text.SimpleDateFormat("MM/dd/yyyy") 并使用以下代码创建了一个 RDD: val onlyHouseLoan = readFile.map(line => (line.split(",")(0), line.split(",")(2), line.split("," )(3).toLong, format.parse(line.split(",")(4).toString())))
但是,我尝试在突出显示的表达式之上使用日历,但出现 NumberformatExpression 错误。
因此,只需以您描述的 csv 文件格式快速创建一个 rdd
val list = sc.parallelize(List(("1","Timothy","04/02/2015","100","TV"), ("1","Timothy","04/03/2015","10","Book"), ("1","Timothy","04/03/2015","20","Book"), ("1","Timothy","04/05/2015","10","Book"),("2","Ursula","04/02/2015","100","TV")))
然后运行
import java.time.LocalDate
import java.time.format.DateTimeFormatter
val startDate = LocalDate.of(2015,1,4)
val endDate = LocalDate.of(2015,4,5)
val result = list
.filter{case(_,_,date,_,_) => {
val localDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("MM/dd/yyyy"))
localDate.isAfter(startDate) && localDate.isBefore(endDate)}}
.map{case(id, _, _, amount, category) => ((id, category), (amount.toDouble, 1))}
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.map{case((id, category),(total, sales)) => (id, List((category, total, total/sales)))}
.reduceByKey(_ ++ _)
会给你
(1,List((Book,30.0,15.0), (TV,100.0,100.0)))
(2,List((TV,100.0,100.0)))
格式为 (SalesPersonId, [(ProductCategory,TotalSaleAmount, AvgSaleAmount)]。这就是您要查找的内容吗?