Spark Streaming - 从按键分组的键值对计算统计信息
Spark Streaming - Calculating stats from key-value pairs grouped by keys
背景:
我正在使用 Spark Streaming 以逗号分隔的键值对的形式从 Kafka 流式传输事件
这是一个事件如何流式传输到我的 spark 应用程序的示例。
Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100
输出:
我想针对给定的批次间隔计算按流中不同键分组的不同指标(平均值、计数等),例如
- Key1、Key2 的平均响应时间(responseTime 是每个事件中的键之一)
- 按Key1、Key2计数
我目前的尝试:
val stream = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val pStream = stream.persist()
val events: DStream[String] = pStream.flatMap(_._2.split(","))
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.
更新 - 03/04
密钥 Key1、Key2...可能在传入流中乱序到达。
感谢您的意见/提示。
一个可能的解决方案是这样的:
创建一个案例 class 代表每条记录,这样我们就不用处理元组了:
case class Record(
key1: String, key2: String, key3: String, key4: String, rt: Double)
使用正则表达式解析记录并删除格式错误的条目:
import scala.util.matching.Regex
val recordPattern = new Regex(
"^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++
"responseTime=(0-9+)$"
)
val records = pStream.map {
case recordPattern(key1, key2, key3, key4, rt) =>
Some(Record(key1, key2, key3, key4, rt.toDouble))
case _ => None
}.flatMap(x => x) // Drop malformed
将数据重塑为键值对:
val pairs = records.map(r => ((r.key1, r.key2), r.rt))
创建分区器并使用StatCounter
聚合统计数据:
import org.apache.spark.util.StatCounter
import org.apache.spark.HashPartitioner
val paritioner: HashPartitioner = ???
pairs.combineByKey[StatCounter](
StatCounter(_), _ merge _, _ merge _, paritioner
)
提取感兴趣的字段:
stats.mapValues(s => (s.count, s.mean))
尽管我强烈建议在上游修复问题,但您也可以对无序数据尝试这样的操作:
val kvPattern = "(\w+)=(\w+)".r
val pairs = pStream.map(line => {
val kvs = kvPattern.findAllMatchIn(line)
.map(m => (m.group(1), m.group(2))).toMap
// This will discard any malformed lines
// (lack of key1, key2, lack or invalid format of responseTime)
Try((
(kvs("Key1"), kvs("Key2")),
kvs("responseTime").toDouble
))
}).flatMap(_.toOption)
像以前一样进行。
背景: 我正在使用 Spark Streaming 以逗号分隔的键值对的形式从 Kafka 流式传输事件 这是一个事件如何流式传输到我的 spark 应用程序的示例。
Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100
输出:
我想针对给定的批次间隔计算按流中不同键分组的不同指标(平均值、计数等),例如
- Key1、Key2 的平均响应时间(responseTime 是每个事件中的键之一)
- 按Key1、Key2计数
我目前的尝试:
val stream = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val pStream = stream.persist()
val events: DStream[String] = pStream.flatMap(_._2.split(","))
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.
更新 - 03/04 密钥 Key1、Key2...可能在传入流中乱序到达。
感谢您的意见/提示。
一个可能的解决方案是这样的:
创建一个案例 class 代表每条记录,这样我们就不用处理元组了:
case class Record( key1: String, key2: String, key3: String, key4: String, rt: Double)
使用正则表达式解析记录并删除格式错误的条目:
import scala.util.matching.Regex val recordPattern = new Regex( "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++ "responseTime=(0-9+)$" ) val records = pStream.map { case recordPattern(key1, key2, key3, key4, rt) => Some(Record(key1, key2, key3, key4, rt.toDouble)) case _ => None }.flatMap(x => x) // Drop malformed
将数据重塑为键值对:
val pairs = records.map(r => ((r.key1, r.key2), r.rt))
创建分区器并使用
StatCounter
聚合统计数据:import org.apache.spark.util.StatCounter import org.apache.spark.HashPartitioner val paritioner: HashPartitioner = ??? pairs.combineByKey[StatCounter]( StatCounter(_), _ merge _, _ merge _, paritioner )
提取感兴趣的字段:
stats.mapValues(s => (s.count, s.mean))
尽管我强烈建议在上游修复问题,但您也可以对无序数据尝试这样的操作:
val kvPattern = "(\w+)=(\w+)".r
val pairs = pStream.map(line => {
val kvs = kvPattern.findAllMatchIn(line)
.map(m => (m.group(1), m.group(2))).toMap
// This will discard any malformed lines
// (lack of key1, key2, lack or invalid format of responseTime)
Try((
(kvs("Key1"), kvs("Key2")),
kvs("responseTime").toDouble
))
}).flatMap(_.toOption)
像以前一样进行。