将 As HTTP 连接到 As stream
connect Akka HTTP to Akka stream
我想使用 Akka HTTP 构建一个连接到现有接收器(使用 Kafka 反应流)的 REST 服务,但我不知道如何将 HTTP 流链接到 Akka 流接收器...
我应该选择使用流的低级 Akka HTTP API 吗?
我的要求是:
- 整个流程的背压
- kafka sink确认所有事件时的200响应码
- 500 背压过高时?可能吗 ?
这是我的代码当前代码
// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)
// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)
val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}
Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)
有几种方法可以解决这个问题。一个建议可能是将数据直接从您的请求实体流式传输到您的 kafka 接收器中。 extractDataBytes
指令可以帮助您做到这一点(更多信息 here)。
按照下面的示例尝试一些操作。我添加了一个 ???
流程,以允许您将特定于案例的转换正确地 split/transform 您的请求实体字节。您可以使用 Framing.delimiter
之类的方法来拆分实体字节流(更多信息 here)。
(extractDataBytes & extractMaterializer) { (byteSrc, mat) =>
val f = byteSrc.via(???).runWith(kafkaSink)(mat)
onComplete(f){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}
或者,如果您想将您的实体解组为某个域对象,您可以执行类似
的操作
(entity(as[Event]) & extractMaterializer) { (event, mat) =>
val f = Source.single(event).via(???).runWith(kafkaSink)(mat)
onComplete(f){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}
关于你的最后一个问题,如果 Kafka 背压,你的流将永远不会完成。您应该期望服务器在配置的请求超时后返回 500(引用下面的文档):
A default request timeout is applied globally to all routes and can be
configured using the akka.http.server.request-timeout setting (which
defaults to 20 seconds).
我想使用 Akka HTTP 构建一个连接到现有接收器(使用 Kafka 反应流)的 REST 服务,但我不知道如何将 HTTP 流链接到 Akka 流接收器...
我应该选择使用流的低级 Akka HTTP API 吗?
我的要求是:
- 整个流程的背压
- kafka sink确认所有事件时的200响应码
- 500 背压过高时?可能吗 ?
这是我的代码当前代码
// flow to split group of lines into lines
val splitLines = Flow[String].mapConcat(_.split("\n").toList)
// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
.map(new ProducerRecord[Array[Byte], String](topic, _))
.toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)
val routes = {
path("ingest") {
post {
logger.info("starting ingestion")
entity(as[GenericEvent]) { eventIngest =>
????
}~
entity(as[GenericEventList]) { eventIngestList =>
????
}
}
}
}
Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)
有几种方法可以解决这个问题。一个建议可能是将数据直接从您的请求实体流式传输到您的 kafka 接收器中。 extractDataBytes
指令可以帮助您做到这一点(更多信息 here)。
按照下面的示例尝试一些操作。我添加了一个 ???
流程,以允许您将特定于案例的转换正确地 split/transform 您的请求实体字节。您可以使用 Framing.delimiter
之类的方法来拆分实体字节流(更多信息 here)。
(extractDataBytes & extractMaterializer) { (byteSrc, mat) =>
val f = byteSrc.via(???).runWith(kafkaSink)(mat)
onComplete(f){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}
或者,如果您想将您的实体解组为某个域对象,您可以执行类似
的操作 (entity(as[Event]) & extractMaterializer) { (event, mat) =>
val f = Source.single(event).via(???).runWith(kafkaSink)(mat)
onComplete(f){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
}
关于你的最后一个问题,如果 Kafka 背压,你的流将永远不会完成。您应该期望服务器在配置的请求超时后返回 500(引用下面的文档):
A default request timeout is applied globally to all routes and can be configured using the akka.http.server.request-timeout setting (which defaults to 20 seconds).