在队列中发送数据并将数据存储在另一个队列中的多线程客户端,同时不会在 Rust Tokio 中阻塞
Multithreaded Client that sends data in a queue and stores data in another, while not blocking in Rust Tokio
我在制作一个 Tokio 客户端时遇到困难,该客户端从服务器接收数据包并将它们存储在队列中供主线程处理,同时能够同时从另一个队列向服务器发送数据包.
我正在尝试制作一个非常简单的在线游戏演示,让游戏客户端发送数据(它自己修改的状态,比如玩家移动)并接收数据(由其他玩家和服务器修改的游戏状态,比如NPC/other 名玩家也移动了)。
这个想法是让一个网络线程访问两个 Arcs
持有 Mutexes
到 Vec<bytes::Bytes>
存储序列化数据。一个 Arc
用于 IncomingPackets
,另一个用于 OutgoingPackets
。 IncomingPackets
将由从服务器发送到客户端的数据包填充,稍后将由主线程读取,而 OutgoingPackets
将由主线程填充应发送到服务器的数据包。
我似乎无法在另一个线程中接收或发送数据包。
客户端将只连接到服务器,而服务器将允许多个客户端(将单独服务)。
关于流的用法和实现的解释对新手来说并不友好,但我认为我应该以某种方式使用它们。
我写了一些代码,但它不起作用,可能是错误的。
(我的原始代码无法编译,所以将其视为伪代码,抱歉)
extern crate byteorder; // 1.3.4
extern crate futures; // 0.3.5
extern crate tokio; // 0.2.21
use bytes::Bytes;
use futures::future;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use byteorder::{BigEndian, WriteBytesExt};
use std::io;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
data: Mutex<Vec<bytes::Bytes>>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
let (mut r, mut w) = stream.split();
let mut inc: Vec<bytes::Bytes> = Vec::new();
inc.push(Bytes::from("Wow"));
let mut incoming_packets = Arc::new(SharedPackets {
data: Mutex::new(inc),
});
let mut outg: Vec<bytes::Bytes> = Vec::new();
outg.push(Bytes::from("Wow"));
let mut outgoint_packets = Arc::new(SharedPackets {
data: Mutex::new(outg),
});
let mut local_incoming_packets = Arc::clone(&incoming_packets);
let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
let mut rarc = Arc::new(Mutex::new(r));
let mut warc = Arc::new(Mutex::new(w));
tokio::spawn(async move {
//send and receive are both async functions that contain an infinite loop
//they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
//send reads the queue and write this data on the socket
//recv reads the socket and write this data on the queue
//both "queues" are manipulated by the main thread
let mut read = &*rarc.lock().unwrap();
let mut write = &*warc.lock().unwrap();
future::try_join(
send(&mut write, &mut local_outgoint_packets),
recv(&mut read, &mut local_incoming_packets),
)
.await;
});
loop {
//read & write other stuff on both incoming_packets & outgoint_packets
//until the end of the program
}
}
async fn recv(reader: &mut ReadHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
loop {
let mut buf: Vec<u8> = vec![0; 4096];
let n = match reader.read(&mut buf).await {
Ok(n) if n == 0 => return Ok(()),
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return Err(e);
}
};
}
}
async fn send(writer: &mut WriteHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
loop {
//task::sleep(Duration::from_millis(300)).await;
{
let a = vec!["AAAA"];
for i in a.iter() {
let mut byte_array = vec![];
let str_bytes = i.as_bytes();
WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32)
.unwrap();
byte_array.extend(str_bytes);
writer.write(&byte_array).await?;
}
}
}
}
这不编译:
error: future cannot be sent between threads safely
--> src/main.rs:46:5
|
46 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::spawn`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:55:9
|
52 | let mut read = &*rarc.lock().unwrap();
| -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>` which is not `Send`
...
55 | / future::try_join(
56 | | send(&mut write, &mut local_outgoint_packets),
57 | | recv(&mut read, &mut local_incoming_packets),
58 | | )
59 | | .await;
| |______________^ await occurs here, with `rarc.lock().unwrap()` maybe used later
60 | });
| - `rarc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/main.rs:52:25
|
52 | let mut read = &*rarc.lock().unwrap();
| ^^^^^^^^^^^^^^^^^^^^^
error: future cannot be sent between threads safely
--> src/main.rs:46:5
|
46 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::spawn`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:55:9
|
53 | let mut write = &*warc.lock().unwrap();
| -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>` which is not `Send`
54 |
55 | / future::try_join(
56 | | send(&mut write, &mut local_outgoint_packets),
57 | | recv(&mut read, &mut local_incoming_packets),
58 | | )
59 | | .await;
| |______________^ await occurs here, with `warc.lock().unwrap()` maybe used later
60 | });
| - `warc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/main.rs:53:26
|
53 | let mut write = &*warc.lock().unwrap();
| ^^^^^^^^^^^^^^^^^^^^^
我认为这是最少的问题,因为我是 tokio 的新手。
我找不到这方面的例子,你知道解决这个问题的有效方法吗?
这是一个有点做作的例子,但它应该有所帮助:
use std::{sync::Arc, time::Duration};
use tokio::{self, net::TcpStream, sync::Mutex};
#[tokio::main]
async fn main() {
let mut incoming_packets = Arc::new(Mutex::new(vec![b"Wow".to_vec()]));
let mut local_incoming_packets = incoming_packets.clone();
tokio::spawn(async move {
for i in 0usize..10 {
tokio::time::delay_for(Duration::from_millis(200)).await;
let mut packets = local_incoming_packets.lock().await;
packets.push(i.to_ne_bytes().to_vec());
}
});
loop {
tokio::time::delay_for(Duration::from_millis(200)).await;
let packets = incoming_packets.lock().await;
dbg!(packets);
}
}
您可以看到我必须在 async move
块之外进行克隆,因为该块拥有其中所有内容的所有权。我不确定 r
和 w
但您可能还需要将它们移动到块内,然后才能将可变引用传递给它们。如果您提供包含所有正确 use
语句的代码,我可以更新我的答案。
您需要记住的一件事是 main()
技术上可以在 spawn
ed 代码之前退出。
此外,请注意,我使用了 tokio::sync::Mutex
以便您可以在等待获取锁时让步。
为什么不使用通道来完成 sending/receiveing 数据 from/to 其他任务?
这里有很多有用的例子 how to share data between tasks
编辑:我查看了您的代码,发现您使用了错误的互斥量。在处理异步代码时,您应该使用 tokio::sync::Mutex。其次,arc 中的引用存在问题。我已将创建弧线移动到衍生任务并将克隆添加到 send/reacv 函数。
extern crate futures; // 0.3.5; // 0.1.36std;
extern crate tokio; // 0.2.21;
extern crate byteorder; // 1.3.4;
use std::{error::Error};
use std::sync::{Arc};
use tokio::sync::Mutex;
use tokio::net::TcpStream;
use futures::{future};
use bytes::Bytes;
use std::io;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use byteorder::{BigEndian, WriteBytesExt};
//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
data: Mutex<Vec<bytes::Bytes>>
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut inc : Vec<bytes::Bytes> = Vec::new();
inc.push(Bytes::from("Wow"));
let mut incoming_packets = Arc::new(SharedPackets {
data: Mutex::new(inc)
});
let mut outg : Vec<bytes::Bytes> = Vec::new();
outg.push(Bytes::from("Wow"));
let mut outgoint_packets = Arc::new(SharedPackets {
data: Mutex::new(outg)
});
let mut local_incoming_packets = Arc::clone(&incoming_packets);
let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
tokio::spawn(async move {
let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
let (mut r, mut w) = stream.split();
let mut rarc = Arc::new(Mutex::new(& mut r));
let mut warc = Arc::new(Mutex::new(& mut w));
//send and receive are both async functions that contain an infinite loop
//they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
//send reads the queue and write this data on the socket
//recv reads the socket and write this data on the queue
//both "queues" are manipulated by the main thread
//let mut read = &*rarc.lock().await;
//let mut write = &*warc.lock().await;
future::try_join(send(warc.clone(), &mut local_outgoint_packets), recv(rarc.clone(), &mut local_incoming_packets)).await;
});
loop {
//read & write other stuff on both incoming_packets & outgoint_packets
//until the end of the program
}
}
async fn recv(readerw: Arc<Mutex<&mut ReadHalf<'_>>>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
let mut reader = readerw.lock().await;
loop {
let mut buf : Vec<u8> = vec![0; 4096];
let n = match reader.read(&mut buf).await {
Ok(n) if n == 0 => return Ok(()),
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return Err(e);
}
};
}
}
async fn send(writerw: Arc<Mutex<&mut WriteHalf<'_>>>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
let mut writer = writerw.lock().await;
loop{
//task::sleep(Duration::from_millis(300)).await;
{
let a = vec!["AAAA"];
for i in a.iter() {
let mut byte_array = vec![];
let str_bytes = i.as_bytes();
WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32).unwrap();
byte_array.extend(str_bytes);
writer.write(&byte_array).await?;
}
}
}
}
这是没有错误的完整代码,虽然没有测试它:Playground link
我在制作一个 Tokio 客户端时遇到困难,该客户端从服务器接收数据包并将它们存储在队列中供主线程处理,同时能够同时从另一个队列向服务器发送数据包.
我正在尝试制作一个非常简单的在线游戏演示,让游戏客户端发送数据(它自己修改的状态,比如玩家移动)并接收数据(由其他玩家和服务器修改的游戏状态,比如NPC/other 名玩家也移动了)。
这个想法是让一个网络线程访问两个 Arcs
持有 Mutexes
到 Vec<bytes::Bytes>
存储序列化数据。一个 Arc
用于 IncomingPackets
,另一个用于 OutgoingPackets
。 IncomingPackets
将由从服务器发送到客户端的数据包填充,稍后将由主线程读取,而 OutgoingPackets
将由主线程填充应发送到服务器的数据包。
我似乎无法在另一个线程中接收或发送数据包。
客户端将只连接到服务器,而服务器将允许多个客户端(将单独服务)。
关于流的用法和实现的解释对新手来说并不友好,但我认为我应该以某种方式使用它们。
我写了一些代码,但它不起作用,可能是错误的。
(我的原始代码无法编译,所以将其视为伪代码,抱歉)
extern crate byteorder; // 1.3.4
extern crate futures; // 0.3.5
extern crate tokio; // 0.2.21
use bytes::Bytes;
use futures::future;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use byteorder::{BigEndian, WriteBytesExt};
use std::io;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
data: Mutex<Vec<bytes::Bytes>>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
let (mut r, mut w) = stream.split();
let mut inc: Vec<bytes::Bytes> = Vec::new();
inc.push(Bytes::from("Wow"));
let mut incoming_packets = Arc::new(SharedPackets {
data: Mutex::new(inc),
});
let mut outg: Vec<bytes::Bytes> = Vec::new();
outg.push(Bytes::from("Wow"));
let mut outgoint_packets = Arc::new(SharedPackets {
data: Mutex::new(outg),
});
let mut local_incoming_packets = Arc::clone(&incoming_packets);
let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
let mut rarc = Arc::new(Mutex::new(r));
let mut warc = Arc::new(Mutex::new(w));
tokio::spawn(async move {
//send and receive are both async functions that contain an infinite loop
//they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
//send reads the queue and write this data on the socket
//recv reads the socket and write this data on the queue
//both "queues" are manipulated by the main thread
let mut read = &*rarc.lock().unwrap();
let mut write = &*warc.lock().unwrap();
future::try_join(
send(&mut write, &mut local_outgoint_packets),
recv(&mut read, &mut local_incoming_packets),
)
.await;
});
loop {
//read & write other stuff on both incoming_packets & outgoint_packets
//until the end of the program
}
}
async fn recv(reader: &mut ReadHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
loop {
let mut buf: Vec<u8> = vec![0; 4096];
let n = match reader.read(&mut buf).await {
Ok(n) if n == 0 => return Ok(()),
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return Err(e);
}
};
}
}
async fn send(writer: &mut WriteHalf<'_>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
loop {
//task::sleep(Duration::from_millis(300)).await;
{
let a = vec!["AAAA"];
for i in a.iter() {
let mut byte_array = vec![];
let str_bytes = i.as_bytes();
WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32)
.unwrap();
byte_array.extend(str_bytes);
writer.write(&byte_array).await?;
}
}
}
}
这不编译:
error: future cannot be sent between threads safely
--> src/main.rs:46:5
|
46 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::spawn`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:55:9
|
52 | let mut read = &*rarc.lock().unwrap();
| -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::ReadHalf<'_>>` which is not `Send`
...
55 | / future::try_join(
56 | | send(&mut write, &mut local_outgoint_packets),
57 | | recv(&mut read, &mut local_incoming_packets),
58 | | )
59 | | .await;
| |______________^ await occurs here, with `rarc.lock().unwrap()` maybe used later
60 | });
| - `rarc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/main.rs:52:25
|
52 | let mut read = &*rarc.lock().unwrap();
| ^^^^^^^^^^^^^^^^^^^^^
error: future cannot be sent between threads safely
--> src/main.rs:46:5
|
46 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::spawn`
|
= help: within `impl futures::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:55:9
|
53 | let mut write = &*warc.lock().unwrap();
| -------------------- has type `std::sync::MutexGuard<'_, tokio::net::tcp::WriteHalf<'_>>` which is not `Send`
54 |
55 | / future::try_join(
56 | | send(&mut write, &mut local_outgoint_packets),
57 | | recv(&mut read, &mut local_incoming_packets),
58 | | )
59 | | .await;
| |______________^ await occurs here, with `warc.lock().unwrap()` maybe used later
60 | });
| - `warc.lock().unwrap()` is later dropped here
help: consider moving this into a `let` binding to create a shorter lived borrow
--> src/main.rs:53:26
|
53 | let mut write = &*warc.lock().unwrap();
| ^^^^^^^^^^^^^^^^^^^^^
我认为这是最少的问题,因为我是 tokio 的新手。
我找不到这方面的例子,你知道解决这个问题的有效方法吗?
这是一个有点做作的例子,但它应该有所帮助:
use std::{sync::Arc, time::Duration};
use tokio::{self, net::TcpStream, sync::Mutex};
#[tokio::main]
async fn main() {
let mut incoming_packets = Arc::new(Mutex::new(vec![b"Wow".to_vec()]));
let mut local_incoming_packets = incoming_packets.clone();
tokio::spawn(async move {
for i in 0usize..10 {
tokio::time::delay_for(Duration::from_millis(200)).await;
let mut packets = local_incoming_packets.lock().await;
packets.push(i.to_ne_bytes().to_vec());
}
});
loop {
tokio::time::delay_for(Duration::from_millis(200)).await;
let packets = incoming_packets.lock().await;
dbg!(packets);
}
}
您可以看到我必须在 async move
块之外进行克隆,因为该块拥有其中所有内容的所有权。我不确定 r
和 w
但您可能还需要将它们移动到块内,然后才能将可变引用传递给它们。如果您提供包含所有正确 use
语句的代码,我可以更新我的答案。
您需要记住的一件事是 main()
技术上可以在 spawn
ed 代码之前退出。
此外,请注意,我使用了 tokio::sync::Mutex
以便您可以在等待获取锁时让步。
为什么不使用通道来完成 sending/receiveing 数据 from/to 其他任务? 这里有很多有用的例子 how to share data between tasks
编辑:我查看了您的代码,发现您使用了错误的互斥量。在处理异步代码时,您应该使用 tokio::sync::Mutex。其次,arc 中的引用存在问题。我已将创建弧线移动到衍生任务并将克隆添加到 send/reacv 函数。
extern crate futures; // 0.3.5; // 0.1.36std;
extern crate tokio; // 0.2.21;
extern crate byteorder; // 1.3.4;
use std::{error::Error};
use std::sync::{Arc};
use tokio::sync::Mutex;
use tokio::net::TcpStream;
use futures::{future};
use bytes::Bytes;
use std::io;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::{ReadHalf, WriteHalf};
use byteorder::{BigEndian, WriteBytesExt};
//This is the SharedPackets struct that is located in the crate structures
struct SharedPackets {
data: Mutex<Vec<bytes::Bytes>>
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut inc : Vec<bytes::Bytes> = Vec::new();
inc.push(Bytes::from("Wow"));
let mut incoming_packets = Arc::new(SharedPackets {
data: Mutex::new(inc)
});
let mut outg : Vec<bytes::Bytes> = Vec::new();
outg.push(Bytes::from("Wow"));
let mut outgoint_packets = Arc::new(SharedPackets {
data: Mutex::new(outg)
});
let mut local_incoming_packets = Arc::clone(&incoming_packets);
let mut local_outgoint_packets = Arc::clone(&outgoint_packets);
tokio::spawn(async move {
let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
let (mut r, mut w) = stream.split();
let mut rarc = Arc::new(Mutex::new(& mut r));
let mut warc = Arc::new(Mutex::new(& mut w));
//send and receive are both async functions that contain an infinite loop
//they basically use AsyncWriteExt and AsyncReadExt to manipulate both halves of the stream
//send reads the queue and write this data on the socket
//recv reads the socket and write this data on the queue
//both "queues" are manipulated by the main thread
//let mut read = &*rarc.lock().await;
//let mut write = &*warc.lock().await;
future::try_join(send(warc.clone(), &mut local_outgoint_packets), recv(rarc.clone(), &mut local_incoming_packets)).await;
});
loop {
//read & write other stuff on both incoming_packets & outgoint_packets
//until the end of the program
}
}
async fn recv(readerw: Arc<Mutex<&mut ReadHalf<'_>>>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
let mut reader = readerw.lock().await;
loop {
let mut buf : Vec<u8> = vec![0; 4096];
let n = match reader.read(&mut buf).await {
Ok(n) if n == 0 => return Ok(()),
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return Err(e);
}
};
}
}
async fn send(writerw: Arc<Mutex<&mut WriteHalf<'_>>>, queue: &mut Arc<SharedPackets>) -> Result<(), io::Error> {
let mut writer = writerw.lock().await;
loop{
//task::sleep(Duration::from_millis(300)).await;
{
let a = vec!["AAAA"];
for i in a.iter() {
let mut byte_array = vec![];
let str_bytes = i.as_bytes();
WriteBytesExt::write_u32::<BigEndian>(&mut byte_array, str_bytes.len() as u32).unwrap();
byte_array.extend(str_bytes);
writer.write(&byte_array).await?;
}
}
}
}
这是没有错误的完整代码,虽然没有测试它:Playground link