SparkR dubt 和 Broken pipe 异常
SparkR dubt and Broken pipe exception
您好,我正在使用 yarn 集群在分布式模式下开发 SparkR。
我有两个问题:
1) 如果我制作了一个包含 R 行代码和 SparkR 行代码的脚本,它会只分发 SparkR 代码还是简单的 R 代码?
这是脚本。我读了一个 csv 并只记录了 100k 的第一条记录。
我清理它(使用 R 函数)删除 NA 值并创建一个 SparkR 数据框。
这就是它的作用:foreach Lineset 获取该 LineSet 出现的每个 TimeInterval 并对一些属性(数字属性)求和,然后将它们全部放入矩阵中。
这是包含 R 和 SparkR 代码的脚本。在独立模式下需要 7 小时,在分布式模式下需要 60 小时(被 java.net.SocketException
: Broken Pipe 杀死)
LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1
colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
IntTemp[k,3]<-k3[1,1]
k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
IntTemp[k,4]<-k4[1,1]
k5<-collect(select(SubTime,sum(SubTime$CallIn)))
IntTemp[k,5]<-k5[1,1]
k6<-collect(select(SubTime,sum(SubTime$CallOut)))
IntTemp[k,6]<-k6[1,1]
k7<-collect(select(SubTime,sum(SubTime$Internet)))
IntTemp[k,7]<-k7[1,1]
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
这是脚本 R,唯一改变的是子集函数,当然是普通 R data.frame 而不是 SparkR 数据帧。
单机模式耗时1.30分钟
为什么它在 R 中这么快而在 SparkR 中却这么慢?
for(i in 1:length(UniqueLineSet)){
SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
2) 分布式模式下的第一个脚本被 :
杀死
java.net.SocketException: Broken Pipe
这有时也会出现:
java.net.SocketTimeoutException: Accept timed out
可能是配置不好?建议?
谢谢。
请不要误会,但这并不是一段编写得特别好的代码。使用核心 R 已经很低效了,将 SparkR 添加到等式中会使它变得更糟。
If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?
除非您使用的是分布式数据结构和对这些结构进行操作的函数,否则它只是在主服务器上的单个线程中执行的普通 R 代码。
Why it's so fast just in R and it's so slowly in SparkR?
对于初学者,您为 LINESET
、UniqueTime
和列的每个组合执行一个作业。每次 Spark 都扫描所有记录并将数据提取到驱动程序。
此外,使用Spark来处理可以在单机内存中轻松处理的数据根本没有意义。 运行 这种情况下的工作成本通常比实际处理成本高得多。
suggestion?
如果你真的想使用 SparkR,只需 groupBy
和 agg
:
group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
sum(Short$CallOut), sum(Short$Internet))
如果您关心缺失的 (LINESET
, TimeInterval
) 对,请使用 join
或 unionAll
.
填充这些对
在实践中,它会简单地坚持 data.table
并在本地聚合:
Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]
您好,我正在使用 yarn 集群在分布式模式下开发 SparkR。
我有两个问题:
1) 如果我制作了一个包含 R 行代码和 SparkR 行代码的脚本,它会只分发 SparkR 代码还是简单的 R 代码?
这是脚本。我读了一个 csv 并只记录了 100k 的第一条记录。
我清理它(使用 R 函数)删除 NA 值并创建一个 SparkR 数据框。
这就是它的作用:foreach Lineset 获取该 LineSet 出现的每个 TimeInterval 并对一些属性(数字属性)求和,然后将它们全部放入矩阵中。
这是包含 R 和 SparkR 代码的脚本。在独立模式下需要 7 小时,在分布式模式下需要 60 小时(被 java.net.SocketException
: Broken Pipe 杀死)
LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1
colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
IntTemp[k,3]<-k3[1,1]
k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
IntTemp[k,4]<-k4[1,1]
k5<-collect(select(SubTime,sum(SubTime$CallIn)))
IntTemp[k,5]<-k5[1,1]
k6<-collect(select(SubTime,sum(SubTime$CallOut)))
IntTemp[k,6]<-k6[1,1]
k7<-collect(select(SubTime,sum(SubTime$Internet)))
IntTemp[k,7]<-k7[1,1]
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
这是脚本 R,唯一改变的是子集函数,当然是普通 R data.frame 而不是 SparkR 数据帧。
单机模式耗时1.30分钟
为什么它在 R 中这么快而在 SparkR 中却这么慢?
for(i in 1:length(UniqueLineSet)){
SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
2) 分布式模式下的第一个脚本被 :
杀死java.net.SocketException: Broken Pipe
这有时也会出现:
java.net.SocketTimeoutException: Accept timed out
可能是配置不好?建议?
谢谢。
请不要误会,但这并不是一段编写得特别好的代码。使用核心 R 已经很低效了,将 SparkR 添加到等式中会使它变得更糟。
If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?
除非您使用的是分布式数据结构和对这些结构进行操作的函数,否则它只是在主服务器上的单个线程中执行的普通 R 代码。
Why it's so fast just in R and it's so slowly in SparkR?
对于初学者,您为 LINESET
、UniqueTime
和列的每个组合执行一个作业。每次 Spark 都扫描所有记录并将数据提取到驱动程序。
此外,使用Spark来处理可以在单机内存中轻松处理的数据根本没有意义。 运行 这种情况下的工作成本通常比实际处理成本高得多。
suggestion?
如果你真的想使用 SparkR,只需 groupBy
和 agg
:
group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
sum(Short$CallOut), sum(Short$Internet))
如果您关心缺失的 (LINESET
, TimeInterval
) 对,请使用 join
或 unionAll
.
在实践中,它会简单地坚持 data.table
并在本地聚合:
Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]