尝试从主题创建索引时,Elasticsearch 接收器连接器抛出 403 禁止异常
Elasticsearch sink connector throws 403 forbidden exception when trying to create indices from topics
我正在尝试创建一个到弹性云的接收器连接器。这是我的 Elasticsearch 接收器连接器(使用 ksqldb)的配置。
create sink connector elastic_writer with (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='********',
'connection.username'='********',
'connection.password'='********',
'type.name'='kafka-connect',
'topics.regex'='sqlserver\.dbo\.*',
'schema.ignore'='true');
当我创建接收器连接器时,我首先遇到此错误。
[2020-11-02 08:56:37,480] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Requesting Elasticsearch create index 'sqlserver.dbo.quotations' (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,494] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,503] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,504] WARN Failed to create index sqlserver.dbo.quotations with attempt 1/6, will attempt retry after 62 ms. Failure reason: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
然后它会循环所有重试,直到我最终收到以下错误并且任务被终止。
[2020-11-02 08:56:40,245] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndex(JestElasticsearchClient.java:451)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:421)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:374)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:131)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:679)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-02 08:56:40,246] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
关于如何解决这个问题有什么想法吗?我已经尝试在创建接收器连接器之前制作索引,但这并没有解决问题并且 kafka connect 引发了完全相同的错误。
当elastic sink connector无法连接到elastic service时抛出这个403异常。检查您的防火墙设置 and/or 弹性云部署中应用的过滤器
我正在尝试创建一个到弹性云的接收器连接器。这是我的 Elasticsearch 接收器连接器(使用 ksqldb)的配置。
create sink connector elastic_writer with (
'connector.class'='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url'='********',
'connection.username'='********',
'connection.password'='********',
'type.name'='kafka-connect',
'topics.regex'='sqlserver\.dbo\.*',
'schema.ignore'='true');
当我创建接收器连接器时,我首先遇到此错误。
[2020-11-02 08:56:37,480] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,486] INFO Requesting Elasticsearch create index 'sqlserver.dbo.quotations' (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,494] INFO Index 'sqlserver.dbo.quotations' not found in local cache; checking for existence (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,503] INFO Index 'sqlserver.dbo.quotations' not found in Elasticsearch. Error message: 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
[2020-11-02 08:56:37,504] WARN Failed to create index sqlserver.dbo.quotations with attempt 1/6, will attempt retry after 62 ms. Failure reason: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden (io.confluent.connect.elasticsearch.jest.JestElasticsearchClient)
然后它会循环所有重试,直到我最终收到以下错误并且任务被终止。
[2020-11-02 08:56:40,245] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Could not create index 'sqlserver.dbo.quotations': 403 Forbidden
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndex(JestElasticsearchClient.java:451)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:421)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:374)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:131)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:679)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[2020-11-02 08:56:40,246] ERROR WorkerSinkTask{id=ELASTIC_WRITER-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
关于如何解决这个问题有什么想法吗?我已经尝试在创建接收器连接器之前制作索引,但这并没有解决问题并且 kafka connect 引发了完全相同的错误。
当elastic sink connector无法连接到elastic service时抛出这个403异常。检查您的防火墙设置 and/or 弹性云部署中应用的过滤器