Skip to content

ROS Communication: Topics, Services, and Actions

Communication between nodes is the core of ROS. This tutorial provides a comprehensive guide to the three main communication mechanisms in ROS: Topics, Services, and Actions.

Communication Overview

┌─────────────────────────────────────────────────────────────┐
│                ROS Communication Mechanisms                  │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │   Topics    │    │  Services   │    │   Actions   │     │
│  │  (异步)     │    │  (同步)     │    │  (异步+反馈)│     │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘     │
│         │                  │                  │             │
│         ▼                  ▼                  ▼             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │  发布/订阅   │    │  请求/响应   │    │  目标/反馈   │     │
│  │  (Pub/Sub)  │    │  (Req/Res)  │    │  (Goal/Fb)  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│                                                             │
│  特点:                                                     │
│  - Topics: 持续数据流,适合传感器数据                        │
│  - Services: 一次性请求/响应,适合查询操作                   │
│  - Actions: 长时间任务,支持取消和反馈                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1. Topics(话题)

Topics 是 ROS 中最常用的通信机制,用于**持续的数据流**。

特点

  • 异步通信:发布者和订阅者不需要同时在线
  • 一对多:一个发布者可以有多个订阅者
  • 多对一:多个发布者可以发布到同一个话题
  • 类型安全:每个话题有固定的消息类型

工作原理

┌─────────────┐                    ┌─────────────┐
│  Publisher  │ ──── Topic ────▶  │  Subscriber │
│  (发布者)   │    /camera/image   │  (订阅者)   │
└─────────────┘                    └─────────────┘
      │                                  ▲
      │                                  │
      └──────────────────────────────────┘
              消息通过 ROS Master 路由

示例代码

发布者 (talker.py)

#!/usr/bin/env python3
"""
ROS 话题发布者示例
演示如何发布消息到话题
"""

import rospy
from std_msgs.msg import String

def talker():
    """发布者节点"""

    # 初始化节点
    rospy.init_node('talker', anonymous=True)

    # 创建发布者
    # 参数:话题名称、消息类型、队列大小
    pub = rospy.Publisher('chatter', String, queue_size=10)

    # 设置发布频率
    rate = rospy.Rate(10)  # 10Hz

    rospy.loginfo("Talker node started")

    while not rospy.is_shutdown():
        # 创建消息
        hello_str = "Hello, ROS! %s" % rospy.get_time()

        # 发布消息
        rospy.loginfo("Publishing: %s", hello_str)
        pub.publish(hello_str)

        # 按照频率休眠
        rate.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        rospy.loginfo("Talker node terminated")

订阅者 (listener.py)

#!/usr/bin/env python3
"""
ROS 话题订阅者示例
演示如何订阅话题并接收消息
"""

import rospy
from std_msgs.msg import String

def callback(data):
    """回调函数,当收到消息时被调用"""
    rospy.loginfo("Received: %s", data.data)

def listener():
    """订阅者节点"""

    # 初始化节点
    rospy.init_node('listener', anonymous=True)

    # 订阅话题
    # 参数:话题名称、消息类型、回调函数
    rospy.Subscriber('chatter', String, callback)

    rospy.loginfo("Listener node started, waiting for messages...")

    # 保持节点运行
    rospy.spin()

if __name__ == '__main__':
    listener()

常用命令

# 查看所有话题
rostopic list

# 查看话题信息
rostopic info /chatter

# 查看话题消息类型
rostopic type /chatter

# 监听话题内容
rostopic echo /chatter

# 发布消息
rostopic pub /chatter std_msgs/String "data: 'Hello'"

# 查看发布频率
rostopic hz /chatter

# 查看带宽使用
rostopic bw /chatter

消息类型

ROS 提供了丰富的内置消息类型:

# 标准消息类型
std_msgs/String      # 字符串
std_msgs/Int32       # 32位整数
std_msgs/Float64     # 64位浮点数
std_msgs/Bool        # 布尔值

# 几何消息
geometry_msgs/Twist  # 速度命令(线速度+角速度)
geometry_msgs/Pose   # 位姿(位置+方向)

# 传感器消息
sensor_msgs/Image    # 图像
sensor_msgs/LaserScan # 激光扫描
sensor_msgs/PointCloud2 # 点云

