Confluent connect 5.5.1 抛出异常:java.lang.OutOfMemoryError UncaughtExceptionHandler in thread kafka-coordinator-heartbeat-thread |

Confluent connect 5.5.1 is throwing Exception: java.lang.OutOfMemoryError UncaughtExceptionHandler in thread kafka-coordinator-heartbeat-thread |

我有一个由多个子集群组成的大型 Confluent Kafka 集群 一个用于 Zookeeper,另一个用于具有 Schema Registry 和 KSQL 流的 Kafka 代理 还有一个集群用于 Connect。

我的连接集群有问题,因为我已经按照文章 here -

将我所有工作实例的 rest.advertised.host.name 配置为 FQDN

以下是我在所有节点上的连接分布式日志文件中不断看到的错误 -

connectDistributed.out

错误 1-

[2021-08-12 14:07:48,932] INFO [Consumer clientId=connector-consumer-XYZ-0, groupId=connect-XYZ] Attempt to
heartbeat failed since group is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1054)

错误 2-

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-coordinator-heartbeat-thread | connect-XYZ" 

Following are the connect worker properties -

bootstrap.servers=production-kafka-elb.int.supportabc.platform.co.uk:9092
group.id=connect-cluster-cc
connect.protocol=compatible
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets-cc
offset.storage.replication.factor=5
config.storage.topic=connect-configs-cc
config.storage.replication.factor=5
status.storage.topic=connect-status-cc
status.storage.replication.factor=5
offset.flush.interval.ms=10000
rest.port=8085
rest.advertised.host.name=bblpkaa011.int.supportabc.platform.co.uk
rest.advertised.port=8085
plugin.path=/usr/share/java,/apps/confluent-5.5.1/share/java/
key.converter.schema.registry.url=abc-production-kafka-elb.int.supportabc.platform.co.uk:8081
value.converter.schema.registry.url=abc-production-kafka-elb.int.supportabc.platform.co.uk:8081

我确定每个工作人员都分配了 6GB -

See the process trace -

java -Xmx6G -Xms6G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:MetaspaceSize=96m -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/apps/confluent-5.5.1/bin/../logs -Dlog4j.configuration=file:/apps/confluent-5.5.1/bin/../etc/kafka/connect-log4j.properties -cp /apps/confluent-5.5.1/share/java/confluent-security/connect/*:/apps/confluent-5.5.1/share/java/kafka/*:/apps/confluent-5.5.1/share/java/confluent-common/*:/apps/confluent-5.5.1/share/java/kafka-serde-tools/*:/apps/confluent-5.5.1/share/java/monitoring-interceptors/*:/apps/confluent-5.5.1/bin/../ce-broker-plugins/build/libs/*:/apps/confluent-5.5.1/bin/../ce-broker-plugins/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-auth-providers/build/libs/*:/apps/confluent-5.5.1/bin/../ce-auth-providers/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-rest-server/build/libs/*:/apps/confluent-5.5.1/bin/../ce-rest-server/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../ce-audit/build/libs/*:/apps/confluent-5.5.1/bin/../ce-audit/build/dependant-libs/*:/apps/confluent-5.5.1/bin/../share/java/kafka/*:/apps/confluent-5.5.1/bin/../share/java/confluent-metadata-service/*:/apps/confluent-5.5.1/bin/../share/java/rest-utils/*:/apps/confluent-5.5.1/bin/../share/java/confluent-common/*:/apps/confluent-5.5.1/bin/../share/java/confluent-security/schema-validator/*:/apps/confluent-5.5.1/bin/../support-metrics-client/build/dependant-libs-2.12.10/*:/apps/confluent-5.5.1/bin/../support-metrics-client/build/libs/*:/usr/share/java/support-metrics-client/*:/apps/confluent-5.5.1/bin/../support-metrics-fullcollector/build/dependant-libs-2.12.10/*:/apps/confluent-5.5.1/bin/../support-metrics-fullcollector/build/libs/*:/usr/share/java/support-metrics-fullcollector/* -javaagent:/apps/ad/java-agent-20.9.0.30985-latest/javaagent.jar org.apache.kafka.connect.cli.ConnectDistributed /apps/confluent-5.5.1/etc/kafka/connect-distributed-worker-cc.properties

请问如何解决这个问题?

What actually happened!

在 Connect 集群上,当集群上的所有节点都进入 Confluent 所谓的 'STOP-THE-WORLD' 重平衡事件时,就会出现这种情况。

这实质上意味着无论之前集群上有多少个连接器工作程序/任务运行,它们都会停止处理它们之前的任何内容并跳入重新平衡模式以争夺领导者。

Why it happened!

您的一个 Connect worker 属性文件设置为此 -> connect.protocol=compatible

connect worker 属性中的一些其他重大更改或 worker 在不先暂停 运行 任务的情况下重新启动

Solution

rest.advertised.host.name=<FULLY QUALIFIED HOST NAME> OR <IP.ADDRESS>
rest.advertised.port=8083

我已经能够按照下面提到的顺序执行以下步骤来解决这个问题 -

  1. 停止了 Connect worker 运行 connect.protocol=compatible

  2. 停止了其他 Connect worker

  3. 在所有工作人员属性文件中添加了两个属性 -rest.advertised.host.name= -rest.advertised.port=

  4. 一个接一个地重新启动 Connect worker 并注意到下面的 属性 被拾取了

[kafka@abchostnamekk01 logs]$ grep -ri 'info advertised' connectDistributed.out
    [2021-08-12 14:06:50,809] INFO Advertised URI: http://abchostnamekk01.domain.com:8083

消除内存不足错误的正确答案是增加进程配置中的 Xms 和 Xmx 内存分配变量,然后从容地重新启动它。

Check if the existing process has a variable like shown below

./connect-distributed:  export KAFKA_HEAP_OPTS="-Xms6G -Xmx6G"

Check the output of free -m or top on the target server

KiB Mem : 32304516 total,   288648 free, 17298612 used, 

Change the memory allocation limits as per the available memory in the system

./connect-distributed:export KAFKA_HEAP_OPTS="-Xmx28G -Xms24G"

Gracefully stop the process on the console using flag below. If -SIGTERM doesn't work use -SIGKILL

kill -SIGTERM <PID> 

Restart the service using the usual restart command

/apps/confluent-5.5.1/bin/connect-distributed -daemon /apps/confluent-5.5.1/etc/kafka/connect-distributed-worker1.properties

After this restart everything should stabilize