如何将结果从一个源流传递到另一个源流

How to pass results from one source stream to another

我有一个处理 Source 和 return 的方法。我正在尝试修改它,但似乎无法 return 同样的事情:

原创

def originalMethod[as: AS, mat: MAT, ec: EC](checkType: String) 
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
      collectStuff
      .map { ts =>
        val errors = MyEngine.checkAll(ts.code)
        (ts, errors)
      }
      .map { x =>
        x._2
          .leftMap(xs => {
            addInformation(x._1, xs.toList)
          })
          .toEither
      }
}

我正在使用另一个来源进行修改,并将其结果传递给原始来源,但 return 同样的事情:

def calculate[T: AS: MAT](source: Source[T, NotUsed]): Future[Seq[T]] = 
{
 source.runWith(Sink.seq)
}


def modifiedMethod[as: AS, mat: MAT, ec: EC](checkType: String, mySource: Source[LoanApplicationRegister, NotUsed]) 
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
  for {
    calc <- calculate(mySource)
    orig <-  collectStuff
        .map { ts =>
          val errors = MyEngine.checkAll(ts.code, calc)
          (ts, errors)
        }
        .map { x =>
          x._2
            .leftMap(xs => {
              addInformation(x._1, xs.toList)
            })
            .toEither
        }
  }
  yield {
    orig
  }
}

但是我遇到了编译错误 Expression of type Future[Nothing] doesn't conform to existing type Flow[ByteString, MyValidation[MyClass]

我如何 return Flow[ByteString, MyValidation[MyClass] 在我的 modifiedMethod 中就像 originalMethod

  for { calc <- calculate(mySource)}
  yield {
    collectStuff
        .map { ts =>
          val errors = MyEngine.checkAll(ts.code, calc)
          (ts, errors)
        }
        .map { x =>
          x._2
            .leftMap(xs => {
              addInformation(x._1, xs.toList)
            })
            .toEither
        }
  }

会给你 Future[Flow[ByteString, MyValidation[MyClass], NotUsed]] 而不是 Future[Nothing] 但是如果你想删除 Future 你需要在某个地方为它 Await (当你调用计算时(然后你不需要 for )或者在它之后。通常,这不是使用 Futures 的方式