Spring 卡夫卡云流
Spring cloud stream with kafka
在将 kafka 与 spring 云流集成方面需要一些帮助。该应用程序非常简单,有 2 个部分(运行 作为单独的 java 个进程)
- 消费者 - 将请求放入 RequestTopic 并从 ResponseTopic 获得响应
- 生产者 - 从 RequestTopic 获取请求并将响应放回 ResponseTopic。
我已经为消费者和 RequestReceiverChannel 和 ResponseSenderChannel 创建了 RequestSenderChannel 和 ResponseReceiverChannel 接口
对于生产者应用程序。他们都共享相同的 yaml 文件。
根据文档 spring.cloud.stream.bindings..destination 应指定消息发送或接收的主题。
但是当我 运行 应用程序时,应用程序在 kafka
中创建主题为 'RequestSender'、'RequestReceiver'、'ResponseSender' 和 'ResponseReceiver'
我的假设是:由于 YAML 文件中的目标仅指定了两个主题 'RequestTopic' 和 'ResponseTopic',它应该已经创建了这些主题。
但它会为 YAML 文件中 'spring.cloud.stream.bindings' 处指定的属性创建 Kafka 主题。
有人可以在 configruation/code 中指出问题吗?
public interface RequestReceiverChannel
{
String requestReceiver ="RequestReceiver";
@Input(requestReceiver)
SubscribableChannel pathQueryRequest();
}
public interface RequestSenderChannel
{
String RequestSender ="RequestSender";
@Output(RequestSender)
MessageChannel pathQueryRequestSender();
}
public interface ResponseReceiverChannel
{
String ResponseReceiver = "ResponseReceiver";
@Input(ResponseReceiver)
SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
String ResponseSender = "ResponseSender";
@Output(ResponseSender)
MessageChannel pceResponseService();
}
'''
YAML 配置文件
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
RequestSender:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseSender:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
RequestReceiver:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseReceiver:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
kafka:
bindings:
RequestTopic:
consumer:
autoCommitOffset: false
ResponseTopic:
consumer:
autoCommitOffset: false
binder:
brokers: ${SERVICE_KAFKA_HOST:localhost}
zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}
通过执行 spring.cloud.stream.bindings.<binding-name>.destination=foo
,您表示希望将 <binding-name>
(例如,RequestSender
)指定的绑定映射到名为 foo
的代理目的地。如果这样的目的地不存在,它将被自动提供。
所以没有问题。
就是说,我们刚刚发布了 Horsham.RELEASE(云 Hoxton.RELEASE 的一部分),我们正在放弃您当前使用的基于注释的模型,转而采用更简单的功能模型.您可以在我们的 release blog 中阅读更多相关信息,其中还提供了指向 4 篇文章的链接,我们在这些文章中阐述并提供了更多关于函数式编程范例的示例。
在将 kafka 与 spring 云流集成方面需要一些帮助。该应用程序非常简单,有 2 个部分(运行 作为单独的 java 个进程)
- 消费者 - 将请求放入 RequestTopic 并从 ResponseTopic 获得响应
- 生产者 - 从 RequestTopic 获取请求并将响应放回 ResponseTopic。
我已经为消费者和 RequestReceiverChannel 和 ResponseSenderChannel 创建了 RequestSenderChannel 和 ResponseReceiverChannel 接口 对于生产者应用程序。他们都共享相同的 yaml 文件。 根据文档 spring.cloud.stream.bindings..destination 应指定消息发送或接收的主题。 但是当我 运行 应用程序时,应用程序在 kafka
中创建主题为 'RequestSender'、'RequestReceiver'、'ResponseSender' 和 'ResponseReceiver'我的假设是:由于 YAML 文件中的目标仅指定了两个主题 'RequestTopic' 和 'ResponseTopic',它应该已经创建了这些主题。 但它会为 YAML 文件中 'spring.cloud.stream.bindings' 处指定的属性创建 Kafka 主题。 有人可以在 configruation/code 中指出问题吗?
public interface RequestReceiverChannel
{
String requestReceiver ="RequestReceiver";
@Input(requestReceiver)
SubscribableChannel pathQueryRequest();
}
public interface RequestSenderChannel
{
String RequestSender ="RequestSender";
@Output(RequestSender)
MessageChannel pathQueryRequestSender();
}
public interface ResponseReceiverChannel
{
String ResponseReceiver = "ResponseReceiver";
@Input(ResponseReceiver)
SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
String ResponseSender = "ResponseSender";
@Output(ResponseSender)
MessageChannel pceResponseService();
}
'''
YAML 配置文件
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
RequestSender:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseSender:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
RequestReceiver:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseReceiver:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
kafka:
bindings:
RequestTopic:
consumer:
autoCommitOffset: false
ResponseTopic:
consumer:
autoCommitOffset: false
binder:
brokers: ${SERVICE_KAFKA_HOST:localhost}
zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}
通过执行 spring.cloud.stream.bindings.<binding-name>.destination=foo
,您表示希望将 <binding-name>
(例如,RequestSender
)指定的绑定映射到名为 foo
的代理目的地。如果这样的目的地不存在,它将被自动提供。
所以没有问题。
就是说,我们刚刚发布了 Horsham.RELEASE(云 Hoxton.RELEASE 的一部分),我们正在放弃您当前使用的基于注释的模型,转而采用更简单的功能模型.您可以在我们的 release blog 中阅读更多相关信息,其中还提供了指向 4 篇文章的链接,我们在这些文章中阐述并提供了更多关于函数式编程范例的示例。