在 BusReader 和 warp WebSocket 接收器之间转发消息会留下未刷新的缓冲区吗?
Forwarding messages between BusReader and warp WebSocket sink leaves unflushed buffer?
我正在尝试创建一个 websocket 服务器(和 HTTP,因此使用 warp)通过 websockets 将消息从一个源(MQTT 订阅)转发到多个客户端。除了在广播第二条消息之前客户端没有收到第一条 websocket 消息外,这似乎大部分工作正常;然后总是留下一条消息,直到最后再也没有收到最后一条消息。对我来说,问题似乎是 ws_connected
函数中从未完全刷新的发送缓冲区。
我用futures::stream::iter to turn the BusReader into a stream, then map the messages into the required Ok(Message)
type that the WebSocket Sink requires. The official warp websocket chat example uses a similar construct for forwarding between streams: https://github.com/seanmonstar/warp/blob/42fd14fdab8145d27ae770fe4b5c843a99bc2a44/examples/websockets_chat.rs#L62.
在这个简化示例中,服务器通过总线广播值 0-9。 websocat 客户端(和 Firefox 中的 JS websocket 客户端)收到消息 0-8——尽管总是在广播和服务器的标准输出之后——但 9 永远不会到达。然而,async_bus_print
函数按时接收所有值,这证明消息至少通过总线没有问题。
这是服务器进程的输出:
async bus_print started
0
async bus: "0"
1
async bus: "1"
2
async bus: "2"
3
async bus: "3"
4
async bus: "4"
5
async bus: "5"
6
async bus: "6"
7
async bus: "7"
8
async bus: "8"
9
async bus: "9"
有问题的代码:
use std::{sync::{Arc, RwLock}, thread};
use bus::{Bus, BusReader};
use futures::StreamExt;
use warp::ws::{Message, WebSocket};
use warp::Filter;
async fn ws_connected(ws: WebSocket, rx: BusReader<String>) {
let (ws_tx, _ws_rx) = ws.split();
thread::spawn(|| {
futures::executor::block_on(async move {
if let Err(e) = futures::stream::iter(rx.into_iter())
.map(|ws_msg| Ok(Message::text(ws_msg)))
.forward(ws_tx)
.await
{
eprintln!("Goodbye, websocket user: {}", e);
}
});
});
}
async fn async_bus_print(mut rx: BusReader<String>) {
println!("async bus_print started");
thread::spawn(||
futures::executor::block_on(async move {
while let Some(msg) = futures::stream::iter(rx.iter()).next().await {
println!("async bus: {:#?}", msg);
}
})
);
}
async fn bus_tx(tx: Arc<RwLock<Bus<String>>>) {
for i in 0..10u8 {
tx.write().unwrap().broadcast(format!("{}", i));
println!("{}", i);
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() {
let bus = Arc::new(RwLock::new(Bus::new(20)));
let bus2 = Arc::clone(&bus);
let rx = warp::any().map(move || bus2.write().unwrap().add_rx());
let rx2 = bus.write().unwrap().add_rx();
let ws = warp::path("ws")
.and(warp::ws())
.and(rx)
.map(|ws: warp::ws::Ws, rx| ws.on_upgrade(move |socket| ws_connected(socket, rx)));
futures::join!(
async_bus_print(rx2),
bus_tx(bus),
warp::serve(ws).run(([127, 0, 0, 1], 3030)),
);
}
我怎样才能找到并消除这个“缓冲”问题?
希望我已经解释得足够清楚了。如果我可以提供更多信息,请告诉我。感谢您的帮助。
虽然我仍然无法找出未刷新数据的根本原因,但感谢 reddit 上的一些乐于助人的人,我有一些更好的替代解决方案。
如果你一开始不拆分 WebSocket,它似乎工作正常:
async fn ws_connected(ws: WebSocket, rx: BusReader<String>) {
thread::spawn(|| {
futures::executor::block_on(async move {
if let Err(e) = futures::stream::iter(rx.into_iter())
.map(|ws_msg| Ok(Message::text(ws_msg)))
.forward(ws)
.await
{
eprintln!("Goodbye, websocket user: {}", e);
}
});
});
}
如果您在代码中包含 futures::SinkExt
,仍然可以将流和 send
数据(而不是 forward
)拆分到 WebSocket 的 tx 端:
async fn ws_connected(ws: WebSocket, mut rx: BusReader<String>) {
use futures::SinkExt;
let (mut ws_tx, ws_rx) = ws.split();
while let Ok(msg) = rx.recv() {
ws_tx.send(Message::text(msg)).await.unwrap();
// `send` automatically flushes the sink
}
}
最终,我认为 tokio 的 broadcast
频道比 bus
:
更适合我的异步多生产者、多消费者频道通信需求
use futures::StreamExt;
use tokio::sync::broadcast;
use warp::ws::{Message, WebSocket};
use warp::Filter;
async fn ws_connected(ws: WebSocket, recv: broadcast::Receiver<String>) {
let (ws_tx, _ws_rx) = ws.split();
recv.map(|s| Ok(Message::text(s.unwrap())))
.forward(ws_tx)
.await
.unwrap();
}
async fn async_bus_print(mut recv: broadcast::Receiver<String>) {
println!("async bus_print started");
while let Some(msg) = recv.next().await {
println!("async bus: {:#?}", msg.unwrap());
}
}
async fn bus_tx(tx: broadcast::Sender<String>) {
for i in 0..10u8 {
tx.send(format!("{}", i)).unwrap();
println!("{}", i);
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() {
let (send, recv) = broadcast::channel::<String>(32);
let send2 = send.clone();
let rx = warp::any().map(move || send2.subscribe());
let ws = warp::path("ws")
.and(warp::ws())
.and(rx)
.map(|ws: warp::ws::Ws, recv| ws.on_upgrade(move |socket| ws_connected(socket, recv)));
let (abp, tx, warp) = futures::join!(
tokio::spawn(async_bus_print(recv)),
tokio::spawn(bus_tx(send)),
tokio::spawn(warp::serve(ws).run(([127, 0, 0, 1], 3030))),
);
abp.unwrap();
tx.unwrap();
warp.unwrap();
}
我正在尝试创建一个 websocket 服务器(和 HTTP,因此使用 warp)通过 websockets 将消息从一个源(MQTT 订阅)转发到多个客户端。除了在广播第二条消息之前客户端没有收到第一条 websocket 消息外,这似乎大部分工作正常;然后总是留下一条消息,直到最后再也没有收到最后一条消息。对我来说,问题似乎是 ws_connected
函数中从未完全刷新的发送缓冲区。
我用futures::stream::iter to turn the BusReader into a stream, then map the messages into the required Ok(Message)
type that the WebSocket Sink requires. The official warp websocket chat example uses a similar construct for forwarding between streams: https://github.com/seanmonstar/warp/blob/42fd14fdab8145d27ae770fe4b5c843a99bc2a44/examples/websockets_chat.rs#L62.
在这个简化示例中,服务器通过总线广播值 0-9。 websocat 客户端(和 Firefox 中的 JS websocket 客户端)收到消息 0-8——尽管总是在广播和服务器的标准输出之后——但 9 永远不会到达。然而,async_bus_print
函数按时接收所有值,这证明消息至少通过总线没有问题。
这是服务器进程的输出:
async bus_print started
0
async bus: "0"
1
async bus: "1"
2
async bus: "2"
3
async bus: "3"
4
async bus: "4"
5
async bus: "5"
6
async bus: "6"
7
async bus: "7"
8
async bus: "8"
9
async bus: "9"
有问题的代码:
use std::{sync::{Arc, RwLock}, thread};
use bus::{Bus, BusReader};
use futures::StreamExt;
use warp::ws::{Message, WebSocket};
use warp::Filter;
async fn ws_connected(ws: WebSocket, rx: BusReader<String>) {
let (ws_tx, _ws_rx) = ws.split();
thread::spawn(|| {
futures::executor::block_on(async move {
if let Err(e) = futures::stream::iter(rx.into_iter())
.map(|ws_msg| Ok(Message::text(ws_msg)))
.forward(ws_tx)
.await
{
eprintln!("Goodbye, websocket user: {}", e);
}
});
});
}
async fn async_bus_print(mut rx: BusReader<String>) {
println!("async bus_print started");
thread::spawn(||
futures::executor::block_on(async move {
while let Some(msg) = futures::stream::iter(rx.iter()).next().await {
println!("async bus: {:#?}", msg);
}
})
);
}
async fn bus_tx(tx: Arc<RwLock<Bus<String>>>) {
for i in 0..10u8 {
tx.write().unwrap().broadcast(format!("{}", i));
println!("{}", i);
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() {
let bus = Arc::new(RwLock::new(Bus::new(20)));
let bus2 = Arc::clone(&bus);
let rx = warp::any().map(move || bus2.write().unwrap().add_rx());
let rx2 = bus.write().unwrap().add_rx();
let ws = warp::path("ws")
.and(warp::ws())
.and(rx)
.map(|ws: warp::ws::Ws, rx| ws.on_upgrade(move |socket| ws_connected(socket, rx)));
futures::join!(
async_bus_print(rx2),
bus_tx(bus),
warp::serve(ws).run(([127, 0, 0, 1], 3030)),
);
}
我怎样才能找到并消除这个“缓冲”问题?
希望我已经解释得足够清楚了。如果我可以提供更多信息,请告诉我。感谢您的帮助。
虽然我仍然无法找出未刷新数据的根本原因,但感谢 reddit 上的一些乐于助人的人,我有一些更好的替代解决方案。
如果你一开始不拆分 WebSocket,它似乎工作正常:
async fn ws_connected(ws: WebSocket, rx: BusReader<String>) {
thread::spawn(|| {
futures::executor::block_on(async move {
if let Err(e) = futures::stream::iter(rx.into_iter())
.map(|ws_msg| Ok(Message::text(ws_msg)))
.forward(ws)
.await
{
eprintln!("Goodbye, websocket user: {}", e);
}
});
});
}
如果您在代码中包含 futures::SinkExt
,仍然可以将流和 send
数据(而不是 forward
)拆分到 WebSocket 的 tx 端:
async fn ws_connected(ws: WebSocket, mut rx: BusReader<String>) {
use futures::SinkExt;
let (mut ws_tx, ws_rx) = ws.split();
while let Ok(msg) = rx.recv() {
ws_tx.send(Message::text(msg)).await.unwrap();
// `send` automatically flushes the sink
}
}
最终,我认为 tokio 的 broadcast
频道比 bus
:
use futures::StreamExt;
use tokio::sync::broadcast;
use warp::ws::{Message, WebSocket};
use warp::Filter;
async fn ws_connected(ws: WebSocket, recv: broadcast::Receiver<String>) {
let (ws_tx, _ws_rx) = ws.split();
recv.map(|s| Ok(Message::text(s.unwrap())))
.forward(ws_tx)
.await
.unwrap();
}
async fn async_bus_print(mut recv: broadcast::Receiver<String>) {
println!("async bus_print started");
while let Some(msg) = recv.next().await {
println!("async bus: {:#?}", msg.unwrap());
}
}
async fn bus_tx(tx: broadcast::Sender<String>) {
for i in 0..10u8 {
tx.send(format!("{}", i)).unwrap();
println!("{}", i);
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
}
}
#[tokio::main]
async fn main() {
let (send, recv) = broadcast::channel::<String>(32);
let send2 = send.clone();
let rx = warp::any().map(move || send2.subscribe());
let ws = warp::path("ws")
.and(warp::ws())
.and(rx)
.map(|ws: warp::ws::Ws, recv| ws.on_upgrade(move |socket| ws_connected(socket, recv)));
let (abp, tx, warp) = futures::join!(
tokio::spawn(async_bus_print(recv)),
tokio::spawn(bus_tx(send)),
tokio::spawn(warp::serve(ws).run(([127, 0, 0, 1], 3030))),
);
abp.unwrap();
tx.unwrap();
warp.unwrap();
}