使用 Rust actix-web 的流式处理结果,生命周期问题

Streaming results using Rust actix-web, lifetime issue

我在使用以下代码将 Stream 与 actix-web 一起使用时遇到问题:

fn format_csv_row(row: tiberius::Row) -> Result<web::Bytes, ServerError> { ... }

#[get("/stream/")]
async fn get_stream(
    db_pool: web::Data<bb8::Pool<TiberiusConnectionManager>>,
) -> Result<HttpResponse, ServerError> {
    // Get connection
    let mut conn = db_pool
        .get()
        .await
        .map_err(|e| ServerError::PoolUnavailable)?;

    // Execute query
    let stream = conn
        .query("SELECT * FROM table", &[])
        .await
        .map_err(|e| ServerError::QueryFail)?;

    // Build a stream from SQL results
    let stream = stream.map(|x| format_csv_row(x.unwrap()));
    Ok(HttpResponse::Ok().streaming(stream))
}

以下是我在此函数中调用的方法:

编译器报错如下:

error[E0597]: `db_pool` does not live long enough
   --> src/routes/trades_stream.rs:88:20
    |
88  |       let mut conn = db_pool
    |                      -^^^^^^
    |                      |
    |  ____________________borrowed value does not live long enough
    | |
89  | |         .get()
    | |______________- argument requires that `db_pool` is borrowed for `'static`
...
102 |   }
    |   - `db_pool` dropped here while still borrowed

error[E0597]: `conn` does not live long enough
   --> src/routes/trades_stream.rs:94:18
    |
94  |       let stream = conn
    |                    -^^^
    |                    |
    |  __________________borrowed value does not live long enough
    | |
95  | |         .query("SELECT * FROM table", &[])
    | |__________________________________________- argument requires that `conn` is borrowed for `'static`
...
102 |   }
    |   - `conn` dropped here while still borrowed

error: aborting due to 2 previous errors

我了解在流式传输仍在进行时连接和池已断开。

我如何修改我的代码才能使其正常工作? 是否可以为 db_poolconn 添加显式生命周期以使它们匹配 stream

为了让它工作,我们需要更改代码,使流拥有它正在读取的连接的所有权,并且由于 bb8 的编写方式,您还需要池句柄的所有权。最好的方法是使用 async-stream 箱子。

我认为应该这样做:

use async_stream::try_stream;

fn format_csv_row(row: tiberius::Row) -> Result<web::Bytes, ServerError> { ... }

#[get("/stream/")]
async fn get_stream(
    db_pool: web::Data<bb8::Pool<TiberiusConnectionManager>>,
) -> Result<HttpResponse, ServerError> {
    // Cloning a bb8 pool gives a new handle to the same pool.
    let db_pool = db_pool.clone();

    let stream = try_stream! {
        // Get connection
        let mut conn = db_pool
            .get()
            .await
            .map_err(|e| ServerError::PoolUnavailable)?;

        // Execute query
        let stream = conn
            .query("SELECT * FROM table", &[])
            .await
            .map_err(|e| ServerError::QueryFail)?;

        while let Some(row) = stream.next().await {
            yield format_csv_row(row?)?;
        }
    };

    Ok(HttpResponse::Ok().streaming(Box::pin(stream)))
}

row? 部分可能需要另一个 map_err

Is it possible to add explicit lifetime for db_pool and conn to make them match stream ?

不,您不会通过设置生命周期来改变事物的寿命。相反,您可以通过改变代码的结构来改变生命周期,使其寿命足够长。