Kafka Docker 和端口转发从 9092 到 9093
Kafka Docker and port forwarding from 9092 to 9093
我已经通过 maven docker 插件配置了 Kafka + ZooKeeper https://dmp.fabric8.io/:
<image>
<name>wurstmeister/zookeeper:latest</name>
<alias>zookeeper</alias>
<run>
<ports>
<port>2181:2181</port>
</ports>
</run>
</image>
<image>
<name>wurstmeister/kafka:1.0.0</name>
<alias>kafka</alias>
<run>
<ports>
<port>9092:9092</port>
</ports>
<links>
<link>zookeeper:zookeeper</link>
</links>
<env>
<KAFKA_ADVERTISED_HOST_NAME>127.0.0.1</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
</run>
</image>
在此配置上,一切正常。
现在我需要将 Kafka 端口(主机系统的转发端口)从 9092
更改为 9093
。
在我的 Spring 引导应用程序的 application.properties
中,我添加了以下行:
spring.kafka.bootstrap-servers=127.0.0.1:9093
并且还将 Kafka 映像的配置更改为:
<image>
<name>wurstmeister/kafka:1.0.0</name>
<alias>kafka</alias>
<run>
<ports>
<port>9093:9092</port>
</ports>
<links>
<link>zookeeper:zookeeper</link>
</links>
<env>
<KAFKA_ADVERTISED_HOST_NAME>127.0.0.1</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
</run>
</image>
并遇到以下错误:
2017-11-22 18:18:16.453 DEBUG 17532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Starting Kafka producer I/O thread.
2017-11-22 18:18:16.453 DEBUG 17532 --- [ main] o.a.k.clients.producer.KafkaProducer : Kafka producer started
2017-11-22 18:18:16.456 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request
2017-11-22 18:18:16.456 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at 127.0.0.1:9093.
2017-11-22 18:18:16.457 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-sent
2017-11-22 18:18:16.457 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-received
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.latency
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Completed connection to node -1. Fetching API versions.
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating API versions fetch from node -1.
2017-11-22 18:18:16.470 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0)
2017-11-22 18:18:16.471 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 18:18:16.477 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Updated cluster metadata version 2 to Cluster(id = I2vTDIF_RYacc2xxU2iAyQ, nodes = [127.0.0.1:9092 (id: 1001 rack: null)], partitions = [Partition(topic = boot.test, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
2017-11-22 18:18:16.492 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 1001 at 127.0.0.1:9092.
2017-11-22 18:18:17.115 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2147482646.bytes-sent
2017-11-22 18:18:17.115 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2147482646.bytes-received
2017-11-22 18:18:17.115 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2147482646.latency
2017-11-22 18:18:17.119 DEBUG 17532 --- [ntainer#0-0-C-1] o.apache.kafka.common.network.Selector : Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_152]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_152]
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:326) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:358) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) [kafka-clients-0.11.0.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571) [spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_152]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
2017-11-22 18:18:17.119 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Node 2147482646 disconnected.
2017-11-22 18:18:17.120 WARN 17532 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 2147482646 could not be established. Broker may not be available.
2017-11-22 18:18:17.120 DEBUG 17532 --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerNetworkClient : Cancelled JOIN_GROUP request {api_key=11,api_version=2,correlation_id=3,client_id=consumer-1} with correlation id 3 due to node 2147482646 being disconnected
2017-11-22 18:18:17.120 INFO 17532 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 127.0.0.1:9092 (id: 2147482646 rack: null) dead for group boot
2017-11-22 18:18:17.221 DEBUG 17532 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Sending GroupCoordinator request for group boot to broker 127.0.0.1:9092 (id: 1001 rack: null)
2017-11-22 18:18:17.221 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 1001 at 127.0.0.1:9092.
2017-11-22 18:18:17.494 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1001.bytes-sent
2017-11-22 18:18:17.494 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1001.bytes-received
2017-11-22 18:18:17.494 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1001.latency
2017-11-22 18:18:17.495 DEBUG 17532 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Connection with /127.0.0.1 disconnected
我做错了什么以及如何解决?
为什么 NetworkClient 仍在尝试连接到 9092
?
2017-11-22 18:18:16.492 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 1001 at 127.0.0.1:9092.
已更新
我还尝试了以下 env
属性:
<env>
<KAFKA_ADVERTISED_HOST_NAME>127.0.0.1</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ADVERTISED_PORT>9093</KAFKA_ADVERTISED_PORT>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
这样我有另一个 LEADER_NOT_AVAILABLE
错误:
2017-11-22 22:11:54.482 DEBUG 8988 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Starting Kafka producer I/O thread.
2017-11-22 22:11:54.482 INFO 8988 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : cb8625948210849f
2017-11-22 22:11:54.482 DEBUG 8988 --- [ main] o.a.k.clients.producer.KafkaProducer : Kafka producer started
2017-11-22 22:11:54.484 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request
2017-11-22 22:11:54.484 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at 127.0.0.1:9093.
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-sent
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-received
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.latency
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2017-11-22 22:11:54.486 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Completed connection to node -1. Fetching API versions.
2017-11-22 22:11:54.486 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating API versions fetch from node -1.
2017-11-22 22:11:54.488 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0)
2017-11-22 22:11:54.489 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.493 WARN 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.523 DEBUG 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.528 WARN 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.593 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.600 WARN 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 2 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.628 DEBUG 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.633 WARN 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 6 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.700 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.705 WARN 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {boot.test=LEADER_NOT_AVAILABLE}
这是Kafka容器日志:
waiting for kafka to be ready
[2017-11-22 20:32:16,203] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = PLAINTEXT://172.17.0.6:9093
advertised.port = null
alter.config.policy.class.name = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = -1
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 1.0-IV0
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = PLAINTEXT://:9092
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /kafka/kafka-logs-b0311aaa4e60
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 1.0-IV0
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = null
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect = zookeeper:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2017-11-22 20:32:16,278] INFO starting (kafka.server.KafkaServer)
[2017-11-22 20:32:16,279] INFO Connecting to zookeeper on zookeeper:2181 (kafka.server.KafkaServer)
[2017-11-22 20:32:16,291] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-11-22 20:32:16,296] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:host.name=b0311aaa4e60 (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.version=1.8.0_144 (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.home=/opt/jdk1.8.0_144/jre (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.class.path=:/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/opt/kafka/bin/..(org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:os.version=4.9.49-moby (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,298] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,309] INFO Opening socket connection to server zookeeper/172.17.0.4:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-11-22 20:32:16,309] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2017-11-22 20:32:16,313] INFO Socket connection established to zookeeper/172.17.0.4:2181, initiating session (org.apache.zookeeper.ClientCnxn)
...
[2017-11-22 20:32:16,952] INFO [ExpirationReaper-1001-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-11-22 20:32:16,956] INFO [ExpirationReaper-1001-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-11-22 20:32:16,957] INFO [ExpirationReaper-1001-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-11-22 20:32:16,960] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-11-22 20:32:16,972] INFO [GroupCoordinator 1001]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2017-11-22 20:32:16,973] INFO [GroupCoordinator 1001]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2017-11-22 20:32:16,978] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-11-22 20:32:16,988] INFO [ProducerId Manager 1001]: Acquired new producerId block (brokerId:1001,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2017-11-22 20:32:17,004] INFO [TransactionCoordinator id=1001] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-11-22 20:32:17,005] INFO [Transaction Marker Channel Manager 1001]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2017-11-22 20:32:17,005] INFO [TransactionCoordinator id=1001] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-11-22 20:32:17,039] INFO Creating /brokers/ids/1001 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-11-22 20:32:17,043] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-11-22 20:32:17,044] INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: EndPoint(172.17.0.6,9093,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-11-22 20:32:17,046] WARN No meta.properties file under dir /kafka/kafka-logs-b0311aaa4e60/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-11-22 20:32:17,064] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-11-22 20:32:17,064] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2017-11-22 20:32:17,065] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
[2017-11-22 20:32:17,100] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-11-22 20:32:17,208] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-11-22 20:32:17,308] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-11-22 20:32:17,410] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
哪里出了问题?
尝试将 localhost 替换为您主机的实际 IP 地址。 Localhost 在容器内意味着不同的东西。 Leader not available 表示由于某种原因无法联系到 broker。
我已经通过 maven docker 插件配置了 Kafka + ZooKeeper https://dmp.fabric8.io/:
<image>
<name>wurstmeister/zookeeper:latest</name>
<alias>zookeeper</alias>
<run>
<ports>
<port>2181:2181</port>
</ports>
</run>
</image>
<image>
<name>wurstmeister/kafka:1.0.0</name>
<alias>kafka</alias>
<run>
<ports>
<port>9092:9092</port>
</ports>
<links>
<link>zookeeper:zookeeper</link>
</links>
<env>
<KAFKA_ADVERTISED_HOST_NAME>127.0.0.1</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
</run>
</image>
在此配置上,一切正常。
现在我需要将 Kafka 端口(主机系统的转发端口)从 9092
更改为 9093
。
在我的 Spring 引导应用程序的 application.properties
中,我添加了以下行:
spring.kafka.bootstrap-servers=127.0.0.1:9093
并且还将 Kafka 映像的配置更改为:
<image>
<name>wurstmeister/kafka:1.0.0</name>
<alias>kafka</alias>
<run>
<ports>
<port>9093:9092</port>
</ports>
<links>
<link>zookeeper:zookeeper</link>
</links>
<env>
<KAFKA_ADVERTISED_HOST_NAME>127.0.0.1</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
</run>
</image>
并遇到以下错误:
2017-11-22 18:18:16.453 DEBUG 17532 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Starting Kafka producer I/O thread.
2017-11-22 18:18:16.453 DEBUG 17532 --- [ main] o.a.k.clients.producer.KafkaProducer : Kafka producer started
2017-11-22 18:18:16.456 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request
2017-11-22 18:18:16.456 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at 127.0.0.1:9093.
2017-11-22 18:18:16.457 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-sent
2017-11-22 18:18:16.457 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-received
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.latency
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Completed connection to node -1. Fetching API versions.
2017-11-22 18:18:16.458 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating API versions fetch from node -1.
2017-11-22 18:18:16.470 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0)
2017-11-22 18:18:16.471 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 18:18:16.477 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Updated cluster metadata version 2 to Cluster(id = I2vTDIF_RYacc2xxU2iAyQ, nodes = [127.0.0.1:9092 (id: 1001 rack: null)], partitions = [Partition(topic = boot.test, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
2017-11-22 18:18:16.492 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 1001 at 127.0.0.1:9092.
2017-11-22 18:18:17.115 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2147482646.bytes-sent
2017-11-22 18:18:17.115 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2147482646.bytes-received
2017-11-22 18:18:17.115 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2147482646.latency
2017-11-22 18:18:17.119 DEBUG 17532 --- [ntainer#0-0-C-1] o.apache.kafka.common.network.Selector : Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_152]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_152]
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:326) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:358) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) [kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) [kafka-clients-0.11.0.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:571) [spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_152]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
2017-11-22 18:18:17.119 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Node 2147482646 disconnected.
2017-11-22 18:18:17.120 WARN 17532 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Connection to node 2147482646 could not be established. Broker may not be available.
2017-11-22 18:18:17.120 DEBUG 17532 --- [ntainer#0-0-C-1] o.a.k.c.c.i.ConsumerNetworkClient : Cancelled JOIN_GROUP request {api_key=11,api_version=2,correlation_id=3,client_id=consumer-1} with correlation id 3 due to node 2147482646 being disconnected
2017-11-22 18:18:17.120 INFO 17532 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 127.0.0.1:9092 (id: 2147482646 rack: null) dead for group boot
2017-11-22 18:18:17.221 DEBUG 17532 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Sending GroupCoordinator request for group boot to broker 127.0.0.1:9092 (id: 1001 rack: null)
2017-11-22 18:18:17.221 DEBUG 17532 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 1001 at 127.0.0.1:9092.
2017-11-22 18:18:17.494 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1001.bytes-sent
2017-11-22 18:18:17.494 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1001.bytes-received
2017-11-22 18:18:17.494 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1001.latency
2017-11-22 18:18:17.495 DEBUG 17532 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Connection with /127.0.0.1 disconnected
我做错了什么以及如何解决?
为什么 NetworkClient 仍在尝试连接到 9092
?
2017-11-22 18:18:16.492 DEBUG 17532 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node 1001 at 127.0.0.1:9092.
已更新
我还尝试了以下 env
属性:
<env>
<KAFKA_ADVERTISED_HOST_NAME>127.0.0.1</KAFKA_ADVERTISED_HOST_NAME>
<KAFKA_ADVERTISED_PORT>9093</KAFKA_ADVERTISED_PORT>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
这样我有另一个 LEADER_NOT_AVAILABLE
错误:
2017-11-22 22:11:54.482 DEBUG 8988 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender : Starting Kafka producer I/O thread.
2017-11-22 22:11:54.482 INFO 8988 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : cb8625948210849f
2017-11-22 22:11:54.482 DEBUG 8988 --- [ main] o.a.k.clients.producer.KafkaProducer : Kafka producer started
2017-11-22 22:11:54.484 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initialize connection to node -1 for sending metadata request
2017-11-22 22:11:54.484 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating connection to node -1 at 127.0.0.1:9093.
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-sent
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-received
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.latency
2017-11-22 22:11:54.485 DEBUG 8988 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2017-11-22 22:11:54.486 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Completed connection to node -1. Fetching API versions.
2017-11-22 22:11:54.486 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Initiating API versions fetch from node -1.
2017-11-22 22:11:54.488 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0)
2017-11-22 22:11:54.489 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.493 WARN 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.523 DEBUG 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.528 WARN 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.593 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.600 WARN 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 2 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.628 DEBUG 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.633 WARN 8988 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 6 : {boot.test=LEADER_NOT_AVAILABLE}
2017-11-22 22:11:54.700 DEBUG 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Sending metadata request (type=MetadataRequest, topics=boot.test) to node -1
2017-11-22 22:11:54.705 WARN 8988 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {boot.test=LEADER_NOT_AVAILABLE}
这是Kafka容器日志:
waiting for kafka to be ready
[2017-11-22 20:32:16,203] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = PLAINTEXT://172.17.0.6:9093
advertised.port = null
alter.config.policy.class.name = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = -1
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 1.0-IV0
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = PLAINTEXT://:9092
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /kafka/kafka-logs-b0311aaa4e60
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.format.version = 1.0-IV0
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 1440
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
port = 9092
principal.builder.class = null
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.enabled.mechanisms = [GSSAPI]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism.inter.broker.protocol = GSSAPI
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect = zookeeper:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2017-11-22 20:32:16,278] INFO starting (kafka.server.KafkaServer)
[2017-11-22 20:32:16,279] INFO Connecting to zookeeper on zookeeper:2181 (kafka.server.KafkaServer)
[2017-11-22 20:32:16,291] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2017-11-22 20:32:16,296] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:host.name=b0311aaa4e60 (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.version=1.8.0_144 (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.home=/opt/jdk1.8.0_144/jre (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.class.path=:/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/opt/kafka/bin/..(org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:os.version=4.9.49-moby (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,297] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,298] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)
[2017-11-22 20:32:16,309] INFO Opening socket connection to server zookeeper/172.17.0.4:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2017-11-22 20:32:16,309] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2017-11-22 20:32:16,313] INFO Socket connection established to zookeeper/172.17.0.4:2181, initiating session (org.apache.zookeeper.ClientCnxn)
...
[2017-11-22 20:32:16,952] INFO [ExpirationReaper-1001-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-11-22 20:32:16,956] INFO [ExpirationReaper-1001-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-11-22 20:32:16,957] INFO [ExpirationReaper-1001-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2017-11-22 20:32:16,960] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-11-22 20:32:16,972] INFO [GroupCoordinator 1001]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2017-11-22 20:32:16,973] INFO [GroupCoordinator 1001]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2017-11-22 20:32:16,978] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-11-22 20:32:16,988] INFO [ProducerId Manager 1001]: Acquired new producerId block (brokerId:1001,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2017-11-22 20:32:17,004] INFO [TransactionCoordinator id=1001] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-11-22 20:32:17,005] INFO [Transaction Marker Channel Manager 1001]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2017-11-22 20:32:17,005] INFO [TransactionCoordinator id=1001] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2017-11-22 20:32:17,039] INFO Creating /brokers/ids/1001 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-11-22 20:32:17,043] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-11-22 20:32:17,044] INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: EndPoint(172.17.0.6,9093,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2017-11-22 20:32:17,046] WARN No meta.properties file under dir /kafka/kafka-logs-b0311aaa4e60/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-11-22 20:32:17,064] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-11-22 20:32:17,064] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2017-11-22 20:32:17,065] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
[2017-11-22 20:32:17,100] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-11-22 20:32:17,208] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-11-22 20:32:17,308] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-11-22 20:32:17,410] WARN [Controller id=1001, targetBrokerId=1001] Connection to node 1001 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
哪里出了问题?
尝试将 localhost 替换为您主机的实际 IP 地址。 Localhost 在容器内意味着不同的东西。 Leader not available 表示由于某种原因无法联系到 broker。