Spring 卡夫卡云流

Spring cloud stream with kafka

在将 kafka 与 spring 云流集成方面需要一些帮助。该应用程序非常简单,有 2 个部分(运行 作为单独的 java 个进程)

  1. 消费者 - 将请求放入 RequestTopic 并从 ResponseTopic 获得响应
  2. 生产者 - 从 RequestTopic 获取请求并将响应放回 ResponseTopic。

我已经为消费者和 RequestReceiverChannel 和 ResponseSenderChannel 创建了 RequestSenderChannel 和 ResponseReceiverChannel 接口 对于生产者应用程序。他们都共享相同的 yaml 文件。 根据文档 spring.cloud.stream.bindings..destination 应指定消息发送或接收的主题。 但是当我 运行 应用程序时,应用程序在 kafka

中创建主题为 'RequestSender'、'RequestReceiver'、'ResponseSender' 和 'ResponseReceiver'

我的假设是:由于 YAML 文件中的目标仅指定了两个主题 'RequestTopic' 和 'ResponseTopic',它应该已经创建了这些主题。 但它会为 YAML 文件中 'spring.cloud.stream.bindings' 处指定的属性创建 Kafka 主题。 有人可以在 configruation/code 中指出问题吗?

public interface RequestReceiverChannel
{
    String requestReceiver ="RequestReceiver";
    @Input(requestReceiver)
    SubscribableChannel pathQueryRequest();
}

public interface RequestSenderChannel
{
    String RequestSender ="RequestSender";
    @Output(RequestSender)
    MessageChannel pathQueryRequestSender();
}

public interface ResponseReceiverChannel
{
    String ResponseReceiver = "ResponseReceiver";
    @Input(ResponseReceiver)
    SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
    String ResponseSender = "ResponseSender";
    @Output(ResponseSender)
    MessageChannel pceResponseService();
}
'''

YAML 配置文件

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        RequestSender:
          binder: kafka
          destination: RequestTopic
          content-type: application/protobuf
          group: consumergroup
        ResponseSender:
          binder: kafka
          destination: ResponseTopic
          content-type: application/protobuf
          group: consumergroup
        RequestReceiver:
          binder: kafka
          destination: RequestTopic
          content-type: application/protobuf
          group: consumergroup
        ResponseReceiver:
          binder: kafka
          destination: ResponseTopic
          content-type: application/protobuf
          group: consumergroup
      kafka:
        bindings:
          RequestTopic:
            consumer:
              autoCommitOffset: false
          ResponseTopic:
            consumer:
              autoCommitOffset: false
        binder:
          brokers: ${SERVICE_KAFKA_HOST:localhost}
          zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
          defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
          defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}

通过执行 spring.cloud.stream.bindings.<binding-name>.destination=foo,您表示希望将 <binding-name>(例如,RequestSender)指定的绑定映射到名为 foo 的代理目的地。如果这样的目的地不存在,它将被自动提供。 所以没有问题。

就是说,我们刚刚发布了 Horsham.RELEASE(云 Hoxton.RELEASE 的一部分),我们正在放弃您当前使用的基于注释的模型,转而采用更简单的功能模型.您可以在我们的 release blog 中阅读更多相关信息,其中还提供了指向 4 篇文章的链接,我们在这些文章中阐述并提供了更多关于函数式编程范例的示例。