# 导航消息
nav_msgs/Odometry    # 里程计
nav_msgs/Path        # 路径
nav_msgs/OccupancyGrid # 占用栅格地图

自定义消息类型

创建自定义消息类型:

# 1. 创建消息目录
mkdir -p my_package/msg

# 2. 创建消息文件
cat > my_package/msg/Person.msg << EOF
string name
int32 age
float64 height
EOF

# 3. 更新 CMakeLists.txt
# add_message_files(FILES Person.msg)
# generate_messages(DEPENDENCIES std_msgs)

# 4. 更新 package.xml
# <build_depend>message_generation</build_depend>
# <exec_depend>message_runtime</exec_depend>

# 5. 编译
catkin_make

2. Services(服务)

Services 是 ROS 中的**同步通信机制**,用于**请求/响应**模式。

特点

  • 同步通信:客户端发送请求后等待响应
  • 一对一:一个客户端对应一个服务端
  • 类型安全:服务有固定的请求和响应类型
  • 阻塞调用:客户端会等待服务端处理完成

工作原理

┌─────────────┐                    ┌─────────────┐
│   Client    │ ──── Request ───▶ │   Server    │
│  (客户端)   │                    │  (服务端)   │
│             │ ◀─── Response ─── │             │
└─────────────┘                    └─────────────┘

示例代码

服务定义 (AddTwoInts.srv)

# 请求部分
int64 a
int64 b
---
# 响应部分
int64 sum

服务端 (add_two_ints_server.py)

#!/usr/bin/env python3
"""
ROS 服务端示例
演示如何创建和提供服务
"""

import rospy
from beginner_tutorials.srv import AddTwoInts, AddTwoIntsResponse

def handle_add_two_ints(req):
    """处理服务请求"""
    rospy.loginfo("Received request: %d + %d", req.a, req.b)

    # 计算结果
    result = req.a + req.b

    rospy.loginfo("Returning result: %d", result)

    # 返回响应
    return AddTwoIntsResponse(result)

def add_two_ints_server():
    """服务端节点"""

    # 初始化节点
    rospy.init_node('add_two_ints_server')

    # 创建服务
    # 参数:服务名称、服务类型、处理函数
    service = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)

    rospy.loginfo("Add Two Ints service ready")

    # 保持服务运行
    rospy.spin()

if __name__ == '__main__':
    add_two_ints_server()

客户端 (add_two_ints_client.py)

#!/usr/bin/env python3
"""
ROS 客户端示例
演示如何调用服务
"""

import rospy
from beginner_tutorials.srv import AddTwoInts
import sys

def add_two_ints_client(x, y):
    """调用服务"""

    # 等待服务可用
    rospy.loginfo("Waiting for service...")
    rospy.wait_for_service('add_two_ints')
    rospy.loginfo("Service available")

    try:
        # 创建服务代理
        add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)

        # 调用服务
        rospy.loginfo("Calling service with %d + %d", x, y)
        resp = add_two_ints(x, y)

        rospy.loginfo("Result: %d", resp.sum)
        return resp.sum

    except rospy.ServiceException as e:
        rospy.logerr("Service call failed: %s", e)
        return None

if __name__ == "__main__":
    # 初始化节点
    rospy.init_node('add_two_ints_client')

    # 解析命令行参数
    if len(sys.argv) == 3:
        x = int(sys.argv[1])
        y = int(sys.argv[2])
    else:
        x = 3
        y = 4
        rospy.loginfo("Using default values: %d + %d", x, y)

    # 调用服务
    result = add_two_ints_client(x, y)

    if result is not None:
        rospy.loginfo("Final result: %d", result)

常用命令

# 查看所有服务
rosservice list

# 查看服务类型
rosservice type /add_two_ints

# 调用服务
rosservice call /add_two_ints "a: 3 b: 4"

# 查看服务信息
rosservice info /add_two_ints

自定义服务类型

# 1. 创建服务目录
mkdir -p my_package/srv

# 2. 创建服务文件
cat > my_package/srv/AddTwoInts.srv << EOF
int64 a
int64 b
---
int64 sum
EOF

# 3. 更新 CMakeLists.txt
# add_service_files(FILES AddTwoInts.srv)
# generate_messages(DEPENDENCIES std_msgs)

