如何将Http管理到Akka Stream中并将消息发送到Kafka?
How to manage Http into Akka Stream and send the message to Kafka?
我从 Akka Streams 开始,我想构建一个服务器作为 Stream,接收 Http.IncomingConnection
并将接收到的消息作为 plainSink 发送给 Kafka。
我声明我的来源如下:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = "8080")
然后,我想从 HttpRequest 的正文中提取消息 (String),最后将其发送给 Kafka。流程如下所示:
val bindingFuture: Future[Http.ServerBinding] = serverSource
.map(???) //Here, I need to extract the message
.map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
.runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
但是,我不知道如何提取消息。我想做这样的事情:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
HttpResponse(202, entity = "Message sent to Kafka!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
但是,使用 connection handleWithSyncHandler requestHandler
我无法让消息跟随流处理。而且,我还想在 /publish
URI 或 return 404 下的其他情况下在流中获得任何请求。
可以这样做吗?
改用指令
Routing DSL 比尝试手动处理 HttpRequest
更容易使用:
val route : Route =
post {
path("publish") {
extractRequestEntity { entity =>
onComplete(entity.toStrict(10.seconds).map(_.data.utf8String){ message =>
Producer.plainSink(producerSettings)(
new ProducerRecord[String, String](topic, message.result(2 seconds))
)
complete(StatusCodes.OK)
}
}
}
}
现在可以传入以处理传入请求:
Http().bindAndHandle(
route,
"localhost",
8080
)
我从 Akka Streams 开始,我想构建一个服务器作为 Stream,接收 Http.IncomingConnection
并将接收到的消息作为 plainSink 发送给 Kafka。
我声明我的来源如下:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = "8080")
然后,我想从 HttpRequest 的正文中提取消息 (String),最后将其发送给 Kafka。流程如下所示:
val bindingFuture: Future[Http.ServerBinding] = serverSource
.map(???) //Here, I need to extract the message
.map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
.runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
但是,我不知道如何提取消息。我想做这样的事情:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
HttpResponse(202, entity = "Message sent to Kafka!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
但是,使用 connection handleWithSyncHandler requestHandler
我无法让消息跟随流处理。而且,我还想在 /publish
URI 或 return 404 下的其他情况下在流中获得任何请求。
可以这样做吗?
改用指令
Routing DSL 比尝试手动处理 HttpRequest
更容易使用:
val route : Route =
post {
path("publish") {
extractRequestEntity { entity =>
onComplete(entity.toStrict(10.seconds).map(_.data.utf8String){ message =>
Producer.plainSink(producerSettings)(
new ProducerRecord[String, String](topic, message.result(2 seconds))
)
complete(StatusCodes.OK)
}
}
}
}
现在可以传入以处理传入请求:
Http().bindAndHandle(
route,
"localhost",
8080
)