"tokio::mpsc::channel" 上的接收器仅在缓冲区已满时接收消息
Receiver on "tokio::mpsc::channel" only receives messages when buffer is full
在我的代码片段中,tokio (v0.3) mpsc:channel
接收器仅在缓冲区已满时接收消息。缓冲区大小无关紧要。
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::time::sleep;
const MESSAGE_LENGTH: usize = 1024;
pub struct Peer {
socket: Arc<UdpSocket>,
}
impl Peer {
pub fn new<S: ToSocketAddrs>(addr: S) -> Peer {
let socket = std::net::UdpSocket::bind(addr).expect("could not create socket");
let peer = Peer {
socket: Arc::new(UdpSocket::from_std(socket).unwrap()),
};
peer.start_inbound_message_handler();
peer
}
pub fn local_addr(&self) -> SocketAddr {
self.socket.local_addr().unwrap()
}
fn start_inbound_message_handler(&self) {
let socket = self.socket.clone();
let (tx, rx) = mpsc::channel(1);
self.start_request_handler(rx);
tokio::spawn(async move {
let mut buf = [0u8; MESSAGE_LENGTH];
loop {
if let Ok((len, addr)) = socket.recv_from(&mut buf).await {
println!("received {} bytes from {}", len, addr);
if let Err(_) = tx.send(true).await {
println!("error sending msg to request handler");
}
}
}
});
}
fn start_request_handler(&self, mut receiver: mpsc::Receiver<bool>) {
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
println!("got ping request: {:?}", msg);
}
});
}
pub async fn send_ping(&self, dest: String) -> Result<(), io::Error> {
let buf = [255u8; MESSAGE_LENGTH];
self.socket.send_to(&buf[..], &dest).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let peer1 = Peer::new("0.0.0.0:0");
println!("peer1 started on: {}", peer1.local_addr().to_string());
let peer2 = Peer::new("0.0.0.0:0");
println!("peer2 started on: {}", peer2.local_addr().to_string());
peer2.send_ping(peer1.local_addr().to_string()).await?;
peer2.send_ping(peer1.local_addr().to_string()).await?;
sleep(Duration::from_secs(100)).await;
Ok(())
}
Link 到 Playground
在 start_inbound_message_handler
函数中,我开始从套接字读取,如果收到消息,则 mpsc::channel
上的消息将发送到 start_request_handler
进行处理的地方,在这种情况下,如果接收方收到任何内容,将写入一个简单的日志输出。
在 main
函数中,我正在创建两个对等点,peer1 和 peer2,在创建两个对等点后,我开始向第一个对等点发出 ping 请求。在 start_inbound_message_handler
中,我将从 udp 套接字接收数据并通过 mpsc::channel
发送消息,发送 returns 没有错误。问题如前所述,接收方只有在缓冲区已满时才会收到消息。在这种情况下,缓冲区是 1
。因此,如果我发送第二个 ping,则会收到第一个 ping。我不知道为什么会这样。
预期的行为是,如果我通过通道发送消息,接收方会立即开始接收消息,不会等到缓冲区已满。
根据 from_std()
的 Tokio 文档:
Creates new UdpSocket
from a previously bound std::net::UdpSocket
.
This function is intended to be used to wrap a UDP socket from the
standard library in the Tokio equivalent. The conversion assumes nothing
about the underlying socket; it is left up to the user to set it in
non-blocking mode.
This can be used in conjunction with socket2's Socket
interface to
configure a socket before it's handed off, such as setting options like
reuse_address
or binding to multiple addresses.
不处于 non-blocking 模式的套接字将阻止 Tokio 正常工作。
就用tokio函数bind()
,简单多了。
在我的代码片段中,tokio (v0.3) mpsc:channel
接收器仅在缓冲区已满时接收消息。缓冲区大小无关紧要。
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::time::sleep;
const MESSAGE_LENGTH: usize = 1024;
pub struct Peer {
socket: Arc<UdpSocket>,
}
impl Peer {
pub fn new<S: ToSocketAddrs>(addr: S) -> Peer {
let socket = std::net::UdpSocket::bind(addr).expect("could not create socket");
let peer = Peer {
socket: Arc::new(UdpSocket::from_std(socket).unwrap()),
};
peer.start_inbound_message_handler();
peer
}
pub fn local_addr(&self) -> SocketAddr {
self.socket.local_addr().unwrap()
}
fn start_inbound_message_handler(&self) {
let socket = self.socket.clone();
let (tx, rx) = mpsc::channel(1);
self.start_request_handler(rx);
tokio::spawn(async move {
let mut buf = [0u8; MESSAGE_LENGTH];
loop {
if let Ok((len, addr)) = socket.recv_from(&mut buf).await {
println!("received {} bytes from {}", len, addr);
if let Err(_) = tx.send(true).await {
println!("error sending msg to request handler");
}
}
}
});
}
fn start_request_handler(&self, mut receiver: mpsc::Receiver<bool>) {
tokio::spawn(async move {
while let Some(msg) = receiver.recv().await {
println!("got ping request: {:?}", msg);
}
});
}
pub async fn send_ping(&self, dest: String) -> Result<(), io::Error> {
let buf = [255u8; MESSAGE_LENGTH];
self.socket.send_to(&buf[..], &dest).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let peer1 = Peer::new("0.0.0.0:0");
println!("peer1 started on: {}", peer1.local_addr().to_string());
let peer2 = Peer::new("0.0.0.0:0");
println!("peer2 started on: {}", peer2.local_addr().to_string());
peer2.send_ping(peer1.local_addr().to_string()).await?;
peer2.send_ping(peer1.local_addr().to_string()).await?;
sleep(Duration::from_secs(100)).await;
Ok(())
}
Link 到 Playground
在 start_inbound_message_handler
函数中,我开始从套接字读取,如果收到消息,则 mpsc::channel
上的消息将发送到 start_request_handler
进行处理的地方,在这种情况下,如果接收方收到任何内容,将写入一个简单的日志输出。
在 main
函数中,我正在创建两个对等点,peer1 和 peer2,在创建两个对等点后,我开始向第一个对等点发出 ping 请求。在 start_inbound_message_handler
中,我将从 udp 套接字接收数据并通过 mpsc::channel
发送消息,发送 returns 没有错误。问题如前所述,接收方只有在缓冲区已满时才会收到消息。在这种情况下,缓冲区是 1
。因此,如果我发送第二个 ping,则会收到第一个 ping。我不知道为什么会这样。
预期的行为是,如果我通过通道发送消息,接收方会立即开始接收消息,不会等到缓冲区已满。
根据 from_std()
的 Tokio 文档:
Creates new
UdpSocket
from a previously boundstd::net::UdpSocket
.This function is intended to be used to wrap a UDP socket from the standard library in the Tokio equivalent. The conversion assumes nothing about the underlying socket; it is left up to the user to set it in non-blocking mode.
This can be used in conjunction with socket2's
Socket
interface to configure a socket before it's handed off, such as setting options likereuse_address
or binding to multiple addresses.
不处于 non-blocking 模式的套接字将阻止 Tokio 正常工作。
就用tokio函数bind()
,简单多了。