集群的一个 Kafka 代理宕机时出现 ConnectException
ConnectException when one Kafka broker of cluster is down
我有两个 Kafka 经纪人:server1:9092 和 server2:9092
我正在使用 Java 客户端向这个集群发送消息,这是代码:
@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {
//See at http://kafka.apache.org/documentation.html#newproducerconfigs
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server1:9092,server2:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> myRecord =
new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");
boolean syncSend = true;
if (syncSend) {
//Synchronously send
producer.send(myRecord).get();
} else {
//Asynchronously send
producer.send(myRecord);
}
producer.close();
}
当其中一个代理关闭时,测试在某些情况下会抛出此异常(在此异常示例中 'server1' 已关闭):
2015-11-02 17:59:29,138 WARN
[org.apache.kafka.common.network.Selector] Error in I/O with
server1/40.35.250.227 java.net.ConnectException: Connection refused:
no further information at
sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
这是我解决问题的方法:
至少需要3个ZooKeeper节点,我还要多配置一个。这是因为ZK确定leader的方式,需要超过50%的节点up,运行.
将此参数添加到 ZooKeeper 属性文件:
tickTime=200
此参数是使用其他参数所必需的:
initLimit=5
syncLimit=2
在制作者中添加这个属性:
props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");
使用 "RECONNECT_BACKOFF_MS_CONFIG"
属性 WARN 仅抛出一次(不是无限循环)然后发送消息
我确实遇到了这个问题,事实证明原因是对其中一个新配置属性的误解。
在从以前的生产者 API 迁移时,我寻找了与 "topic.metadata.refresh.interval.ms" 等效的产品并选择了 ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG。然而,这是在尝试访问元数据被视为失败之前的超时,并且由于我将其设置为几分钟,它阻止了故障转移的发生。
将此设置为较低的值(我选择 500 毫秒)似乎解决了我的问题。
我认为我最初寻找的值是ProducerConfig.METADATA_MAX_AGE_CONFIG,因为无论是否发生故障,元数据刷新前的超时
我有两个 Kafka 经纪人:server1:9092 和 server2:9092 我正在使用 Java 客户端向这个集群发送消息,这是代码:
@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {
//See at http://kafka.apache.org/documentation.html#newproducerconfigs
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server1:9092,server2:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> myRecord =
new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");
boolean syncSend = true;
if (syncSend) {
//Synchronously send
producer.send(myRecord).get();
} else {
//Asynchronously send
producer.send(myRecord);
}
producer.close();
}
当其中一个代理关闭时,测试在某些情况下会抛出此异常(在此异常示例中 'server1' 已关闭):
2015-11-02 17:59:29,138 WARN [org.apache.kafka.common.network.Selector] Error in I/O with server1/40.35.250.227 java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.kafka.common.network.Selector.poll(Selector.java:238) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:745)
这是我解决问题的方法:
至少需要3个ZooKeeper节点,我还要多配置一个。这是因为ZK确定leader的方式,需要超过50%的节点up,运行.
将此参数添加到 ZooKeeper 属性文件:
tickTime=200
此参数是使用其他参数所必需的:
initLimit=5
syncLimit=2
在制作者中添加这个属性:
props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");
使用 "RECONNECT_BACKOFF_MS_CONFIG"
属性 WARN 仅抛出一次(不是无限循环)然后发送消息
我确实遇到了这个问题,事实证明原因是对其中一个新配置属性的误解。
在从以前的生产者 API 迁移时,我寻找了与 "topic.metadata.refresh.interval.ms" 等效的产品并选择了 ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG。然而,这是在尝试访问元数据被视为失败之前的超时,并且由于我将其设置为几分钟,它阻止了故障转移的发生。
将此设置为较低的值(我选择 500 毫秒)似乎解决了我的问题。
我认为我最初寻找的值是ProducerConfig.METADATA_MAX_AGE_CONFIG,因为无论是否发生故障,元数据刷新前的超时