2 个具有相同消费者组 ID 的 Spark Stream 作业

2 spark stream job with same consumer group id

我正在尝试对消费者群体进行试验

这是我的代码片段

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("auto.commit.interval.ms","1000");
    kafkaParams.put("security.protocol","SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name","kafka");
    kafkaParams.put("retries","3");
    kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
    kafkaParams.put("request.timeout.ms","210000");
    kafkaParams.put("session.timeout.ms","180000");
    kafkaParams.put("heartbeat.interval.ms","3000");
    Collection<String> topics = Arrays.asList("venkat4");

    SparkConf conf = new SparkConf();
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                    return new Tuple2<>(record.key(), record.value());
                }
            }).print();


    ssc.start();
    ssc.awaitTermination();


}

}

当我 运行 两个这个 spark streaming 作业并发时它失败并出现错误

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition venkat4-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289)

根据此 https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html 创建具有同一组的单独的 kafka 消费者实例将创建分区的重新平衡。我相信消费者不能容忍重新平衡。我该如何解决这个问题

下面是使用的命令

SPARK_KAFKA_VERSION=0.10 spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf#jaas.conf,hive.keytab#hive.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --class Streaming.App --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf spark.streaming.kafka.consumer.cache.enabled=false 1-1.0-SNAPSHOT.jar

Per this https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html creation of separate instance of kafka consumer with same group will create a rebalance of partitions. I believe the rebalance is not being tolerated by the consumer. How should I fix this

现在所有分区只被一个消费者消费。如果数据摄取率很高,消费者可能会以摄取速度缓慢地消费数据。

将更多的消费者添加到同一个消费者组以消费来自主题的数据并提高消费率。使用这种方法的 Spark 流 1:1 Kafka 分区和 Spark 分区之间的并行性。 Spark 将在内部处理它。

如果消费者的数量多于主题分区的数量,它将处于空闲状态并且资源未得到充分利用。始终建议消费者应小于或等于分区计数。

Kafka will re-balance, if more processes/threads are added. The ZooKeeper can be reconfigured by Kafka cluster, if any consumer or broker fails to send heartbeat to ZooKeeper.

Kafka 在任何代理发生故障或向现有主题添加新分区时重新平衡分区存储。这是 kafka 具体如何在代理中跨分区平衡数据。

Spark 流在 Kafka 分区和 Spark 分区之间提供简单的 1:1 并行。如果您没有使用 ConsumerStragies.Assign 提供任何分区详细信息,则使用给定主题的所有分区。

Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group. Kafka guarantees that a message is only ever read by a single consumer in the group.

当您启动第二个 Spark Streaming 作业时,另一个消费者尝试使用来自同一消费者组 ID 的同一分区。所以它抛出错误。

val alertTopics = Array("testtopic")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> sparkJobConfig.kafkaConsumerGroup,
  "auto.offset.reset" -> "latest"
)

val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))

val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))

如果要使用分区特定的 spark 作业,请使用以下代码。

val topicPartitionsList =  List(new TopicPartition("topic",1))

val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

Consumers can join a group by using the samegroup.id.

val topicPartitionsList =  List(new TopicPartition("topic",3), new TopicPartition("topic",4))

    val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

再添加两个消费者就是添加到同一个groupid。

请阅读Spark-Kafka集成指南。 https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

希望对您有所帮助。

@Ravikumar 对延迟表示歉意。

我的测试是这样完成的

一个。我的主题有 3 个分区 b. spark-streaming 作业由 2 个执行程序启动——运行良好。 C。后来我决定通过 运行 另一个带有 1 个执行程序的 spark-streaming 作业将其扩展到另一个实例,以匹配我失败的第 3 个分区。

关于您的声明: 当您启动第二个 Spark Streaming 作业时,另一个消费者尝试使用来自同一消费者组 ID 的同一分区。所以它抛出错误 是的,这是完全正确的。但为什么不能容忍就是个问题

引用您突出显示的文档:

Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group. Kafka guarantees that a message is only ever read by a single consumer in the group. Kafka rebalance the partitions storage whenever any broker failure or adding new partition to the existing topic. This is kafka specific how to balance the data across partitions in the brokers. Kafka will re-balance, if more processes/threads are added. The ZooKeeper can be reconfigured by Kafka cluster, if any consumer or broker fails to send heartbeat to ZooKeeper.

这也是我对 spark-streaming 工作的期望。我尝试使用能够容忍重新平衡的普通卡夫卡客户。

您在文档 "The cache is keyed by topicpartition and group.id, so use a separate group.id for each call to createDirectStream" 中的观点澄清了我的问题。

另外来自 PR https://github.com/apache/spark/pull/21038 -- 提到以下内容

"Kafka partitions can be revoked when new consumers joined in the consumer group to rebalance the partitions. But current Spark Kafka connector code makes sure there's no partition revoking scenarios, so trying to get latest offset from revoked partitions will throw exceptions as JIRA mentioned."

很高兴关闭此线程。非常感谢您的回复