运行 作为并行流

running akka stream in parallel

我有一个流

  1. 侦听 HTTP post 接收事件列表
  2. mapconcat 流元素中的事件列表
  3. 转换kafka记录中的事件
  4. 用reactive kafka(akka stream kafka producer sink)生成记录

这里是简化代码

// flow to split group of lines into lines
  val splitLines = Flow[List[Evt]].mapConcat(list=>list)

// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
    .map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
    .toMat(Producer.plainSink(kafka))(Keep.right)

val routes = {
    path("ingest") {
      post {
        (entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
            val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
            val result = onComplete(ingest){
              case Success(value) => complete(s"OK")
              case Failure(ex)    => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
            }
            complete("eventList ingested: " + result)
          }
      }
    }
  }

你能强调一下什么是并行 运行 什么是顺序吗?

我认为 mapConcat 将流中的事件顺序化,那么我如何并行化流以便在 mapConcat 之后并行处理每个步骤?

一个简单的 mapAsyncUnordered 就足够了吗?或者我应该使用带有 Balance and Merge 的 GraphDSL 吗?

在你的情况下,我认为它将是连续的。在开始将数据推送到 Kafka 之前,您还会收到整个请求。我会使用 extractDataBytes 指令给你 src: Source[ByteString, Any]。然后我会像

一样处理它
src
  .via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String))
  .mapConcat { line =>
    line.split(",")
  }.async
  .runWith(kafkaSink)(mat)