本文目录
FastDDS 原理
原文:https://fast-dds.docs.eprosima.com/en/latest/fastdds/getting_started/definitions.html
本文仅翻译原文
什么是DDS?
数据分发服务 (DDS)是一种以数据为中心的通信协议,用于分布式软件应用程序通信。它描述了支持数据提供者和数据消费者之间通信的通信应用程序编程接口(API)和通信语义。
由于它是一个以数据为中心的发布订阅(DCPS)模型,因此在其实现中定义了三个关键的应用程序实体:发布实体,定义信息生成对象及其属性;订阅实体,定义信息消费对象及其属性;和配置实体,定义作为主题传输的信息类型,并使用其服务质量(QoS)属性创建发布者和订阅者,确保上述实体的正确性能。
DDS 使用 QoS 来定义 DDS 实体的行为特征。 QoS 由单独的 QoS 策略(从 QoSPolicy 派生的类型对象)组成。这些在政策中进行了描述。
DCPS 概念模型
在 DCPS 模型中,为通信应用系统的开发定义了四个基本元素。
Publisher. 它是 DCPS 实体,负责其实现的DataWriter的创建和配置。 DataWriter是负责实际发布消息的实体。每个消息都会有一个分配的主题,消息将在该主题下发布。
Subscriber. 它是 DCPS 实体,负责接收在其订阅的主题下发布的数据。它为一个或多个DataReader对象提供服务,这些对象负责将新数据的可用性传达给应用程序。
Topic.它是绑定发布和订阅的实体。它在 DDS 域中是唯一的。通过TopicDescription ,它允许发布和订阅的数据类型的统一。
Domain. 这一概念用于链接属于一个或多个应用程序的所有发布者和订阅者,这些发布者和订阅者在不同主题下交换数据。这些参与域的单独应用程序称为DomainParticipant 。 DDS 域由域 ID 来标识。 DomainParticipant定义域ID来指定其所属的DDS域。具有不同 ID 的两个 DomainParticipant 不知道彼此在网络中的存在。因此,可以创建多个通信渠道。这适用于涉及多个 DDS 应用程序的场景,它们各自的 DomainParticipants 相互通信,但这些应用程序不得相互干扰。 DomainParticipant充当其他 DCPS 实体的容器,充当发布者、订阅者和主题实体的工厂,并在域中提供管理服务。
这些元素如下图所示
什么是RTPS?
实时发布订阅 (RTPS)协议是为支持 DDS 应用程序而开发的,是一种基于尽力而为传输(例如 UDP/IP)的发布-订阅通信中间件。此外,Fast DDS 还提供对 TCP 和共享内存 (SHM) 传输的支持。
它旨在支持单播和多播通信。
在继承自 DDS 的 RTPS 的顶部,可以找到域,它定义了一个单独的通信平面。多个域可以同时独立共存。域包含任意数量的RTPSParticipants ,即能够发送和接收数据的元素。为此,RTPSParticipants 使用他们的Endpoints :
RTPSWriter :能够发送数据的端点。
RTPSReader :能够接收数据的端点。
RTPSParticipant 可以有任意数量的写入器和读取器端点。
通信围绕主题进行,主题定义和标记正在交换的数据。这些主题不属于特定参与者。参与者通过 RTPSWriters 对主题下发布的数据进行更改,并通过 RTPSReaders 接收与其订阅的主题关联的数据。通信单元称为Change ,它表示在 Topic 下写入的数据的更新。 RTPSReaders/RTPSWriters在其History上注册这些更改,History 是一种数据结构,用作最近更改的缓存。
在eProsima Fast DDS的默认配置中,当您通过 RTPSWriter 端点发布更改时,会在幕后执行以下步骤:
1.更改将添加到 RTPSWriter 的历史缓存中。
2.RTPSWriter 将更改发送到它知道的任何 RTPSReaders。
3.接收数据后,RTPSReaders 使用新的更改更新其历史缓存。
但是,Fast DDS 支持多种配置,允许您更改 RTPSWriters/RTPSReaders 的行为。 RTPS 实体默认配置的修改意味着 RTPSWriter 和 RTPSReaders 之间数据交换流的更改。此外,通过选择服务质量 (QoS) 策略,您可以通过多种方式影响这些历史缓存的管理方式,但通信循环保持不变。您可以继续阅读RTPS层部分来了解更多关于Fast DDS中RTPS协议的实现。
FastDDS 安装及使用
FastDDS也是目前比较好的一个框架,FastDDS是由位于西班牙马德里的eProsima公司推出的免费开源DDS中间件解决方案,并提供支付技术支持服务。它的源码基于C++,规范基于OMG DDS 1.4 and the OMG RTPS 2.2。
这里先介绍一下依照官方如何安装FastDDS到系统,并进行简单的测试。
官方文档安装链接:https://fast-dds.docs.eprosima.com/en/latest/installation/binaries/binaries_linux.html ,这里出了安装步骤,还有很多原理和例程介绍。
这里是使用的操作系统是ubuntu 20.04进行安装。
FastDDS的安装内容主要有三部分:
FastDDS library
- foonathan_memory_vendor: C++内存分配库
- fastdcdr: CDR序列化(serialization)机制
- fastrtps: FastDDS核心库
FastDDS Python bindings(可选)
FastDDS Gen
- 从IDL文件生成源代码工具,基于Java程序。由IDL文件中定义的data type生成代码的脚本工具。
1. 安装FastDDS
下述的两个安装方法都需要在虚拟机配置VPN,具体如何配置这里不做教学
安装方法一: Linux installation from binaries
教程:https://fast-dds.docs.eprosima.com/en/latest/installation/binaries/binaries_linux.html
本人使用的是V2.6版本,在ubuntu20.04上进行测试,最新的3.0版本由于python3-xmlschema无法安装所以放弃使用了
下载链接:https://www.eprosima.com/component/ars/items/eprosima-fast-dds-2-6?category_id=7&Itemid=102
下载好 eProsima_Fast-DDS-v2.6.8-Linux.tgz
后解压,在文件目录下运行install.sh
cd <extraction_directory>
sudo ./install.sh
安装后src文件夹包含以下包:
foonathan_memory_vendor
,一个 STL 兼容的 C++ 内存分配器库。
fastcdr
,一个根据CDR 标准进行数据序列化的 C++ 库(Section 10.2.1.2 OMG CDR)。
fastdds
, eProsima Fast DDS库的核心库。
fastddsgen
,一个 Java 应用程序,它使用 IDL 文件中定义的数据类型生成源代码。
安装方法二:Linux installation from sources
教程:https://fast-dds.docs.eprosima.com/en/latest/installation/sources/sources_linux.html
(本人使用该方法)
安装依赖:
sudo apt install cmake g++ python3-pip wget git
sudo apt install libasio-dev libtinyxml2-dev
sudo apt install libssl-dev
使用colcon安装FastDDS库
1.安装依赖:
pip3 install -U colcon-common-extensions vcstool
2.创建Fast-DDS目录并下载将用于安装eProsima Fast DDS及其依赖项的存储库文件:
mkdir ~/Fast-DDS
cd ~/Fast-DDS
wget https://raw.githubusercontent.com/eProsima/Fast-DDS/master/fastdds.repos
mkdir src
vcs import src < fastdds.repos
3.构建包
colcon build --packages-up-to fastdds
2.Fast DDS-Gen 安装
sudo apt install openjdk-11-jdk
如果在安装 Colcon 后已经安装了 Fast DDS,请跳过克隆Fast DDS-Gen的存储库,因为它已经可以在 colcon 工作区的src目录下找到
mkdir -p ~/Fast-DDS/src
cd ~/Fast-DDS/src
git clone --recursive https://github.com/eProsima/Fast-DDS-Gen.git fastddsgen
cd fastddsgen
./gradlew assemble
Fast-DDS-Gen文件夹包含以下软件包:
share/fastddsgen
,生成的 Java 应用程序所在的位置。
scripts
,包含一些用户友好的脚本。
注意:要使这些脚本可以从任何 shell 会话和目录访问,请将scripts夹路径添加到PATH环境变量。
3.(测试)编写一个简单的 C++ 发布者和订阅者应用程序
原文:https://fast-dds.docs.eprosima.com/en/latest/fastdds/getting_started/simple_app/simple_app.html
3.1.创建目录树
mkdir workspace_DDSHelloWorld && cd workspace_DDSHelloWorld
mkdir src build
3.2.导入链接库和其他依赖项
source <path/to/Fast-DDS/workspace>/install/setup.bash
通过将 Fast DDS 安装目录添加到运行以下命令的当前用户的 shell 配置文件中的$PATH变量,可以从任何会话访问它们。
echo 'source <path/to/Fast-DDS/workspace>/install/setup.bash' >> ~/.bashrc
3.3.配置 CMake 项目
我们将使用 CMake 工具来管理项目的构建。使用您喜欢的文本编辑器创建一个名为 CMakeLists.txt 的新文件,然后复制并粘贴以下代码片段。将此文件保存在工作区的根目录中。如果您已执行这些步骤,则它应该是workspace_DDSHelloWorld 。
cmake_minimum_required(VERSION 3.20)
project(DDSHelloWorld)
# Find requirements
if(NOT fastcdr_FOUND)
find_package(fastcdr 2 REQUIRED)
endif()
if(NOT fastdds_FOUND)
find_package(fastdds 3 REQUIRED)
endif()
# Set C++11
include(CheckCXXCompilerFlag)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANG OR
CMAKE_CXX_COMPILER_ID MATCHES "Clang")
check_cxx_compiler_flag(-std=c++11 SUPPORTS_CXX11)
if(SUPPORTS_CXX11)
add_compile_options(-std=c++11)
else()
message(FATAL_ERROR "Compiler doesn't support C++11")
endif()
endif()
message(STATUS "Configuring HelloWorld publisher/subscriber example...")
file(GLOB DDS_HELLOWORLD_SOURCES_CXX "src/*.cxx")
3.4.Build the topic data type
在工作空间目录下,执行以下命令:
cd src && touch HelloWorld.idl
这将在src目录中创建 HelloWorld.idl 文件。在文本编辑器中打开该文件,然后复制并粘贴以下代码片段。
struct HelloWorld
{
unsigned long index;
string message;
};
通过这样做,我们定义了HelloWorld数据类型,它有两个元素: uint32_t类型的索引和std::string类型的消息。剩下的就是生成在 C++11 中实现此数据类型的源代码。为此,请从src目录运行以下命令。
<path/to/Fast DDS-Gen>/scripts/fastddsgen HelloWorld.idl
在该命令结束后,生成如下的文件:
HelloWorld.hpp
:HelloWorld 类型定义。
HelloWorldPubSubTypes.cxx
:Fast DDS 用于支持 HelloWorld 类型的接口。
HelloWorldPubSubTypes.h
:HelloWorldPubSubTypes.cxx 的头文件。
HelloWorldCdrAux.ipp
:HelloWorld 类型的序列化和反序列化代码。
HelloWorldCdrAux.hpp
:HelloWorldCdrAux.ipp 的头文件。
HelloWorldTypeObjectSupport.cxx
: TypeObject表示注册代码。
HelloWorldTypeObjectSupport.hpp
:HelloWorldTypeObjectSupport.cxx 的头文件。
3.5 编写FastDDS发布者
从工作区的src目录中,运行以下命令下载 HelloWorldPublisher.cpp 文件。
wget -O HelloWorldPublisher.cpp \
https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/C++/DDSHelloWorld/src/HelloWorldPublisher.cpp
这是发布者应用程序的 C++ 源代码。它将发送 10 个主题为HelloWorldTopic的publications。
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/**
* @file HelloWorldPublisher.cpp
*
*/
#include "HelloWorldPubSubTypes.hpp"
#include <chrono>
#include <thread>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/DataWriterListener.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
using namespace eprosima::fastdds::dds;
class HelloWorldPublisher
{
private:
HelloWorld hello_;
DomainParticipant* participant_;
Publisher* publisher_;
Topic* topic_;
DataWriter* writer_;
TypeSupport type_;
class PubListener : public DataWriterListener
{
public:
PubListener()
: matched_(0)
{
}
~PubListener() override
{
}
void on_publication_matched(
DataWriter*,
const PublicationMatchedStatus& info) override
{
if (info.current_count_change == 1)
{
matched_ = info.total_count;
std::cout << "Publisher matched." << std::endl;
}
else if (info.current_count_change == -1)
{
matched_ = info.total_count;
std::cout << "Publisher unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for PublicationMatchedStatus current count change." << std::endl;
}
}
std::atomic_int matched_;
} listener_;
public:
HelloWorldPublisher()
: participant_(nullptr)
, publisher_(nullptr)
, topic_(nullptr)
, writer_(nullptr)
, type_(new HelloWorldPubSubType())
{
}
virtual ~HelloWorldPublisher()
{
if (writer_ != nullptr)
{
publisher_->delete_datawriter(writer_);
}
if (publisher_ != nullptr)
{
participant_->delete_publisher(publisher_);
}
if (topic_ != nullptr)
{
participant_->delete_topic(topic_);
}
DomainParticipantFactory::get_instance()->delete_participant(participant_);
}
//!Initialize the publisher
bool init()
{
hello_.index(0);
hello_.message("HelloWorld");
DomainParticipantQos participantQos;
participantQos.name("Participant_publisher");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);
if (participant_ == nullptr)
{
return false;
}
// Register the Type
type_.register_type(participant_);
// Create the publications Topic
topic_ = participant_->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);
if (topic_ == nullptr)
{
return false;
}
// Create the Publisher
publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr);
if (publisher_ == nullptr)
{
return false;
}
// Create the DataWriter
writer_ = publisher_->create_datawriter(topic_, DATAWRITER_QOS_DEFAULT, &listener_);
if (writer_ == nullptr)
{
return false;
}
return true;
}
//!Send a publication
bool publish()
{
if (listener_.matched_ > 0)
{
hello_.index(hello_.index() + 1);
writer_->write(&hello_);
return true;
}
return false;
}
//!Run the Publisher
void run(
uint32_t samples)
{
uint32_t samples_sent = 0;
while (samples_sent < samples)
{
if (publish())
{
samples_sent++;
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
<< " SENT" << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
};
int main(
int argc,
char** argv)
{
std::cout << "Starting publisher." << std::endl;
uint32_t samples = 10;
HelloWorldPublisher* mypub = new HelloWorldPublisher();
if(mypub->init())
{
mypub->run(samples);
}
delete mypub;
return 0;
}
3.6 CMakeLists.txt
在您之前创建的 CMakeList.txt 文件的末尾包含以下代码片段。这将添加构建可执行文件所需的所有源文件,并将可执行文件和库链接在一起。
add_executable(DDSHelloWorldPublisher src/HelloWorldPublisher.cpp ${DDS_HELLOWORLD_SOURCES_CXX})
target_link_libraries(DDSHelloWorldPublisher fastdds fastcdr)
此时,项目已准备好构建、编译和运行发布者应用程序。从工作区的构建目录中,运行以下命令。
cmake ..
cmake --build .
./DDSHelloWorldPublisher
3.7 编写 FastDDS 订阅者
从工作区的src目录中,执行以下命令下载 HelloWorldSubscriber.cpp 文件。
wget -O HelloWorldSubscriber.cpp \
https://raw.githubusercontent.com/eProsima/Fast-RTPS-docs/master/code/Examples/C++/DDSHelloWorld/src/HelloWorldSubscriber.cpp
这是订户应用程序的 C++ 源代码。该应用程序运行订阅者,直到收到主题HelloWorldTopic下的 10 个样本。此时订阅者停止了。
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/**
* @file HelloWorldSubscriber.cpp
*
*/
#include "HelloWorldPubSubTypes.hpp"
#include <chrono>
#include <thread>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
using namespace eprosima::fastdds::dds;
class HelloWorldSubscriber
{
private:
DomainParticipant* participant_;
Subscriber* subscriber_;
DataReader* reader_;
Topic* topic_;
TypeSupport type_;
class SubListener : public DataReaderListener
{
public:
SubListener()
: samples_(0)
{
}
~SubListener() override
{
}
void on_subscription_matched(
DataReader*,
const SubscriptionMatchedStatus& info) override
{
if (info.current_count_change == 1)
{
std::cout << "Subscriber matched." << std::endl;
}
else if (info.current_count_change == -1)
{
std::cout << "Subscriber unmatched." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl;
}
}
void on_data_available(
DataReader* reader) override
{
SampleInfo info;
if (reader->take_next_sample(&hello_, &info) == eprosima::fastdds::dds::RETCODE_OK)
{
if (info.valid_data)
{
samples_++;
std::cout << "Message: " << hello_.message() << " with index: " << hello_.index()
<< " RECEIVED." << std::endl;
}
}
}
HelloWorld hello_;
std::atomic_int samples_;
}
listener_;
public:
HelloWorldSubscriber()
: participant_(nullptr)
, subscriber_(nullptr)
, topic_(nullptr)
, reader_(nullptr)
, type_(new HelloWorldPubSubType())
{
}
virtual ~HelloWorldSubscriber()
{
if (reader_ != nullptr)
{
subscriber_->delete_datareader(reader_);
}
if (topic_ != nullptr)
{
participant_->delete_topic(topic_);
}
if (subscriber_ != nullptr)
{
participant_->delete_subscriber(subscriber_);
}
DomainParticipantFactory::get_instance()->delete_participant(participant_);
}
//!Initialize the subscriber
bool init()
{
DomainParticipantQos participantQos;
participantQos.name("Participant_subscriber");
participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos);
if (participant_ == nullptr)
{
return false;
}
// Register the Type
type_.register_type(participant_);
// Create the subscriptions Topic
topic_ = participant_->create_topic("HelloWorldTopic", "HelloWorld", TOPIC_QOS_DEFAULT);
if (topic_ == nullptr)
{
return false;
}
// Create the Subscriber
subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);
if (subscriber_ == nullptr)
{
return false;
}
// Create the DataReader
reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &listener_);
if (reader_ == nullptr)
{
return false;
}
return true;
}
//!Run the Subscriber
void run(
uint32_t samples)
{
while (listener_.samples_ < samples)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
};
int main(
int argc,
char** argv)
{
std::cout << "Starting subscriber." << std::endl;
uint32_t samples = 10;
HelloWorldSubscriber* mysub = new HelloWorldSubscriber();
if (mysub->init())
{
mysub->run(samples);
}
delete mysub;
return 0;
}
3.8CmakeLists.txt
在您之前创建的 CMakeList.txt 文件的末尾包含以下代码片段。这将添加构建可执行文件所需的所有源文件,并将可执行文件和库链接在一起。
add_executable(DDSHelloWorldSubscriber src/HelloWorldSubscriber.cpp ${DDS_HELLOWORLD_SOURCES_CXX})
target_link_libraries(DDSHelloWorldSubscriber fastdds fastcdr)
此时,项目已准备好构建、编译和运行订阅者应用程序。从工作区的构建目录中,运行以下命令。
cmake ..
cmake --build .
./DDSHelloWorldSubscriber
3.9 将所有内容放在一起
最后,在构建目录中,从两个终端运行发布者和订阅者应用程序。
至此 完成了消息的发布和接收。