创建源时将 Future[Done] 指定为物化值

Specifying Future[Done] as materialized value when creating a source

我正在尝试编写一些代码作为更大的集成测试的一部分。

我有一个函数可以创建类型为 Source[KinesisRecord, Future[Done]KinesisSource:

protected def createKinesisSource(config: KinesisClientLibConfiguration): Source[KinesisRecord, Future[Done]]#Repr[MyFile.KinesisFlow] = {
  KinesisSource(config).mapAsync(1)(processRecord)
}

protected def processRecord(record: KinesisRecord): Future[KinesisFlow] = {
    Future { validateRecord(record) }
}

我想从一个文件创建一个相同类型的假源。我有(请原谅代码,我是 scala 的新手):

val records = scala.io.Source.fromFile("testFile.txt").getLines.toList
  .map(i => {
    new KinesisRecord( ByteString.fromString(i), "x", None, "x", None, Instant.now(), "x")
  })

Source(records).mapAsync(1)(processRecord)

这给了我一个 Source[KinesisRecord, NotUsed]。我怎样才能将其更改为 Source[KinesisRecord, Future[Done]]?我知道第二个参数是物化值 (),但我不确定如何在不实际应用 run 函数的情况下指定该值。

您可以使用 mapMaterializedValue(_ => Future.successful(Done))Future.never

但是如果物化值对您的集成测试有影响,这将影响它们,因为 Future 将在流物化后立即完成或永远不会完成。