在 Kafka Streams DSL 中通过内部连接获取记录密钥
Get record key with inner join in Kafka Streams DSL
有没有办法从 Kafka Stream DSL 连接的 join
部分传入或获取消息密钥?
我现在有这样的东西:
KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
join(
inputEndKStream,
(leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
JoinWindows.of(windowDuration),
Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
);
但是,传递给 customLambda
的 leftValue
和 rightValue
记录无法访问 kafka 消息密钥,因为那是一个单独的字符串。他们拥有的唯一内容是消息本身,而不是密钥。
有没有办法从 join lambda 内部获取密钥?我可以做的一件事是简单地将消息密钥添加为消息本身的一部分,然后将其作为常规字段访问,但我想知道框架是否提供了直接访问它的方法?
大多数时候key在记录的值中也是可用的,你的应用不是这样吗?
看起来 ValueJoiner
接口有一个 improvement filed as part of KIP-149,但还没有像 KIP 中的其他方法那样完成:ValueTransformer
和 ValueMapper
。
您可以在加入之前添加一个步骤来提取密钥并将其包含在您的消息的值中,然后再使用 ValueMapperWithKey
进行加入。
有没有办法从 Kafka Stream DSL 连接的 join
部分传入或获取消息密钥?
我现在有这样的东西:
KStream<String, GenericRecord> completedEventsStream = inputStartKStream.
join(
inputEndKStream,
(leftValue, rightValue) -> customLambda((Record) leftValue, (Record) rightValue),
JoinWindows.of(windowDuration),
Joined.with(stringSerde, genericAvroSerde, genericAvroSerde)
);
但是,传递给 customLambda
的 leftValue
和 rightValue
记录无法访问 kafka 消息密钥,因为那是一个单独的字符串。他们拥有的唯一内容是消息本身,而不是密钥。
有没有办法从 join lambda 内部获取密钥?我可以做的一件事是简单地将消息密钥添加为消息本身的一部分,然后将其作为常规字段访问,但我想知道框架是否提供了直接访问它的方法?
大多数时候key在记录的值中也是可用的,你的应用不是这样吗?
看起来 ValueJoiner
接口有一个 improvement filed as part of KIP-149,但还没有像 KIP 中的其他方法那样完成:ValueTransformer
和 ValueMapper
。
您可以在加入之前添加一个步骤来提取密钥并将其包含在您的消息的值中,然后再使用 ValueMapperWithKey
进行加入。