# 4. 更新 package.xml
# <build_depend>message_generation</build_depend>
# <exec_depend>message_runtime</exec_depend>

# 5. 编译
catkin_make

3. Actions(动作)

Actions 是 ROS 中的**异步通信机制**,用于**长时间任务**,支持**取消和反馈**。

特点

  • 异步通信:客户端发送目标后可以继续执行其他任务
  • 支持取消:客户端可以取消正在执行的任务
  • 支持反馈:服务端可以发送进度反馈
  • 状态管理:任务有明确的状态(执行中、成功、失败等)

工作原理

┌─────────────┐                    ┌─────────────┐
│   Client    │ ──── Goal ──────▶ │   Server    │
│  (客户端)   │                    │  (服务端)   │
│             │ ◀─── Feedback ─── │             │
│             │ ◀─── Result ───── │             │
│             │ ──── Cancel ────▶ │             │
└─────────────┘                    └─────────────┘

Action 类型

# Fibonacci.action
# 目标
int32 order
---
# 结果
int32[] sequence
---
# 反馈
int32[] sequence

示例代码

Action 服务端

#!/usr/bin/env python3
"""
ROS Action 服务端示例
演示如何创建 Action 服务
"""

import rospy
import actionlib
from actionlib_tutorials.msg import FibonacciAction, FibonacciFeedback, FibonacciResult

class FibonacciServer:
    def __init__(self):
        """初始化"""

        # 创建 Action 服务
        self.server = actionlib.SimpleActionServer(
            'fibonacci',                    # Action 名称
            FibonacciAction,                # Action 类型
            self.execute,                   # 执行函数
            False                           # 是否自动启动
        )

        # 启动服务
        self.server.start()

        rospy.loginfo("Fibonacci Action Server started")

    def execute(self, goal):
        """执行 Action"""

        rospy.loginfo("Executing goal: order=%d", goal.order)

        # 初始化 Fibonacci 序列
        sequence = [0, 1]

        # 生成 Fibonacci 序列
        for i in range(1, goal.order):
            # 检查是否被取消
            if self.server.is_preempt_requested():
                rospy.loginfo("Action preempted")
                self.server.set_preempted()
                return

            # 计算下一个数
            sequence.append(sequence[i] + sequence[i-1])

            # 发送反馈
            feedback = FibonacciFeedback()
            feedback.sequence = sequence
            self.server.publish_feedback(feedback)

            rospy.loginfo("Feedback: %s", sequence)

            # 模拟耗时操作
            rospy.sleep(0.5)

        # 设置成功结果
        result = FibonacciResult()
        result.sequence = sequence
        self.server.set_succeeded(result)

        rospy.loginfo("Action completed: %s", sequence)

def main():
    """主函数"""

    # 初始化节点
    rospy.init_node('fibonacci_server')

    # 创建服务
    server = FibonacciServer()

    # 保持运行
    rospy.spin()

if __name__ == '__main__':
    main()

Action 客户端

#!/usr/bin/env python3
"""
ROS Action 客户端示例
演示如何调用 Action
"""

import rospy
import actionlib
from actionlib_tutorials.msg import FibonacciAction, FibonacciGoal

class FibonacciClient:
    def __init__(self):
        """初始化"""

        # 创建 Action 客户端
        self.client = actionlib.SimpleActionClient(
            'fibonacci',                    # Action 名称
            FibonacciAction                 # Action 类型
        )

        rospy.loginfo("Waiting for action server...")
        self.client.wait_for_server()
        rospy.loginfo("Action server connected")

    def send_goal(self, order):
        """发送目标"""

        rospy.loginfo("Sending goal: order=%d", order)

        # 创建目标
        goal = FibonacciGoal()
        goal.order = order

        # 发送目标
        self.client.send_goal(
            goal,
            feedback_cb=self.feedback_callback
        )

        # 等待结果
        self.client.wait_for_result()

        # 获取结果
        result = self.client.get_result()
        rospy.loginfo("Result: %s", result.sequence)

        return result

    def feedback_callback(self, feedback):
        """反馈回调函数"""
        rospy.loginfo("Feedback: %s", feedback.sequence)

    def cancel_goal(self):
        """取消目标"""
        rospy.loginfo("Cancelling goal...")
        self.client.cancel_goal()

