Kafka MQTT 接收器连接器失败,HashMap 不是受支持的类型
Kafka MQTT Sink Connector fails with HashMap is not a supported type
这是我的 Kafka-MQTT 接收器连接器配置
name=anonymous
confluent.topic.bootstrap.servers= localhost:9092
connector.class=io.confluent.connect.mqtt.MqttSinkConnector
confluent.topic.replication.factor=1
tasks.max=1
mqtt.server.uri=tcp://127.0.0.1:1883
topics=mqtt
当我 运行 Kafka 连接失败并显示此消息时
[2021-07-22 17:35:37,565] ERROR WorkerSinkTask{id=anonymous-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.util.HashMap is not a supported type. (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
java.lang.UnsupportedOperationException: java.util.HashMap is not a supported type.
at io.confluent.connect.mqtt.SinkConverter.convert(SinkConverter.java:39)
at io.confluent.connect.mqtt.MqttSinkTask.put(MqttSinkTask.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Kafka-MQTT 版本是 1.4.1 哪里出错了?
检查你的默认 key/value 转换器,你如何启动你的 kafka 连接?分享您的连接配置文件,您在 value/key 转换器
上有什么值
尝试添加此配置定义
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
这是我的 Kafka-MQTT 接收器连接器配置
name=anonymous
confluent.topic.bootstrap.servers= localhost:9092
connector.class=io.confluent.connect.mqtt.MqttSinkConnector
confluent.topic.replication.factor=1
tasks.max=1
mqtt.server.uri=tcp://127.0.0.1:1883
topics=mqtt
当我 运行 Kafka 连接失败并显示此消息时
[2021-07-22 17:35:37,565] ERROR WorkerSinkTask{id=anonymous-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.util.HashMap is not a supported type. (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
java.lang.UnsupportedOperationException: java.util.HashMap is not a supported type.
at io.confluent.connect.mqtt.SinkConverter.convert(SinkConverter.java:39)
at io.confluent.connect.mqtt.MqttSinkTask.put(MqttSinkTask.java:95)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Kafka-MQTT 版本是 1.4.1 哪里出错了?
检查你的默认 key/value 转换器,你如何启动你的 kafka 连接?分享您的连接配置文件,您在 value/key 转换器
上有什么值尝试添加此配置定义
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"