Confluent HttpSinkConnector 给出了复制因子为 3 的错误,当它在任何地方都被提到为 1 时

Confluent HttpSinkConnector is giving the error that Replication factor is 3 when it is mentioned as 1 everywhere

我正在使用 Confluent HttpSinkConnector 并遵循此页面中提到的步骤:https://docs.confluent.io/current/connect/kafka-connect-http/index.html

检查控制中心页面上显示的所有属性详细信息后,我更新了我的连接器接收器属性文件。我最终的属性文件如下:

name = HttpSink
connector.class = io.confluent.connect.http.HttpSinkConnector
tasks.max = 1
value.converter = org.apache.kafka.connect.storage.StringConverter
topics = http-messages
http.api.url = http://localhost:8080/api/messages
request.method = post
auth.type = none
reporter.result.topic.replication.factor = 1
reporter.result.topic.partitions = 1
reporter.bootstrap.servers = localhost:9092
confluent.topic.bootstrap.servers = localhost:9092
confluent.topic = http-messages
confluent.topic.replication.factor = 1

我还检查了 "http-messages" 主题已经创建,其配置在控制中心上显示如下:

Overview     MessagesSchemaConfiguration
name     http-messages
partitions     1
compression.type     producer
leader.replication.throttled.replicas
message.downconversion.enable     true
min.insync.replicas      1
segment.jitter.ms      0
cleanup.policy      delete
flush.ms      9223372036854775807
follower.replication.throttled.replicas
segment.bytes      1073741824
retention.ms   604800000
flush.messages  9223372036854775807
message.format.version   2.5-IV0
file.delete.delay.ms   60000
max.compaction.lag.ms    9223372036854775807
max.message.bytes    1048588
min.compaction.lag.ms    0
message.timestamp.type
CreateTime
preallocate   false
min.cleanable.dirty.ratio   0.5
index.interval.bytes    4096
unclean.leader.election.enable    false
retention.bytes   -1
delete.retention.ms   86400000
segment.ms    604800000
message.timestamp.difference.max.ms     9223372036854775807
segment.index.bytes    10485760

然而,当我尝试 运行 HttpSink-connect 任务时,任务失败并出现以下错误消息,如在 Rest 请求的响应中收到的状态消息的输出中所发现的:curl -X GET localhost:8083/connectors/HttpSink/tasks/0/status

{"id":0,"state":"FAILED","worker_id":"127.0.0.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Unable to manage topics:\n\tat io.confluent.connect.reporter.ReporterAdminClient.handleExecutionException(ReporterAdminClient.java:109)\n\tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:57)\n\tat io.confluent.connect.reporter.Reporter.createDestinationTopicsIfNeeded(Reporter.java:433)\n\tat io.confluent.connect.reporter.Reporter.configure(Reporter.java:80)\n\tat io.confluent.connect.http.HttpSinkTask.start(HttpSinkTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.access[=35= ]0(KafkaFutureImpl.java:32)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)\n\tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:53)\n\t... 12 more\nCaused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.\n"}

此外,我在 Rest 请求的响应中收到以下响应 "curl -X GET localhost:8083/connectors/HttpSink/topics"

{"HttpSink":{"topics":[]}}

请帮我解决这个问题。

要克服此异常,除了 reporter.result.topic.replication.factor 和 confluent.topic.replication.factor 之外,您还需要将 reporter.error.topic.replication.factor 设置为 1 以及每个 kafka-connect-http 配置属性 - https://docs.confluent.io/current/connect/kafka-connect-http/connector_config.html

"confluent.topic.replication.factor": 1,
"reporter.result.topic.replication.factor": 1,
"reporter.error.topic.replication.factor": 1