在 mqtt 源连接器上设置 Kafka 主题

Set Kafka Topics on the mqtt souce connector

我使用的是 confluent platform 5.3.1,我在分布式模式下定义了两个不同的 mqtt 源连接器,使用:

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source1",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 1>",
"mqtt.topics" : "<TOPIC MQTT 1>",
"kafka.topics" : "mqtt1",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source2",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "<IP-ADDRESS 2>",
"mqtt.topics" : "<TOPIC MQTT 2>",
"kafka.topics" : "mqtt2",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"value.converter.schemas.enable" : "false",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'

我有一些问题:

1) 查看连接器的状态,我得到两个连接器的相同结果(以下是 post 响应示例):

{
"name": "mqtt-source1",
"connector": {
  "state": "RUNNING",
  "worker_id": "127.0.0.1:8083"
},
"tasks": [
  {
   "id": 0,
   "state": "RUNNING",
   "worker_id": "127.0.0.1:8083"
 }
],
  "type": "source"
}

2) 当我创建第一个连接器时,主题 "mqtt" 会自动在 kafka 上创建。 就个人而言,我将在两个连接器中设置关于 kafka 的两个不同主题(即 "mqtt1" 和 "mqtt2"),但我无法从我创建的主题中读取任何数据。为什么? IP 和 mqtt 主题在两个连接器中不同。

提前致谢。

配置 属性 是 kafka.topic 而不是 kafka.topics

由于您指定了 kafka.topics 连接器采用默认值,即 mqtt

参考:Configuration Properties