ROS Communication: Topics, Services, and Actions¶
Communication between nodes is the core of ROS. This tutorial provides a comprehensive guide to the three main communication mechanisms in ROS: Topics, Services, and Actions.
Communication Overview¶
┌─────────────────────────────────────────────────────────────┐
│ ROS Communication Mechanisms │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Topics │ │ Services │ │ Actions │ │
│ │ (异步) │ │ (同步) │ │ (异步+反馈)│ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 发布/订阅 │ │ 请求/响应 │ │ 目标/反馈 │ │
│ │ (Pub/Sub) │ │ (Req/Res) │ │ (Goal/Fb) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ 特点: │
│ - Topics: 持续数据流,适合传感器数据 │
│ - Services: 一次性请求/响应,适合查询操作 │
│ - Actions: 长时间任务,支持取消和反馈 │
│ │
└─────────────────────────────────────────────────────────────┘
1. Topics(话题)¶
Topics 是 ROS 中最常用的通信机制,用于**持续的数据流**。
特点¶
- 异步通信:发布者和订阅者不需要同时在线
- 一对多:一个发布者可以有多个订阅者
- 多对一:多个发布者可以发布到同一个话题
- 类型安全:每个话题有固定的消息类型
工作原理¶
┌─────────────┐ ┌─────────────┐
│ Publisher │ ──── Topic ────▶ │ Subscriber │
│ (发布者) │ /camera/image │ (订阅者) │
└─────────────┘ └─────────────┘
│ ▲
│ │
└──────────────────────────────────┘
消息通过 ROS Master 路由
示例代码¶
发布者 (talker.py)¶
#!/usr/bin/env python3
"""
ROS 话题发布者示例
演示如何发布消息到话题
"""
import rospy
from std_msgs.msg import String
def talker():
"""发布者节点"""
# 初始化节点
rospy.init_node('talker', anonymous=True)
# 创建发布者
# 参数:话题名称、消息类型、队列大小
pub = rospy.Publisher('chatter', String, queue_size=10)
# 设置发布频率
rate = rospy.Rate(10) # 10Hz
rospy.loginfo("Talker node started")
while not rospy.is_shutdown():
# 创建消息
hello_str = "Hello, ROS! %s" % rospy.get_time()
# 发布消息
rospy.loginfo("Publishing: %s", hello_str)
pub.publish(hello_str)
# 按照频率休眠
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
rospy.loginfo("Talker node terminated")
订阅者 (listener.py)¶
#!/usr/bin/env python3
"""
ROS 话题订阅者示例
演示如何订阅话题并接收消息
"""
import rospy
from std_msgs.msg import String
def callback(data):
"""回调函数,当收到消息时被调用"""
rospy.loginfo("Received: %s", data.data)
def listener():
"""订阅者节点"""
# 初始化节点
rospy.init_node('listener', anonymous=True)
# 订阅话题
# 参数:话题名称、消息类型、回调函数
rospy.Subscriber('chatter', String, callback)
rospy.loginfo("Listener node started, waiting for messages...")
# 保持节点运行
rospy.spin()
if __name__ == '__main__':
listener()
常用命令¶
# 查看所有话题
rostopic list
# 查看话题信息
rostopic info /chatter
# 查看话题消息类型
rostopic type /chatter
# 监听话题内容
rostopic echo /chatter
# 发布消息
rostopic pub /chatter std_msgs/String "data: 'Hello'"
# 查看发布频率
rostopic hz /chatter
# 查看带宽使用
rostopic bw /chatter
消息类型¶
ROS 提供了丰富的内置消息类型:
# 标准消息类型
std_msgs/String # 字符串
std_msgs/Int32 # 32位整数
std_msgs/Float64 # 64位浮点数
std_msgs/Bool # 布尔值
# 几何消息
geometry_msgs/Twist # 速度命令(线速度+角速度)
geometry_msgs/Pose # 位姿(位置+方向)
# 传感器消息
sensor_msgs/Image # 图像
sensor_msgs/LaserScan # 激光扫描
sensor_msgs/PointCloud2 # 点云
# 导航消息
nav_msgs/Odometry # 里程计
nav_msgs/Path # 路径
nav_msgs/OccupancyGrid # 占用栅格地图
自定义消息类型¶
创建自定义消息类型:
# 1. 创建消息目录
mkdir -p my_package/msg
# 2. 创建消息文件
cat > my_package/msg/Person.msg << EOF
string name
int32 age
float64 height
EOF
# 3. 更新 CMakeLists.txt
# add_message_files(FILES Person.msg)
# generate_messages(DEPENDENCIES std_msgs)
# 4. 更新 package.xml
# <build_depend>message_generation</build_depend>
# <exec_depend>message_runtime</exec_depend>
# 5. 编译
catkin_make
2. Services(服务)¶
Services 是 ROS 中的**同步通信机制**,用于**请求/响应**模式。
特点¶
- 同步通信:客户端发送请求后等待响应
- 一对一:一个客户端对应一个服务端
- 类型安全:服务有固定的请求和响应类型
- 阻塞调用:客户端会等待服务端处理完成
工作原理¶
┌─────────────┐ ┌─────────────┐
│ Client │ ──── Request ───▶ │ Server │
│ (客户端) │ │ (服务端) │
│ │ ◀─── Response ─── │ │
└─────────────┘ └─────────────┘
示例代码¶
服务定义 (AddTwoInts.srv)¶
服务端 (add_two_ints_server.py)¶
#!/usr/bin/env python3
"""
ROS 服务端示例
演示如何创建和提供服务
"""
import rospy
from beginner_tutorials.srv import AddTwoInts, AddTwoIntsResponse
def handle_add_two_ints(req):
"""处理服务请求"""
rospy.loginfo("Received request: %d + %d", req.a, req.b)
# 计算结果
result = req.a + req.b
rospy.loginfo("Returning result: %d", result)
# 返回响应
return AddTwoIntsResponse(result)
def add_two_ints_server():
"""服务端节点"""
# 初始化节点
rospy.init_node('add_two_ints_server')
# 创建服务
# 参数:服务名称、服务类型、处理函数
service = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
rospy.loginfo("Add Two Ints service ready")
# 保持服务运行
rospy.spin()
if __name__ == '__main__':
add_two_ints_server()
客户端 (add_two_ints_client.py)¶
#!/usr/bin/env python3
"""
ROS 客户端示例
演示如何调用服务
"""
import rospy
from beginner_tutorials.srv import AddTwoInts
import sys
def add_two_ints_client(x, y):
"""调用服务"""
# 等待服务可用
rospy.loginfo("Waiting for service...")
rospy.wait_for_service('add_two_ints')
rospy.loginfo("Service available")
try:
# 创建服务代理
add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
# 调用服务
rospy.loginfo("Calling service with %d + %d", x, y)
resp = add_two_ints(x, y)
rospy.loginfo("Result: %d", resp.sum)
return resp.sum
except rospy.ServiceException as e:
rospy.logerr("Service call failed: %s", e)
return None
if __name__ == "__main__":
# 初始化节点
rospy.init_node('add_two_ints_client')
# 解析命令行参数
if len(sys.argv) == 3:
x = int(sys.argv[1])
y = int(sys.argv[2])
else:
x = 3
y = 4
rospy.loginfo("Using default values: %d + %d", x, y)
# 调用服务
result = add_two_ints_client(x, y)
if result is not None:
rospy.loginfo("Final result: %d", result)
常用命令¶
# 查看所有服务
rosservice list
# 查看服务类型
rosservice type /add_two_ints
# 调用服务
rosservice call /add_two_ints "a: 3 b: 4"
# 查看服务信息
rosservice info /add_two_ints
自定义服务类型¶
# 1. 创建服务目录
mkdir -p my_package/srv
# 2. 创建服务文件
cat > my_package/srv/AddTwoInts.srv << EOF
int64 a
int64 b
---
int64 sum
EOF
# 3. 更新 CMakeLists.txt
# add_service_files(FILES AddTwoInts.srv)
# generate_messages(DEPENDENCIES std_msgs)
# 4. 更新 package.xml
# <build_depend>message_generation</build_depend>
# <exec_depend>message_runtime</exec_depend>
# 5. 编译
catkin_make
3. Actions(动作)¶
Actions 是 ROS 中的**异步通信机制**,用于**长时间任务**,支持**取消和反馈**。
特点¶
- 异步通信:客户端发送目标后可以继续执行其他任务
- 支持取消:客户端可以取消正在执行的任务
- 支持反馈:服务端可以发送进度反馈
- 状态管理:任务有明确的状态(执行中、成功、失败等)
工作原理¶
┌─────────────┐ ┌─────────────┐
│ Client │ ──── Goal ──────▶ │ Server │
│ (客户端) │ │ (服务端) │
│ │ ◀─── Feedback ─── │ │
│ │ ◀─── Result ───── │ │
│ │ ──── Cancel ────▶ │ │
└─────────────┘ └─────────────┘
Action 类型¶
示例代码¶
Action 服务端¶
#!/usr/bin/env python3
"""
ROS Action 服务端示例
演示如何创建 Action 服务
"""
import rospy
import actionlib
from actionlib_tutorials.msg import FibonacciAction, FibonacciFeedback, FibonacciResult
class FibonacciServer:
def __init__(self):
"""初始化"""
# 创建 Action 服务
self.server = actionlib.SimpleActionServer(
'fibonacci', # Action 名称
FibonacciAction, # Action 类型
self.execute, # 执行函数
False # 是否自动启动
)
# 启动服务
self.server.start()
rospy.loginfo("Fibonacci Action Server started")
def execute(self, goal):
"""执行 Action"""
rospy.loginfo("Executing goal: order=%d", goal.order)
# 初始化 Fibonacci 序列
sequence = [0, 1]
# 生成 Fibonacci 序列
for i in range(1, goal.order):
# 检查是否被取消
if self.server.is_preempt_requested():
rospy.loginfo("Action preempted")
self.server.set_preempted()
return
# 计算下一个数
sequence.append(sequence[i] + sequence[i-1])
# 发送反馈
feedback = FibonacciFeedback()
feedback.sequence = sequence
self.server.publish_feedback(feedback)
rospy.loginfo("Feedback: %s", sequence)
# 模拟耗时操作
rospy.sleep(0.5)
# 设置成功结果
result = FibonacciResult()
result.sequence = sequence
self.server.set_succeeded(result)
rospy.loginfo("Action completed: %s", sequence)
def main():
"""主函数"""
# 初始化节点
rospy.init_node('fibonacci_server')
# 创建服务
server = FibonacciServer()
# 保持运行
rospy.spin()
if __name__ == '__main__':
main()
Action 客户端¶
#!/usr/bin/env python3
"""
ROS Action 客户端示例
演示如何调用 Action
"""
import rospy
import actionlib
from actionlib_tutorials.msg import FibonacciAction, FibonacciGoal
class FibonacciClient:
def __init__(self):
"""初始化"""
# 创建 Action 客户端
self.client = actionlib.SimpleActionClient(
'fibonacci', # Action 名称
FibonacciAction # Action 类型
)
rospy.loginfo("Waiting for action server...")
self.client.wait_for_server()
rospy.loginfo("Action server connected")
def send_goal(self, order):
"""发送目标"""
rospy.loginfo("Sending goal: order=%d", order)
# 创建目标
goal = FibonacciGoal()
goal.order = order
# 发送目标
self.client.send_goal(
goal,
feedback_cb=self.feedback_callback
)
# 等待结果
self.client.wait_for_result()
# 获取结果
result = self.client.get_result()
rospy.loginfo("Result: %s", result.sequence)
return result
def feedback_callback(self, feedback):
"""反馈回调函数"""
rospy.loginfo("Feedback: %s", feedback.sequence)
def cancel_goal(self):
"""取消目标"""
rospy.loginfo("Cancelling goal...")
self.client.cancel_goal()
def main():
"""主函数"""
# 初始化节点
rospy.init_node('fibonacci_client')
# 创建客户端
client = FibonacciClient()
# 发送目标
result = client.send_goal(10)
rospy.loginfo("Final result: %s", result.sequence)
if __name__ == '__main__':
main()
常用命令¶
# 查看所有 Action
rostopic list | grep action
# 查看 Action 状态
rostopic echo /fibonacci/status
# 查看 Action 反馈
rostopic echo /fibonacci/feedback
# 查看 Action 结果
rostopic echo /fibonacci/result
4. 通信机制对比¶
| 特性 | Topics | Services | Actions |
|---|---|---|---|
| 通信模式 | 异步 | 同步 | 异步 |
| 数据流 | 持续 | 一次性 | 持续+反馈 |
| 取消支持 | ❌ | ❌ | ✅ |
| 反馈支持 | ❌ | ❌ | ✅ |
| 适用场景 | 传感器数据 | 查询操作 | 长时间任务 |
| 示例 | 相机图像、激光数据 | 加法计算、路径规划 | 导航、抓取 |
5. 实践练习¶
练习 1:话题通信¶
# 1. 启动 ROS Master
roscore
# 2. 运行发布者
rosrun my_package talker.py
# 3. 运行订阅者
rosrun my_package listener.py
# 4. 查看话题
rostopic list
rostopic echo /chatter
练习 2:服务通信¶
# 1. 编译服务
cd ~/catkin_ws
catkin_make
source devel/setup.bash
# 2. 运行服务端
rosrun my_package add_two_ints_server.py
# 3. 调用服务
rosservice call /add_two_ints "a: 3 b: 4"
# 4. 运行客户端
rosrun my_package add_two_ints_client.py 5 6
练习 3:Action 通信¶
# 1. 安装 Action 库
sudo apt install ros-noetic-actionlib-tutorials
# 2. 运行 Action 服务端
rosrun actionlib_tutorials fibonacci_server
# 3. 运行 Action 客户端
rosrun actionlib_tutorials fibonacci_client
# 4. 监控 Action 状态
rostopic echo /fibonacci/status
rostopic echo /fibonacci/feedback
6. 调试工具¶
1. rqt_graph¶
可视化节点和话题的连接关系:
2. rqt_plot¶
绘制话题数据的时间序列:
3. rqt_console¶
查看 ROS 日志:
4. rostopic¶
命令行工具:
# 查看话题
rostopic list
# 监听话题
rostopic echo /topic_name
# 发布消息
rostopic pub /topic_name std_msgs/String "data: 'Hello'"
# 查看频率
rostopic hz /topic_name
7. 最佳实践¶
1. 话题命名规范¶
# 使用有意义的名称
/camera/image_raw # 相机原始图像
/laser/scan # 激光扫描数据
/robot/cmd_vel # 机器人速度命令
/map # 地图数据
# 避免使用
/test # 太泛泛
/data1 # 无意义
2. 消息类型选择¶
# 使用标准消息类型
from std_msgs.msg import String, Int32, Float64
from geometry_msgs.msg import Twist, Pose
from sensor_msgs.msg import Image, LaserScan
# 只在必要时创建自定义消息
3. 队列大小设置¶
4. 错误处理¶
# 使用 try-except 处理异常
try:
rospy.wait_for_service('/service', timeout=5.0)
except rospy.ROSException:
rospy.logerr("Service not available")
8. 常见问题¶
1. 话题没有数据¶
原因:发布者未启动或话题名称不匹配
解决方案:
2. 服务调用失败¶
原因:服务未启动或参数错误
解决方案:
# 检查服务列表
rosservice list
# 检查服务类型
rosservice type /service_name
# 测试服务调用
rosservice call /service_name "param: value"
3. Action 执行超时¶
原因:服务端处理时间过长或网络延迟
解决方案: