获取一个RDD的增量时间(最小值-实际值)
Getting the delta time (minimum value - actual value) of an RDD
我有一个笛卡尔 RDD,它允许我在特定时间范围内过滤 RDD,但我需要获得 RDD 的最小值,以便我可以计算每条记录到最先出现的条目的增量时间.
我有一个案例 class 组成如下:
case class auction(id: String, prodID: String, timestamp: Long)
我把两个 RDD 放在一起,一个包含票据拍卖,另一个包含那个时间段内发生的拍卖,如下所示:
val specificmessages = allauctions.cartesian(winningauction)
.filter( (x, y) => x.timestamp > y.timestamp - 10 &&
x.timestamp < y.timestamp + 10 &&
x.productID == y.productID )
我想在 specificmessages 函数中添加一个字段,该字段将包含每条记录与具有最小值的拍卖时间戳之间的差值。
您可以像这样使用 DataFrame:
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.expressions.Window
// Convert RDDs to DFs
val allDF = allauctions.toDF
val winDF = winningauction.toDF("winId", "winProdId", "winTimestamp")
// Prepare join conditions
val prodCond = $"prodID" === $"winProdID"
val tsCond = f.abs($"timestamp" - $"winTimestamp") < 10
// Create window
val w = Window
.partitionBy($"id", $"prodID", $"timestamp")
.orderBy($"winTimestamp")
val joined = allDF
.join(winDF, prodCond && tsCond)
.select($"*", first($"winTimestamp").over(w).alias("mintimestamp")
使用普通 RDD
// Create PairRDDs
def allPairs = allauctions.map(a => (a.prodID, a))
def winPairs = winauctions.map(a => (a.prodID, a))
allPairs
.join(winPairs) // Join by prodId -> RDD[(prodID, (auction, auction))]
// Filter timestamp
.filter{case (_, (x, y)) => (x.timestamp - y.timestamp).abs < 10} //
.values // Drop key -> RDD[(auction, auction)]
.groupByKey // Group by allAuctions -> RDD[(auction, Seq[auction])]
.flatMap{ case (k, vals) => {
val minTs = vals.map(_.timestamp).min // Find min ts from winauction
vals.map(v => (k, v, minTs))
}} // -> RDD[(auction, auction, ts)]
我有一个笛卡尔 RDD,它允许我在特定时间范围内过滤 RDD,但我需要获得 RDD 的最小值,以便我可以计算每条记录到最先出现的条目的增量时间.
我有一个案例 class 组成如下:
case class auction(id: String, prodID: String, timestamp: Long)
我把两个 RDD 放在一起,一个包含票据拍卖,另一个包含那个时间段内发生的拍卖,如下所示:
val specificmessages = allauctions.cartesian(winningauction)
.filter( (x, y) => x.timestamp > y.timestamp - 10 &&
x.timestamp < y.timestamp + 10 &&
x.productID == y.productID )
我想在 specificmessages 函数中添加一个字段,该字段将包含每条记录与具有最小值的拍卖时间戳之间的差值。
您可以像这样使用 DataFrame:
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.expressions.Window
// Convert RDDs to DFs
val allDF = allauctions.toDF
val winDF = winningauction.toDF("winId", "winProdId", "winTimestamp")
// Prepare join conditions
val prodCond = $"prodID" === $"winProdID"
val tsCond = f.abs($"timestamp" - $"winTimestamp") < 10
// Create window
val w = Window
.partitionBy($"id", $"prodID", $"timestamp")
.orderBy($"winTimestamp")
val joined = allDF
.join(winDF, prodCond && tsCond)
.select($"*", first($"winTimestamp").over(w).alias("mintimestamp")
使用普通 RDD
// Create PairRDDs
def allPairs = allauctions.map(a => (a.prodID, a))
def winPairs = winauctions.map(a => (a.prodID, a))
allPairs
.join(winPairs) // Join by prodId -> RDD[(prodID, (auction, auction))]
// Filter timestamp
.filter{case (_, (x, y)) => (x.timestamp - y.timestamp).abs < 10} //
.values // Drop key -> RDD[(auction, auction)]
.groupByKey // Group by allAuctions -> RDD[(auction, Seq[auction])]
.flatMap{ case (k, vals) => {
val minTs = vals.map(_.timestamp).min // Find min ts from winauction
vals.map(v => (k, v, minTs))
}} // -> RDD[(auction, auction, ts)]