过滤kafka消息的最佳方法应该是什么

What should be the best way to filter the kafka message

我正在使用来自包含区号的 kafka 主题的数据。我必须仅为某些区号过滤数据。谁能建议解决这个问题的最佳方法。

这是我的侦听器代码。最好的做法是将数据解析为对象(因为我将有效负载映射到 TEST 对象)并根据我需要过滤的值过滤数据,或者 kafka 是否提供了我可以使用此过滤过程的任何其他库.

Kafka 监听器方法

@Service
public class Listener{

    @KafkaListener(topics = "#{@topicName}")
        public void listen(String payload) throws IOException {

            LOGGER.info("received payload from topic='{}'", payload);
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            TEST test = objectMapper.readValue(payload,TEST.class);

        }
}

我的卡夫卡配置class:

@Configuration
public class Config {


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, applicationConfiguration.getKafkaBootStrap());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, applicationConfiguration.getKafkaValueDeserializer());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, applicationConfiguration.getKafkaGroupId());
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, applicationConfiguration.getKafkaAutoOffsetReset());
        return properties;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
    @Bean
    public Listener receiver() {
        return new Listener();
    }

}

你做的很好
如果您的负载除了区号之外还有很多数据并且您担心解析时间过长,您可以在将区号添加为 header 进行整个解析之前过滤消息以测试 object。

更高版本的 Kafka(0.11 之后)提供自定义 headers (KIP-82)

如果你想自己实现它(或者如果你使用旧版本的 Kafka),你可以将 header 添加到你的消息负载中,假设是消息的前 4 个字节,它们将代表区号,可以在解析过程之前非常快速地提取出来。
新消息负载:

([header-4-bytes],[original-payload-n-bytes])

因此,根据 header 创建您的过滤器,如果您发现这是您需要的区号,请根据消息的其余部分创建您的测试 object(删除第一个4 个字节删除 header)。

Kafka 不提供任何可以帮助您的过滤选项,尽管它能够在您的 Producer 中发送键控消息,因此如果您的键是区号,Kafka 保证所有具有相同区号的消息都会发送到相同的分区,如果使用得当,可能会对你的性能有所帮助。
生产者还可以将消息发送到特定分区,因此如果您知道您有固定的区号,您还可以定义分区号等于唯一区号计数的主题,并将每个区号发送到不同的分区,然后使用您的消费者仅访问具有您要查找的区号的分区,但在大多数情况下可能有点矫枉过正。

参见 Filtering Messages

The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter class, which can wrap your MessageListener. This class takes an implementation of RecordFilterStrategy in which you implement the filter method to signal that a message is a duplicate and should be discarded. This has an additional property called ackDiscarded, which indicates whether the adapter should acknowledge the discarded record. It is false by default.

When you use @KafkaListener, set the RecordFilterStrategy (and optionally ackDiscarded) on the container factory so that the listener is wrapped in the appropriate filtering adapter.

/**
 * Set the record filter strategy.
 * @param recordFilterStrategy the strategy.
 */
public void setRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) {
    this.recordFilterStrategy = recordFilterStrategy;
}
/**
 * Implementations of this interface can signal that a record about
 * to be delivered to a message listener should be discarded instead
 * of being delivered.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 *
 */
public interface RecordFilterStrategy<K, V> {

    /**
     * Return true if the record should be discarded.
     * @param consumerRecord the record.
     * @return true to discard.
     */
    boolean filter(ConsumerRecord<K, V> consumerRecord);

}