无法从 kafka 主题中轮询/获取所有记录
Not able to poll / fetch all records from kafka topic
我正在尝试从特定主题轮询数据,例如 kafka 正在接收 100 records/s
但大多数时候它不会获取所有记录。
我将超时设置为 5000 毫秒,我每隔 100ms
调用此方法
注意:我也在订阅特定主题
@Scheduled(fixedDelayString = "100")
public void pollRecords() {
ConsumerRecords<String, String> records =
leadConsumer.poll("5000");
如何从 kafka 获取所有数据?
从 poll() 返回的最大记录数由 max.poll.records
消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制从服务器返回的最大数据量。 (fetch.max.bytes
和 max.partition.fetch.bytes
)
另一方面,在代理端还有另一个大小限制,称为 message.max.bytes
。
所以您应该正确设置这些参数以获取更多消息。
来自 Kafka 文档 (link):
max.poll.records: The maximum number of records returned in a single
call to poll(). (default: 500)
fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the
consumer, and if the first record batch in the first non-empty
partition of the fetch is larger than this value, the record batch
will still be returned to ensure that the consumer can make progress.
As such, this is not a absolute maximum. The maximum record batch size
accepted by the broker is defined via message.max.bytes (broker
config) or max.message.bytes (topic config). Note that the consumer
performs multiple fetches in parallel. (default:52428800)
message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the
consumers' fetch size must also be increased so that the they can
fetch record batches this large. In the latest message format version,
records are always grouped into batches for efficiency. In previous
message format versions, uncompressed records are not grouped into
batches and this limit only applies to a single record in that
case.This can be set per topic with the topic level max.message.bytes
config. (default: 1000012)
max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches
by the consumer. If the first record batch in the first non-empty
partition of the fetch is larger than this limit, the batch will still
be returned to ensure that the consumer can make progress. The maximum
record batch size accepted by the broker is defined via
message.max.bytes (broker config) or max.message.bytes (topic config).
See fetch.max.bytes for limiting the consumer request size. (default: 1048576)
我正在尝试从特定主题轮询数据,例如 kafka 正在接收 100 records/s
但大多数时候它不会获取所有记录。
我将超时设置为 5000 毫秒,我每隔 100ms
调用此方法
注意:我也在订阅特定主题
@Scheduled(fixedDelayString = "100")
public void pollRecords() {
ConsumerRecords<String, String> records =
leadConsumer.poll("5000");
如何从 kafka 获取所有数据?
从 poll() 返回的最大记录数由 max.poll.records
消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制从服务器返回的最大数据量。 (fetch.max.bytes
和 max.partition.fetch.bytes
)
另一方面,在代理端还有另一个大小限制,称为 message.max.bytes
。
所以您应该正确设置这些参数以获取更多消息。
来自 Kafka 文档 (link):
max.poll.records: The maximum number of records returned in a single call to poll(). (default: 500)
fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)
message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)
max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)