在 kubernetes 中远程访问 Kafka 运行
Remotely accessing Kafka running inside kubernetes
我在单节点 kubernetes 环境的 pod 中有一个单节点 Kafka 代理 运行。我将此图像用于 kafka:https://hub.docker.com/r/wurstmeister/kafka
kafka 版本 = 1.1.0
Kubernetes 集群 运行 在服务器上的虚拟机中。 VM 在活动接口 ens32 上具有以下 IP - 192.168.3.102
Kafka.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
namespace: casb-deployment
name: kafkaservice
spec:
replicas: 1
template:
metadata:
labels:
app: kafkaservice
spec:
hostname: kafkaservice
containers:
- name: kafkaservice
imagePullPolicy: IfNotPresent
image: wurstmeister/kafka:1.1.0
env:
- name: KAFKA_BROKER_ID
value: "1"
# - name: KAFKA_ADVERTISED_HOST_NAME
# value: "kafkaservice"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
- name: KAFKA_LISTENERS
value: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL_PLAINTEXT://kafkaservice:9092,EXTERNAL_PLAINTEXT://192.168.3.102:9093"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL_PLAINTEXT"
- name: KAFKA_CREATE_TOPICS
value: "topic-1:100:1,topic-2:1:1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
ports:
- name: port9092
containerPort: 9092
- name: port9093
containerPort: 9093
volumeMounts:
- mountPath: /kafka/kafka-logs-kafkaservice
name: kafka-volume
volumes:
- name: kafka-volume
hostPath:
path: /home/volume/kafka-logs
---
apiVersion: v1
kind: Service
metadata:
namespace: casb-deployment
name: kafkaservice
labels:
app: kafkaservice
spec:
selector:
app: kafkaservice
ports:
- name: port9092
port: 9092
targetPort: 9092
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
namespace: casb-deployment
name: kafkaservice-external
labels:
app: kafkaservice-external
spec:
selector:
app: kafkaservice
ports:
- name: port9093
port: 9093
protocol: TCP
nodePort: 30035
type: NodePort
我能够从我的本地计算机 ping 虚拟机,即 kubernetes 节点 ping 192.168.3.102
,并且我正在使用 nodePort 公开服务。
我也可以远程登录 telnet 192.168.3.102 30035
并且它给出:
Trying 192.168.3.102...
Connected to 192.168.3.102.
Escape character is '^]'.
我在我的本地机器上尝试了 运行 kafka 控制台消费者和生产者:
消费者:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.102:30035 --topic topic-1 --from-beginning
输出:
[2019-09-25 12:30:40,716] WARN [Consumer clientId=consumer-1, groupId=console-consumer-20551] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
制作人:
bin/kafka-console-producer.sh --broker-list 192.168.3.102:30035 --topic topic-1
输出:
[2019-09-25 12:32:07,958] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
网络统计:
netstat -tulpn | grep 30035
tcp6 0 0 :::30035 :::* LISTEN 113545/kube-proxy
我尝试了 运行 基于 python 的消费者,即 kafka-python==1.4.2 它给了我以下日志:
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrapping cluster metadata from [('192.168.3.102', 30035, <AddressFamily.AF_INET: 2>)]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: connecting to 192.168.3.102:30035 [('192.168.3.102', 30035) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrap succeeded: found 1 brokers and 26 topics.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connected> [IPv4 ('192.168.3.102', 30035)]>: Closing connection.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn Probing node 1 broker version
[2019-09-25T12:15:39+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:40+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO Activity URL collector Exception in activity url collector: NoBrokersAvailable
从日志来看,似乎已建立连接,即
<connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
Bootstrap succeeded: found 1 brokers and 26 topics.
但后来就断线了。
请帮我弄清楚我缺少什么以及如何解决这个问题。谢谢。
您正在引导到端口 30035,并获得初始连接,然后为后续连接返回播发的 9093 端口,而不是 30035。
您需要 NodePort 和广告中的相同,或者至少两者都可以在外部路由。如果您的代码在您的主机
上是 运行,那么您还需要在您的 VM 上进行端口转发
注意:存在用于在 Kubernetes 中设置 Kafka 的 Confluent 或 Strimzi Helm Charts
AWS/DigitalOcean/GCE/Azure 中 Kubernetes 的 Kafka/Zookeeper 简单配置,具有外部访问权限:
https://github.com/StanislavKo/k8s_digitalocean_kafka
您可以通过常规二进制协议从 AWS/DO/GCE 外部连接到 Kafka。连接是 PLAINTEXT 或 SASL_PLAINTEXT (username/password).
Kafka 集群是 StatefulSet,因此您可以轻松扩展集群。
我还会为 Kubernetes 上的 Kafka 推荐 Strimzi。对于外部访问这篇文章救了我https://developers.redhat.com/blog/2019/06/11/accessing-apache-kafka-in-strimzi-part-4-load-balancers/。
我的配置如下:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.4.0
replicas: 1
listeners:
plain: {}
tls: {}
external:
type: loadbalancer
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.4"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
这用于检索 IP
kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
我在单节点 kubernetes 环境的 pod 中有一个单节点 Kafka 代理 运行。我将此图像用于 kafka:https://hub.docker.com/r/wurstmeister/kafka
kafka 版本 = 1.1.0
Kubernetes 集群 运行 在服务器上的虚拟机中。 VM 在活动接口 ens32 上具有以下 IP - 192.168.3.102
Kafka.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
namespace: casb-deployment
name: kafkaservice
spec:
replicas: 1
template:
metadata:
labels:
app: kafkaservice
spec:
hostname: kafkaservice
containers:
- name: kafkaservice
imagePullPolicy: IfNotPresent
image: wurstmeister/kafka:1.1.0
env:
- name: KAFKA_BROKER_ID
value: "1"
# - name: KAFKA_ADVERTISED_HOST_NAME
# value: "kafkaservice"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
- name: KAFKA_LISTENERS
value: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL_PLAINTEXT://kafkaservice:9092,EXTERNAL_PLAINTEXT://192.168.3.102:9093"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL_PLAINTEXT"
- name: KAFKA_CREATE_TOPICS
value: "topic-1:100:1,topic-2:1:1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
ports:
- name: port9092
containerPort: 9092
- name: port9093
containerPort: 9093
volumeMounts:
- mountPath: /kafka/kafka-logs-kafkaservice
name: kafka-volume
volumes:
- name: kafka-volume
hostPath:
path: /home/volume/kafka-logs
---
apiVersion: v1
kind: Service
metadata:
namespace: casb-deployment
name: kafkaservice
labels:
app: kafkaservice
spec:
selector:
app: kafkaservice
ports:
- name: port9092
port: 9092
targetPort: 9092
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
namespace: casb-deployment
name: kafkaservice-external
labels:
app: kafkaservice-external
spec:
selector:
app: kafkaservice
ports:
- name: port9093
port: 9093
protocol: TCP
nodePort: 30035
type: NodePort
我能够从我的本地计算机 ping 虚拟机,即 kubernetes 节点 ping 192.168.3.102
,并且我正在使用 nodePort 公开服务。
我也可以远程登录 telnet 192.168.3.102 30035
并且它给出:
Trying 192.168.3.102...
Connected to 192.168.3.102.
Escape character is '^]'.
我在我的本地机器上尝试了 运行 kafka 控制台消费者和生产者:
消费者: bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.102:30035 --topic topic-1 --from-beginning
输出:
[2019-09-25 12:30:40,716] WARN [Consumer clientId=consumer-1, groupId=console-consumer-20551] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
制作人:
bin/kafka-console-producer.sh --broker-list 192.168.3.102:30035 --topic topic-1
输出:
[2019-09-25 12:32:07,958] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
网络统计:
netstat -tulpn | grep 30035
tcp6 0 0 :::30035 :::* LISTEN 113545/kube-proxy
我尝试了 运行 基于 python 的消费者,即 kafka-python==1.4.2 它给了我以下日志:
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrapping cluster metadata from [('192.168.3.102', 30035, <AddressFamily.AF_INET: 2>)]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: connecting to 192.168.3.102:30035 [('192.168.3.102', 30035) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrap succeeded: found 1 brokers and 26 topics.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connected> [IPv4 ('192.168.3.102', 30035)]>: Closing connection.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn Probing node 1 broker version
[2019-09-25T12:15:39+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:40+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO Activity URL collector Exception in activity url collector: NoBrokersAvailable
从日志来看,似乎已建立连接,即
<connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
Bootstrap succeeded: found 1 brokers and 26 topics.
但后来就断线了。
请帮我弄清楚我缺少什么以及如何解决这个问题。谢谢。
您正在引导到端口 30035,并获得初始连接,然后为后续连接返回播发的 9093 端口,而不是 30035。
您需要 NodePort 和广告中的相同,或者至少两者都可以在外部路由。如果您的代码在您的主机
上是 运行,那么您还需要在您的 VM 上进行端口转发注意:存在用于在 Kubernetes 中设置 Kafka 的 Confluent 或 Strimzi Helm Charts
AWS/DigitalOcean/GCE/Azure 中 Kubernetes 的 Kafka/Zookeeper 简单配置,具有外部访问权限:
https://github.com/StanislavKo/k8s_digitalocean_kafka
您可以通过常规二进制协议从 AWS/DO/GCE 外部连接到 Kafka。连接是 PLAINTEXT 或 SASL_PLAINTEXT (username/password).
Kafka 集群是 StatefulSet,因此您可以轻松扩展集群。
我还会为 Kubernetes 上的 Kafka 推荐 Strimzi。对于外部访问这篇文章救了我https://developers.redhat.com/blog/2019/06/11/accessing-apache-kafka-in-strimzi-part-4-load-balancers/。 我的配置如下:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.4.0
replicas: 1
listeners:
plain: {}
tls: {}
external:
type: loadbalancer
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.4"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
这用于检索 IP
kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'