发送到多个 Kafka 主题时的 Logstash 管道问题
Logstash pipeline issues when sending to multiple Kafka topics
我正在使用 Logstash 从 SQL 服务器数据库中提取更改数据并将其发送到不同的 Kafka 主题。
一些 Logstash 配置文件发送到 Ticket 主题,其他的发送到 Availability 主题
如果我 运行 只是使用管道自行发送到 Ticket 主题的配置,它就可以正常工作。如果我 运行 可用性主题的配置在他们自己的管道中发送数据正常。
但是,当我包含要同时发送到两个主题的配置时,我收到了错误消息。请参阅日志中的摘录。这次可用性主题失败,其他时候票主题失败。
[2021-03-22T07:30:00,172][WARN ][org.apache.kafka.clients.NetworkClient][AvaililityDOWN] [Producer clientId=Avail_down1] Error while fetching metadata with correlation id 467 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,172][ERROR][org.apache.kafka.clients.Metadata][AvaililityDOWN] [Producer clientId=Avail_down1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,203][INFO ][logstash.inputs.jdbc ][Ticket1][a296a0df2f603fe98d8c108e860be4d7a17f840f9215bb90c5254647bb9c37cd] (0.004255s) SELECT sys.fn_cdc_map_lsn_to_time(__$start_lsn) transaction_date, abs(convert(bigint, __$seqval)) seqval, * FROM cdc.dbo_TICKET_CT where ( __$operation = 2 or __$operation = 4) and modified_date > '2021-03-22T07:27:00.169' order by modified_date ASC
[2021-03-22T07:30:00,203][INFO ][logstash.inputs.jdbc ][AvailabilityMAXUP][7805e7bd44f20b373e99845b687dc15d7c2a3de084fb4424dd492be93b39b64a] (0.004711s) With Logstash as(
SELECT sys.fn_cdc_map_lsn_to_time(__$start_lsn) transaction_date, abs(convert(bigint, __$seqval)) seqval, *
FROM cdc.dbo_A_TERM_MAX_UPTIME_DAY_CT
)
select * from Logstash
where ( __$operation = 2 or __$operation = 4 or __$operation = 1 ) and TMZONE = 'Etc/UTC' and transaction_date > '2021-03-22T07:15:00.157' order by seqval ASC
[2021-03-22T07:30:00,281][WARN ][org.apache.kafka.clients.NetworkClient][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Error while fetching metadata with correlation id 633 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,281][ERROR][org.apache.kafka.clients.Metadata][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,297][WARN ][org.apache.kafka.clients.NetworkClient][AvaililityDOWN] [Producer clientId=Avail_down1] Error while fetching metadata with correlation id 468 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,297][ERROR][org.apache.kafka.clients.Metadata][AvaililityDOWN] [Producer clientId=Avail_down1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,406][WARN ][org.apache.kafka.clients.NetworkClient][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Error while fetching metadata with correlation id 634 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,406][ERROR][org.apache.kafka.clients.Metadata][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,406][WARN ][logstash.outputs.kafka ][AvailabilityMAXUP][3685b3e90091e526485060db8df552a756f11f0f7fd344a5051e08b484a8ff8a] producer send failed, dropping record {:exception=>Java::OrgApacheKafkaCommonErrors::TopicAuthorizationException, :message=>"Not authorized to access topics: [dcsvisionavailability]", :record_value=>"<A_TERM_MAX_UPTIME_DAY>\
这是可用性配置的输出部分
output {
kafka {
bootstrap_servers => "namespaceurl.windows.net:9093"
topic_id => "dcsvisionavailability"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "C:\Logstash\keys\kafka_sasl_jaasAVAILABILITY.java"
client_id => "Avail_MaxUp1"
codec => line {
format => "<A_TERM_MAX_UPTIME_DAY>
<stuff deleted>"
}
}
}
pipeline.yml 文件中有这个
## Ticket Topic
- pipeline.id: Ticket1
path.config: "TicketCT2KafkaEH8.conf"
queue.type: persisted
- pipeline.id: PublicComments1
path.config: "Public_DiaryCT2KafkaEH1.conf"
queue.type: persisted
## - Availability topic
- pipeline.id: AvailabilityDOWN
path.config: "Availability_Down_TimeCT2KafkaEH3.conf"
queue.type: persisted
- pipeline.id: AvailabilityMAXUP
path.config: "Availability_Max_UptimeCT2KafkaEH2.conf"
queue.type: persisted
我在不同的实例中尝试了 运行ning 是的,在我有管道 运行ning 并打开另一个命令 window 和 运行 另一个配置的地方工作发送到不同的主题(为此我指定了不同的 --path.data )
然而,由于 40 个配置涉及 4 个不同的主题,我真的不想 运行 并行处理这么多实例。欢迎任何建议
我已经解决了这个问题。这与 jaas_path 文件有关。
我为每个主题创建了一个不同的 jaas_path 文件,该文件在 EntityPath 中指定字符串中的主题,如下所示。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://<stuff redacted>.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=<key redacted>;EntityPath=dcsvisionavailability";
};
当我从 Event Hub 提供公共密钥以用于最后没有 ;EntityPath=topicname 的所有主题时,它起作用了。
这是有道理的,因为我已经在行中指定了主题
topic_id => logstash conf 文件中的“dcsvisionavailability”。
我很高兴沃尔特成功了。
对于可能正在寻找此问题解决方案的其他人,这里的问题是相同的 shareAccessKeyName 与不同的令牌一起用于不同的 Kafka 主题以在 EventHub 进行授权。
这就是为什么当只针对一个主题的请求进入授权时一切正常的原因。
当针对不同主题的请求同时以相同的 shareAccessKeyName 但使用不同的 token 到达 EventHub 时,只有一个会通过,而另一个会因 token 冲突而异常退出。
解决这个问题的不同选择是
- 对所有请求使用相同的令牌(对于所有主题)
- 对每个主题使用不同的 shareAccessKeyName 和不同的令牌。
Walter 选择#1 并需要在密码中删除主题名称(因为 EventHub 可能具有主题明智的授权)。
对于主题明智的授权,解决方案#2 会更好。
我正在使用 Logstash 从 SQL 服务器数据库中提取更改数据并将其发送到不同的 Kafka 主题。 一些 Logstash 配置文件发送到 Ticket 主题,其他的发送到 Availability 主题 如果我 运行 只是使用管道自行发送到 Ticket 主题的配置,它就可以正常工作。如果我 运行 可用性主题的配置在他们自己的管道中发送数据正常。
但是,当我包含要同时发送到两个主题的配置时,我收到了错误消息。请参阅日志中的摘录。这次可用性主题失败,其他时候票主题失败。
[2021-03-22T07:30:00,172][WARN ][org.apache.kafka.clients.NetworkClient][AvaililityDOWN] [Producer clientId=Avail_down1] Error while fetching metadata with correlation id 467 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,172][ERROR][org.apache.kafka.clients.Metadata][AvaililityDOWN] [Producer clientId=Avail_down1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,203][INFO ][logstash.inputs.jdbc ][Ticket1][a296a0df2f603fe98d8c108e860be4d7a17f840f9215bb90c5254647bb9c37cd] (0.004255s) SELECT sys.fn_cdc_map_lsn_to_time(__$start_lsn) transaction_date, abs(convert(bigint, __$seqval)) seqval, * FROM cdc.dbo_TICKET_CT where ( __$operation = 2 or __$operation = 4) and modified_date > '2021-03-22T07:27:00.169' order by modified_date ASC
[2021-03-22T07:30:00,203][INFO ][logstash.inputs.jdbc ][AvailabilityMAXUP][7805e7bd44f20b373e99845b687dc15d7c2a3de084fb4424dd492be93b39b64a] (0.004711s) With Logstash as(
SELECT sys.fn_cdc_map_lsn_to_time(__$start_lsn) transaction_date, abs(convert(bigint, __$seqval)) seqval, *
FROM cdc.dbo_A_TERM_MAX_UPTIME_DAY_CT
)
select * from Logstash
where ( __$operation = 2 or __$operation = 4 or __$operation = 1 ) and TMZONE = 'Etc/UTC' and transaction_date > '2021-03-22T07:15:00.157' order by seqval ASC
[2021-03-22T07:30:00,281][WARN ][org.apache.kafka.clients.NetworkClient][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Error while fetching metadata with correlation id 633 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,281][ERROR][org.apache.kafka.clients.Metadata][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,297][WARN ][org.apache.kafka.clients.NetworkClient][AvaililityDOWN] [Producer clientId=Avail_down1] Error while fetching metadata with correlation id 468 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,297][ERROR][org.apache.kafka.clients.Metadata][AvaililityDOWN] [Producer clientId=Avail_down1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,406][WARN ][org.apache.kafka.clients.NetworkClient][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Error while fetching metadata with correlation id 634 : {dcsvisionavailability=TOPIC_AUTHORIZATION_FAILED}
[2021-03-22T07:30:00,406][ERROR][org.apache.kafka.clients.Metadata][AvailabilityMAXUP] [Producer clientId=Avail_MaxUp1] Topic authorization failed for topics [dcsvisionavailability]
[2021-03-22T07:30:00,406][WARN ][logstash.outputs.kafka ][AvailabilityMAXUP][3685b3e90091e526485060db8df552a756f11f0f7fd344a5051e08b484a8ff8a] producer send failed, dropping record {:exception=>Java::OrgApacheKafkaCommonErrors::TopicAuthorizationException, :message=>"Not authorized to access topics: [dcsvisionavailability]", :record_value=>"<A_TERM_MAX_UPTIME_DAY>\
这是可用性配置的输出部分
output {
kafka {
bootstrap_servers => "namespaceurl.windows.net:9093"
topic_id => "dcsvisionavailability"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
jaas_path => "C:\Logstash\keys\kafka_sasl_jaasAVAILABILITY.java"
client_id => "Avail_MaxUp1"
codec => line {
format => "<A_TERM_MAX_UPTIME_DAY>
<stuff deleted>"
}
}
}
pipeline.yml 文件中有这个
## Ticket Topic
- pipeline.id: Ticket1
path.config: "TicketCT2KafkaEH8.conf"
queue.type: persisted
- pipeline.id: PublicComments1
path.config: "Public_DiaryCT2KafkaEH1.conf"
queue.type: persisted
## - Availability topic
- pipeline.id: AvailabilityDOWN
path.config: "Availability_Down_TimeCT2KafkaEH3.conf"
queue.type: persisted
- pipeline.id: AvailabilityMAXUP
path.config: "Availability_Max_UptimeCT2KafkaEH2.conf"
queue.type: persisted
我在不同的实例中尝试了 运行ning 是的,在我有管道 运行ning 并打开另一个命令 window 和 运行 另一个配置的地方工作发送到不同的主题(为此我指定了不同的 --path.data )
然而,由于 40 个配置涉及 4 个不同的主题,我真的不想 运行 并行处理这么多实例。欢迎任何建议
我已经解决了这个问题。这与 jaas_path 文件有关。
我为每个主题创建了一个不同的 jaas_path 文件,该文件在 EntityPath 中指定字符串中的主题,如下所示。
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="$ConnectionString"
password="Endpoint=sb://<stuff redacted>.windows.net/;SharedAccessKeyName=keyname;SharedAccessKey=<key redacted>;EntityPath=dcsvisionavailability";
};
当我从 Event Hub 提供公共密钥以用于最后没有 ;EntityPath=topicname 的所有主题时,它起作用了。
这是有道理的,因为我已经在行中指定了主题 topic_id => logstash conf 文件中的“dcsvisionavailability”。
我很高兴沃尔特成功了。
对于可能正在寻找此问题解决方案的其他人,这里的问题是相同的 shareAccessKeyName 与不同的令牌一起用于不同的 Kafka 主题以在 EventHub 进行授权。
这就是为什么当只针对一个主题的请求进入授权时一切正常的原因。
当针对不同主题的请求同时以相同的 shareAccessKeyName 但使用不同的 token 到达 EventHub 时,只有一个会通过,而另一个会因 token 冲突而异常退出。
解决这个问题的不同选择是
- 对所有请求使用相同的令牌(对于所有主题)
- 对每个主题使用不同的 shareAccessKeyName 和不同的令牌。
Walter 选择#1 并需要在密码中删除主题名称(因为 EventHub 可能具有主题明智的授权)。
对于主题明智的授权,解决方案#2 会更好。