可以与 Zookeeper 对话,但不能与消息代理对话
Can talk to Zookeeper but not to the message brokers
我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索 bootstrap 服务器并与它们建立网络连接,但没有消息通过。相反,在 A
类型的每条消息之后,我立即收到一个 B
... 类型的消息,最终收到一个 C
:
类型的消息
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后立即再次关闭它?
编辑
主题已存在,kafka-topics.sh --list
显示。
我用过的所有客户端都有同样的问题:Kafka 的kafka-console-producer.sh
、kafka-python, confluent-kafka, and kafkacat
Kafka 集群与我所有其他机器位于同一 VPC 中,其安全组允许该 VPC 内的任何传入和传出流量。
但是,它由 Amazon 的 Managed Streaming for Kafka (MSK) 服务管理,这意味着我无法精细控制服务器安装设置(甚至不知道它们是什么)。 MSK 仅发布供客户使用的 zookeeper 和消息代理 URL。
生产者 运行s 作为 AWS Lambda 函数,但是当我 运行 它在普通 EC2 实例上时问题仍然存在。
权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少权限)。
连接不是问题。我可以使用标准 telnet 访问动物园管理员和消息代理的 URL。但是,向动物园管理员发出命令是可行的,而向消息代理发出命令最终总是失败。自Kafka uses a binary protocol over TCP以来,我不知道如何进一步调试问题。
编辑
按照建议,我用
调试了这个
./kafkacat -b $BROKERS -L -d broker
并得到:
7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
那么,这是客户端和代理 API 版本之间的一种不匹配吗?我如何才能从中恢复,请记住我无法 控制 AWS 提供的 Kafka 集群的版本或配置?
由于它也不适用于非python 客户端,因此它不太可能是库中的错误。
这似乎是一个网络问题。
有一个名为 advertised.listeners
的 kafka 代理设置,它指定客户端在第一次连接后将使用的地址。换句话说,这是客户端消费或生产时发生的情况:
使用bootstrap.servers
,它建立第一个连接并要求使用真实地址。
经纪人用经纪人配置中advertised.listeners
指定的地址回复。
客户端尝试使用该新地址进行消费或生产。
这是一项安全功能,可防止 public 可以访问的代理被不应具有访问权限的客户端 consumed/produced 访问。
如何诊断
运行以下命令:
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
哪个returns
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
在这种情况下,ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
是客户端指定的地址,即使客户端可以访问该 address/port,ip-172-31-18-160.us-west-2.compute.internal:9092
也是将要使用的地址至 consume/produce.
现在,如果您在 AWS MSK 中 运行ning kafka,它可能会为您管理它。您必须确保可以访问该命令返回的地址。如果不这样做,您可能需要更改它或 运行 来自有权访问它的主机的命令。
另一种选择可能是使用可以在内部访问该地址的堡垒主机打开 ssh 隧道。
您可以在以下位置找到更多详细信息:https://rmoff.net/2018/08/02/kafka-listeners-explained
我认为这与 TLS 加密有关。默认情况下,MSK 会启动一个同时接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取 bootstrap 服务器,它只会为您提供 TLS 端口。如果您是这种情况,请尝试改用 PLAINTEXT 端口 9092。
要为 TLS 验证客户端,您需要生成一个证书:https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html 然后需要将此证书添加到您的 lambda 并在您的生产者配置中引用该证书。
如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS SDK 获取 bootstrap 服务器时,它会为您提供 PLAINTEXT 端口,您应该没问题。
我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索 bootstrap 服务器并与它们建立网络连接,但没有消息通过。相反,在 A
类型的每条消息之后,我立即收到一个 B
... 类型的消息,最终收到一个 C
:
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后立即再次关闭它?
编辑
主题已存在,
kafka-topics.sh --list
显示。我用过的所有客户端都有同样的问题:Kafka 的
kafka-console-producer.sh
、kafka-python, confluent-kafka, and kafkacatKafka 集群与我所有其他机器位于同一 VPC 中,其安全组允许该 VPC 内的任何传入和传出流量。
但是,它由 Amazon 的 Managed Streaming for Kafka (MSK) 服务管理,这意味着我无法精细控制服务器安装设置(甚至不知道它们是什么)。 MSK 仅发布供客户使用的 zookeeper 和消息代理 URL。
生产者 运行s 作为 AWS Lambda 函数,但是当我 运行 它在普通 EC2 实例上时问题仍然存在。
权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少权限)。
连接不是问题。我可以使用标准 telnet 访问动物园管理员和消息代理的 URL。但是,向动物园管理员发出命令是可行的,而向消息代理发出命令最终总是失败。自Kafka uses a binary protocol over TCP以来,我不知道如何进一步调试问题。
编辑
按照建议,我用
调试了这个./kafkacat -b $BROKERS -L -d broker
并得到:
7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
那么,这是客户端和代理 API 版本之间的一种不匹配吗?我如何才能从中恢复,请记住我无法 控制 AWS 提供的 Kafka 集群的版本或配置?
由于它也不适用于非python 客户端,因此它不太可能是库中的错误。
这似乎是一个网络问题。
有一个名为 advertised.listeners
的 kafka 代理设置,它指定客户端在第一次连接后将使用的地址。换句话说,这是客户端消费或生产时发生的情况:
使用
bootstrap.servers
,它建立第一个连接并要求使用真实地址。经纪人用经纪人配置中
advertised.listeners
指定的地址回复。客户端尝试使用该新地址进行消费或生产。
这是一项安全功能,可防止 public 可以访问的代理被不应具有访问权限的客户端 consumed/produced 访问。
如何诊断
运行以下命令:
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L
哪个returns
Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
在这种情况下,ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
是客户端指定的地址,即使客户端可以访问该 address/port,ip-172-31-18-160.us-west-2.compute.internal:9092
也是将要使用的地址至 consume/produce.
现在,如果您在 AWS MSK 中 运行ning kafka,它可能会为您管理它。您必须确保可以访问该命令返回的地址。如果不这样做,您可能需要更改它或 运行 来自有权访问它的主机的命令。
另一种选择可能是使用可以在内部访问该地址的堡垒主机打开 ssh 隧道。
您可以在以下位置找到更多详细信息:https://rmoff.net/2018/08/02/kafka-listeners-explained
我认为这与 TLS 加密有关。默认情况下,MSK 会启动一个同时接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取 bootstrap 服务器,它只会为您提供 TLS 端口。如果您是这种情况,请尝试改用 PLAINTEXT 端口 9092。
要为 TLS 验证客户端,您需要生成一个证书:https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html 然后需要将此证书添加到您的 lambda 并在您的生产者配置中引用该证书。
如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS SDK 获取 bootstrap 服务器时,它会为您提供 PLAINTEXT 端口,您应该没问题。