Spring 云流 Confluent KStream Avro Consume
Spring cloud stream Confluent KStream Avro Consume
我正在尝试使用 spring boot 2.0 将来自 kafka 主题的融合 avro 消息作为 Kstream 使用。
我能够以 MessageChannel
的形式使用消息,但不能以 KStream
的形式使用消息。
@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
异常:
Exception in thread
"pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3"
org.apache.kafka.streams.errors.StreamsException: stream-thread
[pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] Failed to
rebalance. at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to
configure value serde class
io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde at
org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42)
at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350)
at
org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137)
at
org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
... 3 more Caused by: io.confluent.common.config.ConfigException:
Missing required configuration "schema.registry.url" which has no
default value. at
io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at
io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78)
at
io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61)
at
io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32)
at
io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at
io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at
io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
at
org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855)
... 19 more
根据错误,我认为我缺少配置 schema.registry.url
以进行汇合。
我快速浏览了示例 here
关于如何使用 streamListener
对 spring 云流做同样的事情有些迷茫
这需要单独配置吗?或者是否有一种方法可以在 application.yml
本身中配置 schema.registry.url
而 confluent 正在寻找?
这里是代码库 https://github.com/naveenpop/springboot-kstream-confluent
Organization.avsc
{
"namespace":"com.test.demo.avro",
"type":"record",
"name":"Organization",
"fields":[
{
"name":"orgId",
"type":"string",
"default":"null"
},
{
"name":"orgName",
"type":"string",
"default":"null"
},
{
"name":"orgType",
"type":"string",
"default":"null"
},
{
"name":"parentOrgId",
"type":"string",
"default":"null"
}
]
}
DemokstreamApplication.java
@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemokstreamApplication.class, args);
}
@Component
public static class organizationProducer implements ApplicationRunner {
@Autowired
private KafkaProducer kafkaProducer;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Starting: Run method");
List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = names.get(new Random().nextInt(names.size()));
try {
this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
} catch (Exception e) {
log.info("Exception :" +e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
}
}
}
KafkaConfig.java
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
KafkaConsumer.java
@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
}
}
KafkaProducer.java
@EnableBinding(KstreamBinding.class)
public class KafkaProducer {
@Autowired
private KstreamBinding kstreamBinding;
public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {
try {
Organization organization = Organization.newBuilder()
.setOrgId(orgId)
.setOrgName(orgName)
.setOrgType(orgType)
.setParentOrgId(parentOrgId)
.build();
kstreamBinding.organizationOutputMessageChannel()
.send(MessageBuilder.withPayload(organization)
.setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
.build());
} catch (Exception e){
log.error("Failed to produce Organization Message:" +e);
}
}
}
KstreamBinding.java
public interface KstreamBinding {
String ORGANIZATION_INPUT= "organizationInput";
String ORGANIZATION_OUTPUT= "organizationOutput";
@Input(ORGANIZATION_INPUT)
KStream<String, Organization> organizationInputMessageChannel();
@Output(ORGANIZATION_OUTPUT)
MessageChannel organizationOutputMessageChannel();
}
更新 1:
我应用了 dturanski 的建议,错误消失了。但是仍然无法使用消息,因为 KStream<String, Organization>
在控制台中没有错误。
更新二:
采纳了 sobychacko 的建议,并且该消息可使用对象中的空值。
我已经对 GitHub 示例制作了一个 commit 以从 spring 引导本身生成消息并仍然将其作为空值。
感谢您花时间解决这个问题。
尝试spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ...
以下实现不会执行您想要的操作:
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
该日志语句仅在 bootstrap 阶段调用一次。为了使其工作,您需要对收到的 KStream
调用一些操作,然后在那里提供逻辑。例如以下是我在 foreach
方法调用中提供 lambda 表达式的工作。
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
}
您在配置中也有一个问题,您错误地为实际上是 String
的键分配了 avro Serde
。像这样更改它:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
通过这些更改,我每次向主题发送内容时都会收到日志记录语句。但是,您发送 groovy 脚本时出现问题,我没有从您的 Organization
域中获取任何实际数据,但我会让您解决这个问题。
有关空 Organization
域对象问题的更新
发生这种情况是因为您正在使用混合模式的序列化策略。您在生产者端使用 Spring Cloud Stream 的 avro 消息转换器,但在 Kafka Streams 处理器上使用 Confluent avro Serdes。我刚刚尝试使用 Confluent 的序列化程序,从生产者到处理器,我能够在出站上看到 Organization
域。这里修改配置,使序列化一致。
spring:
application:
name: kstream
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:8081
schema:
avro:
schema-locations: classpath:avro/Organization.avsc
bindings:
organizationInput:
destination: organization-updates
group: demokstream.org
consumer:
useNativeDecoding: true
organizationOutput:
destination: organization-updates
producer:
useNativeEncoding: true
kafka:
bindings:
organizationOutput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
streams:
binder:
brokers: localhost
configuration:
schema.registry.url: http://localhost:8081
commit:
interval:
ms: 1000
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
您还可以从主应用程序 class 中删除 KafkaConfig
class 以及 EnableSchemaRegistryClient
注释。
我正在尝试使用 spring boot 2.0 将来自 kafka 主题的融合 avro 消息作为 Kstream 使用。
我能够以 MessageChannel
的形式使用消息,但不能以 KStream
的形式使用消息。
@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
异常:
Exception in thread "pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3" org.apache.kafka.streams.errors.StreamsException: stream-thread [pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) ... 3 more Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107) at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855) ... 19 more
根据错误,我认为我缺少配置 schema.registry.url
以进行汇合。
我快速浏览了示例 here
关于如何使用 streamListener
这需要单独配置吗?或者是否有一种方法可以在 application.yml
本身中配置 schema.registry.url
而 confluent 正在寻找?
这里是代码库 https://github.com/naveenpop/springboot-kstream-confluent
Organization.avsc
{
"namespace":"com.test.demo.avro",
"type":"record",
"name":"Organization",
"fields":[
{
"name":"orgId",
"type":"string",
"default":"null"
},
{
"name":"orgName",
"type":"string",
"default":"null"
},
{
"name":"orgType",
"type":"string",
"default":"null"
},
{
"name":"parentOrgId",
"type":"string",
"default":"null"
}
]
}
DemokstreamApplication.java
@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemokstreamApplication.class, args);
}
@Component
public static class organizationProducer implements ApplicationRunner {
@Autowired
private KafkaProducer kafkaProducer;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Starting: Run method");
List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = names.get(new Random().nextInt(names.size()));
try {
this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
} catch (Exception e) {
log.info("Exception :" +e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
}
}
}
KafkaConfig.java
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
KafkaConsumer.java
@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
}
}
KafkaProducer.java
@EnableBinding(KstreamBinding.class)
public class KafkaProducer {
@Autowired
private KstreamBinding kstreamBinding;
public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {
try {
Organization organization = Organization.newBuilder()
.setOrgId(orgId)
.setOrgName(orgName)
.setOrgType(orgType)
.setParentOrgId(parentOrgId)
.build();
kstreamBinding.organizationOutputMessageChannel()
.send(MessageBuilder.withPayload(organization)
.setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
.build());
} catch (Exception e){
log.error("Failed to produce Organization Message:" +e);
}
}
}
KstreamBinding.java
public interface KstreamBinding {
String ORGANIZATION_INPUT= "organizationInput";
String ORGANIZATION_OUTPUT= "organizationOutput";
@Input(ORGANIZATION_INPUT)
KStream<String, Organization> organizationInputMessageChannel();
@Output(ORGANIZATION_OUTPUT)
MessageChannel organizationOutputMessageChannel();
}
更新 1:
我应用了 dturanski KStream<String, Organization>
在控制台中没有错误。
更新二:
采纳了 sobychacko
我已经对 GitHub 示例制作了一个 commit 以从 spring 引导本身生成消息并仍然将其作为空值。
感谢您花时间解决这个问题。
尝试spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ...
以下实现不会执行您想要的操作:
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
该日志语句仅在 bootstrap 阶段调用一次。为了使其工作,您需要对收到的 KStream
调用一些操作,然后在那里提供逻辑。例如以下是我在 foreach
方法调用中提供 lambda 表达式的工作。
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
}
您在配置中也有一个问题,您错误地为实际上是 String
的键分配了 avro Serde
。像这样更改它:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
通过这些更改,我每次向主题发送内容时都会收到日志记录语句。但是,您发送 groovy 脚本时出现问题,我没有从您的 Organization
域中获取任何实际数据,但我会让您解决这个问题。
有关空 Organization
域对象问题的更新
发生这种情况是因为您正在使用混合模式的序列化策略。您在生产者端使用 Spring Cloud Stream 的 avro 消息转换器,但在 Kafka Streams 处理器上使用 Confluent avro Serdes。我刚刚尝试使用 Confluent 的序列化程序,从生产者到处理器,我能够在出站上看到 Organization
域。这里修改配置,使序列化一致。
spring:
application:
name: kstream
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:8081
schema:
avro:
schema-locations: classpath:avro/Organization.avsc
bindings:
organizationInput:
destination: organization-updates
group: demokstream.org
consumer:
useNativeDecoding: true
organizationOutput:
destination: organization-updates
producer:
useNativeEncoding: true
kafka:
bindings:
organizationOutput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
streams:
binder:
brokers: localhost
configuration:
schema.registry.url: http://localhost:8081
commit:
interval:
ms: 1000
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
您还可以从主应用程序 class 中删除 KafkaConfig
class 以及 EnableSchemaRegistryClient
注释。