async_send 数据未发送
async_send data not sent
[免责声明] 我是 boost 新手。
查看 boost::asio 并尝试创建具有以下功能的简单异步 TCP 服务器:
- 侦听端口 13 上的连接
- 连接后,接收数据
- 如果收到的数据 == 时间,则 return 当前日期时间,否则 return 预定义字符串 ("Something else was requested")
问题:
虽然,我接受连接并接收数据,但在使用 async_send 传输数据时,虽然我没有收到任何错误并且 bytes_transferred 的值是正确的,但我在客户端收到空数据。
如果我尝试从 handle_accept(而不是 handle_read)内传输数据,这工作正常。
实施:
我研究了发现 here 的 boost asio 教程:
实例化一个 tcp_server 对象,它基本上启动接受器并开始监听。如下图:
int main()
{
try
{
boost::asio::io_service io_service;
tcp_server server(io_service);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
并在 tcp_server 中:
class tcp_server
{
public:
tcp_server(boost::asio::io_service& io_service)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), 13))
{
start_accept();
}
private:
void start_accept()
{
using std::cout;
tcp_connection::pointer new_connection =
tcp_connection::create(acceptor_.get_io_service());
acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
boost::asio::placeholders::error));
cout << "Done";
}
...
}
接受连接后,我将按如下所示进行处理:
void handle_accept(tcp_connection::pointer new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
}
start_accept();
}
下面是tcp_connection::start()
方法:
void start()
{
boost::asio::async_read(socket_, boost::asio::buffer(inputBuffer_),
boost::bind(&tcp_connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
/* the snippet below works here - but not in handle_read
outputBuffer_ = make_daytime_string();
boost::asio::async_write(socket_, boost::asio::buffer(outputBuffer_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));*/
}
并在 handle_read
中:
void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
outputBuffer_ = make_daytime_string();
if (strcmp(inputBuffer_, "time"))
{
/*this does not work - correct bytes_transferred but nothing shown on receiving end */
boost::asio::async_write(socket_, boost::asio::buffer(outputBuffer_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
{
outputBuffer_ = "Something else was requested";//, 128);
boost::asio::async_write(socket_, boost::asio::buffer(outputBuffer_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
handle_write
如下图:
void handle_write(const boost::system::error_code& error,
size_t bytes_transferred)
{
if (!error)
{
std::cout << "Bytes transferred: " << bytes_transferred;
std::cout << "Message sent: " << outputBuffer_;
}
else
{
std::cout << "Error in writing: " << error.message();
}
}
注意以下关于 handle_write
的内容(这才是真正奇怪的事情):
- 没有错误
- bytes_transferred 变量具有正确的值
- outputBuffer_ 具有正确的值(在 handle_read 中设置)
然而,在客户端(Packet Sender)收到的包是空的(就数据而言)。
完整代码分享here.
完整的测试程序(c++14)。注意响应接收时异步缓冲的处理 - 可能已经在进行发送。
#include <boost/asio.hpp>
#include <thread>
#include <future>
#include <vector>
#include <array>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <iterator>
#include <iostream>
namespace asio = boost::asio;
asio::io_service server_service;
asio::io_service::work server_work{server_service};
bool listening = false;
std::condition_variable cv_listening;
std::mutex management_mutex;
auto const shared_query = asio::ip::tcp::resolver::query(asio::ip::tcp::v4(), "localhost", "8082");
void client()
try
{
asio::io_service client_service;
asio::ip::tcp::socket socket(client_service);
auto lock = std::unique_lock<std::mutex>(management_mutex);
cv_listening.wait(lock, [] { return listening; });
lock.unlock();
asio::ip::tcp::resolver resolver(client_service);
asio::connect(socket, resolver.resolve(shared_query));
auto s = std::string("time\ntime\ntime\n");
asio::write(socket, asio::buffer(s));
socket.shutdown(asio::ip::tcp::socket::shutdown_send);
asio::streambuf sb;
boost::system::error_code sink;
asio::read(socket, sb, sink);
std::cout << std::addressof(sb);
socket.close();
server_service.stop();
}
catch(const boost::system::system_error& se)
{
std::cerr << "client: " << se.code().message() << std::endl;
}
struct connection
: std::enable_shared_from_this<connection>
{
connection(asio::io_service& ios)
: strand_(ios)
{
}
void run()
{
asio::async_read_until(socket_, buffer_, "\n",
strand_.wrap([self = shared_from_this()](auto const&ec, auto size)
{
if (size == 0 )
{
// error condition
boost::system::error_code sink;
self->socket_.shutdown(asio::ip::tcp::socket::shutdown_receive, sink);
}
else {
self->buffer_.commit(size);
std::istream is(std::addressof(self->buffer_));
std::string str;
while (std::getline(is, str))
{
if (str == "time") {
self->queue_send("eight o clock");
}
}
self->run();
}
}));
}
void queue_send(std::string s)
{
assert(strand_.running_in_this_thread());
s += '\n';
send_buffers_pending_.push_back(std::move(s));
nudge_send();
}
void nudge_send()
{
assert(strand_.running_in_this_thread());
if (send_buffers_sending_.empty() and not send_buffers_pending_.empty())
{
std::swap(send_buffers_pending_, send_buffers_sending_);
std::vector<asio::const_buffers_1> send_buffers;
send_buffers.reserve(send_buffers_sending_.size());
std::transform(send_buffers_sending_.begin(), send_buffers_sending_.end(),
std::back_inserter(send_buffers),
[](auto&& str) {
return asio::buffer(str);
});
asio::async_write(socket_, send_buffers,
strand_.wrap([self = shared_from_this()](auto const& ec, auto size)
{
// should check for errors here...
self->send_buffers_sending_.clear();
self->nudge_send();
}));
}
}
asio::io_service::strand strand_;
asio::ip::tcp::socket socket_{strand_.get_io_service()};
asio::streambuf buffer_;
std::vector<std::string> send_buffers_pending_;
std::vector<std::string> send_buffers_sending_;
};
void begin_accepting(asio::ip::tcp::acceptor& acceptor)
{
auto candidate = std::make_shared<connection>(acceptor.get_io_service());
acceptor.async_accept(candidate->socket_, [candidate, &acceptor](auto const& ec)
{
if (not ec) {
candidate->run();
begin_accepting(acceptor);
}
});
}
void server()
try
{
asio::ip::tcp::acceptor acceptor(server_service);
asio::ip::tcp::resolver resolver(server_service);
auto first = resolver.resolve(shared_query);
acceptor.open(first->endpoint().protocol());
acceptor.bind(first->endpoint());
acceptor.listen();
begin_accepting(acceptor);
auto lock = std::unique_lock<std::mutex>(management_mutex);
listening = true;
lock.unlock();
cv_listening.notify_all();
server_service.run();
}
catch(const boost::system::system_error& se)
{
std::cerr << "server: " << se.code().message() << std::endl;
}
int main()
{
using future_type = std::future<void>;
auto stuff = std::array<future_type, 2> {{std::async(std::launch::async, client),
std::async(std::launch::async, server)}};
for (auto& f : stuff) f.wait();
}
此代码中存在多个问题。其中一些可能与您的问题有关:
- TCP 没有数据包的定义,因此不能保证您会在 handle_read 中立即收到
time
。为此,您需要一个状态机并尊重 bytes_transferred 信息。如果您只收到消息的一部分,则需要在正确的偏移量处继续。或者您可以使用 asio 实用函数,例如精确读取字节长度或读取一行。
- 除了最后一点,你不应该真正将接收到的数据与
strcmp
进行比较。这只有在远程也通过连接发送空终止符时才有效 - 是吗?
- 您不检查是否发生了错误,尽管这可能会在其他错误中表现出来。
- 如果您在 shart 时间跨度内收到多个数据片段,则可能会发出多个并发异步写入。这在 asio 中无效。
- 更重要的是,您在发送过程中改变了发送缓冲区 (
outputBuffer_
)。这几乎会导致未定义的行为。 asio 可能会尝试写入一块不再有效的内存。
在问题中提供的评论的集体帮助下,我已经解决了这个问题。我遇到的行为是因为 async_read
的功能。更具体地说,在 boost asio documentation 中显示:
This function is used to asynchronously read a certain number of bytes
of data from a stream. The function call always returns immediately.
The asynchronous operation will continue until one of the following
conditions is true:
- The supplied buffers are full. That is, the bytes transferred is equal to the sum of the buffer sizes.
- An error occurred.
我用来读取输入的 inputBuffer_ 是一个 128 字符的数组。我使用的客户端只会传输真实数据(没有填充),因此 async_read 不会 return 直到连接被客户端关闭(或传输了 128 字节的数据)。当客户端关闭连接时,无法发回请求的数据。这也是它与@Arunmu 的简单 python tcp 客户端一起工作的原因(因为他总是发送 128 字节的数据)。
为了解决这些问题,我做了以下更改(提供了完整的工作代码 here 以供参考):
- In
tcp_connection::start
:我现在使用async_read_until
读取传入数据(并使用\n
作为分隔符)。输入存储在 boost::asio::streambuf
中。一旦找到定界符或发生错误,async_read
保证为 return。所以没有机会同时发出多个async_write
。
- 在
handle_read
中:我加入了错误检查,这使得调试变得更加简单。
[免责声明] 我是 boost 新手。
查看 boost::asio 并尝试创建具有以下功能的简单异步 TCP 服务器:
- 侦听端口 13 上的连接
- 连接后,接收数据
- 如果收到的数据 == 时间,则 return 当前日期时间,否则 return 预定义字符串 ("Something else was requested")
问题: 虽然,我接受连接并接收数据,但在使用 async_send 传输数据时,虽然我没有收到任何错误并且 bytes_transferred 的值是正确的,但我在客户端收到空数据。
如果我尝试从 handle_accept(而不是 handle_read)内传输数据,这工作正常。
实施: 我研究了发现 here 的 boost asio 教程: 实例化一个 tcp_server 对象,它基本上启动接受器并开始监听。如下图:
int main()
{
try
{
boost::asio::io_service io_service;
tcp_server server(io_service);
io_service.run();
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
并在 tcp_server 中:
class tcp_server
{
public:
tcp_server(boost::asio::io_service& io_service)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), 13))
{
start_accept();
}
private:
void start_accept()
{
using std::cout;
tcp_connection::pointer new_connection =
tcp_connection::create(acceptor_.get_io_service());
acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
boost::asio::placeholders::error));
cout << "Done";
}
...
}
接受连接后,我将按如下所示进行处理:
void handle_accept(tcp_connection::pointer new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
}
start_accept();
}
下面是tcp_connection::start()
方法:
void start()
{
boost::asio::async_read(socket_, boost::asio::buffer(inputBuffer_),
boost::bind(&tcp_connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
/* the snippet below works here - but not in handle_read
outputBuffer_ = make_daytime_string();
boost::asio::async_write(socket_, boost::asio::buffer(outputBuffer_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));*/
}
并在 handle_read
中:
void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
outputBuffer_ = make_daytime_string();
if (strcmp(inputBuffer_, "time"))
{
/*this does not work - correct bytes_transferred but nothing shown on receiving end */
boost::asio::async_write(socket_, boost::asio::buffer(outputBuffer_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else
{
outputBuffer_ = "Something else was requested";//, 128);
boost::asio::async_write(socket_, boost::asio::buffer(outputBuffer_),
boost::bind(&tcp_connection::handle_write, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
handle_write
如下图:
void handle_write(const boost::system::error_code& error,
size_t bytes_transferred)
{
if (!error)
{
std::cout << "Bytes transferred: " << bytes_transferred;
std::cout << "Message sent: " << outputBuffer_;
}
else
{
std::cout << "Error in writing: " << error.message();
}
}
注意以下关于 handle_write
的内容(这才是真正奇怪的事情):
- 没有错误
- bytes_transferred 变量具有正确的值
- outputBuffer_ 具有正确的值(在 handle_read 中设置)
然而,在客户端(Packet Sender)收到的包是空的(就数据而言)。
完整代码分享here.
完整的测试程序(c++14)。注意响应接收时异步缓冲的处理 - 可能已经在进行发送。
#include <boost/asio.hpp>
#include <thread>
#include <future>
#include <vector>
#include <array>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <iterator>
#include <iostream>
namespace asio = boost::asio;
asio::io_service server_service;
asio::io_service::work server_work{server_service};
bool listening = false;
std::condition_variable cv_listening;
std::mutex management_mutex;
auto const shared_query = asio::ip::tcp::resolver::query(asio::ip::tcp::v4(), "localhost", "8082");
void client()
try
{
asio::io_service client_service;
asio::ip::tcp::socket socket(client_service);
auto lock = std::unique_lock<std::mutex>(management_mutex);
cv_listening.wait(lock, [] { return listening; });
lock.unlock();
asio::ip::tcp::resolver resolver(client_service);
asio::connect(socket, resolver.resolve(shared_query));
auto s = std::string("time\ntime\ntime\n");
asio::write(socket, asio::buffer(s));
socket.shutdown(asio::ip::tcp::socket::shutdown_send);
asio::streambuf sb;
boost::system::error_code sink;
asio::read(socket, sb, sink);
std::cout << std::addressof(sb);
socket.close();
server_service.stop();
}
catch(const boost::system::system_error& se)
{
std::cerr << "client: " << se.code().message() << std::endl;
}
struct connection
: std::enable_shared_from_this<connection>
{
connection(asio::io_service& ios)
: strand_(ios)
{
}
void run()
{
asio::async_read_until(socket_, buffer_, "\n",
strand_.wrap([self = shared_from_this()](auto const&ec, auto size)
{
if (size == 0 )
{
// error condition
boost::system::error_code sink;
self->socket_.shutdown(asio::ip::tcp::socket::shutdown_receive, sink);
}
else {
self->buffer_.commit(size);
std::istream is(std::addressof(self->buffer_));
std::string str;
while (std::getline(is, str))
{
if (str == "time") {
self->queue_send("eight o clock");
}
}
self->run();
}
}));
}
void queue_send(std::string s)
{
assert(strand_.running_in_this_thread());
s += '\n';
send_buffers_pending_.push_back(std::move(s));
nudge_send();
}
void nudge_send()
{
assert(strand_.running_in_this_thread());
if (send_buffers_sending_.empty() and not send_buffers_pending_.empty())
{
std::swap(send_buffers_pending_, send_buffers_sending_);
std::vector<asio::const_buffers_1> send_buffers;
send_buffers.reserve(send_buffers_sending_.size());
std::transform(send_buffers_sending_.begin(), send_buffers_sending_.end(),
std::back_inserter(send_buffers),
[](auto&& str) {
return asio::buffer(str);
});
asio::async_write(socket_, send_buffers,
strand_.wrap([self = shared_from_this()](auto const& ec, auto size)
{
// should check for errors here...
self->send_buffers_sending_.clear();
self->nudge_send();
}));
}
}
asio::io_service::strand strand_;
asio::ip::tcp::socket socket_{strand_.get_io_service()};
asio::streambuf buffer_;
std::vector<std::string> send_buffers_pending_;
std::vector<std::string> send_buffers_sending_;
};
void begin_accepting(asio::ip::tcp::acceptor& acceptor)
{
auto candidate = std::make_shared<connection>(acceptor.get_io_service());
acceptor.async_accept(candidate->socket_, [candidate, &acceptor](auto const& ec)
{
if (not ec) {
candidate->run();
begin_accepting(acceptor);
}
});
}
void server()
try
{
asio::ip::tcp::acceptor acceptor(server_service);
asio::ip::tcp::resolver resolver(server_service);
auto first = resolver.resolve(shared_query);
acceptor.open(first->endpoint().protocol());
acceptor.bind(first->endpoint());
acceptor.listen();
begin_accepting(acceptor);
auto lock = std::unique_lock<std::mutex>(management_mutex);
listening = true;
lock.unlock();
cv_listening.notify_all();
server_service.run();
}
catch(const boost::system::system_error& se)
{
std::cerr << "server: " << se.code().message() << std::endl;
}
int main()
{
using future_type = std::future<void>;
auto stuff = std::array<future_type, 2> {{std::async(std::launch::async, client),
std::async(std::launch::async, server)}};
for (auto& f : stuff) f.wait();
}
此代码中存在多个问题。其中一些可能与您的问题有关:
- TCP 没有数据包的定义,因此不能保证您会在 handle_read 中立即收到
time
。为此,您需要一个状态机并尊重 bytes_transferred 信息。如果您只收到消息的一部分,则需要在正确的偏移量处继续。或者您可以使用 asio 实用函数,例如精确读取字节长度或读取一行。 - 除了最后一点,你不应该真正将接收到的数据与
strcmp
进行比较。这只有在远程也通过连接发送空终止符时才有效 - 是吗? - 您不检查是否发生了错误,尽管这可能会在其他错误中表现出来。
- 如果您在 shart 时间跨度内收到多个数据片段,则可能会发出多个并发异步写入。这在 asio 中无效。
- 更重要的是,您在发送过程中改变了发送缓冲区 (
outputBuffer_
)。这几乎会导致未定义的行为。 asio 可能会尝试写入一块不再有效的内存。
在问题中提供的评论的集体帮助下,我已经解决了这个问题。我遇到的行为是因为 async_read
的功能。更具体地说,在 boost asio documentation 中显示:
This function is used to asynchronously read a certain number of bytes of data from a stream. The function call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:
- The supplied buffers are full. That is, the bytes transferred is equal to the sum of the buffer sizes.
- An error occurred.
我用来读取输入的 inputBuffer_ 是一个 128 字符的数组。我使用的客户端只会传输真实数据(没有填充),因此 async_read 不会 return 直到连接被客户端关闭(或传输了 128 字节的数据)。当客户端关闭连接时,无法发回请求的数据。这也是它与@Arunmu 的简单 python tcp 客户端一起工作的原因(因为他总是发送 128 字节的数据)。
为了解决这些问题,我做了以下更改(提供了完整的工作代码 here 以供参考):
- In
tcp_connection::start
:我现在使用async_read_until
读取传入数据(并使用\n
作为分隔符)。输入存储在boost::asio::streambuf
中。一旦找到定界符或发生错误,async_read
保证为 return。所以没有机会同时发出多个async_write
。 - 在
handle_read
中:我加入了错误检查,这使得调试变得更加简单。