多机器人通信¶
多机器人系统是现代机器人技术的重要方向,它允许多个机器人协同工作完成复杂任务。
什么是多机器人通信?¶
┌─────────────────────────────────────────────────────────────┐
│ 多机器人系统 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Robot 1 │ │ Robot 2 │ │ Robot 3 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ ROS Master │ │
│ │ (主节点) │ │
│ └───────────────┘ │
│ │
│ 优势: │
│ - 任务分配和并行执行 │
│ - 提高系统鲁棒性 │
│ - 扩展系统能力 │
│ │
│ 挑战: │
│ - 通信延迟和带宽 │
│ - 任务协调和同步 │
│ - 资源分配和冲突解决 │
│ │
└─────────────────────────────────────────────────────────────┘
多机器人通信架构¶
1. 集中式架构¶
┌─────────────────────────────────────────────────────────────┐
│ 集中式架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ │
│ │ 中央控制器 │ │
│ │ (Master) │ │
│ └───────┬───────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Robot 1 │ │ Robot 2 │ │ Robot 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ 优点: │
│ - 控制简单,易于管理 │
│ - 全局信息可见 │
│ │
│ 缺点: │
│ - 单点故障 │
│ - 通信瓶颈 │
│ - 扩展性差 │
│ │
└─────────────────────────────────────────────────────────────┘
2. 分布式架构¶
┌─────────────────────────────────────────────────────────────┐
│ 分布式架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Robot 1 │◄──►│ Robot 2 │◄──►│ Robot 3 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ 共享信息 │ │
│ │ (Blackboard) │ │
│ └───────────────┘ │
│ │
│ 优点: │
│ - 无单点故障 │
│ - 扩展性好 │
│ - 通信灵活 │
│ │
│ 缺点: │
│ - 协调复杂 │
│ - 信息不一致 │
│ - 冲突解决困难 │
│ │
└─────────────────────────────────────────────────────────────┘
ROS 中的多机器人通信¶
1. 使用 ROS Namespaces¶
每个机器人使用独立的命名空间。
<!-- Robot 1 -->
<launch>
<group ns="robot1">
<node name="camera" pkg="usb_cam" type="usb_cam_node" />
<node name="detector" pkg="yolo" type="detector_node" />
</group>
</launch>
<!-- Robot 2 -->
<launch>
<group ns="robot2">
<node name="camera" pkg="usb_cam" type="usb_cam_node" />
<node name="detector" pkg="yolo" type="detector_node" />
</group>
</launch>
2. 使用多 Master¶
每个机器人运行独立的 ROS Master。
# Robot 1
export ROS_MASTER_URI=http://robot1:11311
roslaunch robot1 bringup.launch
# Robot 2
export ROS_MASTER_URI=http://robot2:11311
roslaunch robot2 bringup.launch
3. 使用 rosbridge¶
使用 rosbridge 进行跨主机通信。
# 启动 rosbridge
roslaunch rosbridge_server rosbridge_websocket.launch
# 连接到其他 Master
rosrun rosbridge_server rosbridge_tcp.py
多机器人通信实现¶
1. 使用 rospy 进行跨主机通信¶
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String
def multi_robot_communication():
"""多机器人通信示例"""
# 初始化节点
rospy.init_node('multi_robot_comm', anonymous=True)
# 设置 ROS Master URI
# Robot 1: ROS_MASTER_URI=http://robot1:11311
# Robot 2: ROS_MASTER_URI=http://robot2:11311
# 发布者
pub = rospy.Publisher('/robot_status', String, queue_size=10)
# 订阅者
rospy.Subscriber('/robot_status', String, callback)
# 发送状态
rate = rospy.Rate(10)
while not rospy.is_shutdown():
status = "Robot 1: Position (1.0, 2.0)"
pub.publish(status)
rate.sleep()
def callback(data):
"""接收其他机器人的状态"""
rospy.loginfo(f"Received: {data.data}")
2. 使用 MultiMasterFKI¶
MultiMasterFKI 是一个用于多 Master 通信的 ROS 包。
# 安装
sudo apt install ros-noetic-multimaster-fki
# 配置
roslaunch multimaster_fki launch_multimaster.launch
3. 使用 zeroconf¶
zeroconf 是一个用于服务发现的协议。
多机器人任务分配¶
1. 集中式任务分配¶
#!/usr/bin/env python3
import rospy
from geometry_msgs.msg import Pose
class TaskAllocator:
def __init__(self):
self.robots = {}
self.tasks = []
def allocate_tasks(self):
"""分配任务给机器人"""
for task in self.tasks:
# 找到最近的可用机器人
best_robot = self.find_best_robot(task)
if best_robot:
# 分配任务
self.assign_task(best_robot, task)
def find_best_robot(self, task):
"""找到最适合的机器人"""
best_robot = None
min_distance = float('inf')
for robot_id, robot_info in self.robots.items():
if robot_info['status'] == 'idle':
distance = self.calculate_distance(
robot_info['position'],
task['position']
)
if distance < min_distance:
min_distance = distance
best_robot = robot_id
return best_robot
def assign_task(self, robot_id, task):
"""分配任务给机器人"""
self.robots[robot_id]['status'] = 'busy'
self.robots[robot_id]['current_task'] = task
2. 分布式任务分配¶
#!/usr/bin/env python3
import rospy
from std_msgs.msg import String
class DistributedAllocator:
def __init__(self):
self.robot_id = rospy.get_param('~robot_id')
self.neighbors = []
def negotiate_task(self, task):
"""与邻居机器人协商任务"""
# 发送任务提议
proposal = {
'task': task,
'robot_id': self.robot_id,
'bid': self.calculate_bid(task)
}
# 发布提议
self.proposal_pub.publish(String(str(proposal)))
# 等待响应
rospy.sleep(1.0)
# 选择最佳提议
return self.select_best_proposal()
多机器人编队控制¶
1. 领航-跟随者模式¶
#!/usr/bin/env python3
import rospy
from geometry_msgs.msg import Twist, Pose
class LeaderFollower:
def __init__(self, robot_id, leader_id=None):
self.robot_id = robot_id
self.leader_id = leader_id
self.is_leader = leader_id is None
def control(self):
"""编队控制"""
if self.is_leader:
# 领航者:执行任务
self.leader_control()
else:
# 跟随者:跟随领航者
self.follower_control()
def leader_control(self):
"""领航者控制"""
# 发布位置
self.publish_pose()
def follower_control(self):
"""跟随者控制"""
# 获取领航者位置
leader_pose = self.get_leader_pose()
# 计算跟随位置
target_pose = self.calculate_target_pose(leader_pose)
# 控制移动
self.move_to_target(target_pose)
2. 虚拟结构法¶
#!/usr/bin/env python3
import rospy
from geometry_msgs.msg import Pose
class VirtualStructure:
def __init__(self):
self.robots = []
self.virtual_structure = self.create_virtual_structure()
def create_virtual_structure(self):
"""创建虚拟结构"""
# 定义编队形状(例如:三角形)
structure = [
{'offset': (0, 0)}, # 领航者
{'offset': (-1, -1)}, # 左翼
{'offset': (1, -1)} # 右翼
]
return structure
def control(self):
"""控制编队"""
# 获取编队中心
center = self.get_formation_center()
# 计算每个机器人的目标位置
for i, robot in enumerate(self.robots):
target = self.calculate_target(
center,
self.virtual_structure[i]['offset']
)
# 控制机器人移动到目标位置
self.move_robot(robot, target)
多机器人 SLAM¶
1. 协作建图¶
#!/usr/bin/env python3
import rospy
from nav_msgs.msg import OccupancyGrid
class CollaborativeSLAM:
def __init__(self):
self.local_map = None
self.global_map = None
self.other_maps = {}
def merge_maps(self):
"""合并多个机器人的地图"""
# 获取所有机器人的地图
maps = [self.local_map]
maps.extend(self.other_maps.values())
# 合并地图
merged_map = self.fuse_maps(maps)
# 更新全局地图
self.global_map = merged_map
def fuse_maps(self, maps):
"""融合多个地图"""
# 使用贝叶斯融合
fused_map = maps[0].copy()
for map_msg in maps[1:]:
for i in range(len(map_msg.data)):
if map_msg.data[i] >= 0:
# 融合占用概率
fused_map.data[i] = self.bayesian_fusion(
fused_map.data[i],
map_msg.data[i]
)
return fused_map
2. 分布式 SLAM¶
# 每个机器人运行独立的 SLAM
# Robot 1
roslaunch robot1 slam.launch
# Robot 2
roslaunch robot2 slam.launch
# 地图融合节点
roslaunch multi_robot map_merge.launch
多机器人调试¶
1. 查看所有机器人状态¶
2. 可视化¶
在 RViz 中添加: - 所有机器人的模型 - 所有机器人的传感器数据 - 编队信息 - 任务分配信息
3. 日志分析¶
常见问题¶
1. 通信延迟¶
原因:网络带宽不足或通信距离过远
解决方案: - 使用更快的网络(WiFi 6、5G) - 减少数据传输量 - 使用压缩算法
2. 信息不一致¶
原因:分布式系统中的时钟不同步
解决方案: - 使用 NTP 同步时钟 - 使用时间戳标记数据 - 使用一致性算法
3. 任务冲突¶
原因:多个机器人同时执行同一任务
解决方案: - 使用任务锁定机制 - 使用分布式锁 - 使用协商算法
实验¶
Lab: 多机器人通信¶
- 启动多机器人系统
- 配置跨主机通信
- 测试消息传递
- 实现简单的任务分配