使用 lapin 使用来自 RabbitMQ 的消息并访问 AMQP 消息负载

Consuming messages from RabbitMQ using lapin and accessing the AMQP message payload

我正在寻找说明使用 lapin 从 RabbitMQ 消费消息的示例代码,包括访问 AMQP 消息负载。

我正在尝试 运行 来自“https://raw.githubusercontent.com/CleverCloud/lapin/master/examples/consumer.rs”的“consumer.rs”示例,但我得到以下内容

.ack(BasicAckOptions::default())
 ^^^ method not found in `(lapin::Channel, Delivery)

以下是从“https://github.com/CleverCloud/lapin/blob/master/examples/consumer.rs”获取的代码

use futures_lite::StreamExt;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tracing::info;

fn main() {
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "info");
    }

    tracing_subscriber::fmt::init();

    let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

    async_global_executor::block_on(async {
        let conn = Connection::connect(&addr, ConnectionProperties::default())
            .await
            .expect("connection error");

        info!("CONNECTED");

        //receive channel
        let channel = conn.create_channel().await.expect("create_channel");
        info!(state=?conn.status().state());

        let queue = channel
            .queue_declare(
                "hello",
                QueueDeclareOptions::default(),
                FieldTable::default(),
            )
            .await
            .expect("queue_declare");
        info!(state=?conn.status().state());
        info!(?queue, "Declared queue");

        info!("will consume");
        let mut consumer = channel
            .basic_consume(
                "hello",
                "my_consumer",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await
            .expect("basic_consume");
        info!(state=?conn.status().state());

        while let Some(delivery) = consumer.next().await {
            info!(message=?delivery, "received message");
            if let Ok(delivery) = delivery {
                delivery
                    .ack(BasicAckOptions::default())
                    .await
                    .expect("basic_ack");
            }
        }
    })
}

您的问题是您可能依赖于 crates.io 中最新的 lapin 稳定版本,目前是 1.8。但是,您 运行 的示例来自 master 分支。开发人员似乎正在准备新的主要版本 2.0,因此,库的 API 在 master.

中有所不同

如果您使用 same example file from the 1.8 branch,代码会编译。

或者:使用 master 中的示例和 master 中的 crate,而不是最新的稳定版本。

下面是我的测试代码,它排除了 main() 函数。 有大量的额外代码,包括日志记录配置等。我确实清理了这段代码,希望它对其他人有用。

还有一个问题,队列中的所有消息似乎只分配给一个消费者,所有这些消息几乎立即从“就绪”状态移动到“未确认”状态,即使只有一条消息每 10 秒处理一次(由于故意包含睡眠 10 秒)。

我将发布一个关于这个问题的新问题。

问题已通过正确查看 Qos 得到解决我已编辑此函数以包含对 basic_qos 函数的调用。

use lapin::{options::*, Connection, ConnectionProperties, Result};
use futures_util::stream::StreamExt;
//use std::future::Future;
use tracing::info;
use slog::Drain;

pub fn lapin_test_consumer()->std::result::Result<i64, Box<std::io::Error>> {
    //env_logger::init();
    let log_file_name:&str="/tmp/lapin_test_consumer.log";
    let log_file_path=std::path::Path::new(&log_file_name);
    let dir_file_path=log_file_path.parent().unwrap();
    std::fs::create_dir_all(dir_file_path).unwrap();
    
    let log_file_handler_option = std::fs::OpenOptions::new()
        .create(true)
        .write(true)
        .truncate(true)
        .open(log_file_name)
        //.unwrap()
    ;
    let log_file_handler=match log_file_handler_option
    {
        Ok(f)=>f
        ,Err(err)=>{
            println!("{:?}", err);
            panic!("Unable to open the log file '{}', '{:?}'",log_file_name,err);
        }
    };
    
    let my_log_drain = slog_async::Async::new(
        slog::Duplicate::new(
            slog::Filter::new(
                slog_term::FullFormat::new(
                    slog_term::PlainSyncDecorator::new(log_file_handler,)
                )
                .use_file_location()
                .build()
                ,
                |record: &slog::Record|
                {
                    record.level().is_at_least(slog::Level::Debug)
                }    
            )
            //,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
            ,slog::Duplicate::new(
                slog::Filter::new(
                    slog_term::FullFormat::new(
                        slog_term::PlainSyncDecorator::new(std::io::stderr(),)
                    )
                    .use_file_location()
                    .build()
                    ,
                    //|record: &slog::Record| record.level().is_at_least(slog::Level::Warning)
                    |record: &slog::Record|
                    {
                        record.level().is_at_least(slog::Level::Debug)
                    }    
                    
                )
                //,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
                ,slog_term::FullFormat::new(slog_term::TermDecorator::new().build()).use_file_location().build()
            )
        ).fuse()
    )
    .build()
    .fuse()
    ;
    let my_slog_logger=slog::Logger::root(my_log_drain, slog::o!("n" => env!("CARGO_PKG_NAME"),"v" => env!("CARGO_PKG_VERSION")));
    
    if std::env::var("RUST_LOG").is_err() {
        std::env::set_var("RUST_LOG", "info");
    }


    let addr:String = std::env::var("AMQP_ADDR").unwrap_or_else(
        |_|{
            format!("amqp://{}:{}@{}:{}/{}?heartbeat=0"
                ,"abcd"//aMQPJobUser
                ,"abcd"//aMQPJobPasswd
                ,"somewhere.com"//aMQPJobHost
                ,5672//aMQPJobPort
                ,"lapin_test.test"//aMQPJobVirtualHost
            ).into()
        }
    );
    let amqp_conn_url:&str=&addr.as_str();
    

    //see "https://docs.rs/lapin/1.8.0/lapin/struct.Consumer.html"
    let res: std::result::Result<i64, Box<std::io::Error>> = async_global_executor::block_on(async {
            let sleep_duration_ms:u64=10000u64;
            let conn_result:std::result::Result<lapin::Connection, lapin::Error> = Connection::connect(
                &amqp_conn_url,
                ConnectionProperties::default().with_default_executor(2),//set the number of threads
                //ConnectionProperties::default().with_default_executor(8),
            )
            .await;
            let conn:lapin::Connection=match conn_result{
                Err(err)=>{
                    let bt=backtrace::Backtrace::new();
                    let log_message=format!(">>>>>At lapin_test_publisher(), pos 1b, some error has been encountered while trying to establish AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
                    slog::error!(my_slog_logger,"{}",log_message);
                    let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                    return std::result::Result::Err(Box::new(custom_error));
                }
                Ok(conn2)=>{info!("CONNECTED");conn2}
            };
            


            //set basic_qos so each consumer may only have at most one message at at time
            channel_a.basic_qos(
                1
                ,BasicQosOptions{global:true}
            );
            
            let mut message_cnt:i64=0i64;let _some_i64:i64=message_cnt;
            let channel_a_result:Result<lapin::Channel>=conn.create_channel().await;
            let channel_a:lapin::Channel=match channel_a_result{
                Err(err)=>{
                    let bt=backtrace::Backtrace::new();
                    let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a channel from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
                    slog::error!(my_slog_logger,"{}",log_message);
                    let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                    return std::result::Result::Err(Box::new(custom_error));
                    //return Err(err);
                }
                Ok(channel)=>{channel}
            };
            

            channel_a
                .exchange_declare(
                    "my_direct_exchange"
                    ,lapin::ExchangeKind::Direct
                    ,lapin::options::ExchangeDeclareOptions{
                        passive:false
                        ,durable:true
                        ,auto_delete:false
                        ,internal:false
                        ,nowait:false
                    }
                    ,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
                )
            ;
            
            let queue = channel_a
                .queue_declare(
                    "hello.persistent"//:&str queue name
                    ,lapin::options::QueueDeclareOptions{
                        passive:false,
                        durable:true,
                        exclusive:false,
                        auto_delete:false,
                        nowait:false,
                    }
                    ,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
                )
                .await
                .expect("queue_declare")
            ;
            channel_a
                .queue_bind(
                    "hello.persistent"
                    ,"my_direct_exchange"
                    ,"hello.persistent"
                    , lapin::options::QueueBindOptions{
                        nowait:false
                    }
                    ,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
                )
            ;
            
            let consumer_a_result:Result<lapin::Consumer>=channel_a
                .basic_consume(
                    "hello.persistent",
                    "my_consumer",
                    lapin::options::BasicConsumeOptions{
                        no_local: true,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-local"
                        no_ack: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack" "If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application."
                        exclusive: false,
                        nowait: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-wait" "If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception."
                    },
                    lapin::types::FieldTable::default(),
                )
                .await;
            let mut consumer_a:lapin::Consumer=match consumer_a_result{
                Err(err)=>{
                    let bt=backtrace::Backtrace::new();
                    let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a consumer from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",&amqp_conn_url,&err,&bt);
                    slog::error!(my_slog_logger,"{}",log_message);
                    let custom_error=std::io::Error::new(std::io::ErrorKind::Other, &log_message.to_string()[..]);
                    return std::result::Result::Err(Box::new(custom_error));
                    //return Err(err);
                }
                Ok(consumer)=>{consumer}
            };
        
            while let Some(delivery) = consumer_a.next().await {
                let (channel2, delivery2) = delivery.expect("error in consumer");
                message_cnt+=1;
                slog::info!(my_slog_logger,"------------------------------------------------------------------, message_cnt is:{}",&message_cnt);
                let s:String = match String::from_utf8(delivery2.data.to_owned()) {//delivery.data is of type Vec<u8>
                    Ok(v) => v,
                    Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
                };          
                let log_message:String=format!("message_cnt is:{}, delivery_tag is:{}, exchange is:{}, routing_key is:{}, redelivered is:{}, properties is:'{:?}', received data is:'{:?}'"
                    ,&message_cnt
                    ,&delivery2.delivery_tag
                    ,&delivery2.exchange
                    ,&delivery2.routing_key
                    ,&delivery2.redelivered
                    ,&delivery2.properties
                    ,&s
                );
                slog::info!(my_slog_logger,"{}",log_message);
                std::thread::sleep(std::time::Duration::from_millis(sleep_duration_ms));
                slog::info!(my_slog_logger,"After {}ms sleep.",sleep_duration_ms);
                channel2
                    .basic_ack(delivery2.delivery_tag, BasicAckOptions::default())
                    .await
                    .expect("ack")
                ;
            }
            Ok(message_cnt)
        }
    );
    res
}