Spark 流媒体重播
Spark Streaming Replay
我有一个 Spark Streaming 应用程序来分析来自 Kafka 代理的事件。我有如下规则,可以通过组合现有规则生成新规则:
If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.
同时,我将每个传入数据保存到 Cassandra。我喜欢做的是 运行 这个用于 Cassandra 历史数据的流式应用程序。例如,
<This rule> would have generated <these> alerts for <last week>.
有没有办法在 Spark 中或在路线图中做到这一点?例如,Apache Flink 有事件时间处理。但是将现有代码库迁移到它似乎很难,我想通过重用现有代码来解决这个问题。
这很简单,但有一些注意事项。首先,它有助于从 Kafka 端了解其工作原理。
Kafka 管理所谓的偏移量——Kafka 中的每条消息都有一个相对于其在分区中的位置的偏移量。 (分区是主题的逻辑划分。)分区中的第一条消息的偏移量为 0L
,第二条消息的偏移量为 1L
等等。除此之外,由于日志回滚和可能的主题压缩,0L
并不总是分区中最早的偏移量。
您要做的第一件事是从头开始收集要读取的所有分区的偏移量。这是执行此操作的函数:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
(new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
)
val req = new kafka.javaapi.OffsetRequest(
reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
)
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
(offsets(offsets.size - 1), offsets(0))
}
你可以这样称呼它:
val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)
有关从 Kafka 检索偏移量的所有您想知道的信息,read this。至少可以说,这是神秘的。 (例如,当您完全理解 PartitionOffsetRequestInfo
的第二个参数时请告诉我。)
既然您已经有了要查看历史分区的 firstOffset
和 lastOffset
,那么您可以使用 createDirectStream
的 fromOffset
参数,这是输入:fromOffset: Map[TopicAndPartition, Long]
。您可以将 Long
/ 值设置为从 getOffsets()
.
获得的 firstOffset
至于 nextOffset
-- 您可以使用它来确定您何时从处理历史数据转向处理新数据。如果 msg.offset == nextOffset
那么您正在处理分区内的第一条非历史记录。
现在注意事项,直接from the documentation:
- 一旦上下文启动,就不能进行新的流计算
设置或添加到它。
- 上下文一旦停止,就不能
重新启动。
- 在一个 JVM 中只能有一个 StreamingContext 处于活动状态
同时.
- StreamingContext 上的 stop() 也会停止 SparkContext。到
仅停止 StreamingContext,设置 stop() 的可选参数
将 stopSparkContext 调用为 false。
- SparkContext 可以重新用于
创建多个 StreamingContexts,只要上一个
StreamingContext 停止(不停止 SparkContext)
在创建下一个 StreamingContext 之前。
由于这些注意事项,我在 firstOffset
的同时抓取了 nextOffset
-- 所以我可以保持流向上,但将上下文从历史处理更改为当前处理。
我有一个 Spark Streaming 应用程序来分析来自 Kafka 代理的事件。我有如下规则,可以通过组合现有规则生成新规则:
If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.
同时,我将每个传入数据保存到 Cassandra。我喜欢做的是 运行 这个用于 Cassandra 历史数据的流式应用程序。例如,
<This rule> would have generated <these> alerts for <last week>.
有没有办法在 Spark 中或在路线图中做到这一点?例如,Apache Flink 有事件时间处理。但是将现有代码库迁移到它似乎很难,我想通过重用现有代码来解决这个问题。
这很简单,但有一些注意事项。首先,它有助于从 Kafka 端了解其工作原理。
Kafka 管理所谓的偏移量——Kafka 中的每条消息都有一个相对于其在分区中的位置的偏移量。 (分区是主题的逻辑划分。)分区中的第一条消息的偏移量为 0L
,第二条消息的偏移量为 1L
等等。除此之外,由于日志回滚和可能的主题压缩,0L
并不总是分区中最早的偏移量。
您要做的第一件事是从头开始收集要读取的所有分区的偏移量。这是执行此操作的函数:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
(new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
)
val req = new kafka.javaapi.OffsetRequest(
reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
)
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
(offsets(offsets.size - 1), offsets(0))
}
你可以这样称呼它:
val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)
有关从 Kafka 检索偏移量的所有您想知道的信息,read this。至少可以说,这是神秘的。 (例如,当您完全理解 PartitionOffsetRequestInfo
的第二个参数时请告诉我。)
既然您已经有了要查看历史分区的 firstOffset
和 lastOffset
,那么您可以使用 createDirectStream
的 fromOffset
参数,这是输入:fromOffset: Map[TopicAndPartition, Long]
。您可以将 Long
/ 值设置为从 getOffsets()
.
firstOffset
至于 nextOffset
-- 您可以使用它来确定您何时从处理历史数据转向处理新数据。如果 msg.offset == nextOffset
那么您正在处理分区内的第一条非历史记录。
现在注意事项,直接from the documentation:
- 一旦上下文启动,就不能进行新的流计算 设置或添加到它。
- 上下文一旦停止,就不能 重新启动。
- 在一个 JVM 中只能有一个 StreamingContext 处于活动状态 同时.
- StreamingContext 上的 stop() 也会停止 SparkContext。到 仅停止 StreamingContext,设置 stop() 的可选参数 将 stopSparkContext 调用为 false。
- SparkContext 可以重新用于 创建多个 StreamingContexts,只要上一个 StreamingContext 停止(不停止 SparkContext) 在创建下一个 StreamingContext 之前。
由于这些注意事项,我在 firstOffset
的同时抓取了 nextOffset
-- 所以我可以保持流向上,但将上下文从历史处理更改为当前处理。