如何在 sparkR 中使用 groupBy 来计算保持其他列的原样?

How to use groupBy in sparkR to count keeping other columns as it is?

在 sparkR 中,我需要计算 col STORE 在数据帧 df3

中出现的次数
>df3 <- select(df_1,"DC_NAME","STORE", "ITEM_DESC","ITEM")

>head(df3)
 DC_NAME STORE                    ITEM_DESC      ITEM
1  Kerala  1216 Nambisan Ghee 200 Ml Pet Jar 100050222
2  Kerala  1216 Nambisan Ghee 100 ml Pet Jar 100149022
3  Kerala  1216  Nambisan Ghee 50 ml Pet Jar 100149024
4  Kerala  1219 Nambisan Ghee 500 Ml Pet Jar 100050210
5  Kerala  1219 Nambisan Ghee 200 Ml Pet Jar 100050222
6  Kerala  1219  Nambisan Ghee 50 ml Pet Jar 100149024

For counting number times column STORE occurs, i used the code,
df_3 <- groupBy(df_3,"STORE") %>% count()
STORE count
1  1216     3
2  1219     3
3  3154     1
4  3049     3
5  1990     3
6  3107     4

但我需要这种形式的结果,包括 'DC_NAME, ITEM_DESC, ITEM' 列。有代码吗

DC_NAME STORE                    ITEM_DESC      ITEM    count
1  Kerala  1216 Nambisan Ghee 200 Ml Pet Jar 100050222   3
2  Kerala  1216 Nambisan Ghee 100 ml Pet Jar 100149022   3
3  Kerala  1216  Nambisan Ghee 50 ml Pet Jar 100149024   3
4  Kerala  1219 Nambisan Ghee 500 Ml Pet Jar 100050210   3
5  Kerala  1219 Nambisan Ghee 200 Ml Pet Jar 100050222   3
6  Kerala  1219  Nambisan Ghee 50 ml Pet Jar 100149024   3

如果您想避免 join,您可以使用具有无限范围的 window 函数。假设您的数据具有以下结构:

df <- structure(list(DC_NAME = structure(c(1L, 1L, 1L, 1L, 1L, 1L), 
    .Label = " Kerala ", class = "factor"), 
    STORE = c(1216L, 1216L, 1216L, 1219L, 1219L, 1219L),
    ITEM_DESC = structure(c(2L, 
    1L, 4L, 3L, 2L, 4L), .Label = c(" Nambisan Ghee 100 ml Pet Jar", 
    " Nambisan Ghee 200 Ml Pet Jar", " Nambisan Ghee 500 Ml Pet Jar", 
    "  Nambisan Ghee 50 ml Pet Jar"), class = "factor"), ITEM = c(100050222L, 
    100149022L, 100149024L, 100050210L, 100050222L, 100149024L
    )), .Names = c("DC_NAME", "STORE", "ITEM_DESC", "ITEM"),
    class = "data.frame", row.names = c("1 ", "2 ", "3 ", "4 ", "5 ", "6 "))
  • 使用 Hive 上下文创建 Spark DataFrame:

    hiveContext <- sparkRHive.init(sc)
    sdf <- createDataFrame(hiveContext, df)
    
  • 注册为临时table:

    registerTempTable(sdf, "sdf")
    
  • 准备查询:

    query <- "SELECT *, SUM(1) OVER (
        PARTITION BY STORE
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS count FROM sdf"
    
  • 使用sql函数执行:

    sql(hiveContext, query)