Spark 流媒体重播

Spark Streaming Replay

我有一个 Spark Streaming 应用程序来分析来自 Kafka 代理的事件。我有如下规则,可以通过组合现有规则生成新规则:

If this event type occurs raise an alert.
If this event type occurs more than 3 times in a 5-minute interval, raise an alert.

同时,我将每个传入数据保存到 Cassandra。我喜欢做的是 运行 这个用于 Cassandra 历史数据的流式应用程序。例如,

<This rule> would have generated <these> alerts for <last week>.

有没有办法在 Spark 中或在路线图中做到这一点?例如,Apache Flink 有事件时间处理。但是将现有代码库迁移到它似乎很难,我想通过重用现有代码来解决这个问题。

这很简单,但有一些注意事项。首先,它有助于从 Kafka 端了解其工作原理。

Kafka 管理所谓的偏移量——Kafka 中的每条消息都有一个相对于其在分区中的位置的偏移量。 (分区是主题的逻辑划分。)分区中的第一条消息的偏移量为 0L,第二条消息的偏移量为 1L 等等。除此之外,由于日志回滚和可能的主题压缩,0L 并不总是分区中最早的偏移量。

您要做的第一件事是从头开始收集要读取的所有分区的偏移量。这是执行此操作的函数:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
  val time = kafka.api.OffsetRequest.LatestTime
  val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
    (new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
  )
  val req = new kafka.javaapi.OffsetRequest(
    reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
  )
  val resp = consumer.getOffsetsBefore(req)
  val offsets = resp.offsets(topic, partition)
  (offsets(offsets.size - 1), offsets(0))
}

你可以这样称呼它:

val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)

有关从 Kafka 检索偏移量的所有您想知道的信息,read this。至少可以说,这是神秘的。 (例如,当您完全理解 PartitionOffsetRequestInfo 的第二个参数时请告诉我。)

既然您已经有了要查看历史分区的 firstOffsetlastOffset,那么您可以使用 createDirectStreamfromOffset 参数,这是输入:fromOffset: Map[TopicAndPartition, Long]。您可以将 Long / 值设置为从 getOffsets().

获得的 firstOffset

至于 nextOffset -- 您可以使用它来确定您何时从处理历史数据转向处理新数据。如果 msg.offset == nextOffset 那么您正在处理分区内的第一条非历史记录。

现在注意事项,直接from the documentation:

  • 一旦上下文启动,就不能进行新的流计算 设置或添加到它。
  • 上下文一旦停止,就不能 重新启动。
  • 在一个 JVM 中只能有一个 StreamingContext 处于活动状态 同时.
  • StreamingContext 上的 stop() 也会停止 SparkContext。到 仅停止 StreamingContext,设置 stop() 的可选参数 将 stopSparkContext 调用为 false。
  • SparkContext 可以重新用于 创建多个 StreamingContexts,只要上一个 StreamingContext 停止(不停止 SparkContext) 在创建下一个 StreamingContext 之前。

由于这些注意事项,我在 firstOffset 的同时抓取了 nextOffset -- 所以我可以保持流向上,但将上下文从历史处理更改为当前处理。