MQTT Kafka Source 连接器:有趣的字节字符
MQTT Kafka Source connector : funny byte characters
我正在关注 https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 使用 MQTT 源连接器连接 Mosquitto 和 Kafka。我正在获取 Mosquitto 发布者发送到 Mosquitto 订阅者和 Kafka 消费者的数据。但是我的 kafka-consumer ConsumerRecord 对象中的键和值字段有一些前置字节字符。
下面是代码片段和我得到的输出。
mqttPublisher.py
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqttSubscriber.py
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
kafkaConsumer.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
输出:mqttSubscriber.py
sensor/dist b'{"time": "12:44:30.817462", "val": 0}'
sensor/dist b'{"time": "12:44:32.820040", "val": 1}'
sensor/dist b'{"time": "12:44:34.822657", "val": 2}'
输出:kafkaConsumer.py
ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
是什么导致了 Kafka Consumer 中的上述额外字节?
提前致谢。
作为演示的一部分,您将启动架构注册表
Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):
confluent start connect
如果您查看前 5 个字节,您会发现它们以 0 开头,然后是代表整数的另外四个字节。
查看 Schema Registry Wire Format 并尝试执行 curl localhost:8081/subjects
以查看它是否列出了 mqtt-key
和 mqtt-value
的主题名称。
如果您不需要 Avro,则需要配置和编辑您的 Kafka Connect 属性 文件以使用不同的转换器,而不是使用 confluent start
除了获取 Kafka 和 Zookeeper 运行
或者如果你想Python反序列化Avro,你可以参考Github
上的confluent-kafka-python repo
我正在关注 https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 使用 MQTT 源连接器连接 Mosquitto 和 Kafka。我正在获取 Mosquitto 发布者发送到 Mosquitto 订阅者和 Kafka 消费者的数据。但是我的 kafka-consumer ConsumerRecord 对象中的键和值字段有一些前置字节字符。 下面是代码片段和我得到的输出。
mqttPublisher.py
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqttSubscriber.py
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
kafkaConsumer.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
输出:mqttSubscriber.py
sensor/dist b'{"time": "12:44:30.817462", "val": 0}'
sensor/dist b'{"time": "12:44:32.820040", "val": 1}'
sensor/dist b'{"time": "12:44:34.822657", "val": 2}'
输出:kafkaConsumer.py
ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist', value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b'false')], 校验和=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
是什么导致了 Kafka Consumer 中的上述额外字节? 提前致谢。
作为演示的一部分,您将启动架构注册表
Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):
confluent start connect
如果您查看前 5 个字节,您会发现它们以 0 开头,然后是代表整数的另外四个字节。
查看 Schema Registry Wire Format 并尝试执行 curl localhost:8081/subjects
以查看它是否列出了 mqtt-key
和 mqtt-value
的主题名称。
如果您不需要 Avro,则需要配置和编辑您的 Kafka Connect 属性 文件以使用不同的转换器,而不是使用 confluent start
除了获取 Kafka 和 Zookeeper 运行
或者如果你想Python反序列化Avro,你可以参考Github
上的confluent-kafka-python repo