为什么用 monix 或 akka-streams 将 class A 映射到 class B 这么慢?
Why mapping a class A to class B with monix or akka-streams is so slow?
我已经用 monix 和 akka-streams 对 List[ClassA] 到 List[ClassB] 的映射进行了基准测试,但我不明白为什么这么慢。
我尝试了不同的映射方式,这是 JMH 的结果:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 742,626 â–’ 4,853 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 480,460 â–’ 8,493 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 331,398 â–’ 10,490 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 713,500 â–’ 7,394 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 313,275 â–’ 8,716 ms/op
[info] MappingBenchmark.map ss 20 0,567 â–’ 0,175 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 259,736 â–’ 5,939 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 456,310 â–’ 5,225 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 795,345 â–’ 5,443 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 247,172 â–’ 5,342 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 478,840 â–’ 25,249 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 6,707 â–’ 2,176 ms/op
[info] MappingBenchmark.parMap ss 20 1,257 â–’ 0,831 ms/op
代码如下:
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: List[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
@Benchmark
def map: List[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: List[ClassB] = {
val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: List[ClassB] = {
val task: Task[List[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = {
val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
当初始集合较大时,基准结果变得更差。
我想知道我的错误是什么。
感谢您分享您的知识!
此致
我已经更新了代码,板凳真的比以前更好了。差异与 List 运算符有关。事实上,第一个版本使用的是 append 而不是 preprend。由于 List 是一个链表,它必须遍历元素才能添加新元素。由于懒惰,我想使用 _ 运算符,但我不应该使用。
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.collection.immutable.Seq
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
def foldClassB = (l:List[ClassB], o:ClassB) => o +: l
@Benchmark
def map: Seq[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: Seq[ClassB] = {
val task: Task[Seq[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: Seq[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: Seq[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapSeq: Seq[ClassB] = {
val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncSeq: Seq[ClassB] = {
val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = {
val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMapSeq: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = {
val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
此更新 class 的结果是:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 19,052 â–’ 3,779 ms/op
[info] MappingBenchmark.akkaLoadBalanceMapSeq ss 20 16,115 â–’ 3,232 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 20,862 â–’ 3,127 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 26,994 â–’ 4,010 ms/op
[info] MappingBenchmark.akkaMapAsyncSeq ss 20 19,399 â–’ 7,089 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 12,132 â–’ 4,111 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 22,652 â–’ 3,802 ms/op
[info] MappingBenchmark.akkaMapSeq ss 20 10,894 â–’ 3,114 ms/op
[info] MappingBenchmark.map ss 20 0,625 â–’ 0,193 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 9,175 â–’ 4,080 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 11,724 â–’ 4,458 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 14,174 â–’ 6,962 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 1,057 â–’ 0,960 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 9,638 â–’ 4,910 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 7,065 â–’ 2,428 ms/op
[info] MappingBenchmark.parMap ss 20 1,392 â–’ 0,923 ms/op
如果我们可以在 运行 流之前使用 scala 进行映射,似乎仍然更快。
关于异步处理/并行性的注意事项...通常,在并行处理内容时,您最终会遇到大量 CPU 绑定的开销来同步结果。
开销实际上可能非常大,以至于它可以抵消您从多个 CPU 内核并行工作中获得的时间收益。
您还应该熟悉 Amdahl's Law。看看这些数字:在 75% 的并行部分,您仅用 4 个处理器就可以达到最大加速。并行部分为 50%,您仅需 2 个处理器即可达到最大加速。
这只是理论上的限制,因为处理器之间还有共享内存同步,这可能会变得非常混乱;基本上,处理器针对顺序执行进行了优化。引入并发问题,您需要使用内存屏障强制排序,这会使许多 CPU 优化无效。因此,您可以达到负加速,正如您实际在测试中看到的那样。
所以你在测试异步/并行映射,但测试基本上什么都不做,还不如用身份函数测试,这几乎是一回事。换句话说,您正在进行的测试及其结果几乎在实践中毫无用处。
附带说明一下,这也是我从不喜欢 "parallel collections" 这个想法的原因。这个概念是有缺陷的,因为你只能将并行集合用于纯粹的 CPU 绑定的东西(即没有 I/O,没有实际的异步东西),这可以说它可以进行一些计算,除了那:
- 出于许多目的,并行集合的使用比使用单个 CPU 和
的普通运算符慢
- 如果你确实有 CPU 的工作并且你需要最大限度地使用你的硬件资源,那么 "parallel collections" 在他们当前的化身中实际上是错误的抽象,因为 "hardware" 这些天包括 GPU
换句话说,并行集合没有有效地使用硬件资源,因为它们完全忽略了 GPU 支持并且完全不适合混合 CPU - I/O 任务,因为它们缺乏异步支持。
我觉得有必要提及这一点,因为人们常常认为在他们的代码上擦一些“并行”精灵尘会使它 运行 更快,但很多有时不会。
当你有 I/O 绑定的任务(当然混合了 CPU 绑定的任务)时,并行性非常有效,在这种情况下,CPU 开销就不那么重要了,因为处理时间将由 I/O.
决定
PS:Scala 集合的简单映射应该更快,因为它是严格的并且(取决于集合类型)它使用数组支持的缓冲区,因此不会丢弃 CPU 缓存。 Monix 的 .map
与 Scala 的 Iterable.map
具有相同的开销,或者说接近于零的开销,但是它的应用程序是惰性的并且引入了一些我们无法摆脱的装箱开销,因为 JVM 没有'专攻泛型。
虽然在实践中速度非常快;-)
我已经用 monix 和 akka-streams 对 List[ClassA] 到 List[ClassB] 的映射进行了基准测试,但我不明白为什么这么慢。
我尝试了不同的映射方式,这是 JMH 的结果:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 742,626 â–’ 4,853 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 480,460 â–’ 8,493 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 331,398 â–’ 10,490 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 713,500 â–’ 7,394 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 313,275 â–’ 8,716 ms/op
[info] MappingBenchmark.map ss 20 0,567 â–’ 0,175 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 259,736 â–’ 5,939 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 456,310 â–’ 5,225 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 795,345 â–’ 5,443 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 247,172 â–’ 5,342 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 478,840 â–’ 25,249 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 6,707 â–’ 2,176 ms/op
[info] MappingBenchmark.parMap ss 20 1,257 â–’ 0,831 ms/op
代码如下:
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: List[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
@Benchmark
def map: List[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: List[ClassB] = {
val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: List[ClassB] = {
val task: Task[List[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = {
val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
当初始集合较大时,基准结果变得更差。
我想知道我的错误是什么。
感谢您分享您的知识!
此致
我已经更新了代码,板凳真的比以前更好了。差异与 List 运算符有关。事实上,第一个版本使用的是 append 而不是 preprend。由于 List 是一个链表,它必须遍历元素才能添加新元素。由于懒惰,我想使用 _ 运算符,但我不应该使用。
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.collection.immutable.Seq
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
def foldClassB = (l:List[ClassB], o:ClassB) => o +: l
@Benchmark
def map: Seq[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: Seq[ClassB] = {
val task: Task[Seq[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: Seq[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: Seq[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapSeq: Seq[ClassB] = {
val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncSeq: Seq[ClassB] = {
val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = {
val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMapSeq: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = {
val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
此更新 class 的结果是:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 19,052 â–’ 3,779 ms/op
[info] MappingBenchmark.akkaLoadBalanceMapSeq ss 20 16,115 â–’ 3,232 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 20,862 â–’ 3,127 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 26,994 â–’ 4,010 ms/op
[info] MappingBenchmark.akkaMapAsyncSeq ss 20 19,399 â–’ 7,089 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 12,132 â–’ 4,111 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 22,652 â–’ 3,802 ms/op
[info] MappingBenchmark.akkaMapSeq ss 20 10,894 â–’ 3,114 ms/op
[info] MappingBenchmark.map ss 20 0,625 â–’ 0,193 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 9,175 â–’ 4,080 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 11,724 â–’ 4,458 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 14,174 â–’ 6,962 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 1,057 â–’ 0,960 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 9,638 â–’ 4,910 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 7,065 â–’ 2,428 ms/op
[info] MappingBenchmark.parMap ss 20 1,392 â–’ 0,923 ms/op
如果我们可以在 运行 流之前使用 scala 进行映射,似乎仍然更快。
关于异步处理/并行性的注意事项...通常,在并行处理内容时,您最终会遇到大量 CPU 绑定的开销来同步结果。
开销实际上可能非常大,以至于它可以抵消您从多个 CPU 内核并行工作中获得的时间收益。
您还应该熟悉 Amdahl's Law。看看这些数字:在 75% 的并行部分,您仅用 4 个处理器就可以达到最大加速。并行部分为 50%,您仅需 2 个处理器即可达到最大加速。
这只是理论上的限制,因为处理器之间还有共享内存同步,这可能会变得非常混乱;基本上,处理器针对顺序执行进行了优化。引入并发问题,您需要使用内存屏障强制排序,这会使许多 CPU 优化无效。因此,您可以达到负加速,正如您实际在测试中看到的那样。
所以你在测试异步/并行映射,但测试基本上什么都不做,还不如用身份函数测试,这几乎是一回事。换句话说,您正在进行的测试及其结果几乎在实践中毫无用处。
附带说明一下,这也是我从不喜欢 "parallel collections" 这个想法的原因。这个概念是有缺陷的,因为你只能将并行集合用于纯粹的 CPU 绑定的东西(即没有 I/O,没有实际的异步东西),这可以说它可以进行一些计算,除了那:
- 出于许多目的,并行集合的使用比使用单个 CPU 和 的普通运算符慢
- 如果你确实有 CPU 的工作并且你需要最大限度地使用你的硬件资源,那么 "parallel collections" 在他们当前的化身中实际上是错误的抽象,因为 "hardware" 这些天包括 GPU
换句话说,并行集合没有有效地使用硬件资源,因为它们完全忽略了 GPU 支持并且完全不适合混合 CPU - I/O 任务,因为它们缺乏异步支持。
我觉得有必要提及这一点,因为人们常常认为在他们的代码上擦一些“并行”精灵尘会使它 运行 更快,但很多有时不会。
当你有 I/O 绑定的任务(当然混合了 CPU 绑定的任务)时,并行性非常有效,在这种情况下,CPU 开销就不那么重要了,因为处理时间将由 I/O.
决定PS:Scala 集合的简单映射应该更快,因为它是严格的并且(取决于集合类型)它使用数组支持的缓冲区,因此不会丢弃 CPU 缓存。 Monix 的 .map
与 Scala 的 Iterable.map
具有相同的开销,或者说接近于零的开销,但是它的应用程序是惰性的并且引入了一些我们无法摆脱的装箱开销,因为 JVM 没有'专攻泛型。
虽然在实践中速度非常快;-)