MQTT Kafka Source 连接器:有趣的字节字符

MQTT Kafka Source connector : funny byte characters

我正在关注 https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 使用 MQTT 源连接器连接 Mosquitto 和 Kafka。我正在获取 Mosquitto 发布者发送到 Mosquitto 订阅者和 Kafka 消费者的数据。但是我的 kafka-consumer ConsumerRecord 对象中的键和值字段有一些前置字节字符。 下面是代码片段和我得到的输出。

mqttPublisher.py

while v3 < 3:
             data3 = {
                      "time": str(datetime.datetime.now().time()),
                       "val": v3
                      }
             client.publish("sensor/dist", json.dumps(data3), qos=2)

             v3 += 1
             time.sleep(2)

mqttSubscriber.py

def on_message_print(client, userdata, message):
            print(message.topic,message.payload)

subscribe.callback(on_message_print, "sensor/#", hostname="localhost")

kafkaConsumer.py

consumer = KafkaConsumer('mqtt.',
                     bootstrap_servers=['localhost:9092'])

for message in consumer:
   print(message)

输出:mqttSubscriber.py

sensor/dist b'{"time": "12:44:30.817462", "val": 0}'

sensor/dist b'{"time": "12:44:32.820040", "val": 1}'

sensor/dist b'{"time": "12:44:34.822657", "val": 2}'

输出:kafkaConsumer.py

ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

是什么导致了 Kafka Consumer 中的上述额外字节? 提前致谢。

作为演示的一部分,您将启动架构注册表

Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):

confluent start connect

如果您查看前 5 个字节,您会发现它们以 0 开头,然后是代表整数的另外四个字节。

查看 Schema Registry Wire Format 并尝试执行 curl localhost:8081/subjects 以查看它是否列出了 mqtt-keymqtt-value 的主题名称。

如果您不需要 Avro,则需要配置和编辑您的 Kafka Connect 属性 文件以使用不同的转换器,而不是使用 confluent start 除了获取 Kafka 和 Zookeeper 运行

或者如果你想Python反序列化Avro,你可以参考Github

上的confluent-kafka-python repo