ROS发布与订阅处理顺序以及多线程使用

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

ROS发布与订阅处理顺序以及多线程使用

一、前言

记录ROS发布与订阅处理顺序以及多线程使用,由于情况很多,所以不一一细讲,有需要的可以自己尝试。这里简单讲解两个话题以及两个订阅者的情况。

二、发布者

发布方发布两个话题:

ROS发布与订阅处理顺序以及多线程使用
talker发布两个话题,然后每个话题都各自开辟自己的队列 (有一些人说之开辟一个队列,个人觉得不严谨,只是因为话题处理是单线程,所以和简化成一个队列类似。这里可以去了解以下话题通讯协议)。如何队列被占满,就会丢弃旧的信息。
可以修改参数:
1、话题1的队列长度
2、话题2的队列长度
3、话题1的发布频率
4、话题2的发布频率
#include "ros/ros.h"
#include "std_msgs/String.h"
#include

int main(int argc, char **argv) {
  ros::init(argc, argv, "talker");
  ros::NodeHandle n;

  ros::Publisher message_a_pub = n.advertise<std_msgs::String>("MessageA", 1000);
  ros::Publisher message_b_pub = n.advertise<std_msgs::String>("MessageB", 1000);

  int message_a_count = 0;
  int message_b_count = 0;

  while (0 == message_a_pub.getNumSubscribers() || 0 == message_b_pub.getNumSubscribers()) {
    ROS_INFO("Waiting for subscribers to connect");
    ros::Duration(0.1).sleep();
  }

  ros::Timer timer_a = n.createTimer(ros::Duration(0.1), [&](const ros::TimerEvent&) {
    std_msgs::String msg;
    std::stringstream ss;
    ss << "Messages A: " << message_a_count;
    msg.data = ss.str();
    ROS_INFO("%s", msg.data.c_str());
    message_a_pub.publish(msg);
    message_a_count++;
  });

  ros::Timer timer_b = n.createTimer(ros::Duration(0.1), [&](const ros::TimerEvent&) {
    std_msgs::String msg;
    std::stringstream ss;
    ss << "Messages B: " << message_b_count;
    msg.data = ss.str();
    ROS_INFO("%s", msg.data.c_str());
    message_b_pub.publish(msg);
    message_b_count++;
  });

  ros::spin();

  return 0;
}

三、订阅者

订阅者订阅两个信息,并在回调函数处理。
由于订阅者是没有所谓的订阅频率的(和程序的处理速度有关),这里使用简单的休眠来模拟”订阅频率”。 &#x7B80;&#x5355;&#x7684;&#x8BDD;&#x9898;&#x8BA2;&#x9605;&#x9891;&#x7387;&#x9ED8;&#x8BA4;&#x548C;&#x53D1;&#x5E03;&#x9891;&#x7387;&#x4E00;&#x6837;&#x3002;
可以修改参数:
1、订阅话题1的队列长度
2、订阅话题2的队列长度
3、订阅话题1的发布”频率”(在回调函数设置休眠来模拟)
4、订阅话题2的发布”频率”(在回调函数设置休眠来模拟)

#include
#include
#include
#include "ros/ros.h"
#include "std_msgs/String.h"
using namespace std::chrono_literals;

void CallbackA(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());

}

void CallbackB(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());
  ros::Rate rate(1);
  rate.sleep();
}

int main(int argc, char **argv) {
  ros::init(argc, argv, "listener");
  ros::NodeHandle n;

  ros::NodeHandle n_a;

  ros::Subscriber sub_a = n_a.subscribe("MessageA", 100, CallbackA);
  ros::Subscriber sub_b = n.subscribe("MessageB", 10, CallbackB);

  ros::MultiThreadedSpinner spinner(2);
  spinner.spin();

  return 0;
}

四、测试结果

1、单线程

  1. 测试1
&#x53D1;&#x5E03;&#x8005;&#x53C2;&#x6570;&#x8BBE;&#x7F6E;&#xFF08;1000&#xFF0C;1000&#xFF0C;0.1&#xFF0C;0.1&#xFF09; //&#x5206;&#x522B;&#x8868;&#x793A;&#x8BDD;&#x9898;1&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x8BDD;&#x9898;2&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x53D1;&#x5E03;&#x9891;&#x7387;&#xFF0C;&#x53D1;&#x5E03;&#x9891;&#x7387;
&#x8BA2;&#x9605;&#x8005;&#x53C2;&#x6570;&#x8BBE;&#x7F6E;&#xFF08;1&#xFF0C;1&#xFF0C;0&#xFF0C;0&#xFF09;  //&#x5206;&#x522B;&#x8868;&#x793A;&#x8BDD;&#x9898;1&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x8BDD;&#x9898;2&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x8BA2;&#x9605;&#x9891;&#x7387;&#xFF08;0&#x8868;&#x793A;&#x4E0D;&#x8BBE;&#x7F6E;&#x4F11;&#x7720;&#xFF09;&#xFF0C;&#x8BA2;&#x9605;&#x9891;&#x7387;&#xFF08;&#x8BBE;&#x7F6E;&#x4F11;&#x7720;0s&#xFF09;

