Kotlin 集合的并行操作?

Parallel operations on Kotlin collections?

在 Scala 中,可以很容易地做一个并行映射,forEach 等,使用:

collection.par.map(..)

Kotlin 中是否有等效项?

目前没有。 Kotlin 与 Scala 的官方比较提到:

Things that may be added to Kotlin later:

  • Parallel collections

Kotlin 标准库不支持并行操作。但是,由于 Kotlin 使用标准 Java 集合 类,您也可以使用 Java 8 流 API 对 Kotlin 集合执行并行操作。

例如

myCollection.parallelStream()
        .map { ... }
        .filter { ... }

Kotlin 的标准库还没有官方支持,但是你可以定义一个 extension function 来模仿 par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(github source)

这是一个简单的用法示例

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

如果需要,它允许通过提供线程数甚至特定 java.util.concurrent.Executor 来调整线程。例如

listOf("foo", "bar").pmap(4, transform = { it + "!" })

请注意,此方法仅允许并行化 map 操作,不会影响任何下游位。例如。第一个示例中的 filter 将 运行 单线程。然而,在许多情况下,只有数据转换(即 map)需要并行化。此外,将上面的方法扩展到 Kotlin 集合的其他元素会很简单 API.

从 Kotlin 1.1 开始,并行操作也可以用 coroutines 来非常优雅地表达。这是列表中的 pmap

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}

请注意,协程仍是一项实验性功能。

从 1.2 版本开始,kotlin 添加了一个 stream feature 兼容 JRE8

因此,异步遍历列表可以像下面这样完成:

fun main(args: Array<String>) {
  val c = listOf("toto", "tata", "tutu")
  c.parallelStream().forEach { println(it) }
}

Kotlin 希望是地道的,但又不想过于综合以至于乍一看难以理解。

通过协程进行并行计算也不例外。他们希望它简单但不隐含一些预构建的方法,允许在需要时分支计算。

你的情况:

collection.map { 
        async{ produceWith(it) } 
    }
    .forEach { 
        consume(it.await()) 
    }

请注意,要调用 asyncawait,您需要在所谓的 Context 中,您不能挂起调用或从非协程上下文启动协程。要输入一个,您可以:

  • runBlocking { /* your code here */ }:它将挂起当前线程,直到 lambda returns.
  • GlobalScope.launch { }:它将并行执行lambda;如果你的 main 完成执行而你的协程没有坏事会发生,在这种情况下最好使用 runBlocking.

希望对您有所帮助:)

此解决方案假定您的项目正在使用协程:

implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")

调用parallelTransform的函数不保留元素的顺序,return一个Flow<R>,而函数parallelMap保留顺序,return是一个List<R>.

为多次调用创建线程池:

val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
    Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()

使用该调度程序(并在不再需要时调用 close()):

inline fun <T, R> Iterable<T>.parallelTransform(
    dispatcher: ExecutorDispatcher,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    launch(dispatcher) {
        items.forEach {item ->
            launch {
                channelFlowScope.send(transform(item))
            }
        }
    }
}

如果不关心线程池重用(线程池不便宜),你可以使用这个版本:

inline fun <T, R> Iterable<T>.parallelTransform(
    numberOfThreads: Int,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
        launch( dispatcher ) {
            items.forEach { item ->
                launch {
                    channelFlowScope.send(transform(item))
                }
            }
        }
    }
}

如果您需要保留元素顺序的版本:

inline fun <T, R> Iterable<T>.parallelMap(
    dispatcher: ExecutorDispatcher,
    crossinline transform: (T) -> R
): List<R> = runBlocking {

    val items: Iterable<T> = this@parallelMap
    val result = ConcurrentSkipListMap<Int, R>()

    launch(dispatcher) {
        items.withIndex().forEach {(index, item) ->
            launch {
                result[index] = transform(item)
            }
        }
    }

    // ConcurrentSkipListMap is a SortedMap
    // so the values will be in the right order
    result.values.toList()
}

您可以使用此扩展方法:

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

有关详细信息,请参阅 Parallel Map in Kotlin

另一种我发现非常优雅的方法是这样的,使用 kotlinx.coroutines 库:

import kotlinx.coroutines.flow.asFlow

suspend fun process(myCollection: Iterable<Foo>) {
    myCollection.asFlow()
        .map { /* ... */ }
        .filter { /* ... */ }
        .collect { /* ... perform some side effect ... */ }
}

但是它确实需要额外的依赖; kotlinx.coroutines 不在标准库中。

我发现了这个:

实施'com.github.cvb941:kotlin-parallel-operations:1.3'

详情:

https://github.com/cvb941/kotlin-parallel-operations

我想出了几个扩展函数:

  1. Iterable<T> 类型的 suspend 扩展函数,它对项目进行并行处理,return 处理每个项目的一些结果。默认情况下,它使用 Dispatchers.IO 调度程序将阻塞任务卸载到共享线程池。必须从协程(包括带有 Dispatchers.Main 调度程序的协程)或另一个 suspend 函数调用。

    suspend fun <T, R> Iterable<T>.processInParallel(
        dispatcher: CoroutineDispatcher = Dispatchers.IO,
        processBlock: suspend (v: T) -> R,
    ): List<R> = coroutineScope { // or supervisorScope
        map {
            async(dispatcher) { processBlock(it) }
        }.awaitAll()
    }
    

    协程调用示例:

    val collection = listOf("A", "B", "C", "D", "E")
    
    someCoroutineScope.launch {
        val results = collection.processInParallel {
            process(it)
        }
        // use processing results
    }
    

其中 someCoroutineScopeCoroutineScope 的实例。

  1. CoroutineScope 上启动并忘记扩展功能,return 没有任何结果。它还默认使用 Dispatchers.IO 调度程序。可以使用 CoroutineScope 或从另一个协程调用。

    fun <T> CoroutineScope.processInParallelAndForget(
        iterable: Iterable<T>,
        dispatcher: CoroutineDispatcher = Dispatchers.IO,
        processBlock: suspend (v: T) -> Unit
    ) = iterable.forEach {
        launch(dispatcher) { processBlock(it) }
    }
    

    调用示例:

    someoroutineScope.processInParallelAndForget(collection) {
        process(it)
    }
    
    // OR from another coroutine:
    
    someCoroutineScope.launch {
        processInParallelAndForget(collection) {
            process(it)
        }
    }
    

2a。在 Iterable<T> 上启动并忘记扩展功能。它与以前几乎相同,但扩展类型不同。 CoroutineScope 必须作为参数传递给函数。

fun <T> Iterable<T>.processInParallelAndForget(
    scope: CoroutineScope,
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> Unit
) = forEach {
    scope.launch(dispatcher) { processBlock(it) }
}

通话中:

collection.processInParallelAndForget(someCoroutineScope) {
    process(it)
}

// OR from another coroutine:

someScope.launch {
    collection.processInParallelAndForget(this) {
        process(it)
    }
}