Spark Streaming:在 UpdateStateByKey 后加入原始流
Spark Streaming: join back to original stream after UpdateStateByKey
我正在 Spark Streaming 中编写一个应用程序,我需要在其中计算双精度值的指数移动平均值并将该平均值添加到行中。
这个平均值是这样计算的:
EMA(t) = EMA(t-1)*0.75 + Value(t)*0.25
每个时间间隔我都有一行每个名字的数据:
(name1-24/04/2015 15:31; Observation(name1; 24/04/2015 15:31; 132.45))
(name2-24/04/2015 15:31; Observation(name2; 24/04/2015 15:31; 20.5))
我的唯一键包含粘贴在一起的名称和时间戳。然后我将名称和时间戳分开,然后是我的双精度值。我将跟踪每个不同名称的指数移动平均线。
我正在使用完美运行的 updateStateByKey() 来执行此操作:
(在此操作期间名称将是关键,因为我需要每个名称的平均值)
case class Observation(name: String, time: Timestamp, outcome: Double)
val outcomeDstream: DStream[(String, Double)] =
parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs.close) }
def updateEMA(newValues: Seq[Double],oldCount: Option[Double]): Option[Double] = {
if (oldCount.isEmpty) newValues(0)
else Some((newValues(0)*0.25) + (oldCount.get*(0.75)))
}
val ema = outcomeDstream.updateStateByKey[Double](updateEMA _)
我遇到的问题是:如果我使用这个函数来跟踪我的指数移动平均线,它将 return me:(name, expMovAvg)。但是我会丢失我的唯一密钥和时间戳。问题是我无法将此 ema-Dstream 与我的原始流加入,因为我的密钥现在只是不唯一的名称。
是否可以在我的 updateStateByKey 转换过程中保留唯一键或时间戳?
如果我正确理解你的问题,而不是在 updateStateByKey
中保留 Option[Double]
作为状态,你可以使用 Option[Observation]
作为状态,名称作为键,这将包含您需要的所有独特数据:
val outcomeDstream: DStream[(String, Observation)] =
parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs) }
def updateEMA(newValues: Seq[Observation],
oldCount: Option[Observation]): Option[Observation] = {
if (oldCount.isEmpty) newValues(0)
else Some((newValues(0).outcome * 0.25) + (oldCount.get.outcome * (0.75)))
}
附带说明一下,如果您使用的是 Spark 1.6.0,请考虑查看 PairDStreamFunctions.mapWithState
. Although having a slightly different semantics (it won't process a key which hasn't received a new value) and still experimental, it is superior in performance。
我正在 Spark Streaming 中编写一个应用程序,我需要在其中计算双精度值的指数移动平均值并将该平均值添加到行中。 这个平均值是这样计算的:
EMA(t) = EMA(t-1)*0.75 + Value(t)*0.25
每个时间间隔我都有一行每个名字的数据:
(name1-24/04/2015 15:31; Observation(name1; 24/04/2015 15:31; 132.45))
(name2-24/04/2015 15:31; Observation(name2; 24/04/2015 15:31; 20.5))
我的唯一键包含粘贴在一起的名称和时间戳。然后我将名称和时间戳分开,然后是我的双精度值。我将跟踪每个不同名称的指数移动平均线。
我正在使用完美运行的 updateStateByKey() 来执行此操作: (在此操作期间名称将是关键,因为我需要每个名称的平均值)
case class Observation(name: String, time: Timestamp, outcome: Double)
val outcomeDstream: DStream[(String, Double)] =
parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs.close) }
def updateEMA(newValues: Seq[Double],oldCount: Option[Double]): Option[Double] = {
if (oldCount.isEmpty) newValues(0)
else Some((newValues(0)*0.25) + (oldCount.get*(0.75)))
}
val ema = outcomeDstream.updateStateByKey[Double](updateEMA _)
我遇到的问题是:如果我使用这个函数来跟踪我的指数移动平均线,它将 return me:(name, expMovAvg)。但是我会丢失我的唯一密钥和时间戳。问题是我无法将此 ema-Dstream 与我的原始流加入,因为我的密钥现在只是不唯一的名称。
是否可以在我的 updateStateByKey 转换过程中保留唯一键或时间戳?
如果我正确理解你的问题,而不是在 updateStateByKey
中保留 Option[Double]
作为状态,你可以使用 Option[Observation]
作为状态,名称作为键,这将包含您需要的所有独特数据:
val outcomeDstream: DStream[(String, Observation)] =
parsedstream.map { case (k: String, obs: Observation) => (obs.name, obs) }
def updateEMA(newValues: Seq[Observation],
oldCount: Option[Observation]): Option[Observation] = {
if (oldCount.isEmpty) newValues(0)
else Some((newValues(0).outcome * 0.25) + (oldCount.get.outcome * (0.75)))
}
附带说明一下,如果您使用的是 Spark 1.6.0,请考虑查看 PairDStreamFunctions.mapWithState
. Although having a slightly different semantics (it won't process a key which hasn't received a new value) and still experimental, it is superior in performance。