使用 Kafka 协议连接 Camel 和 EventHubs

Connect Camel with EventHubs using Kafka protocol

我正在尝试将消息从 Camel 路由到 Azure EventHubs。 EventHubs 命名空间是使用 Kafka 启用标志创建的。

         String eventHubsPassword = "org.apache.kafka.common.security.plain.PlainLoginModule " +
        "required username=\"$ConnectionString\" " + 
        "password=\"<Connection String>\";";

        String eventHubsConfig = 
        "&requestTimeoutMs=30000" +
        "&securityProtocol=SASL_SSL" + 
        "&saslMechanism=PLAIN" +
        "&saslJaasConfig=" + eventHubsPassword;

        from(component + ":queue:" + queue )
        .to("kafka:mock-topic?brokers=" + eventHubsKafkaBrokers + eventHubsConfig)

其中 mock-topic 是事件中心的名称,eventHubsKafkaBrokers 类似于 mynamespace.servicebus.windows.net:9093<connection string> 是事件中心命名空间的连接字符串。

所以我得到了这个日志

2019-07-03 23:35:23 INFO  AbstractLogin:53 - Successfully logged in.
2019-07-03 23:35:23 INFO  AppInfoParser:109 - Kafka version : 1.0.0

发送消息时我收到

2019-07-03 23:37:51 WARN  NetworkClient:241 - [Producer clientId=producer-2] Connection to node -1 could not be established. Broker may not be available.

有什么问题吗?骆驼版是2.21.1。 camel 不支持 SASL_SSL 安全协议吗?

如果对任何人有帮助,我最终使用了 EventHubs 提供的 amqp 支持。然后将路由的目的地放到定义的 amqp 组件中。

AMQPComponent authorizedAmqp = AMQPComponent.amqpComponent( eventHubsNamespace, eventHubsUsername, //Shared access policy name eventHubsPassword); //Secret code for this shared access policy main.bind("amqps", authorizedAmqp); ... from('jms://source") .to("amqps" + ":queue:" + eventhubInstance);