Spark:分组 RDD Sql 查询
Spark: Group RDD Sql Query
我有 3 个 RDD 需要加入。
val event1001RDD: schemaRDD = [eventtype,id,location,date1]
[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]
val event2009RDD: schemaRDD = [eventtype,id,celltype,date1](未按 id 分组,因为我需要 4 个日期,具体取决于 celltype)
[2009,4929101,R01,2015-01-20 20:44:39]
[2009,4929102,R02,2015-01-20 14:00:00] (RPM)
[2009,4929102,P01,2015-01-20 12:00:00] (PPM)
[2009,4929102,R03,2015-01-20 15:00:00] (RPM)
[2009,4929102,C01,2015-01-20 13:00:00] (RPM)
[2009,4929103,R01,2015-01-20 14:44:39]
[2009,4929105,R01,2015-01-20 12:44:39]
[2009,4929105,V01,2015-01-20 11:44:39]
[2009,4929106,R01,2015-01-20 13:44:39]
val cellLookupRDD: [celltype, cellname](cellname 有 4 个值)
[R01,RPM]
[R02,RPM]
[R03,RPM]
[C01,RPM]
[P01,PPM]
[V01,PPM]
预期结果:[id,1001的位置,1001的日期1,2009的第一个RPM日期,2009的最后一个RPM日期,2009的第一个PPM日期,2009的最后一个PPM日期]
4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
4929102,LOC01,2015-01-20 10:44:39,2015-01-20 13:00:00,2015-01-20 15:00:00,2015-01-20 12:00:00,NULL
4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,NULL,NULL,NULL
4929104,LOC03,2015-01-20 10:44:39,NULL,NULL,NULL,NULL
4929105,NULL,NULL,2015-01-20 12:44:39,NULL,2015-01-20 11:44:39,NULL
4929106,NULL,NULL,2015-01-20 13:44:39,NULL,NULL,NULL
这是我当前的查询(我也将可选的事件类型指示为第一列;但在我之前的事件 2009RDD 中,我选择了一个错误的最小和最大日期,因为我需要通过 cellLookupRDD 确定的四个日期- RPM 和 PPM):
select if(event1001Table.eventtype is not null, event1001Table.eventtype,
event2009Table.eventtype),
if(event1001Table.id is not null, event1001Table.id,
event2009Table.id),
event1001Table.date1, event2009Table.minDate, event2009Table.maxDate
from event1001Table full outer join event2009Table
on event1001Table.id=event2009Table.id")
已编辑以在应用答案后显示结果:
" min(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmn, " +
" max(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmx, " +
" min(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmn, " +
" max(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmx " +
[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
---res: [2009,4929102,NULL,NULL,2015-01-20 13:00:00,2015-01-20 14:00:00]
--- CORRECT
[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,P01,2015-01-20 14:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 14:00:00,NULL]
--- INCORRECT (max should be equal to MIN although NULL is preferred if possible but I could just check in the code later on if min=max)
[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
[2009,4929102,P01,2015-01-20 09:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 09:00:00,NULL]
--- INCORRECT (max is not working)
让我们一步一步来吧。让我们首先构建 2009 部分
event2009RDD.registerTempTable("base2009")
cellLookupRDD.registerTempTable("lookup")
trns2009 = ssc.sql("select eventtype, id, \
min(case when l.cn = 'RPM' then r.date1 else null end) rpmmn, \
max(case when l.cn = 'RPM' then r.date1 else null end) rpmmx, \
min(case when l.cn = 'PPM' then r.date1 else null end) ppmmn, \
max(case when l.cn = 'PPM' then r.date1 else null end) ppmmx, \
from base2009 r inner join lookup l on r.celltype=l.celltype \
group by eventtype,id "
trns2009 .registerTempTable("transformed2009")
现在您可以对 1001 个数据集进行完全外部连接并获得输出。
注意:你不应该
4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
相反,您应该
4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39,NULL,NULL
因为,如果 2009 年的事件发生过一次,那么它应该有第一个和最后一个日期。 NULL 应该代表一个从未发生过的事件,比如 id=4929101, celltype=PPM.
请告诉我这是否有效(或无效)。我现在无法访问 spark,但如果需要,今晚应该可以调试。
我有 3 个 RDD 需要加入。
val event1001RDD: schemaRDD = [eventtype,id,location,date1]
[1001,4929102,LOC01,2015-01-20 10:44:39]
[1001,4929103,LOC02,2015-01-20 10:44:39]
[1001,4929104,LOC03,2015-01-20 10:44:39]
val event2009RDD: schemaRDD = [eventtype,id,celltype,date1](未按 id 分组,因为我需要 4 个日期,具体取决于 celltype)
[2009,4929101,R01,2015-01-20 20:44:39]
[2009,4929102,R02,2015-01-20 14:00:00] (RPM)
[2009,4929102,P01,2015-01-20 12:00:00] (PPM)
[2009,4929102,R03,2015-01-20 15:00:00] (RPM)
[2009,4929102,C01,2015-01-20 13:00:00] (RPM)
[2009,4929103,R01,2015-01-20 14:44:39]
[2009,4929105,R01,2015-01-20 12:44:39]
[2009,4929105,V01,2015-01-20 11:44:39]
[2009,4929106,R01,2015-01-20 13:44:39]
val cellLookupRDD: [celltype, cellname](cellname 有 4 个值)
[R01,RPM]
[R02,RPM]
[R03,RPM]
[C01,RPM]
[P01,PPM]
[V01,PPM]
预期结果:[id,1001的位置,1001的日期1,2009的第一个RPM日期,2009的最后一个RPM日期,2009的第一个PPM日期,2009的最后一个PPM日期]
4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
4929102,LOC01,2015-01-20 10:44:39,2015-01-20 13:00:00,2015-01-20 15:00:00,2015-01-20 12:00:00,NULL
4929103,LOC02,2015-01-20 10:44:39,2015-01-20 14:44:39,NULL,NULL,NULL
4929104,LOC03,2015-01-20 10:44:39,NULL,NULL,NULL,NULL
4929105,NULL,NULL,2015-01-20 12:44:39,NULL,2015-01-20 11:44:39,NULL
4929106,NULL,NULL,2015-01-20 13:44:39,NULL,NULL,NULL
这是我当前的查询(我也将可选的事件类型指示为第一列;但在我之前的事件 2009RDD 中,我选择了一个错误的最小和最大日期,因为我需要通过 cellLookupRDD 确定的四个日期- RPM 和 PPM):
select if(event1001Table.eventtype is not null, event1001Table.eventtype,
event2009Table.eventtype),
if(event1001Table.id is not null, event1001Table.id,
event2009Table.id),
event1001Table.date1, event2009Table.minDate, event2009Table.maxDate
from event1001Table full outer join event2009Table
on event1001Table.id=event2009Table.id")
已编辑以在应用答案后显示结果:
" min(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmn, " +
" max(if(l.cn = 'RPM' or l.cn = 'RPM2', r.date1, 'NULL')) as rpmmx, " +
" min(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmn, " +
" max(if(l.cn = 'PPM' or l.cn = 'PPM2', r.date1, 'NULL')) as ppmmx " +
[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
---res: [2009,4929102,NULL,NULL,2015-01-20 13:00:00,2015-01-20 14:00:00]
--- CORRECT
[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,P01,2015-01-20 14:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 14:00:00,NULL]
--- INCORRECT (max should be equal to MIN although NULL is preferred if possible but I could just check in the code later on if min=max)
[2009,4929102,R01,2015-01-20 13:00:00] min_rpm
[2009,4929102,C01,2015-01-20 14:00:00] max_rpm
[2009,4929102,P01,2015-01-20 09:00:00] min_ppm
---res: [2009,4929102,2015-01-20 13:00:00,NULL,2015-01-20 09:00:00,NULL]
--- INCORRECT (max is not working)
让我们一步一步来吧。让我们首先构建 2009 部分
event2009RDD.registerTempTable("base2009")
cellLookupRDD.registerTempTable("lookup")
trns2009 = ssc.sql("select eventtype, id, \
min(case when l.cn = 'RPM' then r.date1 else null end) rpmmn, \
max(case when l.cn = 'RPM' then r.date1 else null end) rpmmx, \
min(case when l.cn = 'PPM' then r.date1 else null end) ppmmn, \
max(case when l.cn = 'PPM' then r.date1 else null end) ppmmx, \
from base2009 r inner join lookup l on r.celltype=l.celltype \
group by eventtype,id "
trns2009 .registerTempTable("transformed2009")
现在您可以对 1001 个数据集进行完全外部连接并获得输出。
注意:你不应该
4929101,NULL,NULL,2015-01-20 20:44:39,NULL,NULL,NULL
相反,您应该
4929101,NULL,NULL,2015-01-20 20:44:39,2015-01-20 20:44:39,NULL,NULL
因为,如果 2009 年的事件发生过一次,那么它应该有第一个和最后一个日期。 NULL 应该代表一个从未发生过的事件,比如 id=4929101, celltype=PPM.
请告诉我这是否有效(或无效)。我现在无法访问 spark,但如果需要,今晚应该可以调试。