使用 Apache Nifi 读取 AMQP 路由密钥

Reading AMQP routing key with Apache Nifi

我正在尝试使用 apache nifi 读取 RabbitMQ 代理。重要的是我可以检索与消息关联的路由密钥并将有效负载和路由密钥写入文件。

我已经使用 python 读取路由密钥,所以我确定它存在。

我正在使用链接到 PutFile 处理器的 ConsumeAMQP 处理器。所有被写入的都是有效负载,而不是路由密钥。

ConsumeAMQP 处理器解析传入消息并将其形成 Apache NiFi flowfile。流文件结构包括一个名为 attributes 的 key/value 对列表和 content 的任意字节。根据 ConsumeAMQP 文档的 "Additional Details" 部分:

This processor does two things. It constructs FlowFile by extracting information from the consumed AMQP message (both body and attributes). Once message is consumed a FlowFile is constructed. The message body is written to a FlowFile and its com.rabbitmq.client.AMQP.BasicProperties are transfered into the FlowFile as attributes. AMQP attribute names are prefixed with amqp$ prefix.

AMQP Properties The following is the list of available standard AMQP properties which may come with the message: ("amqp$contentType", "amqp$contentEncoding", "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo", "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId", "amqp$clusterId")

如果您要查找的属性不包含在这里(除非命名约定不常见,否则它似乎不包含),您应该检查它是否包含在 com.rabbitmq.client.AMQP.BasicProperties (it does not appear to be). I am not RabbitMQ expert, but from this link it looks like the routing key is some attribute on a message that an exchange registers in order to route the incoming messages. See also: RabbitMQ "AMQP 0-9-1 Model Explained"

我会检查传入消息的 amqp$headers 属性(您可以暂停消耗 PutFile 处理器以在连接中排队这些流文件并检查它们实时)以查看是否可以使用 NiFi 表达式语言提取 路由密钥 。具有动态 属性 routingKey 和类似(未测试)${amqp$headers.routing_key} 的表达式的 UpdateAttribute 处理器将产生一个名为 routingKey 的新流文件属性,其值为您正在寻找。如果它不在那里,它将在 com.rabbitmq.client.Envelope object, accessible via Envelope.getRoutingKey(), but I do not believe NiFi exposes this object to the processor at this time. A change would need to be made in ConsumeAMQP.java @ L101. You can file a feature request via Jira 上。

另一件要记住的事情是,即使你将 路由键 提取到一个属性,PutFile 非常清楚地记录它只打印流文件 content 到文件,而不是 attributes。如果您需要修改流文件的内容以包含属性,请使用 ReplaceText 处理器将属性插入映射结构或其他所需格式。