写入从 SparkR:::map 返回的 R 数据帧
Writing R data frames returned from SparkR:::map
我正在使用 SparkR:::map 和我的函数 returns 每个输入行的大型 R 数据帧,每个输入行的形状相同。我想将这些数据帧写成 parquet 文件而不 'collect'ing 它们。我可以将 write.df 映射到我的输出列表吗?我可以让工人任务改为编写镶木地板吗?
我现在有 工作。例子。我对此很满意,除了我没想到 reduce 会隐式 'collect' 因为我想将结果 DF 写成 Parquet。
此外,我不相信 ::::map 实际上可以并行执行任何操作。我也需要总是打电话给 'parallelise' 吗?
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
appName = "Peter Spark test",
list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
x <- v$xs[1]
y <- v$ys[1]
startTime <- format(Sys.time(), "%F %T")
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE)
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
# Make unique identifiers for each run
xs <- c(1:365)
ys <- c(1:1)
xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
# Convert to Spark dataframe for mapping
sqlContext <- get("sqlContext", envir = .GlobalEnv)
xys.sdf <- createDataFrame(sqlContext, xys)
# Let Spark do what Spark does
output.list <- SparkR:::map(xys.sdf, run.model)
# Reduce gives us a single R dataframe, which may not be what we want.
output.redux <- SparkR:::reduce(output.list, rbind)
# Or you can have it as a list of data frames.
output.col <- collect(output.list)
return(NULL)
}
假设您的数据大致如下所示:
rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])
并且所有列都有支持的类型,您可以将其转换为按行格式:
rows <- SparkR:::flatMap(dfs, function(x) {
data <- as.list(x)
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
do.call(mapply, append(args, data))
})
致电createDataFrame
:
sdf <- createDataFrame(sqlContext, rows)
head(sdf)
## mpg cyl disp hp drat wt qsec vs am gear carb
## 1 18.7 8 360.0 175 3.15 3.44 17.02 0 0 3 2
## 2 18.1 6 225.0 105 2.76 3.46 20.22 1 0 3 1
## 3 14.3 8 360.0 245 3.21 3.57 15.84 0 0 3 4
## 4 24.4 4 146.7 62 3.69 3.19 20.00 1 0 4 2
## 5 22.8 4 140.8 95 3.92 3.15 22.90 1 0 4 2
## 6 19.2 6 167.6 123 3.92 3.44 18.30 1 0 4 4
printSchema(sdf)
## root
## |-- mpg: double (nullable = true)
## |-- cyl: double (nullable = true)
## |-- disp: double (nullable = true)
## |-- hp: double (nullable = true)
## |-- drat: double (nullable = true)
## |-- wt: double (nullable = true)
## |-- qsec: double (nullable = true)
## |-- vs: double (nullable = true)
## |-- am: double (nullable = true)
## |-- gear: double (nullable = true)
## |-- carb: double (nullable = true)
并简单地使用 write.df
/ saveDF
.
问题是您一开始就不应该使用内部 API。它在初始版本中被删除的原因之一是不够健壮,无法直接使用。更不用说现在还不清楚它是否会在未来得到支持甚至可用。只是说...
我正在使用 SparkR:::map 和我的函数 returns 每个输入行的大型 R 数据帧,每个输入行的形状相同。我想将这些数据帧写成 parquet 文件而不 'collect'ing 它们。我可以将 write.df 映射到我的输出列表吗?我可以让工人任务改为编写镶木地板吗?
我现在有 工作。例子。我对此很满意,除了我没想到 reduce 会隐式 'collect' 因为我想将结果 DF 写成 Parquet。
此外,我不相信 ::::map 实际上可以并行执行任何操作。我也需要总是打电话给 'parallelise' 吗?
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
appName = "Peter Spark test",
list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
x <- v$xs[1]
y <- v$ys[1]
startTime <- format(Sys.time(), "%F %T")
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE)
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
# Make unique identifiers for each run
xs <- c(1:365)
ys <- c(1:1)
xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
# Convert to Spark dataframe for mapping
sqlContext <- get("sqlContext", envir = .GlobalEnv)
xys.sdf <- createDataFrame(sqlContext, xys)
# Let Spark do what Spark does
output.list <- SparkR:::map(xys.sdf, run.model)
# Reduce gives us a single R dataframe, which may not be what we want.
output.redux <- SparkR:::reduce(output.list, rbind)
# Or you can have it as a list of data frames.
output.col <- collect(output.list)
return(NULL)
}
假设您的数据大致如下所示:
rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])
并且所有列都有支持的类型,您可以将其转换为按行格式:
rows <- SparkR:::flatMap(dfs, function(x) {
data <- as.list(x)
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
do.call(mapply, append(args, data))
})
致电createDataFrame
:
sdf <- createDataFrame(sqlContext, rows)
head(sdf)
## mpg cyl disp hp drat wt qsec vs am gear carb
## 1 18.7 8 360.0 175 3.15 3.44 17.02 0 0 3 2
## 2 18.1 6 225.0 105 2.76 3.46 20.22 1 0 3 1
## 3 14.3 8 360.0 245 3.21 3.57 15.84 0 0 3 4
## 4 24.4 4 146.7 62 3.69 3.19 20.00 1 0 4 2
## 5 22.8 4 140.8 95 3.92 3.15 22.90 1 0 4 2
## 6 19.2 6 167.6 123 3.92 3.44 18.30 1 0 4 4
printSchema(sdf)
## root
## |-- mpg: double (nullable = true)
## |-- cyl: double (nullable = true)
## |-- disp: double (nullable = true)
## |-- hp: double (nullable = true)
## |-- drat: double (nullable = true)
## |-- wt: double (nullable = true)
## |-- qsec: double (nullable = true)
## |-- vs: double (nullable = true)
## |-- am: double (nullable = true)
## |-- gear: double (nullable = true)
## |-- carb: double (nullable = true)
并简单地使用 write.df
/ saveDF
.
问题是您一开始就不应该使用内部 API。它在初始版本中被删除的原因之一是不够健壮,无法直接使用。更不用说现在还不清楚它是否会在未来得到支持甚至可用。只是说...