如何将结果从一个源流传递到另一个源流
How to pass results from one source stream to another
我有一个处理 Source
和 return 的方法。我正在尝试修改它,但似乎无法 return 同样的事情:
原创
def originalMethod[as: AS, mat: MAT, ec: EC](checkType: String)
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
collectStuff
.map { ts =>
val errors = MyEngine.checkAll(ts.code)
(ts, errors)
}
.map { x =>
x._2
.leftMap(xs => {
addInformation(x._1, xs.toList)
})
.toEither
}
}
我正在使用另一个来源进行修改,并将其结果传递给原始来源,但 return 同样的事情:
def calculate[T: AS: MAT](source: Source[T, NotUsed]): Future[Seq[T]] =
{
source.runWith(Sink.seq)
}
def modifiedMethod[as: AS, mat: MAT, ec: EC](checkType: String, mySource: Source[LoanApplicationRegister, NotUsed])
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
for {
calc <- calculate(mySource)
orig <- collectStuff
.map { ts =>
val errors = MyEngine.checkAll(ts.code, calc)
(ts, errors)
}
.map { x =>
x._2
.leftMap(xs => {
addInformation(x._1, xs.toList)
})
.toEither
}
}
yield {
orig
}
}
但是我遇到了编译错误 Expression of type Future[Nothing] doesn't conform to existing type Flow[ByteString, MyValidation[MyClass]
我如何 return Flow[ByteString, MyValidation[MyClass]
在我的 modifiedMethod
中就像 originalMethod
是
for { calc <- calculate(mySource)}
yield {
collectStuff
.map { ts =>
val errors = MyEngine.checkAll(ts.code, calc)
(ts, errors)
}
.map { x =>
x._2
.leftMap(xs => {
addInformation(x._1, xs.toList)
})
.toEither
}
}
会给你 Future[Flow[ByteString, MyValidation[MyClass], NotUsed]]
而不是 Future[Nothing]
但是如果你想删除 Future
你需要在某个地方为它 Await
(当你调用计算时(然后你不需要 for
)或者在它之后。通常,这不是使用 Futures 的方式
我有一个处理 Source
和 return 的方法。我正在尝试修改它,但似乎无法 return 同样的事情:
原创
def originalMethod[as: AS, mat: MAT, ec: EC](checkType: String)
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
collectStuff
.map { ts =>
val errors = MyEngine.checkAll(ts.code)
(ts, errors)
}
.map { x =>
x._2
.leftMap(xs => {
addInformation(x._1, xs.toList)
})
.toEither
}
}
我正在使用另一个来源进行修改,并将其结果传递给原始来源,但 return 同样的事情:
def calculate[T: AS: MAT](source: Source[T, NotUsed]): Future[Seq[T]] =
{
source.runWith(Sink.seq)
}
def modifiedMethod[as: AS, mat: MAT, ec: EC](checkType: String, mySource: Source[LoanApplicationRegister, NotUsed])
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
for {
calc <- calculate(mySource)
orig <- collectStuff
.map { ts =>
val errors = MyEngine.checkAll(ts.code, calc)
(ts, errors)
}
.map { x =>
x._2
.leftMap(xs => {
addInformation(x._1, xs.toList)
})
.toEither
}
}
yield {
orig
}
}
但是我遇到了编译错误 Expression of type Future[Nothing] doesn't conform to existing type Flow[ByteString, MyValidation[MyClass]
我如何 return Flow[ByteString, MyValidation[MyClass]
在我的 modifiedMethod
中就像 originalMethod
是
for { calc <- calculate(mySource)}
yield {
collectStuff
.map { ts =>
val errors = MyEngine.checkAll(ts.code, calc)
(ts, errors)
}
.map { x =>
x._2
.leftMap(xs => {
addInformation(x._1, xs.toList)
})
.toEither
}
}
会给你 Future[Flow[ByteString, MyValidation[MyClass], NotUsed]]
而不是 Future[Nothing]
但是如果你想删除 Future
你需要在某个地方为它 Await
(当你调用计算时(然后你不需要 for
)或者在它之后。通常,这不是使用 Futures 的方式