为什么 Camel Kafka-Rabbitmq 连接器将我的消息转换为不可读的格式?

Why Camel Kafka-Rabbitmq connector converts my messages to unreadable format?

我的目标是在 rabbitmq 交换队列和 kafka 主题之间设置一个连接器。

我正在按照本指南设置连接器:https://camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html. I downloaded and installed the connector from the source: https://github.com/apache/camel-kafka-connector,构建它并为 camel-rabbitmq-kafka-connector 解压缩文件。我还将 plugin.path 指向我在 connect-standalone.properties.

中解压缩 camel-rabbitmq-kafka-connector jar 的文件夹

我用于 CamelRabbitSourceConnector 的参数如下:

name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to send messages into
topics=mytopic

# mandatory properties (for a complete properties list see the connector documentation):

# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue

我的 docker 运行 rabbitmq 命令看起来像这样: docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management。对于 kafka,我使用了标准的“入门”指南。

使用 python Pika 库发送消息:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')

如您所见,我在发送消息时未在 channel.basic_publish 函数中指定 exchange 参数。如果我将它设置为 camel.source.path.exchangeName,那么我的消息就会在两者之间的某个地方丢失,所以我可能在这里遗漏了一些东西。

我通过将客户端更改为 Java 来解决问题:https://www.rabbitmq.com/tutorials/tutorial-one-java.html 而不是 python。

我能够使用以下属性让它工作:

name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to send messages into
topics=mytopic

# mandatory properties (for a complete properties list see the connector documentation):

# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
camel.source.endpoint.autoDelete=false
camel.source.endpoint.skipExchangeDeclare=true
camel.source.endpoint.skipQueueBind=true