kafka connect 转换base64decode一个字段
kafka connect Transform base64decode a field
我正在使用 kafka RMQ 源连接器从 RMQ 队列中获取数据。其中一个字段是 base64 编码的,它具有 json 结构。我正在使用 Extract 转换来提取此字段,但我不确定如何解码此字段,我尝试编写自己的 smt 来解码此字段,但在解码此字段时出现错误 java.lang.ClassCastException: [B cannot be cast to java.lang.String
并且尝试将其放入记录的更新值中,因为消息是 Json 结构。非常感谢您的帮助。
下面是我的连接器配置。
{
"name" : "RabbitMQ_Source",
"connector.class" : "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "RMQ_Topic",
"rabbitmq.queue" : "rmqqueue",
"rabbitmq.username":"username",
"rabbitmq.virtual.host":"dummy",
"rabbitmq.password":"password",
"rabbitmq.host":"x.x.x.x",
"rabbitmq.port":"5674",
"transforms": "ExtractField",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"body"
}
下面是我要消费的队列中的消息,Body是解码后需要发送到Kafka主题的主要字段。如果我只是使用 extract transform 那么它工作正常但我只能在 kafka 主题中看到编码消息。
{
"consumerTag": "abcd",
"envelope": {
"deliveryTag": 1,
"isRedeliver": false,
"exchange": "rmqqueue",
"routingKey": "rmqqueue"
},
"basicProperties": {
"contentType": "text/plain",
"contentEncoding": null,
"headers": {},
"deliveryMode": 2,
"priority": 0,
"correlationId": null,
"replyTo": null,
"expiration": null,
"messageId": null,
"timestamp": null,
"type": null,
"userId": null,
"appId": null
},
"body": "eyJXSFMiOlt7IkNoYXJhY3RlciBTZXQiOiJVVEYtOCIsImFjdGlvbiI6InJld3JpdGUiLCJVcGRhdGUtRGF0ZS1UaW1lIjoiMjAyMC0wMy0yNSAwOTowMDowMjoxOSJ9XX0="
}
您需要使用 ByteArrayConverter
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
有一个这样的例子here。
我正在使用 kafka RMQ 源连接器从 RMQ 队列中获取数据。其中一个字段是 base64 编码的,它具有 json 结构。我正在使用 Extract 转换来提取此字段,但我不确定如何解码此字段,我尝试编写自己的 smt 来解码此字段,但在解码此字段时出现错误 java.lang.ClassCastException: [B cannot be cast to java.lang.String
并且尝试将其放入记录的更新值中,因为消息是 Json 结构。非常感谢您的帮助。
下面是我的连接器配置。
{
"name" : "RabbitMQ_Source",
"connector.class" : "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "RMQ_Topic",
"rabbitmq.queue" : "rmqqueue",
"rabbitmq.username":"username",
"rabbitmq.virtual.host":"dummy",
"rabbitmq.password":"password",
"rabbitmq.host":"x.x.x.x",
"rabbitmq.port":"5674",
"transforms": "ExtractField",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"body"
}
下面是我要消费的队列中的消息,Body是解码后需要发送到Kafka主题的主要字段。如果我只是使用 extract transform 那么它工作正常但我只能在 kafka 主题中看到编码消息。
{
"consumerTag": "abcd",
"envelope": {
"deliveryTag": 1,
"isRedeliver": false,
"exchange": "rmqqueue",
"routingKey": "rmqqueue"
},
"basicProperties": {
"contentType": "text/plain",
"contentEncoding": null,
"headers": {},
"deliveryMode": 2,
"priority": 0,
"correlationId": null,
"replyTo": null,
"expiration": null,
"messageId": null,
"timestamp": null,
"type": null,
"userId": null,
"appId": null
},
"body": "eyJXSFMiOlt7IkNoYXJhY3RlciBTZXQiOiJVVEYtOCIsImFjdGlvbiI6InJld3JpdGUiLCJVcGRhdGUtRGF0ZS1UaW1lIjoiMjAyMC0wMy0yNSAwOTowMDowMjoxOSJ9XX0="
}
您需要使用 ByteArrayConverter
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
有一个这样的例子here。