Jmeter监控Kafka Topic
Monitoring Kafka Topic by Jmeter
我正在尝试通过 jmeter 监控 Kafka 主题。我通过外部工具向 kafka 主题发送了 100 条消息,并且消息在我检查消费者 window 时完美传递。现在想用jmeter监控kafka主题。我研究了使用 kafkameter 并查看了一些有关使用 kafkameter 进行负载测试的教程。但是在我在网上找到的所有资源中,都展示了如何将消息发送到 Kafka 主题,然后监控它,这是我不想做的。我想在从外部工具向该主题发送消息时监视该主题。谁能帮我解决这个问题?
我不知道你所说的 "monitoring" 是什么意思,如果你想接收来自给定 Kafka 主题的消息 - 你可以使用 JSR223 Sampler and Kafka Consumer API 来阅读消息并做任何你想做的事情和他们一起做。
- 将提供 Kafka Consumer API 的库下载到 JMeter Classpath
- 重新启动 JMeter 以选择库
将 JSR223 采样器添加到您的线程组,并放置将与 "interesting" Kafka 主题建立连接的代码,这里是一个简单的示例代码用于传入消息并将它们打印到 jmeter.log 文件
props.put('bootstrap.servers', '192.168.99.100:9092')
props.put('group.id', 'foo')
props.put('enable.auto.commit', 'true')
props.put('auto.commit.interval.ms', '1000')
props.put('session.timeout.ms', '30000')
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer')
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer')
def consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props)
def topic = 'sometopic'
consumer.subscribe(Arrays.asList(topic))
log.info('Subscribed to topic ' + topic)
while (true) {
def records = consumer.poll(100)
records.each { record ->
log.info('Received message: ' + record.value())
}
}
演示:
我正在尝试通过 jmeter 监控 Kafka 主题。我通过外部工具向 kafka 主题发送了 100 条消息,并且消息在我检查消费者 window 时完美传递。现在想用jmeter监控kafka主题。我研究了使用 kafkameter 并查看了一些有关使用 kafkameter 进行负载测试的教程。但是在我在网上找到的所有资源中,都展示了如何将消息发送到 Kafka 主题,然后监控它,这是我不想做的。我想在从外部工具向该主题发送消息时监视该主题。谁能帮我解决这个问题?
我不知道你所说的 "monitoring" 是什么意思,如果你想接收来自给定 Kafka 主题的消息 - 你可以使用 JSR223 Sampler and Kafka Consumer API 来阅读消息并做任何你想做的事情和他们一起做。
- 将提供 Kafka Consumer API 的库下载到 JMeter Classpath
- 重新启动 JMeter 以选择库
将 JSR223 采样器添加到您的线程组,并放置将与 "interesting" Kafka 主题建立连接的代码,这里是一个简单的示例代码用于传入消息并将它们打印到 jmeter.log 文件
props.put('bootstrap.servers', '192.168.99.100:9092') props.put('group.id', 'foo') props.put('enable.auto.commit', 'true') props.put('auto.commit.interval.ms', '1000') props.put('session.timeout.ms', '30000') props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer') def consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props) def topic = 'sometopic' consumer.subscribe(Arrays.asList(topic)) log.info('Subscribed to topic ' + topic) while (true) { def records = consumer.poll(100) records.each { record -> log.info('Received message: ' + record.value()) } }
演示: