从 RabbitMQ 队列读取时的 Kafka Connect 问题

Kafka Connect issue when reading from a RabbitMQ queue

我正在尝试使用具有以下配置的 Kafka 连接器从 RabbitMQ 队列将数据读入我的主题:

{
  "name" : "RabbitMQSourceConnector1",
  "config" : {
   "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
   "tasks.max" : "1",
   "kafka.topic" : "rabbitmqtest3",
   "rabbitmq.queue" : "taskqueue",
    "rabbitmq.host" : "localhost",
    "rabbitmq.username" : "guest",
    "rabbitmq.password" : "guest",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true"

  }
}

但是我在将源流转换为 JSON 格式时遇到了麻烦,因为我丢失了原始消息

原文:

{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}

收到:

{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}

有人知道为什么会这样吗?

编辑:我尝试使用 "value.converter" 将消息转换为字符串:"org.apache.kafka.connect.storage.StringConverter",但结果相同:

11/27/19 4:07:37 PM CET , 0 , [B@1583a488

编辑 2:

我现在收到 JSON 文件,但内容仍以 BASE64 编码

知道如何将其直接转换回 UTF8 吗?

{
 "name": "adls-gen2-sink",
 "config": {
   "connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
   "tasks.max":"1",
   "topics":"rabbitmqtest3",
   "flush.size":"3",
   "format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
   "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
   "internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
   "topics.dir":"sw66jsoningest",
   "confluent.topic.bootstrap.servers":"localhost:9092",
   "confluent.topic.replication.factor":"1",
   "partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"

          }
}

更新:

考虑到这个流程,我得到了解决方案:

消息 (JSON) --> RabbitMq (ByteArray) --> Kafka (ByteArray) -->ADLS (JSON)

我在 RabbitMQ 到 Kafka 连接器上使用这个转换器将消息从 Base64 解码为 UTF8。

"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"

之后我将消息视为字符串并将其保存为 JSON。

"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",

非常感谢!

如果设置 schemas.enable": "false",则不应获取架构和负载字段

如果您根本不想进行任何转换,请使用 ByteArrayConverter

如果您的数据只是一个普通字符串(包括 JSON),请使用 StringConverter

不清楚您是如何打印结果消息的,但看起来您正在打印字节数组而不是将其解码为字符串