5.5.1 使用“zero-copy”
功能背景
通信是机器人开发引擎的基础功能,原生ROS2 Foxy进行大数据量通信时存在时延较大、系统负载较高等问题。TogetheROS.Bot Foxy基于RDK系统软件库hbmem实现了“zero-copy”功能,数据跨进程传输零拷贝,可大大减少大块数据传输延时和系统资源占用。本节介绍如何使用tros.b Foxy和Humble创建publisher和subscriber node进行大块数据传输,并计算传输延时。
前置条件
已按照安装成功安装tros.b,并已掌握ROS2 node,topic,qos等基础知识,以及如何创建package和使用自定义消息,具体教程可见ROS2官方文档。
ROS2软件包构建、编译等工具。安装命令:sudo apt install ros-dev-tools
任务内容
1. 创建package
打开一个新的终端,source tros.b setup脚本,确保ros2
命令可以运行。
- Foxy
- Humble
source /opt/tros/setup.bash
source /opt/tros/humble/setup.bash
使用以下命令创建一个workspace,详细介绍可见ROS2 官方教程Creating a workspace。
mkdir -p ~/dev_ws/src
cd ~/dev_ws/src
运行以下命令创建一个package
ros2 pkg create --build-type ament_cmake hbmem_pubsub
2. 创建自定义消息
2.1 新建消息文件
运行以下命令,创建msg
目录用来存放自定义消息文件
cd ~/dev_ws/src/hbmem_pubsub
mkdir msg
在msg
目录下新建SampleMessage.msg
文件,具体内容如下:
int32 index
uint64 time_stamp
uint8[4194304] data
uint32 MAX_SIZE=4194304
2.2 编译依赖
返回到~/dev_ws/src/hbmem_pubsub
目录,修改package.xml
,在<buildtool_depend>ament_cmake</buildtool_depend>
下面添加以下内容:
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
2.3 编译脚本
修改CMakeLists.txt
,在# find_package(<dependency> REQUIRED)
下面添加以下内容,进行msg编译:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SampleMessage.msg"
)
3. 创建消息发布节点
3.1 新建消息发布节点文件
在~/dev_ws/src/hbmem_pubsub/src
目录下新建 publisher_hbmem.cpp
文件,用来创建publisher node,具体代码和解释如下:
- Foxy
- Humble
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"
using namespace std::chrono_literals;
class MinimalHbmemPublisher : public rclcpp::Node {
public:
MinimalHbmemPublisher () : Node("minimal_hbmem_publisher"), count_(0) {
// 创建publisher_hbmem,topic为"topic"
publisher_ = this->create_publisher_hbmem<hbmem_pubsub::msg::SampleMessage>(
"topic", rclcpp::SensorDataQoS());
// 定时器,每隔40毫秒调用一次timer_callback进行消息发送
timer_ = this->create_wall_timer(
40ms, std::bind(&MinimalHbmemPublisher ::timer_callback, this));
}
private:
// 定时器回调函数
void timer_callback() {
// 获取要发送的消息
auto loanedMsg = publisher_->borrow_loaned_message();
// 判断消息是否可用,可能出现获取消息失败导致消息不可用的情况
if (loanedMsg.is_valid()) {
// 引用方式获取实际的消息
auto& msg = loanedMsg.get();
// 获取当前时间,单位为us
auto time_now =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
// 对消息的index和time_stamp进行赋值
msg.index = count_;
msg.time_stamp = time_now;
// 打印发送消息
RCLCPP_INFO(this->get_logger(), "message: %d", msg.index);
publisher_->publish(std::move(loanedMsg));
// 注意,发送后,loanedMsg已不可用
// 计数器加一
count_++;
} else {
// 获取消息失败,丢弃该消息
RCLCPP_INFO(this->get_logger(), "Failed to get LoanMessage!");
}
}
// 定时器
rclcpp::TimerBase::SharedPtr timer_;
// hbmem publisher
rclcpp::PublisherHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr publisher_;
// 计数器
size_t count_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalHbmemPublisher>());
rclcpp::shutdown();
return 0;
}
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"
using namespace std::chrono_literals;
class MinimalHbmemPublisher : public rclcpp::Node {
public:
MinimalHbmemPublisher () : Node("minimal_hbmem_publisher"), count_(0) {
// 创建publisher_hbmem,topic为"topic"
publisher_ = this->create_publisher<hbmem_pubsub::msg::SampleMessage>(
"topic", rclcpp::SensorDataQoS());
// 定时器,每隔40毫秒调用一次timer_callback进行消息发送
timer_ = this->create_wall_timer(
40ms, std::bind(&MinimalHbmemPublisher ::timer_callback, this));
}
private:
// 定时器回调函数
void timer_callback() {
// 获取要发送的消息
auto loanedMsg = publisher_->borrow_loaned_message();
// 判断消息是否可用,可能出现获取消息失败导致消息不可用的情况
if (loanedMsg.is_valid()) {
// 引用方式获取实际的消息
auto& msg = loanedMsg.get();
// 获取当前时间,单位为us
auto time_now =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
// 对消息的index和time_stamp进行赋值
msg.index = count_;
msg.time_stamp = time_now;
// 打印发送消息
RCLCPP_INFO(this->get_logger(), "message: %d", msg.index);
publisher_->publish(std::move(loanedMsg));
// 注意,发送后,loanedMsg已不可用
// 计数器加一
count_++;
} else {
// 获取消息失败,丢弃该消息
RCLCPP_INFO(this->get_logger(), "Failed to get LoanMessage!");
}
}
// 定时器
rclcpp::TimerBase::SharedPtr timer_;
// hbmem publisher
rclcpp::Publisher<hbmem_pubsub::msg::SampleMessage>::SharedPtr publisher_;
// 计数器
size_t count_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalHbmemPublisher>());
rclcpp::shutdown();
return 0;
}
3.2 编译依赖
返回到~/dev_ws/src/hbmem_pubsub
目录,修改package.xml
,在<member_of_group>rosidl_interface_packages</member_of_group>
下面增加rclcpp
依赖:
<depend>rclcpp</depend>
3.3 编译脚本
修改CMakeLists.txt
,在rosidl_generate_interfaces
语句下面添加以下内容,完成publisher编译:
find_package(rclcpp REQUIRED)
add_executable(talker src/publisher_hbmem.cpp)
ament_target_dependencies(talker rclcpp)
rosidl_target_interfaces(talker
${PROJECT_NAME} "rosidl_typesupport_cpp")
install(TARGETS
talker
DESTINATION lib/${PROJECT_NAME})
4. 创建消息接收节点
4.1 新建消息接收节点文件
在~/dev_ws/src/hbmem_pubsub/src
目录下新建 subscriber_hbmem.cpp
文件,用来创建subscriber node,具体代码和解释如下:
- Foxy
- Humble
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"
class MinimalHbmemSubscriber : public rclcpp::Node {
public:
MinimalHbmemSubscriber () : Node("minimal_hbmem_subscriber") {
// 创建subscription_hbmem,topic为"sample"
// 消息回调函数为topic_callback
subscription_ =
this->create_subscription_hbmem<hbmem_pubsub::msg::SampleMessage>(
"topic", rclcpp::SensorDataQoS(),
std::bind(&MinimalHbmemSubscriber ::topic_callback, this,
std::placeholders::_1));
}
private:
// 消息回调函数
void topic_callback(
const hbmem_pubsub::msg::SampleMessage::SharedPtr msg) const {
// 注意,msg只能在回调函数中使用,回调函数返回后,该消息就会被释放
// 获取当前时间
auto time_now =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
// 计算延时并打印出来
RCLCPP_INFO(this->get_logger(), "msg %d, time cost %dus", msg->index,
time_now - msg->time_stamp);
}
// hbmem subscription
rclcpp::SubscriptionHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr
subscription_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalHbmemSubscriber>());
rclcpp::shutdown();
return 0;
}
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"
class MinimalHbmemSubscriber : public rclcpp::Node {
public:
MinimalHbmemSubscriber () : Node("minimal_hbmem_subscriber") {
// 创建subscription_hbmem,topic为"sample"
// 消息回调函数为topic_callback
subscription_ =
this->create_subscription<hbmem_pubsub::msg::SampleMessage>(
"topic", rclcpp::SensorDataQoS(),
std::bind(&MinimalHbmemSubscriber ::topic_callback, this,
std::placeholders::_1));
}
private:
// 消息回调函数
void topic_callback(
const hbmem_pubsub::msg::SampleMessage::SharedPtr msg) const {
// 注意,msg只能在回调函数中使用,回调 函数返回后,该消息就会被释放
// 获取当前时间
auto time_now =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
// 计算延时并打印出来
RCLCPP_INFO(this->get_logger(), "msg %d, time cost %dus", msg->index,
time_now - msg->time_stamp);
}
// hbmem subscription
rclcpp::Subscription<hbmem_pubsub::msg::SampleMessage>::SharedPtr
subscription_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalHbmemSubscriber>());
rclcpp::shutdown();
return 0;
}
4.2 编译脚本
返回到~/dev_ws/src/hbmem_pubsub
目录,之前已经在package.xml
中增加rclcpp
依赖,故不需要需改package.xml
。
修改CMakeLists.txt
,在install
语句下面添加以下内容,完成subscriber编译:
add_executable(listener src/subscriber_hbmem.cpp)
ament_target_dependencies(listener rclcpp)
rosidl_target_interfaces(listener
${PROJECT_NAME} "rosidl_typesupport_cpp")
install(TARGETS
listener
DESTINATION lib/${PROJECT_NAME})