发送到多个 Kafka 主题时的 Logstash 管道问题

Logstash pipeline issues when sending to multiple Kafka topics

我正在使用 Logstash 从 SQL 服务器数据库中提取更改数据并将其发送到不同的 Kafka 主题。 一些 Logstash 配置文件发送到 Ticket 主题,其他的发送到 Availability 主题 如果我 运行 只是使用管道自行发送到 Ticket 主题的配置,它就可以正常工作。如果我 运行 可用性主题的配置在他们自己的管道中发送数据正常。

但是,当我包含要同时发送到两个主题的配置时,我收到了错误消息。请参阅日志中的摘录。这次可用性主题失败,其他时候票主题失败。

[2021-03-22T07:30:00,172][WARN ][org.apache.kafka.clients.NetworkClient][AvaililityDOWN] [Producer clientId=Avail_down1] Error while fetching metadata with correlation id 467 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,172][ERROR][org.apache.kafka.clients.Metadata][AvaililityDOWN] [Producer clientId=Avail_down1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,203][INFO ][logstash.inputs.jdbc     ][Ticket1][a296a0df2f603fe98d8c108e860be4d7a17f840f9215bb90c5254647bb9c37cd] (0.004255s) SELECT sys.fn_cdc_map_lsn_to_time(__$start_lsn) transaction_date, abs(convert(bigint, __$seqval)) seqval, * FROM cdc.dbo_TICKET_CT where ( __$operation = 2 or __$operation = 4) and modified_date > '2021-03-22T07:27:00.169' order by modified_date ASC
[2021-03-22T07:30:00,203][INFO ][logstash.inputs.jdbc     ][AvailabilityMAXUP][7805e7bd44f20b373e99845b687dc15d7c2a3de084fb4424dd492be93b39b64a] (0.004711s) With Logstash as(
SELECT sys.fn_cdc_map_lsn_to_time(__$start_lsn) transaction_date, abs(convert(bigint, __$seqval)) seqval, *
FROM cdc.dbo_A_TERM_MAX_UPTIME_DAY_CT
)
select * from Logstash
where ( __$operation = 2 or __$operation = 4 or __$operation = 1 ) and TMZONE = 'Etc/UTC' and transaction_date > '2021-03-22T07:15:00.157' order by seqval ASC
[2021-03-22T07:30:00,281][WARN ][org.apache.kafka.clients.NetworkClient][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Error while fetching metadata with correlation id 633 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,281][ERROR][org.apache.kafka.clients.Metadata][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,297][WARN ][org.apache.kafka.clients.NetworkClient][AvaililityDOWN] [Producer clientId=Avail_down1] Error while fetching metadata with correlation id 468 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,297][ERROR][org.apache.kafka.clients.Metadata][AvaililityDOWN] [Producer clientId=Avail_down1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,406][WARN ][org.apache.kafka.clients.NetworkClient][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Error while fetching metadata with correlation id 634 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,406][ERROR][org.apache.kafka.clients.Metadata][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,406][WARN ][logstash.outputs.kafka   ][AvailabilityMAXUP][3685b3e90091e526485060db8df552a756f11f0f7fd344a5051e08b484a8ff8a] producer send failed, dropping record {:exception=>Java::OrgApacheKafkaCommonErrors::TopicAuthorizationException, :message=>"Not authorized to access topics: [dcsvisionavailability]", :record_value=>"<A_TERM_MAX_UPTIME_DAY>\

这是可用性配置的输出部分

output {
    kafka {
      bootstrap_servers =>  "namespaceurl.windows.net:9093"
      topic_id => "dcsvisionavailability"
      security_protocol => "SASL_SSL"
      sasl_mechanism => "PLAIN"
      jaas_path => "C:\Logstash\keys\kafka_sasl_jaasAVAILABILITY.java"
      client_id => "Avail_MaxUp1"
      codec  => line {
      format => "<A_TERM_MAX_UPTIME_DAY>
    <stuff deleted>"
      }
}

}

pipeline.yml 文件中有这个

## Ticket Topic
- pipeline.id: Ticket1
  path.config: "TicketCT2KafkaEH8.conf"
  queue.type: persisted
  
- pipeline.id: PublicComments1
  path.config: "Public_DiaryCT2KafkaEH1.conf"
  queue.type: persisted

##  - Availability topic 
- pipeline.id: AvailabilityDOWN
  path.config: "Availability_Down_TimeCT2KafkaEH3.conf"
  queue.type: persisted
  
- pipeline.id: AvailabilityMAXUP
  path.config: "Availability_Max_UptimeCT2KafkaEH2.conf"
  queue.type: persisted   

我在不同的实例中尝试了 运行ning 是的,在我有管道 运行ning 并打开另一个命令 window 和 运行 另一个配置的地方工作发送到不同的主题(为此我指定了不同的 --path.data )

然而,由于 40 个配置涉及 4 个不同的主题,我真的不想 运行 并行处理这么多实例。欢迎任何建议

我已经解决了这个问题。这与 jaas_path 文件有关。

我为每个主题创建了一个不同的 jaas_path 文件,该文件在 EntityPath 中指定字符串中的主题,如下所示。

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://<stuff redacted>.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=<key redacted>;EntityPath=dcsvisionavailability";
};

当我从 Event Hub 提供公共密钥以用于最后没有 ;EntityPath=topicname 的所有主题时,它起作用了。

这是有道理的,因为我已经在行中指定了主题 topic_id => logstash conf 文件中的“dcsvisionavailability”。

我很高兴沃尔特成功了。

对于可能正在寻找此问题解决方案的其他人,这里的问题是相同的 shareAccessKeyName 与不同的令牌一起用于不同的 Kafka 主题以在 EventHub 进行授权。

这就是为什么当只针对一个主题的请求进入授权时一切正常的原因。

当针对不同主题的请求同时以相同的 shareAccessKeyName 但使用不同的 token 到达 EventHub 时,只有一个会通过,而另一个会因 token 冲突而异常退出。

解决这个问题的不同选择是

  1. 对所有请求使用相同的令牌(对于所有主题)
  2. 对每个主题使用不同的 shareAccessKeyName 和不同的令牌。

Walter 选择#1 并需要在密码中删除主题名称(因为 EventHub 可能具有主题明智的授权)。

对于主题明智的授权,解决方案#2 会更好。