用于发布数据的 Akka 流需要一个图形而不是一个流入的流

Akka stream for posting data expects a graph not a flow in via

我正在尝试创建一个示例 akka 流,它采用 CSV 文件,将其更改为 XML(使用具有 toXml 函数的现有对象),然后将其发布到端点。我创建的代码如下所示:-

val poolClientFlow =
  Http().cachedHostConnectionPool[Thing]("localhost",5000)

val file = new File("./example.csv")

val uploadPipeline =
  FileIO.fromFile(file)
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
    .map(_.utf8String)
    .map(_.split(","))
    .map(t => Thing(t(0),t(1).toInt,t(2).toInt) )
    .map(_.toXml)
    .map(_.toString)
    .map(ByteString(_))
    .map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d))
    .via(poolClientFlow)
    .runForeach(x => System.out.println(x.toString()))

但是它没有编译为对 .via(poolClientFlow) 的调用,因为它发现了一个 akka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse], 但这个版本的 via 需要一个 akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?].

我想我没有正确构建我的 poolClientFlow,但我看不出我所做的与我在其他示例代码中看到的有什么区别。有人可以帮忙吗?

来自 cachedHostConnectionPool[T] 的流采用 (HttpRequest,T) 的元组,这使您可以在最终获得结果 (Try[HttpResponse], T) 时保留请求的某些上下文。如果您不需要,只需传入 Unit ().

我不确定是否有 API 接受请求。

要编译您的示例,您可以..

.map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d))
.via(Http().cachedHostConnectionPool[ByteString]("localhost",5000))

此外,如果我在您所在的位置,那段代码中不会有那么多 .map。您的序列化不会阻塞,也不需要所有这些步骤之间的背压。我有一个纯函数 def write(t: Thing): HttpRequest。但这没什么大不了的...