从 RabbitMQ 队列读取时的 Kafka Connect 问题
Kafka Connect issue when reading from a RabbitMQ queue
我正在尝试使用具有以下配置的 Kafka 连接器从 RabbitMQ 队列将数据读入我的主题:
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "rabbitmqtest3",
"rabbitmq.queue" : "taskqueue",
"rabbitmq.host" : "localhost",
"rabbitmq.username" : "guest",
"rabbitmq.password" : "guest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
但是我在将源流转换为 JSON 格式时遇到了麻烦,因为我丢失了原始消息
原文:
{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
收到:
{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}
有人知道为什么会这样吗?
编辑:我尝试使用 "value.converter" 将消息转换为字符串:"org.apache.kafka.connect.storage.StringConverter",但结果相同:
11/27/19 4:07:37 PM CET , 0 , [B@1583a488
编辑 2:
我现在收到 JSON 文件,但内容仍以 BASE64 编码
知道如何将其直接转换回 UTF8 吗?
{
"name": "adls-gen2-sink",
"config": {
"connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max":"1",
"topics":"rabbitmqtest3",
"flush.size":"3",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics.dir":"sw66jsoningest",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
}
更新:
考虑到这个流程,我得到了解决方案:
消息 (JSON) --> RabbitMq (ByteArray) --> Kafka (ByteArray) -->ADLS (JSON)
我在 RabbitMQ 到 Kafka 连接器上使用这个转换器将消息从 Base64 解码为 UTF8。
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
之后我将消息视为字符串并将其保存为 JSON。
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
非常感谢!
如果设置 schemas.enable": "false"
,则不应获取架构和负载字段
如果您根本不想进行任何转换,请使用 ByteArrayConverter
如果您的数据只是一个普通字符串(包括 JSON),请使用 StringConverter
不清楚您是如何打印结果消息的,但看起来您正在打印字节数组而不是将其解码为字符串
我正在尝试使用具有以下配置的 Kafka 连接器从 RabbitMQ 队列将数据读入我的主题:
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "rabbitmqtest3",
"rabbitmq.queue" : "taskqueue",
"rabbitmq.host" : "localhost",
"rabbitmq.username" : "guest",
"rabbitmq.password" : "guest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
但是我在将源流转换为 JSON 格式时遇到了麻烦,因为我丢失了原始消息
原文:
{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
收到:
{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}
有人知道为什么会这样吗?
编辑:我尝试使用 "value.converter" 将消息转换为字符串:"org.apache.kafka.connect.storage.StringConverter",但结果相同:
11/27/19 4:07:37 PM CET , 0 , [B@1583a488
编辑 2:
我现在收到 JSON 文件,但内容仍以 BASE64 编码
知道如何将其直接转换回 UTF8 吗?
{
"name": "adls-gen2-sink",
"config": {
"connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max":"1",
"topics":"rabbitmqtest3",
"flush.size":"3",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics.dir":"sw66jsoningest",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
}
更新:
考虑到这个流程,我得到了解决方案:
消息 (JSON) --> RabbitMq (ByteArray) --> Kafka (ByteArray) -->ADLS (JSON)
我在 RabbitMQ 到 Kafka 连接器上使用这个转换器将消息从 Base64 解码为 UTF8。
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
之后我将消息视为字符串并将其保存为 JSON。
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
非常感谢!
如果设置 schemas.enable": "false"
,则不应获取架构和负载字段
如果您根本不想进行任何转换,请使用 ByteArrayConverter
如果您的数据只是一个普通字符串(包括 JSON),请使用 StringConverter
不清楚您是如何打印结果消息的,但看起来您正在打印字节数组而不是将其解码为字符串