使用 BOOST ASIO 读取 mpstat 输出时文件意外结束 async_read_until
Unexpected end of file while reading mpstat output with BOOST ASIO async_read_until
我尝试读取 mpstat 命令的输出(每秒收集 cpu 信息,即:"mptstat -P ALL 1")以获得有关 cpu 和核心使用情况的信息。在多核 cpu 上,我在读取第一个测量值后得到了意外的 "end of file" 状态。
mpstat 似乎以这样一种方式格式化其输出,即所有核心的测量值都由一个空行分隔。
我使用了 async_read_until,分隔符等于 '\n'。
请在下面找到一个小型复制器。使用此复制器,我得到以下信息:
Enter handle_read
handle_read got data: --Linux 4.13.0-46-generic (pierre) 26/08/2018 _x86_64_ (4 CPU)
--Enter handle_read
handle_read got data: --
11:39:11 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
11:39:11 all--Enter handle_read
handle_read got data: -- 5,69 0,06 2,90 0,20 0,00 0,02 0,00 0,00 0,00 91,13
11:39:11 0 5,95 0,05--Enter handle_read
handle_read got data: -- 2,80 0,13 0,00 0,01 0,00 0,00 0,00 91,06
11:39:11 1 5,24 0,01 2,50 0,14 0,00 0,02 0,00 0,00 0,00 92,09
11:39:11 2 6,30 0,17--Enter handle_read
handle_read got data: -- 2,29 0,36 0,00 0,04 0,00 0,00 0,00 90,85
11:39:11 3 5,28 0,01 4,01 0,17 0,00 0,00 0,00 0,00 0,00 90,52
--Enter handle_read
Problem while trying to read mpstat data: End of file
基本上,我能够收到第一个测量结果,但之后我立即收到 "end of file"。看起来 mpstat 发出的空行是我得到 "end of line" 指示的原因...但我不明白为什么...
有人可以提供一些帮助吗?非常感谢。
#include <boost/asio.hpp>
#include <unistd.h>
#include <iostream>
#define PIPE_READ 0
#define PIPE_WRITE 1
#define ENDOFLINE "\n"
static boost::asio::streambuf data;
static std::shared_ptr<boost::asio::posix::stream_descriptor> cpuLoadDataStream;
static void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
printf("Enter handle_read\n");
if (error == boost::system::errc::success) {
if (data.size() > 0) {
std::string dataReceived((std::istreambuf_iterator<char>(&data)), std::istreambuf_iterator<char>());
std::cout << "handle_read got data: " << "--" << dataReceived << "--";
}
boost::asio::async_read_until(*cpuLoadDataStream, data, ENDOFLINE, handle_read);
} else {
std::cout << "Problem while trying to read mpstat data: " << error.message() << std::endl;
}
}
int main() {
int pipeFd[2];
boost::asio::io_service ioService;
if (pipe(pipeFd) < 0) {
std::cout << "pipe error" << std::endl;
exit(EXIT_FAILURE);
}
int pid;
if ((pid = fork()) == 0) {
// son
close(pipeFd[PIPE_READ]);
if (dup2(pipeFd[PIPE_WRITE], STDOUT_FILENO) == -1) {
std::cout << "dup2 error" << std::endl;
exit(EXIT_FAILURE);
}
close(pipeFd[PIPE_WRITE]);
if (execlp("mpstat", "1", "-P", "ALL", 0) == -1) {
std::cout << "execlp error" << std::endl;
exit(EXIT_FAILURE);
}
} else {
// parent
if (pid == -1) {
std::cout << "fork error" << std::endl;
exit(EXIT_FAILURE);
} else {
close(pipeFd[PIPE_WRITE]);
cpuLoadDataStream = std::make_shared<boost::asio::posix::stream_descriptor>(ioService, ::dup(pipeFd[PIPE_READ]));
boost::asio::async_read_until(*cpuLoadDataStream, data, ENDOFLINE, handle_read);
}
}
ioService.run();
return 1;
}
首先:如果您希望 "1"
表示 mpstat
以 1 秒的间隔重复运行,您必须将其作为第一个参数,而不是进程名称:
if (execlp("mpstat", "mpstat", "1", "-P", "ALL", 0) == -1) {
The first argument, by convention, should point to the filename associated with the file being executed.
async_read_until
读取直到输入缓冲区 至少 包含分隔符。因此,当您读取缓冲区时,必须考虑 bytes_tranferred
将不包括定界符。
确保也使用分隔符以避免陷入无限循环(因为已经满足停止条件):
void handle_read(const boost::system::error_code &error, std::size_t bytes_transferred) {
std::cout << "Enter handle_read (" << error.message() << ")\n";
if (!error) {
if (bytes_transferred > 0) {
std::copy_n(std::istreambuf_iterator<char>(&data), bytes_transferred, std::ostreambuf_iterator<char>(std::cout << "handle_read got data: '"));
std::cout << "' --\n";
data.consume(delimiter.size());
}
do_read_loop();
}
}
或更简单:
void handle_read(const boost::system::error_code &error, std::size_t bytes_transferred) {
std::cout << "Enter handle_read (" << error.message() << ")\n";
if (!error) {
std::string line;
if (getline(std::istream(&data), line))
std::cout << "handle_read got data: '" << line << "'\n";
do_read_loop();
}
}
我更喜欢第一个,因为它更明确且普遍适用。
Side Notes:
there was a problem with the way you used global data (that would only get destroyed after the corresponding io_servce
was already out of scope). That was UB - using asan/ubsan allows you to spot these bugs.
Using io_service
across forks is not supported without support from the service implementations, see https://www.boost.org/doc/libs/1_68_0/doc/html/boost_asio/reference/io_context/notify_fork.html
Fixed/Simplified 亚西欧
这里进行了一些简化,并删除了全局变量:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <unistd.h>
namespace ba = boost::asio;
struct Reader : std::enable_shared_from_this<Reader> {
ba::streambuf data;
ba::posix::stream_descriptor cpuLoadDataStream;
std::string const delimiter = "\n";
Reader(ba::io_service& svc, int fd)
: cpuLoadDataStream(svc, fd) {}
void do_read_loop() {
async_read_until(cpuLoadDataStream, data, delimiter, boost::bind(&Reader::handle_read, shared_from_this(), _1, _2));
}
void handle_read(const boost::system::error_code &error, std::size_t bytes_transferred) {
std::cout << "Enter handle_read (" << error.message() << ")\n";
if (!error) {
if (bytes_transferred > 0) {
std::copy_n(std::istreambuf_iterator<char>(&data), bytes_transferred, std::ostreambuf_iterator<char>(std::cout << "handle_read got data: '"));
std::cout << "' --\n";
data.consume(delimiter.size());
}
do_read_loop();
}
}
};
int main() {
int fds[2];
int& readFd = fds[0];
int& writeFd = fds[1];
if (pipe(fds) == -1) {
std::cout << "pipe error" << std::endl;
return 1;
}
if (int pid = fork()) {
ba::io_service ioService;
// parent
if (pid == -1) {
std::cerr << "fork error" << std::endl;
return 2;
} else {
close(writeFd);
std::make_shared<Reader>(ioService, ::dup(readFd))->do_read_loop();
}
ioService.run();
} else {
ba::io_service ioService;
// child
if (dup2(writeFd, STDOUT_FILENO) == -1) {
std::cerr << "dup2 error" << std::endl;
return 3;
}
close(readFd);
close(writeFd);
if (execlp("mpstat", "mpstat", "-P", "ALL", 0) == -1) {
std::cerr << "execlp error" << std::endl;
return 4;
}
ioService.run();
}
}
Note it's not continuous for obvious reasons on Coliru
更简单:升压过程
为什么不改用 Boost Process?
#include <boost/process.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
int main() {
bp::pstream output;
bp::system("mpstat -P ALL", bp::std_out > output);
for (std::string line; std::getline(output, line);) {
std::cout << "Got: " << std::quoted(line) << "\n";
}
}
或者让它再次异步:
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
using Args = std::vector<std::string>;
int main() {
boost::asio::io_service io;
bp::async_pipe output(io);
bp::child mpstat(bp::search_path("mpstat"),
//Args { "1", "-P", "ALL" },
Args { "1", "3" }, // limited to 3 iterations on Coliru
bp::std_out > output, io);
boost::asio::streambuf data;
std::function<void(boost::system::error_code, size_t)> handle = [&](boost::system::error_code ec, size_t /*bytes_transferred*/) {
if (ec) {
std::cout << "Good bye (" << ec.message() << ")\n";
} else {
std::string line;
std::getline(std::istream(&data), line);
std::cout << "Got: " << std::quoted(line) << "\n";
async_read_until(output, data, "\n", handle);
}
};
async_read_until(output, data, "\n", handle);
io.run();
}
版画
Got: "Linux 4.4.0-57-generic (stacked-crooked) 08/26/18 _x86_64_ (4 CPU)"
Got: ""
Got: "13:23:20 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle"
Got: "13:23:21 all 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00"
Got: "13:23:22 all 0.00 0.00 0.00 0.25 0.00 0.00 0.00 0.00 0.00 99.75"
Got: "13:23:23 all 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00"
Got: "Average: all 0.00 0.00 0.00 0.08 0.00 0.00 0.00 0.00 0.00 99.92"
Good bye (End of file)
我尝试读取 mpstat 命令的输出(每秒收集 cpu 信息,即:"mptstat -P ALL 1")以获得有关 cpu 和核心使用情况的信息。在多核 cpu 上,我在读取第一个测量值后得到了意外的 "end of file" 状态。
mpstat 似乎以这样一种方式格式化其输出,即所有核心的测量值都由一个空行分隔。
我使用了 async_read_until,分隔符等于 '\n'。
请在下面找到一个小型复制器。使用此复制器,我得到以下信息:
Enter handle_read
handle_read got data: --Linux 4.13.0-46-generic (pierre) 26/08/2018 _x86_64_ (4 CPU)
--Enter handle_read
handle_read got data: --
11:39:11 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
11:39:11 all--Enter handle_read
handle_read got data: -- 5,69 0,06 2,90 0,20 0,00 0,02 0,00 0,00 0,00 91,13
11:39:11 0 5,95 0,05--Enter handle_read
handle_read got data: -- 2,80 0,13 0,00 0,01 0,00 0,00 0,00 91,06
11:39:11 1 5,24 0,01 2,50 0,14 0,00 0,02 0,00 0,00 0,00 92,09
11:39:11 2 6,30 0,17--Enter handle_read
handle_read got data: -- 2,29 0,36 0,00 0,04 0,00 0,00 0,00 90,85
11:39:11 3 5,28 0,01 4,01 0,17 0,00 0,00 0,00 0,00 0,00 90,52
--Enter handle_read
Problem while trying to read mpstat data: End of file
基本上,我能够收到第一个测量结果,但之后我立即收到 "end of file"。看起来 mpstat 发出的空行是我得到 "end of line" 指示的原因...但我不明白为什么...
有人可以提供一些帮助吗?非常感谢。
#include <boost/asio.hpp>
#include <unistd.h>
#include <iostream>
#define PIPE_READ 0
#define PIPE_WRITE 1
#define ENDOFLINE "\n"
static boost::asio::streambuf data;
static std::shared_ptr<boost::asio::posix::stream_descriptor> cpuLoadDataStream;
static void handle_read(const boost::system::error_code& error, std::size_t bytes_transferred) {
printf("Enter handle_read\n");
if (error == boost::system::errc::success) {
if (data.size() > 0) {
std::string dataReceived((std::istreambuf_iterator<char>(&data)), std::istreambuf_iterator<char>());
std::cout << "handle_read got data: " << "--" << dataReceived << "--";
}
boost::asio::async_read_until(*cpuLoadDataStream, data, ENDOFLINE, handle_read);
} else {
std::cout << "Problem while trying to read mpstat data: " << error.message() << std::endl;
}
}
int main() {
int pipeFd[2];
boost::asio::io_service ioService;
if (pipe(pipeFd) < 0) {
std::cout << "pipe error" << std::endl;
exit(EXIT_FAILURE);
}
int pid;
if ((pid = fork()) == 0) {
// son
close(pipeFd[PIPE_READ]);
if (dup2(pipeFd[PIPE_WRITE], STDOUT_FILENO) == -1) {
std::cout << "dup2 error" << std::endl;
exit(EXIT_FAILURE);
}
close(pipeFd[PIPE_WRITE]);
if (execlp("mpstat", "1", "-P", "ALL", 0) == -1) {
std::cout << "execlp error" << std::endl;
exit(EXIT_FAILURE);
}
} else {
// parent
if (pid == -1) {
std::cout << "fork error" << std::endl;
exit(EXIT_FAILURE);
} else {
close(pipeFd[PIPE_WRITE]);
cpuLoadDataStream = std::make_shared<boost::asio::posix::stream_descriptor>(ioService, ::dup(pipeFd[PIPE_READ]));
boost::asio::async_read_until(*cpuLoadDataStream, data, ENDOFLINE, handle_read);
}
}
ioService.run();
return 1;
}
首先:如果您希望 "1"
表示 mpstat
以 1 秒的间隔重复运行,您必须将其作为第一个参数,而不是进程名称:
if (execlp("mpstat", "mpstat", "1", "-P", "ALL", 0) == -1) {
The first argument, by convention, should point to the filename associated with the file being executed.
async_read_until
读取直到输入缓冲区 至少 包含分隔符。因此,当您读取缓冲区时,必须考虑 bytes_tranferred
将不包括定界符。
确保也使用分隔符以避免陷入无限循环(因为已经满足停止条件):
void handle_read(const boost::system::error_code &error, std::size_t bytes_transferred) {
std::cout << "Enter handle_read (" << error.message() << ")\n";
if (!error) {
if (bytes_transferred > 0) {
std::copy_n(std::istreambuf_iterator<char>(&data), bytes_transferred, std::ostreambuf_iterator<char>(std::cout << "handle_read got data: '"));
std::cout << "' --\n";
data.consume(delimiter.size());
}
do_read_loop();
}
}
或更简单:
void handle_read(const boost::system::error_code &error, std::size_t bytes_transferred) {
std::cout << "Enter handle_read (" << error.message() << ")\n";
if (!error) {
std::string line;
if (getline(std::istream(&data), line))
std::cout << "handle_read got data: '" << line << "'\n";
do_read_loop();
}
}
我更喜欢第一个,因为它更明确且普遍适用。
Side Notes:
there was a problem with the way you used global data (that would only get destroyed after the corresponding
io_servce
was already out of scope). That was UB - using asan/ubsan allows you to spot these bugs.Using
io_service
across forks is not supported without support from the service implementations, see https://www.boost.org/doc/libs/1_68_0/doc/html/boost_asio/reference/io_context/notify_fork.html
Fixed/Simplified 亚西欧
这里进行了一些简化,并删除了全局变量:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <unistd.h>
namespace ba = boost::asio;
struct Reader : std::enable_shared_from_this<Reader> {
ba::streambuf data;
ba::posix::stream_descriptor cpuLoadDataStream;
std::string const delimiter = "\n";
Reader(ba::io_service& svc, int fd)
: cpuLoadDataStream(svc, fd) {}
void do_read_loop() {
async_read_until(cpuLoadDataStream, data, delimiter, boost::bind(&Reader::handle_read, shared_from_this(), _1, _2));
}
void handle_read(const boost::system::error_code &error, std::size_t bytes_transferred) {
std::cout << "Enter handle_read (" << error.message() << ")\n";
if (!error) {
if (bytes_transferred > 0) {
std::copy_n(std::istreambuf_iterator<char>(&data), bytes_transferred, std::ostreambuf_iterator<char>(std::cout << "handle_read got data: '"));
std::cout << "' --\n";
data.consume(delimiter.size());
}
do_read_loop();
}
}
};
int main() {
int fds[2];
int& readFd = fds[0];
int& writeFd = fds[1];
if (pipe(fds) == -1) {
std::cout << "pipe error" << std::endl;
return 1;
}
if (int pid = fork()) {
ba::io_service ioService;
// parent
if (pid == -1) {
std::cerr << "fork error" << std::endl;
return 2;
} else {
close(writeFd);
std::make_shared<Reader>(ioService, ::dup(readFd))->do_read_loop();
}
ioService.run();
} else {
ba::io_service ioService;
// child
if (dup2(writeFd, STDOUT_FILENO) == -1) {
std::cerr << "dup2 error" << std::endl;
return 3;
}
close(readFd);
close(writeFd);
if (execlp("mpstat", "mpstat", "-P", "ALL", 0) == -1) {
std::cerr << "execlp error" << std::endl;
return 4;
}
ioService.run();
}
}
Note it's not continuous for obvious reasons on Coliru
更简单:升压过程
为什么不改用 Boost Process?
#include <boost/process.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
int main() {
bp::pstream output;
bp::system("mpstat -P ALL", bp::std_out > output);
for (std::string line; std::getline(output, line);) {
std::cout << "Got: " << std::quoted(line) << "\n";
}
}
或者让它再次异步:
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
using Args = std::vector<std::string>;
int main() {
boost::asio::io_service io;
bp::async_pipe output(io);
bp::child mpstat(bp::search_path("mpstat"),
//Args { "1", "-P", "ALL" },
Args { "1", "3" }, // limited to 3 iterations on Coliru
bp::std_out > output, io);
boost::asio::streambuf data;
std::function<void(boost::system::error_code, size_t)> handle = [&](boost::system::error_code ec, size_t /*bytes_transferred*/) {
if (ec) {
std::cout << "Good bye (" << ec.message() << ")\n";
} else {
std::string line;
std::getline(std::istream(&data), line);
std::cout << "Got: " << std::quoted(line) << "\n";
async_read_until(output, data, "\n", handle);
}
};
async_read_until(output, data, "\n", handle);
io.run();
}
版画
Got: "Linux 4.4.0-57-generic (stacked-crooked) 08/26/18 _x86_64_ (4 CPU)"
Got: ""
Got: "13:23:20 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle"
Got: "13:23:21 all 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00"
Got: "13:23:22 all 0.00 0.00 0.00 0.25 0.00 0.00 0.00 0.00 0.00 99.75"
Got: "13:23:23 all 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00"
Got: "Average: all 0.00 0.00 0.00 0.08 0.00 0.00 0.00 0.00 0.00 99.92"
Good bye (End of file)