如何在 R 中并行化分组 mutate/summarise

How to parallelize a grouped mutate/summarise in R

在 tidy R 中,如何并行化分组的 summarize(或 mutate)函数调用? 对 iris 数据集的转换说明了我的问题。

我创建了一个简单的函数 - 它需要两个数值向量作为参数。它 returns 一个包含 2 列小标题的列表。

 library(tidyverse)
 geoMaxMean <- function(pLen, pWid){
    list(
      tibble(maxLen = max(pLen), 
             geoMean = sqrt(max(pLen) * max(pWid))))}

将其应用于虹膜

 gIris <- iris %>% 
    as_tibble() %>% 
    group_by(Species) %>% 
    summarise(Cols2 = geoMaxMean(Petal.Length, Petal.Width)) %>% 
    unnest(Cols2)

给出了预期的结果。

Species     maxLen      geoMean
setosa      1.9         1.067708
versicolor  5.1         3.029851
virginica   6.9         4.153312

如何并行化 geoMaxMean 调用?我试图用 lappplyforeach 重新调用,但我一直没弄明白。

我是 运行 RStudio Pro 上的 R 3.4.4。

这里有一段代码可以使用 pbmcapply 包来完成。 mcapply 包也可以正常工作,并且功能相同,但这样你会得到一个进度条,这很方便。

library(tidyverse)
library(magrittr)
library(pbmcapply)

allSpecies <- 
  iris %>%
  pull(Species) %>%
  unique 

geoMaxMean <- 
  function(species, data){
    data <- data[data$Species == species,]
    pLen <- data$Petal.Length
    pWid <-  data$Petal.Width
    rm(data)

    out <- 
      tibble(maxLen = max(pLen), 
             geoMean = sqrt(max(pLen) * max(pWid))
             )
    return(out)
}

nCores <- 
  detectCores() %>%
  subtract(2)

gIris <-
  allSpecies %>%
  as.list %>%
  pbmclapply(geoMaxMean,
             data = iris,
             mc.cores = nCores
             ) %>%
  bind_rows %>%
  tibble("Species" = allSpecies, .)

这里的主要区别在于,您必须重新考虑要输入并行化 apply 函数的函数中包含的内容。您的原始代码片段将所有计算分配给一个函数,然后尝试对所有内容进行分组。如果您将函数设计为将数据拆分为一个子组,然后执行您的计算,通过使用所有分组标签的列表作为输入列表到 pbmclapply 中可以很容易地并行化,并且只需将您的数据作为函数的参数,而不是输入。

希望对您有所帮助。

您也可以使用 dplyr::group_nestfuturefurrr:future_map_dfr

(以防万一,我使用的是 dplyr 1.0.7、furrr 0.2.3、tidyr 1.1.2 和 future1.21.0)

首先,您使用 group_nest 将组放在一起,然后拆分以进行并行化(例如,通过 worker_id,如下所示)。然后你 运行 在每个分离的工作组上, future_map_dfr 自动重新组合成 tibbledataframe (例如相当于 运行ning bind_rows 最后):

library(tidyverse)

geoMaxMean <- function(pLen, pWid) {
  list(
    tibble(maxLen = max(pLen), 
           geoMean = sqrt(max(pLen) * max(pWid))))
  }

n_workers <- 4
# Setup parallelization
future::plan(future::multisession, workers=n_workers)

gIris <- iris %>% 
  as_tibble() %>% 
  group_by(Species) %>% 
  summarise(Cols2 = geoMaxMean(Petal.Length, Petal.Width)) %>% 
  unnest(Cols2)

gIris_parallel <- iris %>% 
  group_nest(Species, .key="grouped_data") %>% 
  dplyr::mutate(.worker_id = sample(1:n_workers, replace=T, size=nrow(.))) %>% 
  dplyr::group_split(.worker_id, .keep=F) %>% 
  furrr::future_map_dfr(
    function(.data) tidyr::unnest(.data, grouped_data) %>% 
      group_by(Species) %>% 
      summarise(Cols2 = geoMaxMean(Petal.Length, Petal.Width)) %>% 
      unnest(Cols2)
  )

顺便说一句,请注意 运行ning summarise 在 returns tibble 自动解包列的函数上,并消除了对虚拟变量 Col2 的需要:

geoMaxMean_to_tibble <- function(pLen, pWid) {
    tibble(maxLen = max(pLen), 
           geoMean = sqrt(max(pLen) * max(pWid)))
  }

gIris <- iris %>% 
  as_tibble() %>% 
  group_by(Species) %>% 
  summarise(geoMaxMean_to_tibble(Petal.Length, Petal.Width))
  # No need to call unnest