可以与 Zookeeper 对话,但不能与消息代理对话

Can talk to Zookeeper but not to the message brokers

我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索 bootstrap 服务器并与它们建立网络连接,但没有消息通过。相反,在 A 类型的每条消息之后,我立即收到一个 B... 类型的消息,最终收到一个 C:

类型的消息
A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后立即再次关闭它?

编辑

编辑

按照建议,我用

调试了这个

./kafkacat -b $BROKERS -L -d broker

并得到:

7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN

那么,这是客户端和代理 API 版本之间的一种不匹配吗?我如何才能从中恢复,请记住我无法 控制 AWS 提供的 Kafka 集群的版本或配置?

由于它也不适用于非python 客户端,因此它不太可能是库中的错误。

这似乎是一个网络问题

有一个名为 advertised.listeners 的 kafka 代理设置,它指定客户端在第一次连接后将使用的地址。换句话说,这是客户端消费或生产时发生的情况:

  1. 使用bootstrap.servers,它建立第一个连接并要求使用真实地址。

  2. 经纪人用经纪人配置中advertised.listeners指定的地址回复。

  3. 客户端尝试使用该新地址进行消费或生产。

这是一项安全功能,可防止 public 可以访问的代理被不应具有访问权限的客户端 consumed/produced 访问。

如何诊断

运行以下命令:

$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L

哪个returns

Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
  broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

在这种情况下,ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 是客户端指定的地址,即使客户端可以访问该 address/port,ip-172-31-18-160.us-west-2.compute.internal:9092 也是将要使用的地址至 consume/produce.

现在,如果您在 AWS MSK 中 运行ning kafka,它可能会为您管理它。您必须确保可以访问该命令返回的地址。如果不这样做,您可能需要更改它或 运行 来自有权访问它的主机的命令。

另一种选择可能是使用可以在内部访问该地址的堡垒主机打开 ssh 隧道。

您可以在以下位置找到更多详细信息:https://rmoff.net/2018/08/02/kafka-listeners-explained

我认为这与 TLS 加密有关。默认情况下,MSK 会启动一个同时接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取 bootstrap 服务器,它只会为您提供 TLS 端口。如果您是这种情况,请尝试改用 PLAINTEXT 端口 9092。

要为 TLS 验证客户端,您需要生成一个证书:https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html 然后需要将此证书添加到您的 lambda 并在您的生产者配置中引用该证书。

如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS SDK 获取 bootstrap 服务器时,它会为您提供 PLAINTEXT 端口,您应该没问题。