Spring 云流 Confluent KStream Avro Consume

Spring cloud stream Confluent KStream Avro Consume

我正在尝试使用 spring boot 2.0 将来自 kafka 主题的融合 avro 消息作为 Kstream 使用。

我能够以 MessageChannel 的形式使用消息,但不能以 KStream 的形式使用消息。

@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();


@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

异常:

Exception in thread "pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3" org.apache.kafka.streams.errors.StreamsException: stream-thread [pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) ... 3 more Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107) at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855) ... 19 more

根据错误,我认为我缺少配置 schema.registry.url 以进行汇合。 我快速浏览了示例 here 关于如何使用 streamListener

对 spring 云流做同样的事情有些迷茫

这需要单独配置吗?或者是否有一种方法可以在 application.yml 本身中配置 schema.registry.url 而 confluent 正在寻找?

这里是代码库 https://github.com/naveenpop/springboot-kstream-confluent

Organization.avsc

{
   "namespace":"com.test.demo.avro",
   "type":"record",
   "name":"Organization",
   "fields":[
      {
         "name":"orgId",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgName",
         "type":"string",
         "default":"null"
      },
      {
         "name":"orgType",
         "type":"string",
         "default":"null"
      },
      {
         "name":"parentOrgId",
         "type":"string",
         "default":"null"
      }
   ]
}

DemokstreamApplication.java

@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemokstreamApplication.class, args);
    }

    @Component
    public  static class organizationProducer implements ApplicationRunner {

        @Autowired
        private KafkaProducer kafkaProducer;

        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("Starting: Run method");
            List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
            List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
            Runnable runnable = () -> {
                String rPage = pages.get(new Random().nextInt(pages.size()));
                String rName = names.get(new Random().nextInt(names.size()));

                try {
                    this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
                } catch (Exception e) {
                    log.info("Exception :" +e);
                }
            };
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
        }
    }
}

KafkaConfig.java

@Configuration
public class KafkaConfig {

    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
    private String endpoint;

    @Bean
    public SchemaRegistryClient confluentSchemaRegistryClient() {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }

}

KafkaConsumer.java

@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {

   @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
        organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
    }
}

KafkaProducer.java

@EnableBinding(KstreamBinding.class)
public class KafkaProducer {

    @Autowired
    private KstreamBinding kstreamBinding;

    public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {

        try {
            Organization organization = Organization.newBuilder()
                    .setOrgId(orgId)
                    .setOrgName(orgName)
                    .setOrgType(orgType)
                    .setParentOrgId(parentOrgId)
                    .build();

            kstreamBinding.organizationOutputMessageChannel()
                            .send(MessageBuilder.withPayload(organization)
                            .setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
                            .build());

        } catch (Exception e){
            log.error("Failed to produce Organization Message:" +e);
        }
    }
}

KstreamBinding.java

public interface KstreamBinding {

    String ORGANIZATION_INPUT= "organizationInput";
    String ORGANIZATION_OUTPUT= "organizationOutput";

    @Input(ORGANIZATION_INPUT)
    KStream<String, Organization> organizationInputMessageChannel();

    @Output(ORGANIZATION_OUTPUT)
    MessageChannel organizationOutputMessageChannel();
}

更新 1:

我应用了 dturanski 的建议,错误消失了。但是仍然无法使用消息,因为 KStream<String, Organization> 在控制台中没有错误。

更新二:

采纳了 sobychacko 的建议,并且该消息可使用对象中的空值。

我已经对 GitHub 示例制作了一个 commit 以从 spring 引导本身生成消息并仍然将其作为空值。

感谢您花时间解决这个问题。

尝试spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ...

以下实现不会执行您想要的操作:

@StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
        log.info("Organization Received:" + organization);
 }

该日志语句仅在 bootstrap 阶段调用一次。为了使其工作,您需要对收到的 KStream 调用一些操作,然后在那里提供逻辑。例如以下是我在 foreach 方法调用中提供 lambda 表达式的工作。

 @StreamListener
    public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {

        organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
    }

您在配置中也有一个问题,您错误地为实际上是 String 的键分配了 avro Serde。像这样更改它:

default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

通过这些更改,我每次向主题发送内容时都会收到日志记录语句。但是,您发送 groovy 脚本时出现问题,我没有从您的 Organization 域中获取任何实际数据,但我会让您解决这个问题。

有关空 Organization 域对象问题的更新

发生这种情况是因为您正在使用混合模式的序列化策略。您在生产者端使用 Spring Cloud Stream 的 avro 消息转换器,但在 Kafka Streams 处理器上使用 Confluent avro Serdes。我刚刚尝试使用 Confluent 的序列化程序,从生产者到处理器,我能够在出站上看到 Organization 域。这里修改配置,使序列化一致。

spring:
  application:
    name: kstream
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081
      schema:
        avro:
          schema-locations: classpath:avro/Organization.avsc
      bindings:
        organizationInput:
          destination: organization-updates
          group: demokstream.org
          consumer:
            useNativeDecoding: true
        organizationOutput:
          destination: organization-updates
          producer:
            useNativeEncoding: true
      kafka:
        bindings:
          organizationOutput:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                schema.registry.url: http://localhost:8081
        streams:
          binder:
            brokers: localhost
            configuration:
              schema.registry.url: http://localhost:8081
              commit:
                interval:
                  ms: 1000
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

您还可以从主应用程序 class 中删除 KafkaConfig class 以及 EnableSchemaRegistryClient 注释。