运行 作为并行流
running akka stream in parallel
我有一个流
- 侦听 HTTP post 接收事件列表
- mapconcat 流元素中的事件列表
- 转换kafka记录中的事件
- 用reactive kafka(akka stream kafka producer sink)生成记录
这里是简化代码
// flow to split group of lines into lines
val splitLines = Flow[List[Evt]].mapConcat(list=>list)
// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
.map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
.toMat(Producer.plainSink(kafka))(Keep.right)
val routes = {
path("ingest") {
post {
(entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
val result = onComplete(ingest){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
complete("eventList ingested: " + result)
}
}
}
}
你能强调一下什么是并行 运行 什么是顺序吗?
我认为 mapConcat 将流中的事件顺序化,那么我如何并行化流以便在 mapConcat 之后并行处理每个步骤?
一个简单的 mapAsyncUnordered 就足够了吗?或者我应该使用带有 Balance and Merge 的 GraphDSL 吗?
在你的情况下,我认为它将是连续的。在开始将数据推送到 Kafka 之前,您还会收到整个请求。我会使用 extractDataBytes
指令给你 src: Source[ByteString, Any]
。然后我会像
一样处理它
src
.via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String))
.mapConcat { line =>
line.split(",")
}.async
.runWith(kafkaSink)(mat)
我有一个流
- 侦听 HTTP post 接收事件列表
- mapconcat 流元素中的事件列表
- 转换kafka记录中的事件
- 用reactive kafka(akka stream kafka producer sink)生成记录
这里是简化代码
// flow to split group of lines into lines
val splitLines = Flow[List[Evt]].mapConcat(list=>list)
// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
.map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
.toMat(Producer.plainSink(kafka))(Keep.right)
val routes = {
path("ingest") {
post {
(entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
val result = onComplete(ingest){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
complete("eventList ingested: " + result)
}
}
}
}
你能强调一下什么是并行 运行 什么是顺序吗?
我认为 mapConcat 将流中的事件顺序化,那么我如何并行化流以便在 mapConcat 之后并行处理每个步骤?
一个简单的 mapAsyncUnordered 就足够了吗?或者我应该使用带有 Balance and Merge 的 GraphDSL 吗?
在你的情况下,我认为它将是连续的。在开始将数据推送到 Kafka 之前,您还会收到整个请求。我会使用 extractDataBytes
指令给你 src: Source[ByteString, Any]
。然后我会像
src
.via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String))
.mapConcat { line =>
line.split(",")
}.async
.runWith(kafkaSink)(mat)