如何从通过 StreamListener 接收到的对象创建 KTable?
How to create a KTable from an Object received through StreamListener?
我想创建一个 KTable(将来会是一个 GlobalKTable)来跟踪使用 Spring Cloud Streams 通过我的 Kafka 代理的条目。
我现在拥有的代码从侦听器的角度来看应该可以正常工作,因为它接收通过 Kafka 传递的数据并提取我创建注册表所需的两个 ID。
最终的目标(简而言之)是在系统中创建一个位置,以便我们可以检查是否正在为该组合(userId 和 appId)处理数据
// Peek on myBindings interface
public interface myBindings {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
MessageChannel input();
@Output(OUTPUT)
MessageChannel output();
}
//Peek on the service
@StreamListener(target = myBindings.INPUT)
public void listenforMessage(@Payload String jsonAsString) throws IOException {
ObjectNode node = new ObjectMapper().readValue(jsonAsString, ObjectNode.class);
KeyValue kvPair = new KeyValue<>(node.get(Info.USER_ID_KEY), node.get(Info.APP_ID_KEY));
// Here comes the question...
}
检查(或打印)该对后,我们发现数据是正确的。那么问题就是:
如何从中构建 KTable,因为我没有使用 KStreams 并且我的输入对象不同?
我在网上找到的所有示例都使用 KStream/KTables 作为方法的入口点/参数,但是没有使用其他类型入口的示例。
Kafka 新手/Spring 云流。感谢您的帮助!
Kafka Streams 的模型(以及在 Spring Cloud Stream 活页夹中使用的模型)是您需要从 Kafka 主题接收记录并将其作为 KStream
使用, KTable
或 GlobalKTable
。目前,常规 Kafka 绑定器(使用 vanilla Kafka producer/consumer)和 Kafka Streams 绑定器之间没有互操作性,因为这两个绑定器的目标类型完全不同。您可以使用两种单独的 StreamListener
方法,其中一种是普通 Kafka 消费者(使用 Kafka 活页夹),另一种是基于 Kafka Streams 活页夹的消费者。至于您描述的具体用例,我认为您可以将其作为普通的 Kafka Streams 应用程序来处理。
// Peek on myBindings interface
public interface MyBindings {
@Input("input)
KStream<?,? input();
}
@StreamListener
public void listenforMessage(@Input("input") KStream<?, String> input) throws IOException {
input.... //operations on input KStream here.
}
您可以使用 KStream
上可用的函数操作将数据具体化为 KTable
。
我想创建一个 KTable(将来会是一个 GlobalKTable)来跟踪使用 Spring Cloud Streams 通过我的 Kafka 代理的条目。
我现在拥有的代码从侦听器的角度来看应该可以正常工作,因为它接收通过 Kafka 传递的数据并提取我创建注册表所需的两个 ID。
最终的目标(简而言之)是在系统中创建一个位置,以便我们可以检查是否正在为该组合(userId 和 appId)处理数据
// Peek on myBindings interface
public interface myBindings {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
MessageChannel input();
@Output(OUTPUT)
MessageChannel output();
}
//Peek on the service
@StreamListener(target = myBindings.INPUT)
public void listenforMessage(@Payload String jsonAsString) throws IOException {
ObjectNode node = new ObjectMapper().readValue(jsonAsString, ObjectNode.class);
KeyValue kvPair = new KeyValue<>(node.get(Info.USER_ID_KEY), node.get(Info.APP_ID_KEY));
// Here comes the question...
}
检查(或打印)该对后,我们发现数据是正确的。那么问题就是:
如何从中构建 KTable,因为我没有使用 KStreams 并且我的输入对象不同?
我在网上找到的所有示例都使用 KStream/KTables 作为方法的入口点/参数,但是没有使用其他类型入口的示例。
Kafka 新手/Spring 云流。感谢您的帮助!
Kafka Streams 的模型(以及在 Spring Cloud Stream 活页夹中使用的模型)是您需要从 Kafka 主题接收记录并将其作为 KStream
使用, KTable
或 GlobalKTable
。目前,常规 Kafka 绑定器(使用 vanilla Kafka producer/consumer)和 Kafka Streams 绑定器之间没有互操作性,因为这两个绑定器的目标类型完全不同。您可以使用两种单独的 StreamListener
方法,其中一种是普通 Kafka 消费者(使用 Kafka 活页夹),另一种是基于 Kafka Streams 活页夹的消费者。至于您描述的具体用例,我认为您可以将其作为普通的 Kafka Streams 应用程序来处理。
// Peek on myBindings interface
public interface MyBindings {
@Input("input)
KStream<?,? input();
}
@StreamListener
public void listenforMessage(@Input("input") KStream<?, String> input) throws IOException {
input.... //operations on input KStream here.
}
您可以使用 KStream
上可用的函数操作将数据具体化为 KTable
。