在流中维护状态

Maintaining state within a stream

我的用户数据流负载很大。我想通过它的 id 来确定这是否是一个新用户。为了减少对数据库的调用,我宁愿在以前的用户的记忆中维护一个状态。

val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink: Sink[User, NotUsed] //goes to db
//if the user is added to the set it will return true
val usersFilter = Flow[User].filter(user => users.add(user.id))

现在我可以创建图表了

source ~> usersFilter ~> dbSink

我的问题是可变状态是共享的且不安全。是否有在流程中维护状态的选项?

有两种方法可以做到这一点。

如果您正在获取记录流并且您想要对流进行重复数据删除(因为一些 ID 已被处理)。你可以做到

http://janschulte.com/2016/03/08/deduplicate-akka-stream/

另一种方法是通过数据库查找来检查 ID 是否已经存在。

val alreadyExists : Flow[User, NotUsed] = {
  // build a cache of known ids
  val knownIdList = ... // query database and get list of IDs
  Flow[User].filterNot(user => knownIdList.contains(user.id))
}