roscpp 底层通讯协议更改

        ROS为机器人开发者们提供了不同语言的编程接口,其中C++接口叫做roscpp,用来创建topic、service、param,实现ROS的通信功能。

roscpp is a C++ implementation of ROS. It provides a client library that enables C++ programmers to quickly interface with ROS TopicsServices, and Parameters. roscpp is the most widely used ROS client library and is designed to be the high-performance library for ROS.

        roscpp 底层采用的通讯协议为TCP/UDP,即无论使用ros的topic还是service,都是通过tcp或udp进行传输。

ros-melodic topic 传输机制

        ros topic 的pub sub 过程如下图,talker通过rpc向ros master 注册pub topic,listener通过rpc向ros master 注册sub topic。当ros master 发现listenser注册的sub topic 有人pub时,会把pub此topic的talker信息发送给listener,然后listener和talker通过rpc建立 tcp/udp连接。后续二者通过tcp/udp连接进行通讯,ros master不再参与。

        ros 底层通过 transport 实现talker和listener的通信。transport成对建立,为点对点通讯形式。当talker发布的一个topic被多个listener订阅时,ros会为每个listener都建立一个transport用于与其通信,当talker publish一个message时,ros通过遍历所有transport为每个listener发送message。

 roscpp_jbus 通信原理

        roscpp 是点对点通讯形式,talker 会为每一个listener创建一个transport。而jbus是点对面通讯,所以我们要改变pub的行为,即无论有几个listener,talker只创建一个transport,由jbus通过tipc广播给每一个listener。

 

        Class Transport 是ros抽象底层通信行为的基类。我们通过继承它来实现对roscpp底层通讯协议的更改,通过仿照Class TransportUDP编写Class TransportJBUS。

#ifndef ROSCPP_TRANSPORT_JBUS_H
#define ROSCPP_TRANSPORT_JBUS_H

#include <ros/types.h>
#include <ros/transport/transport.h>

#include <boost/thread/mutex.hpp>
#include "ros/io.h"
#include <ros/common.h>
#include <queue.h>
#include <jbus.h>
#include <vector>
namespace ros {
#define ROSJBUS_ID_FMT_     "ROSJBUS_%d"
#define MAX_RECV_QUEUE_     32

class TransportJBUS;
typedef boost::shared_ptr<TransportJBUS> TransportJBUSPtr;

class PollSet;

class ROSCPP_DECL TransportJBUS : public Transport {
  public:
	TransportJBUS(PollSet* poll_set, int max_datagram_size = 0);
	~TransportJBUS();

	/**
	 * @brief 用于链接jbus,只在 ConnectionManager::start 调用一次
	 *
	 * @return true
	 * @return false
	 */
	bool connect_jbus();

	virtual void close();

	/**
	 * @brief Create a Incoming object
	 *
	 * @param topic_name
	 * @param user 实质为jbus_handle_t *
	 * @return true
	 * @return false
	 */
	bool createIncoming(const char *topic_name, void *user);

	/**
	 * @brief Create a Outgoing object
	 *
	 * @param max_datagram_size 取决于发送端的设置
	 * @param topic_name
	 * @return TransportJBUSPtr
	 */
	TransportJBUSPtr createOutgoing(int max_datagram_size, const char *topic_name);
	// overrides from Transport
	virtual int32_t read(uint8_t* buffer, uint32_t size);
	virtual int32_t write(uint8_t* buffer, uint32_t size);

	virtual void enableWrite();
	virtual void disableWrite();
	virtual void enableRead();
	virtual void disableRead();

	virtual std::string getTransportInfo();

	virtual bool requiresHeader() {
		return false;
	}

	virtual const char* getType() {
		return "JBUSROS";
	}

	int getMaxDatagramSize() const {
		return max_datagram_size_;
	}

	void setJbusHandleIndex(int index) {
		jbus_handle_index_ = index;
	}

	int getJbusHandleIndex() const {
		return jbus_handle_index_;
	}

