如何先从一个流中获取然后传递给另一个流
How to fetch from one stream first and then pass to another stream
我需要调用 cassandra 来获取日期,然后将获取的日期传递给另一个要将数据插入数据库的流。
def fetchDate: Future[Done] =
readJournal(persistenceKey)
.drop(3)
.take(1)
.map(l => l.mydate)
.runWith(Sink.ignore)
def insertRowsToDb: Future[Done] =
readJournal(somePersistenceKey)
.drop(4)
.take(1)
.map(data => MyClass(data))
.mapAsync(1) { myData =>
for {
insert <- myRepository.insert(data.id, fetchDate) //error here because fetchDate is unavailable
}
}
class MyRepository(tableName: String) {
def insert(id: String, fetchedDate: Long): Future[Int] =
config.db.run {
sqlu"""INSERT INTO #${tableName}
VALUES (
${id},
${fetchedDate}
)
"""
}
问题
- 我怎样才能先执行
fetchDate
然后将它的结果传递给 myRepository.insert
行?
你有两个问题。首先是 fetchDate
甚至没有日期!修复这意味着 运行 一个 Sink
不仅仅是 "ignore":
def fetchDateFut: Future[Date] =
readJournal(persistenceKey)
.drop(3)
.take(1)
.map(l => l.mydate)
.runWith(Sink.last)
然后,您需要 flatMap
您的 Future
将日期纳入范围:
def insertRowsToDb: Future[Done] = fetchDateFut.flatMap { fetchDate: Date =>
readJournal(somePersistenceKey)
.drop(4)
.take(1)
.map(data => MyClass(data))
.mapAsync(1) { myData =>
for {
insert <- myRepository.insert(data.id, fetchDate)
}
}
}
我需要调用 cassandra 来获取日期,然后将获取的日期传递给另一个要将数据插入数据库的流。
def fetchDate: Future[Done] =
readJournal(persistenceKey)
.drop(3)
.take(1)
.map(l => l.mydate)
.runWith(Sink.ignore)
def insertRowsToDb: Future[Done] =
readJournal(somePersistenceKey)
.drop(4)
.take(1)
.map(data => MyClass(data))
.mapAsync(1) { myData =>
for {
insert <- myRepository.insert(data.id, fetchDate) //error here because fetchDate is unavailable
}
}
class MyRepository(tableName: String) {
def insert(id: String, fetchedDate: Long): Future[Int] =
config.db.run {
sqlu"""INSERT INTO #${tableName}
VALUES (
${id},
${fetchedDate}
)
"""
}
问题
- 我怎样才能先执行
fetchDate
然后将它的结果传递给myRepository.insert
行?
你有两个问题。首先是 fetchDate
甚至没有日期!修复这意味着 运行 一个 Sink
不仅仅是 "ignore":
def fetchDateFut: Future[Date] =
readJournal(persistenceKey)
.drop(3)
.take(1)
.map(l => l.mydate)
.runWith(Sink.last)
然后,您需要 flatMap
您的 Future
将日期纳入范围:
def insertRowsToDb: Future[Done] = fetchDateFut.flatMap { fetchDate: Date =>
readJournal(somePersistenceKey)
.drop(4)
.take(1)
.map(data => MyClass(data))
.mapAsync(1) { myData =>
for {
insert <- myRepository.insert(data.id, fetchDate)
}
}
}