为什么我的 Future 实现在被轮询一次并且 NotReady 后被阻止?

Why is my Future implementation blocked after it is polled once and NotReady?

我实现了未来并提出了请求,但它阻止了我的 curl 并且日志显示 poll 只被调用了一次。

我是不是执行错了什么?

use failure::{format_err, Error};
use futures::{future, Async};
use hyper::rt::Future;
use hyper::service::{service_fn, service_fn_ok};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use std::{
    sync::{Arc, Mutex},
    task::Waker,
    thread,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

struct SharedState {
    completed: bool,
    resp: String,
}

impl Future for TimerFuture {
    type Item = Response<Body>;
    type Error = hyper::Error;
    fn poll(&mut self) -> futures::Poll<Response<Body>, hyper::Error> {
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            return Ok(Async::Ready(Response::new(Body::from(
                shared_state.resp.clone(),
            ))));
        } else {
            return Ok(Async::NotReady);
        }
    }
}

impl TimerFuture {
    pub fn new(instance: String) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            resp: String::new(),
        }));
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            let res = match request_health(instance) {
                Ok(status) => status.clone(),
                Err(err) => {
                    error!("{:?}", err);
                    format!("{}", err)
                }
            };
            let mut shared_state = thread_shared_state.lock().unwrap();
            shared_state.completed = true;
            shared_state.resp = res;
        });

        TimerFuture { shared_state }
    }
}

fn request_health(instance_name: String) -> Result<String, Error> {
    std::thread::sleep(std::time::Duration::from_secs(1));
    Ok("health".to_string())
}

type BoxFut = Box<dyn Future<Item = Response<Body>, Error = hyper::Error> + Send>;
fn serve_health(req: Request<Body>) -> BoxFut {
    let mut response = Response::new(Body::empty());
    let path = req.uri().path().to_owned();
    match (req.method(), path) {
        (&Method::GET, path) => {
            return Box::new(TimerFuture::new(path.clone()));
        }
        _ => *response.status_mut() = StatusCode::NOT_FOUND,
    }
    Box::new(future::ok(response))
}

fn main() {
    let endpoint_addr = "0.0.0.0:8080";
    match std::thread::spawn(move || {
        let addr = endpoint_addr.parse().unwrap();
        info!("Server is running on {}", addr);
        hyper::rt::run(
            Server::bind(&addr)
                .serve(move || service_fn(serve_health))
                .map_err(|e| eprintln!("server error: {}", e)),
        );
    })
    .join()
    {
        Ok(e) => e,
        Err(e) => println!("{:?}", e),
    }
}

编译和运行这段代码后,一个端口为8080的服务器是运行ning。使用 curl 调用服务器,它会阻塞:

curl 127.0.0.1:8080/my-health-scope

Did I implement anything wrong?

是的,您没有阅读和关注 the documentation for the method you are implementing(强调我的):

When a future is not ready yet, the Async::NotReady value will be returned. In this situation the future will also register interest of the current task in the value being produced. This is done by calling task::park to retrieve a handle to the current Task. When the future is then ready to make progress (e.g. it should be polled again) the unpark method is called on the Task.

作为 minimal, reproducible example,让我们使用这个:

use futures::{future::Future, Async};
use std::{
    mem,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

pub struct Timer {
    data: Arc<Mutex<String>>,
}

impl Timer {
    pub fn new(instance: String) -> Self {
        let data = Arc::new(Mutex::new(String::new()));

        thread::spawn({
            let data = data.clone();
            move || {
                thread::sleep(Duration::from_secs(1));
                *data.lock().unwrap() = instance;
            }
        });

        Timer { data }
    }
}

impl Future for Timer {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        let mut data = self.data.lock().unwrap();

        eprintln!("poll was called");

        if data.is_empty() {
            Ok(Async::NotReady)
        } else {
            let data = mem::replace(&mut *data, String::new());
            Ok(Async::Ready(data))
        }
    }
}

fn main() {
    let v = Timer::new("Some text".into()).wait();
    println!("{:?}", v);
}

只打印一次"poll was called"。

你可以在Future::poll的实现中调用task::current(之前是task::park),保存结果值,然后用Task::notify(之前是Task::unpark) 每当未来可能再次被轮询时:

use futures::{
    future::Future,
    task::{self, Task},
    Async,
};
use std::{
    mem,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

pub struct Timer {
    data: Arc<Mutex<(String, Option<Task>)>>,
}

impl Timer {
    pub fn new(instance: String) -> Self {
        let data = Arc::new(Mutex::new((String::new(), None)));
        let me = Timer { data };

        thread::spawn({
            let data = me.data.clone();
            move || {
                thread::sleep(Duration::from_secs(1));
                let mut data = data.lock().unwrap();

                data.0 = instance;
                if let Some(task) = data.1.take() {
                    task.notify();
                }
            }
        });

        me
    }
}

impl Future for Timer {
    type Item = String;
    type Error = ();

    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
        let mut data = self.data.lock().unwrap();

        eprintln!("poll was called");

        if data.0.is_empty() {
            let v = task::current();
            data.1 = Some(v);
            Ok(Async::NotReady)
        } else {
            let data = mem::replace(&mut data.0, String::new());
            Ok(Async::Ready(data))
        }
    }
}

fn main() {
    let v = Timer::new("Some text".into()).wait();
    println!("{:?}", v);
}

另请参阅: