ROS 订阅者回调丢失消息
ROS subscriber callback missing messages
总结:我有一个节点以 ~300hz 的频率发布消息,但订阅另一个节点中的主题的回调仅在 ~25hz 时被调用。订阅者节点中的 spinOnce 在 ~700hz 被调用,所以我不知道为什么它丢失消息。
发布者节点:
#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>
...
int main(int argc, char** argv)
{
ros::init(argc, argv, "sim_node");
ros::NodeHandle nh;
...
// Publishers
tf::TransformBroadcaster tfbr;
ros::Publisher odomPub = nh.advertise<nav_msgs::Odometry>("pose",10);
...
ros::Rate r(300); // loop rate
while(ros::ok())
{
...
// Publish pose and velocity
...
odomPub.publish(msg);
ros::spinOnce();
r.sleep();
}
ros::waitForShutdown();
return 0;
}
订阅者节点:
#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>
...
std::mutex mtx1, mtx2;
class DataHandler
{
private:
ros::NodeHandle nh;
ros::Publisher odomPub;
double lastTime;
int lastSeq;
public:
Eigen::Vector3d x, xDot, w;
Eigen::Vector3d xDes, xDesDot, xDesDotDot, b1Des, b1DesDot;
Eigen::Matrix3d R;
DataHandler()
{
// Initialize data
xDes = Eigen::Vector3d(1,0,1);
xDesDot = Eigen::Vector3d::Zero();
xDesDotDot = Eigen::Vector3d::Zero();
b1Des = Eigen::Vector3d(1,0,0);
b1DesDot = Eigen::Vector3d::Zero();
x = Eigen::Vector3d::Zero();
xDot = Eigen::Vector3d::Zero();
R = Eigen::Matrix3d::Identity();
odomPub = nh.advertise<nav_msgs::Odometry>("controller_pose",10);
trajPub = nh.advertise<asap_control::DesiredTrajectory>("controller_desTraj",10);
lastTime = ros::Time::now().toSec();
lastSeq = 0;
}
// Get current pose and velocity
void odomCB(const nav_msgs::OdometryConstPtr& odomMsg)
{
mtx1.lock();
// Get data
double time1 = ros::Time::now().toSec();
x << odomMsg->pose.pose.position.x, odomMsg->pose.pose.position.y, ...;
xDot << odomMsg->twist.twist.linear.x, odomMsg->twist.twist.linear.y, ...;
R = Eigen::Quaterniond(odomMsg->pose.pose.orientation.w, odomMsg->pose.pose.orientation.x,...;
w << odomMsg->twist.twist.angular.x, odomMsg->twist.twist.angular.y, ...;
double time2 = ros::Time::now().toSec();
// Time to extract data, < 1ms
double delTproc = time2 - time1;
std::cout << "\n\n";
std::cout << "proc elapsed time: " << delTproc << "\n";
std::cout << "proc frequency: " << 1.0/delTproc << "\n";
odomPub.publish(odomMsg); // rostopic hz says this is publishing at ~25Hz
// Time between callback calls, ~25Hz
double timeNow = ros::Time::now().toSec();
double delT = timeNow - lastTime;
lastTime = timeNow;
std::cout << "elapsed time: " << delT << "\n";
std::cout << "frequency: " << 1.0/delT << "\n";
// Message sequence IDs, shows 12 msgs skipped every call
int seqNow = odomMsg->header.seq;
int delSeq = seqNow - lastSeq;
lastSeq = seqNow;
std::cout << "delta seq: " << delSeq << "\n";
mtx1.unlock();
}
};
...
int main(int argc, char** argv)
{
ros::init(argc, argv, "asap_control");
ros::NodeHandle nh;
...
// Publishers
ros::Publisher outputPub = nh.advertise<geometry_msgs::WrenchStamped>("wrench_command",10);
ros::Publisher debugPub = nh.advertise<control::ControllerSignals>("controller_debug",10);
tf::TransformBroadcaster tfbr;
// Subscribers
DataHandler callbacks;
ros::Subscriber poseSub = nh.subscribe("pose",10,&DataHandler::odomCB,&callbacks);
// Asynchronous threads for callback handling
//ros::AsyncSpinner spinner(2);
//spinner.start();
double lastTime = ros::Time::now().toSec();
// Main loop
ros::Rate r(700); // loop rate
while(ros::ok())
{
// Data (extracted for cleanliness further down, and thread safety)
mtx1.lock();
Eigen::Vector3d x = callbacks.x;
Eigen::Vector3d xDot = callbacks.xDot;
Eigen::Matrix3d R = callbacks.R;
Eigen::Vector3d w = callbacks.w;
mtx1.unlock();
...
// Publish
...
outputPub.publish(msg);
// Publish debug signals
control::ControllerSignals debugMsg;
debugMsg.x[0] = x(0);
...
debugPub.publish(debugMsg);
//double timeNow = ros::Time::now().toSec();
//double delT = timeNow - lastTime;
//lastTime = timeNow;
//std::cout << "\n\n";
//std::cout << "elapsed time: " << delT << "\n";
//std::cout << "frequency: " << 1.0/delT << "\n";
ros::spinOnce();
r.sleep();
}
ros::waitForShutdown();
return 0;
}
附加信息:
- 发布者以 ~300Hz 的频率发布(由“姿势”主题的
rostopic hz
确认)
- 订阅者节点中的主循环是 运行 ~700Hz(由循环中发布到“wrench_command”主题的
rostopic hz
确认,以及循环通过 ros::Time::now()
计时),因此,spinOnce
以相同的速率被调用。
- 姿势主题的回调以约 25 Hz 的频率调用(已在回调中发布到“controller_pose”主题的
rostopic hz
确认,以及通过 ros::Time::now()
)
- 即使我使用
AsyncSpinner
而不是 spinOnce
,我也会得到相同的行为,尽管只能使用 rostopic hz
确认。正如预期的那样,时序产生不稳定的输出
- 将订阅者 queue_length 增加到例如 10 会将回调率增加到 ~250Hz,但是,我想保持 queue_length 为 1 以仅获取最新数据。
- Ubuntu 中的系统监视器显示利用率不到 50% cpu,因此我认为这不是 cpu 瓶颈问题。
我前段时间解决了传输里程计数据的类似问题,其中里程计数据以 100Hz 的频率传输,但仅以 25Hz 的频率接收。事实证明,底层 TCP 套接字正在缓冲数据,将 4 条消息集中到一个 TCP 数据包中以节省传输成本。我相信您需要在 TransportHints():
中将 tcpNoDelay 设置为 true
node.subscribe(...,ros::TransportHints().tcpNoDelay(true));
请注意,这发生在 订阅 端。
http://wiki.ros.org/roscpp/Overview/Publishers%20and%20Subscribers#Transport_Hints
总结:我有一个节点以 ~300hz 的频率发布消息,但订阅另一个节点中的主题的回调仅在 ~25hz 时被调用。订阅者节点中的 spinOnce 在 ~700hz 被调用,所以我不知道为什么它丢失消息。
发布者节点:
#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>
...
int main(int argc, char** argv)
{
ros::init(argc, argv, "sim_node");
ros::NodeHandle nh;
...
// Publishers
tf::TransformBroadcaster tfbr;
ros::Publisher odomPub = nh.advertise<nav_msgs::Odometry>("pose",10);
...
ros::Rate r(300); // loop rate
while(ros::ok())
{
...
// Publish pose and velocity
...
odomPub.publish(msg);
ros::spinOnce();
r.sleep();
}
ros::waitForShutdown();
return 0;
}
订阅者节点:
#include <ros/ros.h>
#include <ros/console.h>
#include <nav_msgs/Odometry.h>
...
std::mutex mtx1, mtx2;
class DataHandler
{
private:
ros::NodeHandle nh;
ros::Publisher odomPub;
double lastTime;
int lastSeq;
public:
Eigen::Vector3d x, xDot, w;
Eigen::Vector3d xDes, xDesDot, xDesDotDot, b1Des, b1DesDot;
Eigen::Matrix3d R;
DataHandler()
{
// Initialize data
xDes = Eigen::Vector3d(1,0,1);
xDesDot = Eigen::Vector3d::Zero();
xDesDotDot = Eigen::Vector3d::Zero();
b1Des = Eigen::Vector3d(1,0,0);
b1DesDot = Eigen::Vector3d::Zero();
x = Eigen::Vector3d::Zero();
xDot = Eigen::Vector3d::Zero();
R = Eigen::Matrix3d::Identity();
odomPub = nh.advertise<nav_msgs::Odometry>("controller_pose",10);
trajPub = nh.advertise<asap_control::DesiredTrajectory>("controller_desTraj",10);
lastTime = ros::Time::now().toSec();
lastSeq = 0;
}
// Get current pose and velocity
void odomCB(const nav_msgs::OdometryConstPtr& odomMsg)
{
mtx1.lock();
// Get data
double time1 = ros::Time::now().toSec();
x << odomMsg->pose.pose.position.x, odomMsg->pose.pose.position.y, ...;
xDot << odomMsg->twist.twist.linear.x, odomMsg->twist.twist.linear.y, ...;
R = Eigen::Quaterniond(odomMsg->pose.pose.orientation.w, odomMsg->pose.pose.orientation.x,...;
w << odomMsg->twist.twist.angular.x, odomMsg->twist.twist.angular.y, ...;
double time2 = ros::Time::now().toSec();
// Time to extract data, < 1ms
double delTproc = time2 - time1;
std::cout << "\n\n";
std::cout << "proc elapsed time: " << delTproc << "\n";
std::cout << "proc frequency: " << 1.0/delTproc << "\n";
odomPub.publish(odomMsg); // rostopic hz says this is publishing at ~25Hz
// Time between callback calls, ~25Hz
double timeNow = ros::Time::now().toSec();
double delT = timeNow - lastTime;
lastTime = timeNow;
std::cout << "elapsed time: " << delT << "\n";
std::cout << "frequency: " << 1.0/delT << "\n";
// Message sequence IDs, shows 12 msgs skipped every call
int seqNow = odomMsg->header.seq;
int delSeq = seqNow - lastSeq;
lastSeq = seqNow;
std::cout << "delta seq: " << delSeq << "\n";
mtx1.unlock();
}
};
...
int main(int argc, char** argv)
{
ros::init(argc, argv, "asap_control");
ros::NodeHandle nh;
...
// Publishers
ros::Publisher outputPub = nh.advertise<geometry_msgs::WrenchStamped>("wrench_command",10);
ros::Publisher debugPub = nh.advertise<control::ControllerSignals>("controller_debug",10);
tf::TransformBroadcaster tfbr;
// Subscribers
DataHandler callbacks;
ros::Subscriber poseSub = nh.subscribe("pose",10,&DataHandler::odomCB,&callbacks);
// Asynchronous threads for callback handling
//ros::AsyncSpinner spinner(2);
//spinner.start();
double lastTime = ros::Time::now().toSec();
// Main loop
ros::Rate r(700); // loop rate
while(ros::ok())
{
// Data (extracted for cleanliness further down, and thread safety)
mtx1.lock();
Eigen::Vector3d x = callbacks.x;
Eigen::Vector3d xDot = callbacks.xDot;
Eigen::Matrix3d R = callbacks.R;
Eigen::Vector3d w = callbacks.w;
mtx1.unlock();
...
// Publish
...
outputPub.publish(msg);
// Publish debug signals
control::ControllerSignals debugMsg;
debugMsg.x[0] = x(0);
...
debugPub.publish(debugMsg);
//double timeNow = ros::Time::now().toSec();
//double delT = timeNow - lastTime;
//lastTime = timeNow;
//std::cout << "\n\n";
//std::cout << "elapsed time: " << delT << "\n";
//std::cout << "frequency: " << 1.0/delT << "\n";
ros::spinOnce();
r.sleep();
}
ros::waitForShutdown();
return 0;
}
附加信息:
- 发布者以 ~300Hz 的频率发布(由“姿势”主题的
rostopic hz
确认) - 订阅者节点中的主循环是 运行 ~700Hz(由循环中发布到“wrench_command”主题的
rostopic hz
确认,以及循环通过ros::Time::now()
计时),因此,spinOnce
以相同的速率被调用。 - 姿势主题的回调以约 25 Hz 的频率调用(已在回调中发布到“controller_pose”主题的
rostopic hz
确认,以及通过ros::Time::now()
) - 即使我使用
AsyncSpinner
而不是spinOnce
,我也会得到相同的行为,尽管只能使用rostopic hz
确认。正如预期的那样,时序产生不稳定的输出 - 将订阅者 queue_length 增加到例如 10 会将回调率增加到 ~250Hz,但是,我想保持 queue_length 为 1 以仅获取最新数据。
- Ubuntu 中的系统监视器显示利用率不到 50% cpu,因此我认为这不是 cpu 瓶颈问题。
我前段时间解决了传输里程计数据的类似问题,其中里程计数据以 100Hz 的频率传输,但仅以 25Hz 的频率接收。事实证明,底层 TCP 套接字正在缓冲数据,将 4 条消息集中到一个 TCP 数据包中以节省传输成本。我相信您需要在 TransportHints():
中将 tcpNoDelay 设置为 truenode.subscribe(...,ros::TransportHints().tcpNoDelay(true));
请注意,这发生在 订阅 端。
http://wiki.ros.org/roscpp/Overview/Publishers%20and%20Subscribers#Transport_Hints