了解 Kafka 消息字节大小

Understanding Kafka Message Byte Size

如何获取Kafka中单条记录的大小?

关于我为什么需要这个的一些说明。

这似乎不是 ConsumerRecord 或 RecordMetadata 上公开的 serializedValueSize 类。我真的不明白这个 属性 的价值,因为它与对消费者有用的消息大小不匹配。如果不是这个,serializedValueSize 的用途是什么?

我试图让我的 Kafka java 应用程序表现得像 "min.poll.records" 如果它存在以补充 "max.poll.records"。我必须这样做,因为这是必需的 :)。假设给定主题上的所有消息都具有相同的大小(在这种情况下是正确的),这应该可以从消费者端设置 fetch.min.bytes 等于要批处理的消息数量乘以每个消息的字节大小留言。

这个存在:

https://kafka.apache.org/documentation/#consumerapi

max.poll.records

The maximum number of records returned in a single call to poll().

这不存在,但这是我想要的行为:

min.poll.records

The minimum number of records returned in a single call to poll(). If not enough records are available before the time specified in fetch.max.wait.ms elapses, then the records are returned anyway, and as such, this is not an absolute minimum.

这是我目前的发现:

System.out.println(myRecordMetadata.serializedValueSize());
// 76
# producer
batch.size=1

# consumer

# Expected this to work:
# 76 * 2 = 152
max.partition.fetch.bytes=152

# Actually works:
# 292 = ??? magic ???
max.partition.fetch.bytes=292

我预计将 max.partition.fetch.bytes 设置为 serializedValueSize 给定的字节数的倍数将使 Kafka 消费者从轮询中接收最多该数量的记录。相反,max.partition.fetch.bytes 值需要更高才能发生这种情况。

原回答

我不太熟悉 serializedValueSize 方法,但根据文档,这只是该消息中存储的值的大小。这将小于总消息大小(即使使用 null 键),因为消息还包含不属于值的元数据(例如时间戳)。

至于你的问题:与其通过处理消息大小和限制消费者的吞吐量来直接控制轮询,不如缓冲传入的消息直到有足够的可用消息或所需的超时(你提到 fetch.max.wait.ms 但您可以手动指定一个)已经过去了?

public static <K, V> List<ConsumerRecord<K, V>>
    minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
  List<ConsumerRecord<K, V>> acc = new ArrayList<>();
  long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
  long start = System.nanoTime();
  do {
    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
    for(ConsumerRecord<K, V> record : records)
      acc.add(record);
  } while(acc.size() < minRecords &&
          System.nanoTime() - start < timeout.toNanos());
  return acc;
}

调用 consumer.poll 时的 timeout.toMillis()/10 超时是任意的。您应该选择一个足够小的持续时间,这样即使我们等待的时间长于指定的超时时间(此处:长 10%)也没关系。

编辑:请注意,这可能 return 一个大于 max.poll.records 的列表(最大值为 max.poll.records + minRecords - 1)。如果您还需要强制执行此严格的上限,请使用该方法外部的另一个缓冲区来临时存储多余的记录(这可能会更快但不允许混合使用 minPoll 和普通的 poll 方法)或简单地丢弃它们并使用 consumerseek 方法回溯。

更新问题的答案

因此,问题不在于控制由 poll 方法 return 编辑的消息数量,而在于如何获取单个记录的大小。不幸的是,我认为不经历很多麻烦是不可能的。问题是对此没有真正的(恒定的)答案,甚至一个大概的答案也将取决于 Kafka 版本或不同的 Kafka 协议版本。

首先,我不完全确定 max.partition.fetch.bytes 到底控制了什么(如:协议开销是否也是其中的一部分?)。让我说明一下我的意思:当消费者发送一个获取请求时,获取响应由以下字段组成:

  1. 节流时间(4 字节)
  2. 主题响应数组(数组长度为 4 个字节 + 数组中的数据大小)。

主题响应依次包含

  1. 主题名称(字符串长度 2 个字节 + 字符串大小)
  2. 分区响应数组(数组长度为 4 个字节 + 数组中的数据大小)。

然后分区响应有

  1. 分区 ID(4 字节)
  2. 错误代码(2 字节)
  3. 高水印(8 字节)
  4. 最后稳定偏移量(8 字节)
  5. 日志起始偏移量(8 字节)
  6. 中止事务数组(数组长度为 4 个字节 + 数组中的数据)
  7. 记录集。

所有这些都可以在 FetchResponse.java file. A record set in turn consists of record batches, which contain records. I'm not gonna list all that comprises a record batch (you can see it here) 中找到。只要说开销是 61 字节就足够了。最后,批处理中单个记录的大小有点棘手,因为它使用了 varint 和 varlong 字段。它包含

  1. Body 大小(1-5 字节)
  2. 属性(1 字节)
  3. 时间戳增量(1-10 字节)
  4. 偏移增量(1-5 字节)
  5. 关键字节数组(1-5字节+关键数据大小)
  6. 值字节数组(1-5 个字节 + 值数据大小)
  7. Headers(1-5 字节 + headers 数据大小)。

源代码是 here。如您所见,您不能简单地将 292 字节除以二来获得记录大小,因为一些开销是恒定的并且与记录数无关。

更糟糕的是,记录没有固定大小,即使它们的键和值(和 headers)是固定的,因为时间戳和偏移量存储为与批处理时间戳和偏移量的差异,使用可变长度数据类型。此外,这只是在撰写本文时最新协议版本的情况。对于旧版本,答案将再次不同,谁知道未来版本会发生什么。