如何使用 spark-streaming 计算流中的新元素

How to count new element from stream by using spark-streaming

我已经完成日常计算的实施。这是一些伪代码。 "newUser" 可以呼叫第一个激活的用户。

// Get today log from hbase or somewhere else
val log = getRddFromHbase(todayDate)
// Compute active user
val activeUser = log.map(line => ((line.uid, line.appId), line).reduceByKey(distinctStrategyMethod)
// Get history user from hdfs
val historyUser = loadFromHdfs(path + yesterdayDate)
// Compute new user from active user and historyUser
val newUser = activeUser.subtractByKey(historyUser)
// Get new history user
val newHistoryUser = historyUser.union(newUser)
// Save today history user
saveToHdfs(path + todayDate)

"activeUser" 的计算可以轻松转换为 spark-streaming。这是一些代码:

    val transformedLog = sdkLogDs.map(sdkLog => {
      val time = System.currentTimeMillis()
      val timeToday = ((time - (time + 3600000 * 8) % 86400000) / 1000).toInt
      ((sdkLog.appid, sdkLog.bcode, sdkLog.uid), (sdkLog.channel_no, sdkLog.ctime.toInt, timeToday))
    })
    val activeUser = transformedLog.groupByKeyAndWindow(Seconds(86400), Seconds(60)).mapValues(x => {
      var firstLine = x.head
      x.foreach(line => {
        if (line._2 < firstLine._2) firstLine = line
      })
      firstLine
    })

但是"newUser"和"historyUser"的方法让我很困惑。
我想我的问题可以概括为"how to count new element from stream"。正如我上面的伪代码,"newUser" 是 "activeUser" 的一部分。而且我必须维护一组"historyUser"才能知道哪一部分是"newUser"。

我考虑过一种方法,但我认为它可能无法正常工作:
将历史用户加载为 RDD。 Foreach "activeUser" 的 DStream,发现 "historyUser" 中不存在元素。 这里的一个问题是我什么时候应该更新 "historyUser" 的 RDD 以确保我可以获得 window.[=23 的正确 "newUser" =] 更新 "historyUser" RDD 意味着向其添加 "newUser"。就像我在上面的伪代码中所做的一样。 "historyUser" 在该代码中每天更新一次。 另一个问题是如何从 DStream 执行此更新 RDD 操作。 我认为在 window 幻灯片时更新 "historyUser" 是正确的。但是我还没有找到合适的 API 来做这个。
那么解决这个问题的最佳实践是什么。

updateStateByKey 在这里会有所帮助,因为它允许您设置初始状态(您的历史用户),然后在主流的每个时间间隔更新它。我把一些代码放在一起来解释这个概念

val historyUsers = loadFromHdfs(path + yesterdayDate).map(UserData(...))

case class UserStatusState(isNew: Boolean, values: UserData)

// this will prepare the RDD of already known historical users
// to pass into updateStateByKey as initial state
val initialStateRDD = historyUsers.map(user => UserStatusState(false, user))

// stateful stream
val trackUsers = sdkLogDs.updateStateByKey(updateState, new HashPartitioner(sdkLogDs.ssc.sparkContext.defaultParallelism), true, initialStateRDD)
// only new users
val newUsersStream = trackUsers.filter(_._2.isNew)


def updateState(newValues: Seq[UserData], prevState: Option[UserStatusState]): Option[UserStatusState] = {
  // Group all values for specific user as needed
  val groupedUserData: UserData = newValues.reduce(...)

  // prevState is defined only for users previously seen in the stream
  // or loaded as initial state from historyUsers RDD
  // For new users it is None
  val isNewUser = !prevState.isDefined
  // as you return state here for the user - prevState won't be None on next iterations
  Some(UserStatusState(isNewUser, groupedUserData))

}