使用 Kafka 协议连接 Camel 和 EventHubs
Connect Camel with EventHubs using Kafka protocol
我正在尝试将消息从 Camel 路由到 Azure EventHubs。
EventHubs 命名空间是使用 Kafka 启用标志创建的。
String eventHubsPassword = "org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"$ConnectionString\" " +
"password=\"<Connection String>\";";
String eventHubsConfig =
"&requestTimeoutMs=30000" +
"&securityProtocol=SASL_SSL" +
"&saslMechanism=PLAIN" +
"&saslJaasConfig=" + eventHubsPassword;
from(component + ":queue:" + queue )
.to("kafka:mock-topic?brokers=" + eventHubsKafkaBrokers + eventHubsConfig)
其中 mock-topic
是事件中心的名称,eventHubsKafkaBrokers
类似于 mynamespace.servicebus.windows.net:9093
,<connection string>
是事件中心命名空间的连接字符串。
所以我得到了这个日志
2019-07-03 23:35:23 INFO AbstractLogin:53 - Successfully logged in.
2019-07-03 23:35:23 INFO AppInfoParser:109 - Kafka version : 1.0.0
发送消息时我收到
2019-07-03 23:37:51 WARN NetworkClient:241 - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.
有什么问题吗?骆驼版是2.21.1
。 camel 不支持 SASL_SSL
安全协议吗?
如果对任何人有帮助,我最终使用了 EventHubs 提供的 amqp 支持。然后将路由的目的地放到定义的 amqp 组件中。
AMQPComponent authorizedAmqp = AMQPComponent.amqpComponent(
eventHubsNamespace,
eventHubsUsername, //Shared access policy name
eventHubsPassword); //Secret code for this shared access policy
main.bind("amqps", authorizedAmqp);
...
from('jms://source")
.to("amqps" + ":queue:" + eventhubInstance);
我正在尝试将消息从 Camel 路由到 Azure EventHubs。 EventHubs 命名空间是使用 Kafka 启用标志创建的。
String eventHubsPassword = "org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username=\"$ConnectionString\" " +
"password=\"<Connection String>\";";
String eventHubsConfig =
"&requestTimeoutMs=30000" +
"&securityProtocol=SASL_SSL" +
"&saslMechanism=PLAIN" +
"&saslJaasConfig=" + eventHubsPassword;
from(component + ":queue:" + queue )
.to("kafka:mock-topic?brokers=" + eventHubsKafkaBrokers + eventHubsConfig)
其中 mock-topic
是事件中心的名称,eventHubsKafkaBrokers
类似于 mynamespace.servicebus.windows.net:9093
,<connection string>
是事件中心命名空间的连接字符串。
所以我得到了这个日志
2019-07-03 23:35:23 INFO AbstractLogin:53 - Successfully logged in.
2019-07-03 23:35:23 INFO AppInfoParser:109 - Kafka version : 1.0.0
发送消息时我收到
2019-07-03 23:37:51 WARN NetworkClient:241 - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.
有什么问题吗?骆驼版是2.21.1
。 camel 不支持 SASL_SSL
安全协议吗?
如果对任何人有帮助,我最终使用了 EventHubs 提供的 amqp 支持。然后将路由的目的地放到定义的 amqp 组件中。
AMQPComponent authorizedAmqp = AMQPComponent.amqpComponent(
eventHubsNamespace,
eventHubsUsername, //Shared access policy name
eventHubsPassword); //Secret code for this shared access policy
main.bind("amqps", authorizedAmqp);
...
from('jms://source")
.to("amqps" + ":queue:" + eventhubInstance);