如何从 KStream 获取 KafkaStream 的状态

How to get state of KafkaStream from a KStream

我有一个应用程序使用 mysql binlog 生成的数据流。这是我代码的相关部分。

KStream<GenericRecord,mysql.x.Envelope> xStream = builder.stream(sourceTopicName,
                    Consumed.with(XSerde.getGenericKeySerde(), XSerde.getEnvelopeSerde()));

出于某些健康检查目的,我需要能够获取 KafkaStream 的状态。是运行,等待关机... 我找不到 KafkaStream 和 KStream 之间的任何关系。

您在构建 StreamsBuilder 并使用它启动应用程序后获得它。健康是针对整个应用程序,而不是单个流,换句话说,一个 KafkaStreams 实例负责监视一个或多个 KStream,或 KTable

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);