创建源时将 Future[Done] 指定为物化值
Specifying Future[Done] as materialized value when creating a source
我正在尝试编写一些代码作为更大的集成测试的一部分。
我有一个函数可以创建类型为 Source[KinesisRecord, Future[Done]
的 KinesisSource
:
protected def createKinesisSource(config: KinesisClientLibConfiguration): Source[KinesisRecord, Future[Done]]#Repr[MyFile.KinesisFlow] = {
KinesisSource(config).mapAsync(1)(processRecord)
}
protected def processRecord(record: KinesisRecord): Future[KinesisFlow] = {
Future { validateRecord(record) }
}
我想从一个文件创建一个相同类型的假源。我有(请原谅代码,我是 scala 的新手):
val records = scala.io.Source.fromFile("testFile.txt").getLines.toList
.map(i => {
new KinesisRecord( ByteString.fromString(i), "x", None, "x", None, Instant.now(), "x")
})
Source(records).mapAsync(1)(processRecord)
这给了我一个 Source[KinesisRecord, NotUsed]
。我怎样才能将其更改为 Source[KinesisRecord, Future[Done]]
?我知道第二个参数是物化值 (),但我不确定如何在不实际应用 run
函数的情况下指定该值。
您可以使用 mapMaterializedValue(_ => Future.successful(Done))
或 Future.never
但是如果物化值对您的集成测试有影响,这将影响它们,因为 Future 将在流物化后立即完成或永远不会完成。
我正在尝试编写一些代码作为更大的集成测试的一部分。
我有一个函数可以创建类型为 Source[KinesisRecord, Future[Done]
的 KinesisSource
:
protected def createKinesisSource(config: KinesisClientLibConfiguration): Source[KinesisRecord, Future[Done]]#Repr[MyFile.KinesisFlow] = {
KinesisSource(config).mapAsync(1)(processRecord)
}
protected def processRecord(record: KinesisRecord): Future[KinesisFlow] = {
Future { validateRecord(record) }
}
我想从一个文件创建一个相同类型的假源。我有(请原谅代码,我是 scala 的新手):
val records = scala.io.Source.fromFile("testFile.txt").getLines.toList
.map(i => {
new KinesisRecord( ByteString.fromString(i), "x", None, "x", None, Instant.now(), "x")
})
Source(records).mapAsync(1)(processRecord)
这给了我一个 Source[KinesisRecord, NotUsed]
。我怎样才能将其更改为 Source[KinesisRecord, Future[Done]]
?我知道第二个参数是物化值 (run
函数的情况下指定该值。
您可以使用 mapMaterializedValue(_ => Future.successful(Done))
或 Future.never
但是如果物化值对您的集成测试有影响,这将影响它们,因为 Future 将在流物化后立即完成或永远不会完成。