ROS发布与订阅处理顺序以及多线程使用
话题交替出现。 &#x5982;&#x679C;&#x56DE;&#x8C03;&#x51FD;&#x6570;&#x662F;&#x7B80;&#x5355;&#x6570;&#x636E;&#x5904;&#x7406;&#xFF0C;&#x57FA;&#x672C;&#x90FD;&#x662F;&#x6309;&#x8FD9;&#x79CD;&#x7406;&#x60F3;&#x65B9;&#x5F0F;&#x51FA;&#x73B0;&#xFF0C;&#x4F46;&#x662F;&#x5982;&#x679C;&#x56DE;&#x8C03;&#x51FD;&#x6570;&#x5904;&#x7406;&#x901F;&#x5EA6;&#x6BD4;&#x53D1;&#x5E03;&#x9891;&#x7387;&#x6162;&#xFF0C;&#x90A3;&#x4E48;&#x5C31;&#x5BF9;&#x51FA;&#x73B0;&#x5404;&#x79CD;&#x60C5;&#x51B5;。在这时候,话题的队列长度就可以体现作用。(如果想了解的,可以修改休眠时间和队列长度,去看看实际效果)
2. 测试2
参数:
&#x53D1;&#x5E03;&#x8005;&#x53C2;&#x6570;&#x8BBE;&#x7F6E;&#xFF08;1000&#xFF0C;1000&#xFF0C;0.1&#xFF0C;0.1&#xFF09; //&#x5206;&#x522B;&#x8868;&#x793A;&#x8BDD;&#x9898;1&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x8BDD;&#x9898;2&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x53D1;&#x5E03;&#x9891;&#x7387;&#xFF0C;&#x53D1;&#x5E03;&#x9891;&#x7387;
&#x8BA2;&#x9605;&#x8005;&#x53C2;&#x6570;&#x8BBE;&#x7F6E;&#xFF08;1&#xFF0C;1&#xFF0C;0&#xFF0C;1&#xFF09;  //&#x5206;&#x522B;&#x8868;&#x793A;&#x8BDD;&#x9898;1&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x8BDD;&#x9898;2&#x961F;&#x5217;&#x957F;&#x5EA6;&#xFF0C;&#x8BA2;&#x9605;&#x9891;&#x7387;&#xFF08;0&#x8868;&#x793A;&#x4E0D;&#x8BBE;&#x7F6E;&#x4F11;&#x7720;&#xFF09;&#xFF0C;&#x8BA2;&#x9605;&#x9891;&#x7387;&#xFF08;&#x8BBE;&#x7F6E;&#x4F11;&#x7720;1s&#xFF09;

结果:

ROS发布与订阅处理顺序以及多线程使用
订阅话题是按照1Hz频率来订阅,虽然订阅话题1没有设置休眠,但是还是和话题2一样,主要原因是,订阅话题是单线程。就像将订阅话题放进一个队列来处理一样,处理完一个,才能处理下一个。所以导致两个话题发布频率一样。解决方法办法是使用多线程(少量数据的时候都ok,但是长时间处理没去试)。

2、多线程

订阅者代码:

#include
#include
#include
#include "ros/ros.h"
#include "std_msgs/String.h"
using namespace std::chrono_literals;

void CallbackA(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());

}

void CallbackB(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());
  ros::Rate rate(1);
  rate.sleep();
}

int main(int argc, char **argv) {
  ros::init(argc, argv, "listener");
  ros::NodeHandle n;

  ros::NodeHandle n_a;

  ros::Subscriber sub_a = n_a.subscribe("MessageA", 1, CallbackA);
  ros::Subscriber sub_b = n.subscribe("MessageB", 1, CallbackB);

  ros::MultiThreadedSpinner spinner(2);
  spinner.spin();

  return 0;
}

结果:

ROS发布与订阅处理顺序以及多线程使用
话题A的频率变成10Hz,而话题B的频率是1Hz。

3、单独为某个话题开辟线程


#include
#include
#include
#include "ros/ros.h"
#include "std_msgs/String.h"
using namespace std::chrono_literals;

void CallbackA(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());

}

void CallbackB(const std_msgs::String::ConstPtr& msg) {
  ROS_INFO(" I heard: [%s]", msg->data.c_str());
  ros::Rate rate(1);
  rate.sleep();
}

int main(int argc, char **argv) {
  ros::init(argc, argv, "listener");
  ros::NodeHandle n;

  ros::NodeHandle n_a;
  ros::CallbackQueue callback_queue_a;
  n_a.setCallbackQueue(&callback_queue_a);
  ros::Subscriber sub_a = n_a.subscribe("MessageA", 1, CallbackA);
  ros::Subscriber sub_b = n.subscribe("MessageB", 1, CallbackB);

  std::thread spinner_thread_a([&callback_queue_a]() {
    ros::SingleThreadedSpinner spinner_a;
    spinner_a.spin(&callback_queue_a);
  });

  ros::spin();
  spinner_thread_a.join();

  return 0;
}

结果:

ROS发布与订阅处理顺序以及多线程使用
和多线程一样。

五、总结

  1. 话题发布基本流程:每一个话题都各自开辟自己队列,然后把数据发布到队列中,等待订阅者读取,如果订阅者来不及处理,那么当发布者队列被沾满后,就会将旧的数据覆盖。
  2. 订阅者的订阅频率取决于回调函数的处理速度,单线程时,回调函数的频率是一样的,等于所有回调函数处理时间之和的倒数。
  3. 如果多线程,可以避免部分数据丢失。

注:实际代码可能会出现更复杂情况,而不是简单的休眠来替代回调函数处理时间。

Original: https://blog.csdn.net/qq_16539009/article/details/123526165
Author: 柯藤
Title: ROS发布与订阅处理顺序以及多线程使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/559248/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球