如何使用 Java 在单个循环中从 Kafka 读取多条记录

How to read multiple records from Kafka in a single cycle using Java

我必须创建一个 Kafka topics 的消费者,它不断地监听并将数据推送到 Database

这里的要求是:- 如果您碰巧在一个循环中从 Kafka 读取了多个记录,请尝试将其作为单个调用而不是多个推送到数据库中。

public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,
        String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS,
        String bufferMemory) throws Exception {

    ObjectMapper mapper = new ObjectMapper();

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
            getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit));

    consumer.subscribe(Arrays.asList(topicName));
    logger.info("subscibed to the topic {}", topicName);
    cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build();
    session = cluster.connect(KEYSPACE);

    try {

        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {

                    Model model= mapper.readValue(record.value(), Model.class);

                try {
                        boolean flag = insertIntoDB(session, model);
                        if (flag) {
                            logger.info("************ Data Persisted Successfully ***************");
                        } else {
                            logger.info("******* Data Persition Failed *************");
                        }
                    } catch (Exception ex) {
                        logger.error("Exception while persisting data into DB", ex);
                    }
                }
            } catch (Exception ex) {
                logger.error("Exception while reading data from kafka", ex);
            }
        }
    } finally {
        consumer.close();
    }
}

Mysql INSERT 支持一次插入多行。像这样:

INSERT INTO tbl_name (a,b,c) VALUES(1,2,3),(4,5,6),(7,8,9);

所以你可以先将记录保存到一个数组中,当数组大小等于BATCH_SIZE时,你可以将它传递给你的insertIntoDb方法。然后清空数组,继续循环

您还可以将一次投票中的所有消息放入数组中,然后将其传递给 insertIntoDb。

但是如果消息数太大,Mysql 会抱怨数据包太大,所以在这种情况下使用指定的 BATCH_SIZE 会更好。

您还可以为消费者指定 "max.poll.records" 配置以限制一次投票中的消息数。

Cassandra 中的类似内容:

PreparedStatement ps = session.prepare("INSERT INTO messages (user_id,msg_id, title, body) VALUES (?, ?, ?, ?)");
BatchStatement batch = new BatchStatement();
batch.add(ps.bind(uid, mid1, title1, body1));
batch.add(ps.bind(uid, mid2, title2, body2));
batch.add(ps.bind(uid, mid3, title3, body3));
session.execute(batch);