Kafka Consumer 和 Spark-Kafka-Consumer 的区别
Difference between Kafka Consumer and Spark-Kafka-Consumer
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在在消费者方面,我有两个选择。
1.使用 KafkaConsumer - 下面是 kafkaConsumer 的代码,它从主题中读取数据并且工作正常。
@EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
private PolicyExecutor policyExecutor;
public RawEventKafkaConsumer() {
policyExecutor = new PolicyExecutor();
}
@Value("${spring.kafka.topic}")
private String rawEventTopicName;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootStrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Bean
public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean(name="kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
logger.info("kafkaListenerContainerFactory called..");
ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(rawEventConsumer());
return factory;
}
@KafkaListener(topics = "rawEventTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String baseDataModel) {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel;
try {
csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);
//saving the datamodel in elastic search.
//dataModelServiceImpl.save(csvDataModel);
System.out.println("Message received " + csvDataModel.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2。使用 Spark Stream 消费 kafkaTopic 数据 - 代码如下 -
@Service
public class RawEventSparkStreamConsumer {
private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
@Autowired
private JavaStreamingContext streamingContext;
@Autowired
private JavaInputDStream<ConsumerRecord<String, String>> messages;
@PostConstruct
private void sparkRawEventConsumer() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
messages.foreachRDD((rdd) -> {
System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
rdd.foreach(record -> {
System.out.println("Data is comming...." + record);
});
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
});
}
}
consumer kafka consumer和Spark streaming均成功读取topic数据。现在我有一个问题,如果两者都在做同样的事情(从主题中读取数据)那么
- 两者有什么区别?
- 另外我还面临一个问题,类 kafka consume 和 Spark consumer 都在同一个代码库中,所以如果我同时使用两者,那么 kafkaConsumer 代码将无法正常工作。
谢谢。
简短的回答是,与单个 JVM 中的 Kafka Consumer 相比,您需要一个 Spark 集群 运行 以分布式方式编写 Spark 代码,而您 运行手动对同一应用程序的多个实例进行横向扩展。
换句话说,您 运行 他们会有所不同。 spark-submit
对比 java -jar
。我不相信使用 Spring 会改变
另一个区别是 "plain consumer" 对 Kafka 配置有更多的控制,并且一次获得一条记录。 Spark RDD 可以是很多事件,它们必须都是相同的 "schema" 除非你想要复杂的解析逻辑,这比用 ConsumerRecord
值更难用 RDD 对象编写,这些值会为你提取.
总的来说,我认为将它们结合起来不是一个好主意。
如果他们正在阅读同一个主题,那么 Kafka 消费者协议只能为每个分区分配一个消费者......目前还不清楚你的主题有多少个分区,但这可以解释为什么一个会起作用,但不是另一个
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在在消费者方面,我有两个选择。
1.使用 KafkaConsumer - 下面是 kafkaConsumer 的代码,它从主题中读取数据并且工作正常。
@EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
private PolicyExecutor policyExecutor;
public RawEventKafkaConsumer() {
policyExecutor = new PolicyExecutor();
}
@Value("${spring.kafka.topic}")
private String rawEventTopicName;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootStrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Bean
public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean(name="kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
logger.info("kafkaListenerContainerFactory called..");
ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(rawEventConsumer());
return factory;
}
@KafkaListener(topics = "rawEventTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String baseDataModel) {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel;
try {
csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);
//saving the datamodel in elastic search.
//dataModelServiceImpl.save(csvDataModel);
System.out.println("Message received " + csvDataModel.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2。使用 Spark Stream 消费 kafkaTopic 数据 - 代码如下 -
@Service
public class RawEventSparkStreamConsumer {
private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
@Autowired
private JavaStreamingContext streamingContext;
@Autowired
private JavaInputDStream<ConsumerRecord<String, String>> messages;
@PostConstruct
private void sparkRawEventConsumer() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
messages.foreachRDD((rdd) -> {
System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
rdd.foreach(record -> {
System.out.println("Data is comming...." + record);
});
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
});
}
}
consumer kafka consumer和Spark streaming均成功读取topic数据。现在我有一个问题,如果两者都在做同样的事情(从主题中读取数据)那么
- 两者有什么区别?
- 另外我还面临一个问题,类 kafka consume 和 Spark consumer 都在同一个代码库中,所以如果我同时使用两者,那么 kafkaConsumer 代码将无法正常工作。
谢谢。
简短的回答是,与单个 JVM 中的 Kafka Consumer 相比,您需要一个 Spark 集群 运行 以分布式方式编写 Spark 代码,而您 运行手动对同一应用程序的多个实例进行横向扩展。
换句话说,您 运行 他们会有所不同。 spark-submit
对比 java -jar
。我不相信使用 Spring 会改变
另一个区别是 "plain consumer" 对 Kafka 配置有更多的控制,并且一次获得一条记录。 Spark RDD 可以是很多事件,它们必须都是相同的 "schema" 除非你想要复杂的解析逻辑,这比用 ConsumerRecord
值更难用 RDD 对象编写,这些值会为你提取.
总的来说,我认为将它们结合起来不是一个好主意。
如果他们正在阅读同一个主题,那么 Kafka 消费者协议只能为每个分区分配一个消费者......目前还不清楚你的主题有多少个分区,但这可以解释为什么一个会起作用,但不是另一个