如何使用 Java 在单个循环中从 Kafka 读取多条记录
How to read multiple records from Kafka in a single cycle using Java
我必须创建一个 Kafka topics
的消费者,它不断地监听并将数据推送到 Database
。
这里的要求是:-
如果您碰巧在一个循环中从 Kafka 读取了多个记录,请尝试将其作为单个调用而不是多个推送到数据库中。
public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,
String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS,
String bufferMemory) throws Exception {
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit));
consumer.subscribe(Arrays.asList(topicName));
logger.info("subscibed to the topic {}", topicName);
cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build();
session = cluster.connect(KEYSPACE);
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
Model model= mapper.readValue(record.value(), Model.class);
try {
boolean flag = insertIntoDB(session, model);
if (flag) {
logger.info("************ Data Persisted Successfully ***************");
} else {
logger.info("******* Data Persition Failed *************");
}
} catch (Exception ex) {
logger.error("Exception while persisting data into DB", ex);
}
}
} catch (Exception ex) {
logger.error("Exception while reading data from kafka", ex);
}
}
} finally {
consumer.close();
}
}
Mysql INSERT 支持一次插入多行。像这样:
INSERT INTO tbl_name (a,b,c) VALUES(1,2,3),(4,5,6),(7,8,9);
所以你可以先将记录保存到一个数组中,当数组大小等于BATCH_SIZE时,你可以将它传递给你的insertIntoDb
方法。然后清空数组,继续循环
您还可以将一次投票中的所有消息放入数组中,然后将其传递给 insertIntoDb。
但是如果消息数太大,Mysql 会抱怨数据包太大,所以在这种情况下使用指定的 BATCH_SIZE 会更好。
您还可以为消费者指定 "max.poll.records" 配置以限制一次投票中的消息数。
Cassandra 中的类似内容:
PreparedStatement ps = session.prepare("INSERT INTO messages (user_id,msg_id, title, body) VALUES (?, ?, ?, ?)");
BatchStatement batch = new BatchStatement();
batch.add(ps.bind(uid, mid1, title1, body1));
batch.add(ps.bind(uid, mid2, title2, body2));
batch.add(ps.bind(uid, mid3, title3, body3));
session.execute(batch);
我必须创建一个 Kafka topics
的消费者,它不断地监听并将数据推送到 Database
。
这里的要求是:- 如果您碰巧在一个循环中从 Kafka 读取了多个记录,请尝试将其作为单个调用而不是多个推送到数据库中。
public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,
String enableAutoCommit, String kafkaServers, String acks, String retries, String lingerMS,
String bufferMemory) throws Exception {
ObjectMapper mapper = new ObjectMapper();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
getKafkaParams(groupId, kafkaServers, autoOffsetReset, enableAutoCommit));
consumer.subscribe(Arrays.asList(topicName));
logger.info("subscibed to the topic {}", topicName);
cluster = Cluster.builder().addContactPoints(CASSANDRA_IPS.split(",")).build();
session = cluster.connect(KEYSPACE);
try {
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
Model model= mapper.readValue(record.value(), Model.class);
try {
boolean flag = insertIntoDB(session, model);
if (flag) {
logger.info("************ Data Persisted Successfully ***************");
} else {
logger.info("******* Data Persition Failed *************");
}
} catch (Exception ex) {
logger.error("Exception while persisting data into DB", ex);
}
}
} catch (Exception ex) {
logger.error("Exception while reading data from kafka", ex);
}
}
} finally {
consumer.close();
}
}
Mysql INSERT 支持一次插入多行。像这样:
INSERT INTO tbl_name (a,b,c) VALUES(1,2,3),(4,5,6),(7,8,9);
所以你可以先将记录保存到一个数组中,当数组大小等于BATCH_SIZE时,你可以将它传递给你的insertIntoDb
方法。然后清空数组,继续循环
您还可以将一次投票中的所有消息放入数组中,然后将其传递给 insertIntoDb。
但是如果消息数太大,Mysql 会抱怨数据包太大,所以在这种情况下使用指定的 BATCH_SIZE 会更好。
您还可以为消费者指定 "max.poll.records" 配置以限制一次投票中的消息数。
Cassandra 中的类似内容:
PreparedStatement ps = session.prepare("INSERT INTO messages (user_id,msg_id, title, body) VALUES (?, ?, ?, ?)");
BatchStatement batch = new BatchStatement();
batch.add(ps.bind(uid, mid1, title1, body1));
batch.add(ps.bind(uid, mid2, title2, body2));
batch.add(ps.bind(uid, mid3, title3, body3));
session.execute(batch);