以有状态的方式在 spark 中处理网络数据包
processing network packets in spark in a stateful manner
我想使用 Spark 来解析网络消息,并以有状态的方式将它们分组为逻辑实体。
问题描述
我们假设每条消息都位于输入数据帧的一行中,如下所示。
| row | time | raw payload |
+-------+------+---------------+
| 1 | 10 | TEXT1; |
| 2 | 20 | TEXT2;TEXT3; |
| 3 | 30 | LONG- |
| 4 | 40 | TEXT1; |
| 5 | 50 | TEXT4;TEXT5;L |
| 6 | 60 | ONG |
| 7 | 70 | -TEX |
| 8 | 80 | T2; |
任务是解析原始负载中的逻辑消息,并在新的输出数据帧中提供它们。在该示例中,有效负载中的每个逻辑消息都以分号(定界符)结尾。
所需的输出数据帧如下所示:
| row | time | message |
+-------+------+---------------+
| 1 | 10 | TEXT1; |
| 2 | 20 | TEXT2; |
| 3 | 20 | TEXT3; |
| 4 | 30 | LONG-TEXT1; |
| 5 | 50 | TEXT4; |
| 6 | 50 | TEXT5; |
| 7 | 50 | LONG-TEXT2; |
请注意,有些消息行不会在结果中产生新行(例如第 4、6、7、8 行),有些甚至会产生多行(例如第 2、5 行)
我的问题:
- 这是 UDAF 的用例吗?如果是这样,例如我应该如何实现
merge
功能?我不知道它的目的是什么。
- 由于消息顺序很重要(我无法在不遵守消息顺序的情况下正确处理 LONGTEXT-1、LONGTEXT-2),我可以告诉 spark 在更高级别(例如,每个日历日的消息)上并行化但不能在一天内并行化(例如时间为 50、60、70、80 的事件需要按顺序处理)。
- 跟进问题:是否可以想象该解决方案不仅可以用于传统的spark,还可以用于spark structured streaming?或者后者是否需要自己的一种状态处理方法?
通常,您可以 运行 通过使用 mapGroupsWithState
of flatMapGroupsWithState
在 spark streaming 上进行任意状态聚合。您可以找到一些示例 here。 None 将保证流的处理将按事件时间排序。
如果您需要强制数据排序,您应该尝试使用window operations on event time。在这种情况下,您需要 运行 无状态操作,但如果每个 window 组中的元素数量足够小,您可以使用 collectList
例如,然后应用 UDF (您可以在其中管理每个列表中每个 window 组的状态。
好的,我同时想出了如何使用 UDAF 来做到这一点。
class TagParser extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
override def bufferSchema: StructType = StructType(
StructField("parsed", ArrayType(StringType)) ::
StructField("rest", StringType)
:: Nil)
override def dataType: DataType = ArrayType(StringType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = IndexedSeq[String]()
buffer(1) = null
}
def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {
buffer(0) = IndexedSeq[String]()
val prevRest = buffer(1)
var idx = -1
val strToParse = if (prevRest != null) prevRest + str else str
do {
val oldIdx = idx;
idx = strToParse.indexOf(';', oldIdx + 1)
if (idx == -1) {
buffer(1) = strToParse.substring(oldIdx + 1)
} else {
val newlyParsed = strToParse.substring(oldIdx + 1, idx)
buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
buffer(1) = null
}
} while (idx != -1)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer == null) {
return
}
doParse(input.getAs[String](0), buffer)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException
override def evaluate(buffer: Row): Any = buffer(0)
}
这里有一个演示应用程序,它使用上面的 UDAF 来解决上面的问题:
case class Packet(time: Int, payload: String)
object TagParserApp extends App {
val spark, sc = ... // kept out for brevity
val df = sc.parallelize(List(
Packet(10, "TEXT1;"),
Packet(20, "TEXT2;TEXT3;"),
Packet(30, "LONG-"),
Packet(40, "TEXT1;"),
Packet(50, "TEXT4;TEXT5;L"),
Packet(60, "ONG"),
Packet(70, "-TEX"),
Packet(80, "T2;")
)).toDF()
val tp = new TagParser
val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))
df2.show()
}
这产生:
+----+-------------+--------------+
|time| payload| msg|
+----+-------------+--------------+
| 10| TEXT1;| [TEXT1]|
| 20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
| 30| LONG-| []|
| 40| TEXT1;| [LONG-TEXT1]|
| 50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
| 60| ONG| []|
| 70| -TEX| []|
| 80| T2;| [LONG-TEXT2]|
+----+-------------+--------------+
我的主要问题是弄清楚如何实际应用这个 UDAF,即使用这个:
df.withColumn("msg", tp.apply(df.col("payload")).over(window))
我现在唯一需要弄清楚的是并行化的各个方面(我只想在我们不依赖排序的地方发生)但这对我来说是一个单独的问题。
我想使用 Spark 来解析网络消息,并以有状态的方式将它们分组为逻辑实体。
问题描述
我们假设每条消息都位于输入数据帧的一行中,如下所示。
| row | time | raw payload |
+-------+------+---------------+
| 1 | 10 | TEXT1; |
| 2 | 20 | TEXT2;TEXT3; |
| 3 | 30 | LONG- |
| 4 | 40 | TEXT1; |
| 5 | 50 | TEXT4;TEXT5;L |
| 6 | 60 | ONG |
| 7 | 70 | -TEX |
| 8 | 80 | T2; |
任务是解析原始负载中的逻辑消息,并在新的输出数据帧中提供它们。在该示例中,有效负载中的每个逻辑消息都以分号(定界符)结尾。
所需的输出数据帧如下所示:
| row | time | message |
+-------+------+---------------+
| 1 | 10 | TEXT1; |
| 2 | 20 | TEXT2; |
| 3 | 20 | TEXT3; |
| 4 | 30 | LONG-TEXT1; |
| 5 | 50 | TEXT4; |
| 6 | 50 | TEXT5; |
| 7 | 50 | LONG-TEXT2; |
请注意,有些消息行不会在结果中产生新行(例如第 4、6、7、8 行),有些甚至会产生多行(例如第 2、5 行)
我的问题:
- 这是 UDAF 的用例吗?如果是这样,例如我应该如何实现
merge
功能?我不知道它的目的是什么。 - 由于消息顺序很重要(我无法在不遵守消息顺序的情况下正确处理 LONGTEXT-1、LONGTEXT-2),我可以告诉 spark 在更高级别(例如,每个日历日的消息)上并行化但不能在一天内并行化(例如时间为 50、60、70、80 的事件需要按顺序处理)。
- 跟进问题:是否可以想象该解决方案不仅可以用于传统的spark,还可以用于spark structured streaming?或者后者是否需要自己的一种状态处理方法?
通常,您可以 运行 通过使用 mapGroupsWithState
of flatMapGroupsWithState
在 spark streaming 上进行任意状态聚合。您可以找到一些示例 here。 None 将保证流的处理将按事件时间排序。
如果您需要强制数据排序,您应该尝试使用window operations on event time。在这种情况下,您需要 运行 无状态操作,但如果每个 window 组中的元素数量足够小,您可以使用 collectList
例如,然后应用 UDF (您可以在其中管理每个列表中每个 window 组的状态。
好的,我同时想出了如何使用 UDAF 来做到这一点。
class TagParser extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("value", StringType) :: Nil)
override def bufferSchema: StructType = StructType(
StructField("parsed", ArrayType(StringType)) ::
StructField("rest", StringType)
:: Nil)
override def dataType: DataType = ArrayType(StringType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = IndexedSeq[String]()
buffer(1) = null
}
def doParse(str: String, buffer: MutableAggregationBuffer): Unit = {
buffer(0) = IndexedSeq[String]()
val prevRest = buffer(1)
var idx = -1
val strToParse = if (prevRest != null) prevRest + str else str
do {
val oldIdx = idx;
idx = strToParse.indexOf(';', oldIdx + 1)
if (idx == -1) {
buffer(1) = strToParse.substring(oldIdx + 1)
} else {
val newlyParsed = strToParse.substring(oldIdx + 1, idx)
buffer(0) = buffer(0).asInstanceOf[IndexedSeq[String]] :+ newlyParsed
buffer(1) = null
}
} while (idx != -1)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer == null) {
return
}
doParse(input.getAs[String](0), buffer)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = throw new UnsupportedOperationException
override def evaluate(buffer: Row): Any = buffer(0)
}
这里有一个演示应用程序,它使用上面的 UDAF 来解决上面的问题:
case class Packet(time: Int, payload: String)
object TagParserApp extends App {
val spark, sc = ... // kept out for brevity
val df = sc.parallelize(List(
Packet(10, "TEXT1;"),
Packet(20, "TEXT2;TEXT3;"),
Packet(30, "LONG-"),
Packet(40, "TEXT1;"),
Packet(50, "TEXT4;TEXT5;L"),
Packet(60, "ONG"),
Packet(70, "-TEX"),
Packet(80, "T2;")
)).toDF()
val tp = new TagParser
val window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val df2 = df.withColumn("msg", tp.apply(df.col("payload")).over(window))
df2.show()
}
这产生:
+----+-------------+--------------+
|time| payload| msg|
+----+-------------+--------------+
| 10| TEXT1;| [TEXT1]|
| 20| TEXT2;TEXT3;|[TEXT2, TEXT3]|
| 30| LONG-| []|
| 40| TEXT1;| [LONG-TEXT1]|
| 50|TEXT4;TEXT5;L|[TEXT4, TEXT5]|
| 60| ONG| []|
| 70| -TEX| []|
| 80| T2;| [LONG-TEXT2]|
+----+-------------+--------------+
我的主要问题是弄清楚如何实际应用这个 UDAF,即使用这个:
df.withColumn("msg", tp.apply(df.col("payload")).over(window))
我现在唯一需要弄清楚的是并行化的各个方面(我只想在我们不依赖排序的地方发生)但这对我来说是一个单独的问题。