将 As HTTP 连接到 As stream

connect Akka HTTP to Akka stream

我想使用 Akka HTTP 构建一个连接到现有接收器(使用 Kafka 反应流)的 REST 服务,但我不知道如何将 HTTP 流链接到 Akka 流接收器...

我应该选择使用流的低级 Akka HTTP API 吗?

我的要求是:

这是我的代码当前代码

// flow to split group of lines into lines
  val splitLines = Flow[String].mapConcat(_.split("\n").toList)

// sink to produce kafka records in kafka
val kafkaSink = Flow[String]
    .map(new ProducerRecord[Array[Byte], String](topic, _))
    .toMat(Producer.plainSink(ProducerSettings(system,new ByteArraySerializer, new StringSerializer)))(Keep.right)

val routes = {
    path("ingest") {
      post {
        logger.info("starting ingestion")
        entity(as[GenericEvent]) { eventIngest =>
          ????      
        }~
        entity(as[GenericEventList]) { eventIngestList =>
          ????
        }
      }
    }
  }

Http(actorSystem).bindAndHandle(routes, config.httpInterface, config.httpPort)

有几种方法可以解决这个问题。一个建议可能是将数据直接从您的请求实体流式传输到您的 kafka 接收器中。 extractDataBytes 指令可以帮助您做到这一点(更多信息 here)。

按照下面的示例尝试一些操作。我添加了一个 ??? 流程,以允许您将特定于案例的转换正确地 split/transform 您的请求实体字节。您可以使用 Framing.delimiter 之类的方法来拆分实体字节流(更多信息 here)。

  (extractDataBytes & extractMaterializer) { (byteSrc, mat) =>
    val f = byteSrc.via(???).runWith(kafkaSink)(mat)
    onComplete(f){
      case Success(value) => complete(s"OK")
      case Failure(ex)    => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
    }
  }

或者,如果您想将您的实体解组为某个域对象,您可以执行类似

的操作
  (entity(as[Event]) & extractMaterializer) { (event, mat) =>
    val f = Source.single(event).via(???).runWith(kafkaSink)(mat)
    onComplete(f){
      case Success(value) => complete(s"OK")
      case Failure(ex)    => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
    }
  }

关于你的最后一个问题,如果 Kafka 背压,你的流将永远不会完成。您应该期望服务器在配置的请求超时后返回 500(引用下面的文档):

A default request timeout is applied globally to all routes and can be configured using the akka.http.server.request-timeout setting (which defaults to 20 seconds).