kafka_Mongodb sync connector failing ERROR WorkerSinkTask{id=mongo-sink-0} 任务正在被终止并且在手动重新启动之前不会恢复
kafka_Mongodb sync connector failing ERROR WorkerSinkTask{id=mongo-sink-0} Task is being killed and will not recover until manually restarted
正在启动同步作业
bash-4.4# nohup ./bin/connect-standalone.sh ./config/connect-standalone-mongo.properties ./config/connect-mongo-sink.properties
出现以下错误..
ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: null
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:753)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:374)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
信息独立配置值:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [172.18.0.7:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
已加载示例配置文件...
信息 AdminClientConfig 值:
bootstrap.servers = [172.18.0.7:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
[2020-04-11 11:51:49,331] 信息 MongoSinkTopicConfig 值:
change.data.capture.handler =
collection = changedata
database = z**
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 3
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = changedata
我要更改什么或如何测试 kafka_mongodb 接收器..?
我也在尝试在容器内...与外部交谈 Mongodb。
Kakfa 和 Zookeeper 运行 在容器内,我在正确的位置获得了必要的插件和配置文件。
确保 Mongodb 的 IP 地址正确,端口已打开,检查防火墙和其他安全设置,同时确保 mongodb 正在侦听 0.0.0.0属于 127.0.0.1
正在启动同步作业
bash-4.4# nohup ./bin/connect-standalone.sh ./config/connect-standalone-mongo.properties ./config/connect-mongo-sink.properties
出现以下错误..
ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: null
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:753)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:374)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
信息独立配置值:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [172.18.0.7:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
已加载示例配置文件... 信息 AdminClientConfig 值:
bootstrap.servers = [172.18.0.7:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
[2020-04-11 11:51:49,331] 信息 MongoSinkTopicConfig 值:
change.data.capture.handler =
collection = changedata
database = z**
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 3
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = changedata
我要更改什么或如何测试 kafka_mongodb 接收器..? 我也在尝试在容器内...与外部交谈 Mongodb。 Kakfa 和 Zookeeper 运行 在容器内,我在正确的位置获得了必要的插件和配置文件。
确保 Mongodb 的 IP 地址正确,端口已打开,检查防火墙和其他安全设置,同时确保 mongodb 正在侦听 0.0.0.0属于 127.0.0.1