将 Akka HTTP 直接连接到 Akka 流

Connecting Akka HTTP directly to an Akka Stream Flow

我已经查看了 https://doc.akka.io/docs/akka-http/current/introduction.html 中关于 Akka HTTP 路由的示例,奇怪的是,对于构建在 Akka Streams 之上的东西,none 示例连接到一个流。

有人可以展示一个创建 Java DSL 流程(请不要使用 Scala),然后将路由直接连接到该流程的简单示例吗?

或者我错过了重点,这是不可能的,但需要路由中的一些 CompletionStage 代码来等待调用流程的粘合代码的结果?

编辑:为了阐明流程,可以执行一些操作,例如将字符串附加到已发布的请求正文。

用akka streams完成一条路由是绝对可以的。它涉及:

  • 网络套接字路由,请参阅文档中的examples,或
  • 分块的 http 响应(因为如果响应是从流中馈送的,您通常不知道响应的大小)。您可以 create a Chunked Entity 来自 akka 流 Source of ByteStrings
  • 如果事先知道响应大小,您也可以使用其他响应类型,请参阅 HttpEntity 文档了解它们的细节

Edit: to clarify the flow can do something like append a string to a posted request body.

Michael 的回答包含很好的链接,所以请阅读一下。 Akka HTTP 在默认情况下始终随其数据流式传输——例如实体。因此,例如要进行同时添加后缀的流式处理 "echo",您可以这样做:

path("test", () ->
  // extract the request entity, it contains the streamed entity as `getDataBytes`
  extractRequestEntity(requestEntity -> {

  // prepare what to add as suffix to the incoming entity stream:
  Source<ByteString, NotUsed> suffixSource = 
    Source.single(ByteString.fromString("\n\nADDS THIS AFTER INCOMING ENTITY"))

  // concat the suffix stream to the incoming entity stream
  Source<ByteString, Object> replySource = requestEntity.getDataBytes()
    .concat(suffixSource);

    // prepare and return the entity:
    HttpEntity.Chunked replyEntity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, replySource);
    return complete(StatusCodes.OK, replyEntity);
  })
);

话虽如此,有多种方法可以利用流媒体功能,包括 JSON Streaming and more. You should also give the docs page about implications of streaming 帧读取。