使用 Kafka 在 运行 长的 Spark 作业之间进行通信

Using Kafka to communicate between long running Spark jobs

我是 Apache Spark 的新手,需要在我的 Spark 集群上同时 运行 几个长期 运行ning 进程(作业)。通常,这些单独的进程(每个进程都是自己的工作)需要相互通信。暂时,我正在考虑使用 Kafka 作为这些进程之间的代理。所以高层次的工作对工作沟通看起来像:

  1. 作业 #1 执行一些工作并将消息发布到 Kafka 主题
  2. 作业 #2 被设置为同一 Kafka 主题的流式接收器(使用 StreamingContext),一旦消息发布到主题,作业 #2 就会使用它
  3. 作业 #2 现在可以根据它消耗的消息做一些工作

据我所知,流上下文正在阻止 Spark Driver 节点上 运行 的侦听器。这意味着一旦我像这样启动流媒体消费者:

def createKafkaStream(ssc: StreamingContext,
        kafkaTopics: String, brokers: String): DStream[(String, 
        String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.collect().foreach { msg =>
            // Now do some work as soon as we receive a messsage from the topic
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

...现在有 2 个含义:

  1. Driver 现在正在阻塞和侦听要从 Kafka 消费的工作;和
  2. 收到工作(消息)后,它们将发送到任何可用的工作节点以实际执行

所以首先,如果我上面所说的任何内容不正确或具有误导性,请先纠正我!假设我或多或少是正确的,那么我只是想知道根据我的标准是否有更具可扩展性或性能的方法来实现这一目标。同样,我的 Spark 节点上有两个长期 运行nning 作业(作业 #1 和作业 #2)运行ning,其中一个需要能够 'send work to'另一个。有什么想法吗?

From what I can tell, streaming contexts are blocking listeners that run on the Spark Driver node.

A StreamingContext(单数)不是阻塞侦听器。它的工作是为您的流媒体作业创建执行图。

当您开始从 Kafka 读取数据时,您指定要每 10 秒获取一次新记录。从现在开始会发生什么取决于您为 Kafka 使用哪种 Kafka 抽象,通过 KafkaUtils.createStream 的 Receiver 方法或通过 KafkaUtils.createDirectStream.

的 Receiver-less 方法

一般来说,在这两种方法中,数据都是从 Kafka 使用,然后分派给每个 Spark worker 以 并行.

处理

then I'm simply wondering if there is a more scalable or performant way to accomplish this

这种方法具有高度可扩展性。当使用 receiver-less 方法时,每个 Kafka 分区映射到给定 RDD 中的一个 Spark 分区。您可以通过增加 Kafka 中的分区数量或重新分区 Spark 中的数据(使用 DStream.repartition)来提高并行度。我建议测试此设置以确定它是否符合您的性能要求。