def main():
    """主函数"""

    # 初始化节点
    rospy.init_node('fibonacci_client')

    # 创建客户端
    client = FibonacciClient()

    # 发送目标
    result = client.send_goal(10)

    rospy.loginfo("Final result: %s", result.sequence)

if __name__ == '__main__':
    main()

常用命令

# 查看所有 Action
rostopic list | grep action

# 查看 Action 状态
rostopic echo /fibonacci/status

# 查看 Action 反馈
rostopic echo /fibonacci/feedback

# 查看 Action 结果
rostopic echo /fibonacci/result

4. 通信机制对比

特性 Topics Services Actions
通信模式 异步 同步 异步
数据流 持续 一次性 持续+反馈
取消支持
反馈支持
适用场景 传感器数据 查询操作 长时间任务
示例 相机图像、激光数据 加法计算、路径规划 导航、抓取

5. 实践练习

练习 1:话题通信

# 1. 启动 ROS Master
roscore

# 2. 运行发布者
rosrun my_package talker.py

# 3. 运行订阅者
rosrun my_package listener.py

# 4. 查看话题
rostopic list
rostopic echo /chatter

练习 2:服务通信

# 1. 编译服务
cd ~/catkin_ws
catkin_make
source devel/setup.bash

# 2. 运行服务端
rosrun my_package add_two_ints_server.py

# 3. 调用服务
rosservice call /add_two_ints "a: 3 b: 4"

# 4. 运行客户端
rosrun my_package add_two_ints_client.py 5 6

练习 3:Action 通信

# 1. 安装 Action 库
sudo apt install ros-noetic-actionlib-tutorials

# 2. 运行 Action 服务端
rosrun actionlib_tutorials fibonacci_server

# 3. 运行 Action 客户端
rosrun actionlib_tutorials fibonacci_client

# 4. 监控 Action 状态
rostopic echo /fibonacci/status
rostopic echo /fibonacci/feedback

6. 调试工具

1. rqt_graph

可视化节点和话题的连接关系:

rosrun rqt_graph rqt_graph

2. rqt_plot

绘制话题数据的时间序列:

rosrun rqt_plot rqt_plot /chatter

3. rqt_console

查看 ROS 日志:

rosrun rqt_console rqt_console

4. rostopic

命令行工具:

# 查看话题
rostopic list

# 监听话题
rostopic echo /topic_name

# 发布消息
rostopic pub /topic_name std_msgs/String "data: 'Hello'"

# 查看频率
rostopic hz /topic_name

7. 最佳实践

1. 话题命名规范

# 使用有意义的名称
/camera/image_raw          # 相机原始图像
/laser/scan                # 激光扫描数据
/robot/cmd_vel             # 机器人速度命令
/map                       # 地图数据

# 避免使用
/test                      # 太泛泛
/data1                     # 无意义

2. 消息类型选择

# 使用标准消息类型
from std_msgs.msg import String, Int32, Float64
from geometry_msgs.msg import Twist, Pose
from sensor_msgs.msg import Image, LaserScan

# 只在必要时创建自定义消息

3. 队列大小设置

# 根据数据频率设置队列大小
pub = rospy.Publisher('/topic', String, queue_size=10)

# 高频率数据:较小队列
# 低频率数据:较大队列

4. 错误处理

# 使用 try-except 处理异常
try:
    rospy.wait_for_service('/service', timeout=5.0)
except rospy.ROSException:
    rospy.logerr("Service not available")

8. 常见问题

1. 话题没有数据

原因:发布者未启动或话题名称不匹配

解决方案

# 检查话题列表
rostopic list

# 检查发布者
rostopic info /topic_name

# 检查消息类型
rostopic type /topic_name

2. 服务调用失败

原因:服务未启动或参数错误

解决方案

# 检查服务列表
rosservice list

# 检查服务类型
rosservice type /service_name

# 测试服务调用
rosservice call /service_name "param: value"

3. Action 执行超时

原因:服务端处理时间过长或网络延迟

解决方案

# 设置超时时间
client.wait_for_result(timeout=rospy.Duration(30))

下一步

参考资源