用于发布数据的 Akka 流需要一个图形而不是一个流入的流
Akka stream for posting data expects a graph not a flow in via
我正在尝试创建一个示例 akka 流,它采用 CSV 文件,将其更改为 XML(使用具有 toXml 函数的现有对象),然后将其发布到端点。我创建的代码如下所示:-
val poolClientFlow =
Http().cachedHostConnectionPool[Thing]("localhost",5000)
val file = new File("./example.csv")
val uploadPipeline =
FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.map(_.split(","))
.map(t => Thing(t(0),t(1).toInt,t(2).toInt) )
.map(_.toXml)
.map(_.toString)
.map(ByteString(_))
.map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d))
.via(poolClientFlow)
.runForeach(x => System.out.println(x.toString()))
但是它没有编译为对 .via(poolClientFlow)
的调用,因为它发现了一个 akka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse],
但这个版本的 via 需要一个 akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?]
.
我想我没有正确构建我的 poolClientFlow
,但我看不出我所做的与我在其他示例代码中看到的有什么区别。有人可以帮忙吗?
来自 cachedHostConnectionPool[T]
的流采用 (HttpRequest,T)
的元组,这使您可以在最终获得结果 (Try[HttpResponse], T)
时保留请求的某些上下文。如果您不需要,只需传入 Unit ()
.
我不确定是否有 API 接受请求。
要编译您的示例,您可以..
.map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d))
.via(Http().cachedHostConnectionPool[ByteString]("localhost",5000))
此外,如果我在您所在的位置,那段代码中不会有那么多 .map
。您的序列化不会阻塞,也不需要所有这些步骤之间的背压。我有一个纯函数 def write(t: Thing): HttpRequest
。但这没什么大不了的...
我正在尝试创建一个示例 akka 流,它采用 CSV 文件,将其更改为 XML(使用具有 toXml 函数的现有对象),然后将其发布到端点。我创建的代码如下所示:-
val poolClientFlow =
Http().cachedHostConnectionPool[Thing]("localhost",5000)
val file = new File("./example.csv")
val uploadPipeline =
FileIO.fromFile(file)
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.map(_.split(","))
.map(t => Thing(t(0),t(1).toInt,t(2).toInt) )
.map(_.toXml)
.map(_.toString)
.map(ByteString(_))
.map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d))
.via(poolClientFlow)
.runForeach(x => System.out.println(x.toString()))
但是它没有编译为对 .via(poolClientFlow)
的调用,因为它发现了一个 akka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse],
但这个版本的 via 需要一个 akka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?]
.
我想我没有正确构建我的 poolClientFlow
,但我看不出我所做的与我在其他示例代码中看到的有什么区别。有人可以帮忙吗?
来自 cachedHostConnectionPool[T]
的流采用 (HttpRequest,T)
的元组,这使您可以在最终获得结果 (Try[HttpResponse], T)
时保留请求的某些上下文。如果您不需要,只需传入 Unit ()
.
我不确定是否有 API 接受请求。
要编译您的示例,您可以..
.map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d))
.via(Http().cachedHostConnectionPool[ByteString]("localhost",5000))
此外,如果我在您所在的位置,那段代码中不会有那么多 .map
。您的序列化不会阻塞,也不需要所有这些步骤之间的背压。我有一个纯函数 def write(t: Thing): HttpRequest
。但这没什么大不了的...