过滤 Kafka 流
Filter Kafka Streams
我一直在检查 Kafka 流。我一直在测试 Kafka 流的以下代码
生产者主题:(这是第一个生产者主题 - 发送以下 json 数据)
KafkaProducer<String, String> producer = new KafkaProducer<>(
properties);
producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString()));
producer.close();
JSON - 主题制作人:
{"UserID":"1","Address”:”XXX”,”AccountNo":"234234","MemberName”:”Stella”,”AccountType":"Savings"}
Stream主题代码:(这是第二个Streaming代码和主题)
builder.<String,String>stream(topic)
.filter(new Predicate <String, String>() {
@Override
public boolean test(String key, String value) {
// put you processor logic here
System.out.println("value : " + value);
return value.substring(0).equals(“1”);
}
})
.to(streamouttopic);
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
如果 UserID 值为“1”,我想归档,然后将该数据发送到目标流主题。
当我使用“.filter”并打印System.out.println("value : " + value);时,在执行时会抛出以下错误。
Exception in thread "SampleStreamProducer-a6bb543e-bb92-48d0-8d9f-225046722d81-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String
如果我不使用“.filter”并使用像这样的简单代码,builder.stream(topic).to(streamouttopic);
,它工作正常,但没有过滤。但是,我需要使用那个过滤器。
有人可以指导我修复它吗?
默认情况下,Kafka Streams 假定数据类型为 <byte[],byte[]>
,并且 byte[]
无法转换为 String
。
阅读题目时需要指定正确的Serdes
为KStream
:
builder.<String,String>stream(topic, Consumed.with(Serdes.String(), Serdes.String())
.filter(...)
请查看示例并阅读文档:
我一直在检查 Kafka 流。我一直在测试 Kafka 流的以下代码
生产者主题:(这是第一个生产者主题 - 发送以下 json 数据)
KafkaProducer<String, String> producer = new KafkaProducer<>(
properties);
producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString()));
producer.close();
JSON - 主题制作人:
{"UserID":"1","Address”:”XXX”,”AccountNo":"234234","MemberName”:”Stella”,”AccountType":"Savings"}
Stream主题代码:(这是第二个Streaming代码和主题)
builder.<String,String>stream(topic)
.filter(new Predicate <String, String>() {
@Override
public boolean test(String key, String value) {
// put you processor logic here
System.out.println("value : " + value);
return value.substring(0).equals(“1”);
}
})
.to(streamouttopic);
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
如果 UserID 值为“1”,我想归档,然后将该数据发送到目标流主题。
当我使用“.filter”并打印System.out.println("value : " + value);时,在执行时会抛出以下错误。
Exception in thread "SampleStreamProducer-a6bb543e-bb92-48d0-8d9f-225046722d81-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String
如果我不使用“.filter”并使用像这样的简单代码,builder.stream(topic).to(streamouttopic);
,它工作正常,但没有过滤。但是,我需要使用那个过滤器。
有人可以指导我修复它吗?
默认情况下,Kafka Streams 假定数据类型为 <byte[],byte[]>
,并且 byte[]
无法转换为 String
。
阅读题目时需要指定正确的Serdes
为KStream
:
builder.<String,String>stream(topic, Consumed.with(Serdes.String(), Serdes.String())
.filter(...)
请查看示例并阅读文档: