Kotlin 集合的并行操作?
Parallel operations on Kotlin collections?
在 Scala 中,可以很容易地做一个并行映射,forEach 等,使用:
collection.par.map(..)
Kotlin 中是否有等效项?
目前没有。 Kotlin 与 Scala 的官方比较提到:
Things that may be added to Kotlin later:
- Parallel collections
Kotlin 标准库不支持并行操作。但是,由于 Kotlin 使用标准 Java 集合 类,您也可以使用 Java 8 流 API 对 Kotlin 集合执行并行操作。
例如
myCollection.parallelStream()
.map { ... }
.filter { ... }
Kotlin 的标准库还没有官方支持,但是你可以定义一个 extension function 来模仿 par.map
:
fun <T, R> Iterable<T>.pmap(
numThreads: Int = Runtime.getRuntime().availableProcessors() - 2,
exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
transform: (T) -> R): List<R> {
// default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
val defaultSize = if (this is Collection<*>) this.size else 10
val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))
for (item in this) {
exec.submit { destination.add(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
return ArrayList<R>(destination)
}
这是一个简单的用法示例
val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
如果需要,它允许通过提供线程数甚至特定 java.util.concurrent.Executor
来调整线程。例如
listOf("foo", "bar").pmap(4, transform = { it + "!" })
请注意,此方法仅允许并行化 map
操作,不会影响任何下游位。例如。第一个示例中的 filter
将 运行 单线程。然而,在许多情况下,只有数据转换(即 map
)需要并行化。此外,将上面的方法扩展到 Kotlin 集合的其他元素会很简单 API.
从 Kotlin 1.1 开始,并行操作也可以用 coroutines 来非常优雅地表达。这是列表中的 pmap
:
fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
map { async(CommonPool) { f(it) } }.map { it.await() }
}
请注意,协程仍是一项实验性功能。
从 1.2 版本开始,kotlin 添加了一个 stream feature 兼容 JRE8
因此,异步遍历列表可以像下面这样完成:
fun main(args: Array<String>) {
val c = listOf("toto", "tata", "tutu")
c.parallelStream().forEach { println(it) }
}
Kotlin 希望是地道的,但又不想过于综合以至于乍一看难以理解。
通过协程进行并行计算也不例外。他们希望它简单但不隐含一些预构建的方法,允许在需要时分支计算。
你的情况:
collection.map {
async{ produceWith(it) }
}
.forEach {
consume(it.await())
}
请注意,要调用 async
和 await
,您需要在所谓的 Context
中,您不能挂起调用或从非协程上下文启动协程。要输入一个,您可以:
runBlocking { /* your code here */ }
:它将挂起当前线程,直到 lambda returns.
GlobalScope.launch { }
:它将并行执行lambda;如果你的 main
完成执行而你的协程没有坏事会发生,在这种情况下最好使用 runBlocking
.
希望对您有所帮助:)
此解决方案假定您的项目正在使用协程:
implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")
调用parallelTransform
的函数不保留元素的顺序,return一个Flow<R>
,而函数parallelMap
保留顺序,return是一个List<R>
.
为多次调用创建线程池:
val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()
使用该调度程序(并在不再需要时调用 close()
):
inline fun <T, R> Iterable<T>.parallelTransform(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
launch(dispatcher) {
items.forEach {item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
如果不关心线程池重用(线程池不便宜),你可以使用这个版本:
inline fun <T, R> Iterable<T>.parallelTransform(
numberOfThreads: Int,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
launch( dispatcher ) {
items.forEach { item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
}
如果您需要保留元素顺序的版本:
inline fun <T, R> Iterable<T>.parallelMap(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): List<R> = runBlocking {
val items: Iterable<T> = this@parallelMap
val result = ConcurrentSkipListMap<Int, R>()
launch(dispatcher) {
items.withIndex().forEach {(index, item) ->
launch {
result[index] = transform(item)
}
}
}
// ConcurrentSkipListMap is a SortedMap
// so the values will be in the right order
result.values.toList()
}
您可以使用此扩展方法:
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
有关详细信息,请参阅 Parallel Map in Kotlin
另一种我发现非常优雅的方法是这样的,使用 kotlinx.coroutines 库:
import kotlinx.coroutines.flow.asFlow
suspend fun process(myCollection: Iterable<Foo>) {
myCollection.asFlow()
.map { /* ... */ }
.filter { /* ... */ }
.collect { /* ... perform some side effect ... */ }
}
但是它确实需要额外的依赖; kotlinx.coroutines
不在标准库中。
我发现了这个:
实施'com.github.cvb941:kotlin-parallel-operations:1.3'
详情:
我想出了几个扩展函数:
Iterable<T>
类型的 suspend
扩展函数,它对项目进行并行处理,return 处理每个项目的一些结果。默认情况下,它使用 Dispatchers.IO
调度程序将阻塞任务卸载到共享线程池。必须从协程(包括带有 Dispatchers.Main
调度程序的协程)或另一个 suspend
函数调用。
suspend fun <T, R> Iterable<T>.processInParallel(
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> R,
): List<R> = coroutineScope { // or supervisorScope
map {
async(dispatcher) { processBlock(it) }
}.awaitAll()
}
协程调用示例:
val collection = listOf("A", "B", "C", "D", "E")
someCoroutineScope.launch {
val results = collection.processInParallel {
process(it)
}
// use processing results
}
其中 someCoroutineScope
是 CoroutineScope
的实例。
在 CoroutineScope
上启动并忘记扩展功能,return 没有任何结果。它还默认使用 Dispatchers.IO
调度程序。可以使用 CoroutineScope
或从另一个协程调用。
fun <T> CoroutineScope.processInParallelAndForget(
iterable: Iterable<T>,
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> Unit
) = iterable.forEach {
launch(dispatcher) { processBlock(it) }
}
调用示例:
someoroutineScope.processInParallelAndForget(collection) {
process(it)
}
// OR from another coroutine:
someCoroutineScope.launch {
processInParallelAndForget(collection) {
process(it)
}
}
2a。在 Iterable<T>
上启动并忘记扩展功能。它与以前几乎相同,但扩展类型不同。 CoroutineScope
必须作为参数传递给函数。
fun <T> Iterable<T>.processInParallelAndForget(
scope: CoroutineScope,
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> Unit
) = forEach {
scope.launch(dispatcher) { processBlock(it) }
}
通话中:
collection.processInParallelAndForget(someCoroutineScope) {
process(it)
}
// OR from another coroutine:
someScope.launch {
collection.processInParallelAndForget(this) {
process(it)
}
}
在 Scala 中,可以很容易地做一个并行映射,forEach 等,使用:
collection.par.map(..)
Kotlin 中是否有等效项?
目前没有。 Kotlin 与 Scala 的官方比较提到:
Things that may be added to Kotlin later:
- Parallel collections
Kotlin 标准库不支持并行操作。但是,由于 Kotlin 使用标准 Java 集合 类,您也可以使用 Java 8 流 API 对 Kotlin 集合执行并行操作。
例如
myCollection.parallelStream()
.map { ... }
.filter { ... }
Kotlin 的标准库还没有官方支持,但是你可以定义一个 extension function 来模仿 par.map
:
fun <T, R> Iterable<T>.pmap(
numThreads: Int = Runtime.getRuntime().availableProcessors() - 2,
exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
transform: (T) -> R): List<R> {
// default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
val defaultSize = if (this is Collection<*>) this.size else 10
val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))
for (item in this) {
exec.submit { destination.add(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
return ArrayList<R>(destination)
}
这是一个简单的用法示例
val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
如果需要,它允许通过提供线程数甚至特定 java.util.concurrent.Executor
来调整线程。例如
listOf("foo", "bar").pmap(4, transform = { it + "!" })
请注意,此方法仅允许并行化 map
操作,不会影响任何下游位。例如。第一个示例中的 filter
将 运行 单线程。然而,在许多情况下,只有数据转换(即 map
)需要并行化。此外,将上面的方法扩展到 Kotlin 集合的其他元素会很简单 API.
从 Kotlin 1.1 开始,并行操作也可以用 coroutines 来非常优雅地表达。这是列表中的 pmap
:
fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
map { async(CommonPool) { f(it) } }.map { it.await() }
}
请注意,协程仍是一项实验性功能。
从 1.2 版本开始,kotlin 添加了一个 stream feature 兼容 JRE8
因此,异步遍历列表可以像下面这样完成:
fun main(args: Array<String>) {
val c = listOf("toto", "tata", "tutu")
c.parallelStream().forEach { println(it) }
}
Kotlin 希望是地道的,但又不想过于综合以至于乍一看难以理解。
通过协程进行并行计算也不例外。他们希望它简单但不隐含一些预构建的方法,允许在需要时分支计算。
你的情况:
collection.map {
async{ produceWith(it) }
}
.forEach {
consume(it.await())
}
请注意,要调用 async
和 await
,您需要在所谓的 Context
中,您不能挂起调用或从非协程上下文启动协程。要输入一个,您可以:
runBlocking { /* your code here */ }
:它将挂起当前线程,直到 lambda returns.GlobalScope.launch { }
:它将并行执行lambda;如果你的main
完成执行而你的协程没有坏事会发生,在这种情况下最好使用runBlocking
.
希望对您有所帮助:)
此解决方案假定您的项目正在使用协程:
implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")
调用parallelTransform
的函数不保留元素的顺序,return一个Flow<R>
,而函数parallelMap
保留顺序,return是一个List<R>
.
为多次调用创建线程池:
val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()
使用该调度程序(并在不再需要时调用 close()
):
inline fun <T, R> Iterable<T>.parallelTransform(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
launch(dispatcher) {
items.forEach {item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
如果不关心线程池重用(线程池不便宜),你可以使用这个版本:
inline fun <T, R> Iterable<T>.parallelTransform(
numberOfThreads: Int,
crossinline transform: (T) -> R
): Flow<R> = channelFlow {
val items: Iterable<T> = this@parallelTransform
val channelFlowScope: ProducerScope<R> = this@channelFlow
Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
launch( dispatcher ) {
items.forEach { item ->
launch {
channelFlowScope.send(transform(item))
}
}
}
}
}
如果您需要保留元素顺序的版本:
inline fun <T, R> Iterable<T>.parallelMap(
dispatcher: ExecutorDispatcher,
crossinline transform: (T) -> R
): List<R> = runBlocking {
val items: Iterable<T> = this@parallelMap
val result = ConcurrentSkipListMap<Int, R>()
launch(dispatcher) {
items.withIndex().forEach {(index, item) ->
launch {
result[index] = transform(item)
}
}
}
// ConcurrentSkipListMap is a SortedMap
// so the values will be in the right order
result.values.toList()
}
您可以使用此扩展方法:
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async { f(it) } }.awaitAll()
}
有关详细信息,请参阅 Parallel Map in Kotlin
另一种我发现非常优雅的方法是这样的,使用 kotlinx.coroutines 库:
import kotlinx.coroutines.flow.asFlow
suspend fun process(myCollection: Iterable<Foo>) {
myCollection.asFlow()
.map { /* ... */ }
.filter { /* ... */ }
.collect { /* ... perform some side effect ... */ }
}
但是它确实需要额外的依赖; kotlinx.coroutines
不在标准库中。
我发现了这个:
实施'com.github.cvb941:kotlin-parallel-operations:1.3'
详情:
我想出了几个扩展函数:
Iterable<T>
类型的suspend
扩展函数,它对项目进行并行处理,return 处理每个项目的一些结果。默认情况下,它使用Dispatchers.IO
调度程序将阻塞任务卸载到共享线程池。必须从协程(包括带有Dispatchers.Main
调度程序的协程)或另一个suspend
函数调用。suspend fun <T, R> Iterable<T>.processInParallel( dispatcher: CoroutineDispatcher = Dispatchers.IO, processBlock: suspend (v: T) -> R, ): List<R> = coroutineScope { // or supervisorScope map { async(dispatcher) { processBlock(it) } }.awaitAll() }
协程调用示例:
val collection = listOf("A", "B", "C", "D", "E") someCoroutineScope.launch { val results = collection.processInParallel { process(it) } // use processing results }
其中 someCoroutineScope
是 CoroutineScope
的实例。
在
CoroutineScope
上启动并忘记扩展功能,return 没有任何结果。它还默认使用Dispatchers.IO
调度程序。可以使用CoroutineScope
或从另一个协程调用。fun <T> CoroutineScope.processInParallelAndForget( iterable: Iterable<T>, dispatcher: CoroutineDispatcher = Dispatchers.IO, processBlock: suspend (v: T) -> Unit ) = iterable.forEach { launch(dispatcher) { processBlock(it) } }
调用示例:
someoroutineScope.processInParallelAndForget(collection) { process(it) } // OR from another coroutine: someCoroutineScope.launch { processInParallelAndForget(collection) { process(it) } }
2a。在 Iterable<T>
上启动并忘记扩展功能。它与以前几乎相同,但扩展类型不同。 CoroutineScope
必须作为参数传递给函数。
fun <T> Iterable<T>.processInParallelAndForget(
scope: CoroutineScope,
dispatcher: CoroutineDispatcher = Dispatchers.IO,
processBlock: suspend (v: T) -> Unit
) = forEach {
scope.launch(dispatcher) { processBlock(it) }
}
通话中:
collection.processInParallelAndForget(someCoroutineScope) {
process(it)
}
// OR from another coroutine:
someScope.launch {
collection.processInParallelAndForget(this) {
process(it)
}
}