Kafka connect 正在发送格式错误的 json
Kafka connect is sending a malformed json
我正在尝试使用带有 rabbitMQ 连接器的 kafka-connect 来执行概念验证。基本上,我有两个简单的 spring 启动应用程序;一个 RabbitMQ 生产者和一个 Kafka 消费者。消费者无法处理来自连接器的消息,因为它正在以某种方式转换我的 JSON 消息; RabbitMQ 发送 {"transaction": "PAYMENT", "amount": "5.0"}
并且 kafka-connect 打印 X{"transaction": "PAYMENT", "amount": "5.0"}
。请注意开头的 X
。如果我添加一个字段,比如说 "foo": "bar"
那么那个字母就变成了 t
或其他什么。
Dockerfile(连接器):
FROM confluentinc/cp-kafka-connect-base:5.3.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest
请按如下方式生成图像:docker build . -t rabbit-connector
,以便您可以在 docker-compose 文件中将其引用为 rabbit-connector
。
docker-compose.yml:
version: '2'
networks:
kafka-connect-network:
driver: bridge
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.2
networks:
- kafka-connect-network
ports:
- '31000:31000'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31000
kafka:
image: confluentinc/cp-enterprise-kafka:5.3.2
networks:
- kafka-connect-network
ports:
- '9092:9092'
- '31001:31001'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31001
schema-registry:
image: confluentinc/cp-schema-registry:5.3.2
depends_on:
- zookeeper
- kafka
networks:
- kafka-connect-network
ports:
- '8081:8081'
- '31002:31002'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_JMX_HOSTNAME: "localhost"
SCHEMA_REGISTRY_JMX_PORT: 31002
rabbitmq:
image: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: "/"
networks:
- kafka-connect-network
ports:
- '15672:15672'
- '5672:5672'
kafka-connect:
image: rabbit-connector
networks:
- kafka-connect-network
ports:
- '8083:8083'
- '31004:31004'
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31004
depends_on:
- zookeeper
- kafka
- schema-registry
- rabbitmq
rest-proxy:
image: confluentinc/cp-kafka-rest:5.3.2
depends_on:
- zookeeper
- kafka
- schema-registry
networks:
- kafka-connect-network
ports:
- '8082:8082'
- '31005:31005'
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKAREST_JMX_HOSTNAME: "localhost"
KAFKAREST_JMX_PORT: 31005
schema.avsc:
{
"type": "record",
"name": "CustomMessage",
"namespace": "com.poc.model",
"fields": [
{
"name": "transaction",
"type": "string"
},
{
"name": "amount",
"type": "string"
}
]
}
所以我在这里使用 StringConverter
作为我的密钥(老实说我不关心)和 AvroConverter
作为值。也许我遗漏了什么,或者我错误地配置了我的 kafka-connect worker。
我的连接器配置是 (connector-config.json):
{
"name" : "rabbit_to_kafka_poc",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"kafka.topic" : "spectrum-message",
"rabbitmq.queue" : "spectrum-queue",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/"
}
}
为了注册我的连接器,我做了 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json
。
配置完所有内容后,我 运行 使用以下命令打印消息:
kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--topic spectrum-message \
--from-beginning
JSON 以字母开头,所以我的问题是 为什么会这样?我认为某些东西正在编码我的消息,但我的 rabbitMQ 生产者正在发送一个普通的 JSON 消息。我可以通过使用 RabbitMQ 消费者进行测试并将我的应用程序调试到发送消息的位置来确认。
您没有 JSON 条消息,根据 AvroConverter 的使用情况,您有来自 Kafka 的 Avro 消息。
该字母实际上不是字母,而是您的终端显示二进制数据前 5 个字节的 UTF8 表示形式。这通常发生在使用常规控制台使用者时,而不是 avro-console-consumer
本身会为 Avro 数据
正确解析主题之外的字节
如果您想要 JSON 贯穿始终,请改用 JSON 转换器
您需要使用ByteArrayConverter。它只是连接器从 RabbitMQ 中提取的字节——它不会尝试将其强制转换为模式。即使将它序列化为 Avro,模式也只是一个字节字段:
$ curl -s -XGET localhost:8081/subjects/rabbit-test-avro-00-value/versions/1 | jq '.'
{
"subject": "rabbit-test-avro-00-value",
"version": 1,
"id": 1,
"schema": "\"bytes\""
}
如果您想将它写入 Avro 中的主题(这是个好主意)使用模式然后使用 Kafka Streams 或 ksqlDB 之类的东西来执行此操作,应用流处理器到 Kafka Connect 使用 ByteArrayConverter 写入的源主题。
例如在 ksqlDB 中你会做:
-- Inspect the topic - ksqlDB recognises the format as JSON
ksql> PRINT 'rabbit-test-00' FROM BEGINNING;
Format:JSON
{"ROWTIME":1578477403591,"ROWKEY":"null","transaction":"PAYMENT","amount":"5.0"}
{"ROWTIME":1578477598555,"ROWKEY":"null","transaction":"PAYMENT","amount":"5.0"}
-- Declare the schema
CREATE STREAM rabbit (transaction VARCHAR,
amount VARCHAR)
WITH (KAFKA_TOPIC='rabbit-test-00',
VALUE_FORMAT='JSON');
-- Reserialise to Avro
CREATE STREAM TRANSACTIONS WITH (VALUE_FORMAT='AVRO',
KAFKA_TOPIC='reserialised_data') AS
SELECT *
FROM rabbit
EMIT CHANGES;
有关详细信息,请参阅我写的 this blog。
我正在尝试使用带有 rabbitMQ 连接器的 kafka-connect 来执行概念验证。基本上,我有两个简单的 spring 启动应用程序;一个 RabbitMQ 生产者和一个 Kafka 消费者。消费者无法处理来自连接器的消息,因为它正在以某种方式转换我的 JSON 消息; RabbitMQ 发送 {"transaction": "PAYMENT", "amount": "5.0"}
并且 kafka-connect 打印 X{"transaction": "PAYMENT", "amount": "5.0"}
。请注意开头的 X
。如果我添加一个字段,比如说 "foo": "bar"
那么那个字母就变成了 t
或其他什么。
Dockerfile(连接器):
FROM confluentinc/cp-kafka-connect-base:5.3.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest
请按如下方式生成图像:docker build . -t rabbit-connector
,以便您可以在 docker-compose 文件中将其引用为 rabbit-connector
。
docker-compose.yml:
version: '2'
networks:
kafka-connect-network:
driver: bridge
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.2
networks:
- kafka-connect-network
ports:
- '31000:31000'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31000
kafka:
image: confluentinc/cp-enterprise-kafka:5.3.2
networks:
- kafka-connect-network
ports:
- '9092:9092'
- '31001:31001'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31001
schema-registry:
image: confluentinc/cp-schema-registry:5.3.2
depends_on:
- zookeeper
- kafka
networks:
- kafka-connect-network
ports:
- '8081:8081'
- '31002:31002'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_JMX_HOSTNAME: "localhost"
SCHEMA_REGISTRY_JMX_PORT: 31002
rabbitmq:
image: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: "/"
networks:
- kafka-connect-network
ports:
- '15672:15672'
- '5672:5672'
kafka-connect:
image: rabbit-connector
networks:
- kafka-connect-network
ports:
- '8083:8083'
- '31004:31004'
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31004
depends_on:
- zookeeper
- kafka
- schema-registry
- rabbitmq
rest-proxy:
image: confluentinc/cp-kafka-rest:5.3.2
depends_on:
- zookeeper
- kafka
- schema-registry
networks:
- kafka-connect-network
ports:
- '8082:8082'
- '31005:31005'
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKAREST_JMX_HOSTNAME: "localhost"
KAFKAREST_JMX_PORT: 31005
schema.avsc:
{
"type": "record",
"name": "CustomMessage",
"namespace": "com.poc.model",
"fields": [
{
"name": "transaction",
"type": "string"
},
{
"name": "amount",
"type": "string"
}
]
}
所以我在这里使用 StringConverter
作为我的密钥(老实说我不关心)和 AvroConverter
作为值。也许我遗漏了什么,或者我错误地配置了我的 kafka-connect worker。
我的连接器配置是 (connector-config.json):
{
"name" : "rabbit_to_kafka_poc",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"kafka.topic" : "spectrum-message",
"rabbitmq.queue" : "spectrum-queue",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/"
}
}
为了注册我的连接器,我做了 curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json
。
配置完所有内容后,我 运行 使用以下命令打印消息:
kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--topic spectrum-message \
--from-beginning
JSON 以字母开头,所以我的问题是 为什么会这样?我认为某些东西正在编码我的消息,但我的 rabbitMQ 生产者正在发送一个普通的 JSON 消息。我可以通过使用 RabbitMQ 消费者进行测试并将我的应用程序调试到发送消息的位置来确认。
您没有 JSON 条消息,根据 AvroConverter 的使用情况,您有来自 Kafka 的 Avro 消息。
该字母实际上不是字母,而是您的终端显示二进制数据前 5 个字节的 UTF8 表示形式。这通常发生在使用常规控制台使用者时,而不是 avro-console-consumer
本身会为 Avro 数据
如果您想要 JSON 贯穿始终,请改用 JSON 转换器
您需要使用ByteArrayConverter。它只是连接器从 RabbitMQ 中提取的字节——它不会尝试将其强制转换为模式。即使将它序列化为 Avro,模式也只是一个字节字段:
$ curl -s -XGET localhost:8081/subjects/rabbit-test-avro-00-value/versions/1 | jq '.'
{
"subject": "rabbit-test-avro-00-value",
"version": 1,
"id": 1,
"schema": "\"bytes\""
}
如果您想将它写入 Avro 中的主题(这是个好主意)使用模式然后使用 Kafka Streams 或 ksqlDB 之类的东西来执行此操作,应用流处理器到 Kafka Connect 使用 ByteArrayConverter 写入的源主题。
例如在 ksqlDB 中你会做:
-- Inspect the topic - ksqlDB recognises the format as JSON
ksql> PRINT 'rabbit-test-00' FROM BEGINNING;
Format:JSON
{"ROWTIME":1578477403591,"ROWKEY":"null","transaction":"PAYMENT","amount":"5.0"}
{"ROWTIME":1578477598555,"ROWKEY":"null","transaction":"PAYMENT","amount":"5.0"}
-- Declare the schema
CREATE STREAM rabbit (transaction VARCHAR,
amount VARCHAR)
WITH (KAFKA_TOPIC='rabbit-test-00',
VALUE_FORMAT='JSON');
-- Reserialise to Avro
CREATE STREAM TRANSACTIONS WITH (VALUE_FORMAT='AVRO',
KAFKA_TOPIC='reserialised_data') AS
SELECT *
FROM rabbit
EMIT CHANGES;
有关详细信息,请参阅我写的 this blog。