Kafka Streams - 解释 KTable 及其关联的 Store 每 30 秒才更新一次的原因
Kafka Streams - Explain the reason why KTable and its associated Store only get updated every 30 seconds
我有这个生成商店的简单 KTable 定义:
KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
我将消息发布到 ORDERS_TOPIC,但商店直到每 30 秒才真正更新一次。这是日志,其中有关于提交的消息,因为 30000 毫秒时间已经过去:
2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl : task [0_0] Flushing producer
我发现控制这个的属性是commit.interval.ms
:
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
为什么默认设置为 30000 毫秒(听起来很长),将其更改为 10 毫秒有什么影响?
如果我使用 KStream 而不是 KTable...
KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();
...我可以立即看到消息,无需等待那 30000 毫秒,为什么不同?
它与内存管理特别相关,KTable
缓存:http://docs.confluent.io/current/streams/developer-guide.html#memory-management
KTable
实际上一直在更新,如果您使用 "Interactive Queries" 访问底层状态存储,您可以立即获取每个更新。但是,KTable
缓存缓冲更新以减少下游负载,并且每次触发提交时,都需要向下游刷新此缓存以避免在失败时丢失数据。如果您的缓存大小很小,如果某个键从缓存中被逐出,您可能还会看到下游记录。
关于commit interval:commit interval一般会设置一个比较大的值,以减少broker的commit负载。
我有这个生成商店的简单 KTable 定义:
KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
我将消息发布到 ORDERS_TOPIC,但商店直到每 30 秒才真正更新一次。这是日志,其中有关于提交的消息,因为 30000 毫秒时间已经过去:
2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl : task [0_0] Flushing producer
我发现控制这个的属性是commit.interval.ms
:
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
为什么默认设置为 30000 毫秒(听起来很长),将其更改为 10 毫秒有什么影响?
如果我使用 KStream 而不是 KTable...
KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();
...我可以立即看到消息,无需等待那 30000 毫秒,为什么不同?
它与内存管理特别相关,KTable
缓存:http://docs.confluent.io/current/streams/developer-guide.html#memory-management
KTable
实际上一直在更新,如果您使用 "Interactive Queries" 访问底层状态存储,您可以立即获取每个更新。但是,KTable
缓存缓冲更新以减少下游负载,并且每次触发提交时,都需要向下游刷新此缓存以避免在失败时丢失数据。如果您的缓存大小很小,如果某个键从缓存中被逐出,您可能还会看到下游记录。
关于commit interval:commit interval一般会设置一个比较大的值,以减少broker的commit负载。