完整的 Kafka 流缓存作为内部操作会导致什么?
What does a full Kafka stream cache cause as internal operation?
设置缓存时我们设置了大小和提交间隔?我理解的是commit interval过了就调用commit,但是缓存满了触发什么操作。它是否也会触发提交,导致 kafka 流应用程序在其指标中将其重新编码为提交操作?或者它只是导致前向操作驱逐最旧的记录?
我的目标是能够监控我的 kafka 流应用程序并了解我看到的指标?
kafka 流缓存(记录缓存)用于内部缓存和压缩您使用 StreamsBuilder.table()
或 StreamsBuilder#globalTable()
创建的 KTable 的输出记录,以及作为 aggregate
。它在将 KTable 的输出记录写入底层状态存储 (RocksDb) 和下游处理器之前缓冲它们。
处理器 API 在写入状态存储之前使用此缓存缓冲输出记录,但不用于下游处理器。
但是缓存满了会触发什么操作?
当记录缓存已满(设置 cache.max.bytes.buffering
)时,缓冲区会将一些输出记录(默认为 LRU 缓存,因此一些最旧的输出记录)刷新到底层状态存储和下游处理器。可以查看一个可视化的例子here.
是否也触发提交?或者它只是导致前向操作驱逐最旧的记录?
我查看了内部代码,它只刷新最旧的记录缓存,它将输出记录写入状态存储并转发到下游处理器。它不会触发提交,这又会刷新生产者,因此您在状态存储中的记录不会生成到内部 kafka 变更日志主题 util 流线程已提交。
设置缓存时我们设置了大小和提交间隔?我理解的是commit interval过了就调用commit,但是缓存满了触发什么操作。它是否也会触发提交,导致 kafka 流应用程序在其指标中将其重新编码为提交操作?或者它只是导致前向操作驱逐最旧的记录?
我的目标是能够监控我的 kafka 流应用程序并了解我看到的指标?
kafka 流缓存(记录缓存)用于内部缓存和压缩您使用 StreamsBuilder.table()
或 StreamsBuilder#globalTable()
创建的 KTable 的输出记录,以及作为 aggregate
。它在将 KTable 的输出记录写入底层状态存储 (RocksDb) 和下游处理器之前缓冲它们。
处理器 API 在写入状态存储之前使用此缓存缓冲输出记录,但不用于下游处理器。
但是缓存满了会触发什么操作?
当记录缓存已满(设置
cache.max.bytes.buffering
)时,缓冲区会将一些输出记录(默认为 LRU 缓存,因此一些最旧的输出记录)刷新到底层状态存储和下游处理器。可以查看一个可视化的例子here.是否也触发提交?或者它只是导致前向操作驱逐最旧的记录?
我查看了内部代码,它只刷新最旧的记录缓存,它将输出记录写入状态存储并转发到下游处理器。它不会触发提交,这又会刷新生产者,因此您在状态存储中的记录不会生成到内部 kafka 变更日志主题 util 流线程已提交。