	const char *getTopicName() {
		return (const char *)topic_name_;
	}
  private:
	/**
	 * @brief 初始化epoll,设置topic name
	 *
	 * @param topic_name
	 * @return true
	 * @return false
	 */
	bool initializeSocket(const char *topic_name);

	/**
	 * @brief epoll 处理函数
	 *
	 * @param events
	 */
	void socketUpdate(int events);

	/**
	 * @brief jbus 回调函数,我们通过他接收数据然后触发POLLIN
	 *
	 * @param arg jbus收到的数据
	 * @param user jbus_handle_t *,传他是因为若传this指针,当对象析构后,无法通过this指针判断
	 * @return enum jbus_cb_status
	 */
	static enum jbus_cb_status callback_raw(nmpk_msg_t * arg, void *user);

	/** data **/
	PollSet* poll_set_;
	uint32_t max_datagram_size_;		// 因为jbus层有切片,所以我们默认其为MSGSIZE_NMPK_MAX (64*1024*1024)
	bool closed_;
	boost::mutex close_mutex_;

	bool expecting_read_;				//
	bool expecting_write_;				//
	int notify_pipe[2];					// 实际用于epoll的句柄
	char *topic_name_;					// 我们用topic name来操作jbus的收发
	queue_t *recv_queue_;				// 这个队列用于暂存接收数据
	bytes_t *recv_buffer_;				// 数据读取,第一次先读头,第二次才真正的读数据,所以需要一个缓存
	int reorder_bytes_;					// recv_buffer_已读数据长度
	int jbus_handle_index_;				// 当用于sub时,记住其in_transport_vec_的索引号
};

/**
 * @brief in 类型的对象管理结构体
 *
 */
struct jbus_handle_t {
	jbus_handle_t():handle_(nullptr), is_alive_(false) {};
	TransportJBUS *handle_;			// 对象的this指针
	boost::mutex handle_mutex_;
	bool is_alive_;					// 记录对象是否存活,构造:is_alive_ = true 析构:is_alive_ = false
};

}
#endif

具体实现细节及使用方式见README

# ROSCPP with libjbus

## 依赖

* libcjson

* support_libs

* libjbus

* ros [and it`s dependencies]

## 编译安装

* mkdir build

* cd build

* cmake ..

* make

* sudo cp devel/lib/libroscpp.so /opt/ros/melodic/lib/libroscpp.so

*or catkin_make ??*

## 说明

在 topic 中强制使用 jbus 代替 ros 原本使用的 TCP/UDP 底层通讯协议,使用时只需将编译出来的 libroscpp.so 代替 ros 中原本的libroscpp.so 即可,无需更改任何代码。

### transport_jbus

class TransportJBUS 由基类 Transport 派生,仿照 TransportUDP 编写,用于实现 topic 通讯。

由于原本的ros通讯机制为点对点,而jbus则为点对面,故我们在 pub topic 时,不再为每个 sub 链接都创建一个 transport,而是只创建一个 pub 用的 TransportJBUS 对象,并由 JbusTransportManager 中的 out_transport_map_ 进行管理。

在 sub topic 时,会创建一个 TransportJBUS,并创建一个 jbus_handle_t 对象对其进行管理。因为在 jbus 接收数据时,是通过void *指针将对象传入 jbus 的回调函数中,若我们直接将 TransportJBUS 对象的 this 指针传入,会导致对象析构后无法通过 this 指针判断,从而发生不可预料的问题。而 jbus_handle_t 通过标记 TransportJBUS 对象的状态,避免了这类问题。jbus_handle_t 由JbusTransportManager 中的 in_transport_vec_ 进行管理,并在程序退出时统一释放。

### jbus_transport_manager

class JbusTransportManager 用于管理 TransportUDP 对象。JbusTransportManager 是一个单例对象,在程序启动时会打印JBUSROS: start!

Hello world

表示现在使用的是 roscpp_jbus

### 其他修改

在其他源文件中的修改都由注释

```

/*** jbusros ***/

xxxxx;

/***************/

```

进行包裹,可通过搜索此注释快速定位各处修改


版权声明:本文为qq_43350875原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。