SparkR dubt 和 Broken pipe 异常

SparkR dubt and Broken pipe exception

您好,我正在使用 yarn 集群在分布式模式下开发 SparkR。

我有两个问题:

1) 如果我制作了一个包含 R 行代码和 SparkR 行代码的脚本,它会只分发 SparkR 代码还是简单的 R 代码?

这是脚本。我读了一个 csv 并只记录了 100k 的第一条记录。 我清理它(使用 R 函数)删除 NA 值并创建一个 SparkR 数据框。
这就是它的作用:foreach Lineset 获取该 LineSet 出现的每个 TimeInterval 并对一些属性(数字属性)求和,然后将它们全部放入矩阵中。

这是包含 R 和 SparkR 代码的脚本。在独立模式下需要 7 小时,在分布式模式下需要 60 小时(被 java.net.SocketException: Broken Pipe 杀死)

LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1

colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
  SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
  for(j in 1:length(UniqueTime)){
    SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])       
    IntTemp[k,1]<-UniqueLineSet[i]
    IntTemp[k,2]<-as.numeric(UniqueTime[j])
    k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
    IntTemp[k,3]<-k3[1,1]
    k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
    IntTemp[k,4]<-k4[1,1]
    k5<-collect(select(SubTime,sum(SubTime$CallIn)))
    IntTemp[k,5]<-k5[1,1]
    k6<-collect(select(SubTime,sum(SubTime$CallOut)))
    IntTemp[k,6]<-k6[1,1]
    k7<-collect(select(SubTime,sum(SubTime$Internet)))
    IntTemp[k,7]<-k7[1,1]
    k<-k+1
  }
  print(UniqueLineSet[i])
  print(i)
}

这是脚本 R,唯一改变的是子集函数,当然是普通 R data.frame 而不是 SparkR 数据帧。 单机模式耗时1.30分钟
为什么它在 R 中这么快而在 SparkR 中却这么慢?

for(i in 1:length(UniqueLineSet)){
  SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
  for(j in 1:length(UniqueTime)){
    SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
    IntTemp[k,1]<-UniqueLineSet[i]
    IntTemp[k,2]<-as.numeric(UniqueTime[j])
    IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
    IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
    IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
    IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
    IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
    k<-k+1
  }
  print(UniqueLineSet[i])
  print(i)
}

2) 分布式模式下的第一个脚本被 :

杀死

java.net.SocketException: Broken Pipe

这有时也会出现:

java.net.SocketTimeoutException: Accept timed out

可能是配置不好?建议?

谢谢。

请不要误会,但这并不是一段编写得特别好的代码。使用核心 R 已经很低效了,将 SparkR 添加到等式中会使它变得更糟。

If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?

除非您使用的是分布式数据结构和对这些结构进行操作的函数,否则它只是在主服务器上的单个线程中执行的普通 R 代码。

Why it's so fast just in R and it's so slowly in SparkR?

对于初学者,您为 LINESETUniqueTime 和列的每个组合执行一个作业。每次 Spark 都扫描所有记录并将数据提取到驱动程序。

此外,使用Spark来处理可以在单机内存中轻松处理的数据根本没有意义。 运行 这种情况下的工作成本通常比实际处理成本高得多。

suggestion?

如果你真的想使用 SparkR,只需 groupByagg:

group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
  sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
  sum(Short$CallOut), sum(Short$Internet))

如果您关心缺失的 (LINESET, TimeInterval) 对,请使用 joinunionAll.

填充这些对

在实践中,它会简单地坚持 data.table 并在本地聚合:

Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]