如何将函数应用于 SparkR 中的每一行?
How to apply a function to each row in SparkR?
我有一个 CSV 格式的文件,其中包含 table 列 "id"、"timestamp"、"action"、"value" 和 "location".
我想对 table 的每一行应用一个函数,我已经在 R 中编写了如下代码:
user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
row <- user[i,]
if(user$action != "power")
curLocation <- row$value
user[i,"location"] <- curLocation
}
R 脚本工作正常,现在我想应用它 SparkR。但是,我无法直接访问 SparkR 中的第 i 行,也找不到任何函数来操作 SparkR documentation.
中的每一行
我应该使用哪种方法才能达到与R脚本相同的效果?
此外,根据@chateaur 的建议,我尝试使用 dapply 函数进行编码,如下所示:
curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
if(row$Action != "power|battery|level"){
curLoc <- row$Value
}
row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)
然后我得到一个错误:
我查看了警告消息条件的长度 > 1,并且只使用第一个元素,我发现了一些东西。这让我想知道 dapply 函数 中的 row 参数是否代表我的数据框的 整个分区 而不是 一行 ?也许 dapply 功能不是一个理想的解决方案?
后来按照@chateaur的建议修改了函数。我没有使用 dapply,而是使用了 dapplyCollect,这节省了我指定模式的工作。有效!
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- "1"
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row$action != "power") {
curLocation <- row$value
}
partitionnedDf[i,"location"] <- curLocation
}
partitionnedDf
}
bw <- dapplyCollect(user, changeLocation)
Scorpion775,
您应该分享您的 sparkR 代码。不要忘记,数据在 R 和 sparkR 中的处理方式不同。
发件人:http://spark.apache.org/docs/latest/sparkr.html、
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
然后你可以在这里查看 dapply 函数:https://spark.apache.org/docs/2.1.0/api/R/dapply.html
这是一个工作示例:
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- as.integer(1)
# Loop over each row of the partitionned data frame
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row[1] != "power") {
curLocation <- row[2]
}
partitionnedDf[i,3] <- curLocation
}
# Return modified data frame
partitionnedDf
}
# Load data
df <- read.df("data.csv", "csv", header="false", inferSchema = "true")
head(collect(df))
# Define schema of dataframe
schema <- structType(structField("action", "string"), structField("value", "integer"),
structField("location", "integer"))
# Change location of each row
df2 <- dapply(df, changeLocation, schema)
head(df2)
我有一个 CSV 格式的文件,其中包含 table 列 "id"、"timestamp"、"action"、"value" 和 "location". 我想对 table 的每一行应用一个函数,我已经在 R 中编写了如下代码:
user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
row <- user[i,]
if(user$action != "power")
curLocation <- row$value
user[i,"location"] <- curLocation
}
R 脚本工作正常,现在我想应用它 SparkR。但是,我无法直接访问 SparkR 中的第 i 行,也找不到任何函数来操作 SparkR documentation.
中的每一行我应该使用哪种方法才能达到与R脚本相同的效果?
此外,根据@chateaur 的建议,我尝试使用 dapply 函数进行编码,如下所示:
curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
if(row$Action != "power|battery|level"){
curLoc <- row$Value
}
row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)
然后我得到一个错误:
我查看了警告消息条件的长度 > 1,并且只使用第一个元素,我发现了一些东西。这让我想知道 dapply 函数 中的 row 参数是否代表我的数据框的 整个分区 而不是 一行 ?也许 dapply 功能不是一个理想的解决方案?
后来按照@chateaur的建议修改了函数。我没有使用 dapply,而是使用了 dapplyCollect,这节省了我指定模式的工作。有效!
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- "1"
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row$action != "power") {
curLocation <- row$value
}
partitionnedDf[i,"location"] <- curLocation
}
partitionnedDf
}
bw <- dapplyCollect(user, changeLocation)
Scorpion775,
您应该分享您的 sparkR 代码。不要忘记,数据在 R 和 sparkR 中的处理方式不同。
发件人:http://spark.apache.org/docs/latest/sparkr.html、
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
然后你可以在这里查看 dapply 函数:https://spark.apache.org/docs/2.1.0/api/R/dapply.html
这是一个工作示例:
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- as.integer(1)
# Loop over each row of the partitionned data frame
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row[1] != "power") {
curLocation <- row[2]
}
partitionnedDf[i,3] <- curLocation
}
# Return modified data frame
partitionnedDf
}
# Load data
df <- read.df("data.csv", "csv", header="false", inferSchema = "true")
head(collect(df))
# Define schema of dataframe
schema <- structType(structField("action", "string"), structField("value", "integer"),
structField("location", "integer"))
# Change location of each row
df2 <- dapply(df, changeLocation, schema)
head(df2)