多机器人编队控制 (Multi-Robot Formation Control)¶
项目类型: 多机器人 | 难度: ★★☆☆☆ 至 ★★★★☆(取决于实现方法)| 预计时间: 2–4 个周末
1. 项目概述¶
多机器人编队控制(Multi-Robot Formation Control)是指协调一组自主移动机器人作为一个整体移动,同时保持特定几何形状(直线、三角形、菱形等)的问题。本项目涵盖三种逐步深入的控制范式:
┌──────────────────────────────────────────────────────────────┐
│ 编队控制 (俯视图) │
│ │
│ ◆ 领航机器人 │
│ │ │
│ ├─ ◆ 从机器人 1 (偏移量: -Δx, +Δy) │
│ │ │
│ └─ ◆ 从机器人 2 (偏移量: -2Δx, 0) │
│ │
│ 编队形状: 箭头 / 直线 / 三角形 / 菱形 │
│ │
│ 所有机器人跟踪相对于领航者的期望位置 │
└──────────────────────────────────────────────────────────────┘
| 层级 | 方法 | 核心思想 | 通信方式 |
|---|---|---|---|
| 传统 | Leader-Follower(领航-跟随) | 领航机器人轨迹定义编队路径;跟随者保持相对位姿 | 单向(领航者 → 跟随者) |
| 中级 | 共识 / 虚拟结构 | 图拉普拉斯算子驱动分布式编队几何一致性 | 双向通信 |
| 现代 | MARL(MAPPO / QMIX) | 神经网络通过 CTDE 学习协作编队策略 | 部分/无需通信 |
2. 硬件与软件需求¶
硬件¶
| 组件 | 规格 | 说明 |
|---|---|---|
| 机器人 | 3–5 台移动机器人(差速驱动) | 任意平台(TurtleBot、定 制等) |
| 计算单元 | 每台机器人配备板载计算机(Raspberry Pi 4 / Jetson Nano) | 或使用纯仿真 |
| 通信 | WiFi 路由器 / 以太网交换机 | 多机器人联网必需 |
| 激光雷达 | 每台机器人配 2D 激光雷达(可选) | RPLIDAR A1 或类似型号 |
| 相机 | 前向相机(可选) | 用于基于视觉的相对位姿估计 |
软件¶
| 软件包 | 版本 | 用途 |
|---|---|---|
| Python | ≥ 3.8 | 核心语言 |
| NumPy / SciPy | ≥ 1.20 / ≥ 1.7 | 矩阵运算、图论 |
| ROS 2 (Humble / Foxy) | ≥ Foxy | 多机器人框架,支持 namespace |
| NetworkX | ≥ 2.6 | 图的表示与计算 |
| PyTorch | ≥ 1.10 | 神经网络(MARL) |
| Ray [rllib] | ≥ 2.0 | 多智能体 RL 训练 |
| Gymnasium | ≥ 0.26 | RL 环境接口 |
| matplotlib | ≥ 3.5 | 可视化 |
pip install numpy scipy networkx torch gymnasium matplotlib
# 多智能体 RL:
pip install "ray[rllib]" pettingzoo
3. 编队几何基础¶
3.1 刚性编队¶
由 \(n\) 个机器人(位置为 \(p_1, \dots, p_n \in \mathbb{R}^2\))组成的编队,如果机器人间的距离能唯一确定形状(仅差旋转/平移),则称为**刚性编队**(Rigid Formation)。在 \(\mathbb{R}^2\) 中,刚性编队所需的最少距离约束数为:
其中 \(|E_{\min}|\) 是**最小刚性图**(Minimally Rigid Graph,Laman 图)中的最小边数。
3.2 常见编队形状¶
| 形状 | 机器人数量 | 边数(Laman) | 描述 |
|---|---|---|---|
| 直线 | n | 2n − 3 | 等间距,航向一致 |
| 三角形 | 3 | 3 | 等边,领航机器人在顶点 |
| 菱形 | 4 | 5 | 领航 + 3 个跟随者在顶点 |
4. 方法 A — 传统:领航-跟随式编队控制¶
4.1 基本思想¶
Leader-Follower(领航-跟随) 方法指定一个机器人为领航者,并为所有跟随者定义相对于领航者的期望位置。领航者沿预定轨迹或遥控轨迹运动;每个跟随者跟踪其相对于领航者的目标位置。
4.2 相对位姿控制律¶
设领航者的位姿为 \((x_L, y_L, \theta_L)\),第 \(i\) 个跟随者的位姿为 \((x_i, y_i, \theta_i)\)。跟随者 \(i\) 的期望位置为:
其中 \(R(\theta_L)\) 是 2D 旋转矩阵,\((\Delta x_i, \Delta y_i)\) 是跟随者在领航者体坐标系中的固定偏移量。
跟随者使用**比例控制器**到达目标位置 \((x_i^d, y_i^d)\):
4.3 完整 Python 代码¶
"""
Approach A: Leader-Follower Formation Control
==============================================
Simulates N robots in formation following a leader's trajectory.
Uses proportional position control for each follower.
"""
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import FancyArrowPatch
import matplotlib.animation as animation
# ─── Parameters ────────────────────────────────────────────────────
DT = 0.05 # Time step (s)
T = 30.0 # Total simulation time (s)
K_POS = 1.2 # Position gain (proportional controller)
K_HEAD = 2.0 # Heading gain
ROBOT_RADIUS = 0.2 # For visualization
# ─── Formation Definitions ─────────────────────────────────────────
# Each entry: (Δx_rel, Δy_rel) in leader's body frame
FORMATION_LINE = [
(-1.0, 0.0),
(-2.0, 0.0),
(-3.0, 0.0),
]
FORMATION_TRIANGLE = [
(-1.0, -1.0),
(-1.0, 1.0),
]
# Choose formation here:
DESIRED_OFFSETS = FORMATION_LINE
def rot2(theta: float) -> np.ndarray:
"""2D rotation matrix."""
c, s = np.cos(theta), np.sin(theta)
return np.array([[c, -s], [s, c]])
def normalize_angle(a: float) -> float:
return (a + np.pi) % (2 * np.pi) - np.pi
def leader_trajectory(t: float) -> tuple:
"""
Define the leader's path as a function of time.
Here: a circular path of radius 3m at speed 0.5 rad/s.
"""
omega = 0.5 # rad/s
x = 3.0 + 3.0 * np.cos(omega * t)
y = 3.0 + 3.0 * np.sin(omega * t)
theta = omega * t + np.pi / 2 # tangent direction
return x, y, theta
def desired_position(leader_pose: tuple, offset: tuple) -> np.ndarray:
"""Compute desired world-frame position for a follower."""
xL, yL, thetaL = leader_pose
dx, dy = offset
delta = rot2(thetaL) @ np.array([dx, dy])
return np.array([xL + delta[0], yL + delta[1]])
def follower_control(
current: np.ndarray,
desired: np.ndarray,
current_heading: float,
k_pos: float = K_POS,
k_head: float = K_HEAD,
) -> tuple:
"""
Proportional controller for differential-drive follower.
Returns: (v, omega) velocity commands
"""
ex = desired[0] - current[0]
ey = desired[1] - current[1]
dist = np.sqrt(ex**2 + ey**2)
# Desired heading toward target
theta_d = np.arctan2(ey, ex)
theta_e = normalize_angle(theta_d - current_heading)
# Linear and angular velocity commands
v = k_pos * dist
omega = k_head * theta_e
# Clamp velocities
v = max(0.0, min(2.0, v))
omega = max(-3.0, min(3.0, omega))
return v, omega
def simulate(n_steps: int, formation_offsets: list) -> dict:
"""Run the formation simulation."""
n_agents = 1 + len(formation_offsets) # 1 leader + followers
n_followers = len(formation_offsets)
# State: [x, y, theta] per agent
state = np.zeros((n_agents, 3))
state[0] = [3.0 + 3.0, 3.0, 0.0] # leader initial
trajectory = [state.copy()]
for step in range(n_steps):
t = step * DT
# ── Leader moves along trajectory ──
xL, yL, thetaL = leader_trajectory(t)
state[0, :2] = [xL, yL]
state[0, 2] = thetaL
leader_pose = (xL, yL, thetaL)
# ── Each follower tracks its desired position ──
for i, offset in enumerate(formation_offsets):
agent_idx = i + 1
desired_pos = desired_position(leader_pose, offset)
v, omega = follower_control(
current=state[agent_idx, :2],
desired=desired_pos,
current_heading=state[agent_idx, 2],
)
# Differential drive update
state[agent_idx, 0] += v * np.cos(state[agent_idx, 2]) * DT
state[agent_idx, 1] += v * np.sin(state[agent_idx, 2]) * DT
state[agent_idx, 2] += omega * DT
state[agent_idx, 2] = normalize_angle(state[agent_idx, 2])
trajectory.append(state.copy())
return {"trajectory": trajectory, "offsets": formation_offsets}
def plot_formation(trajectory: np.ndarray, offsets: list, save_path: str = None):
"""Plot the formation at the final time step."""
n_agents = trajectory.shape[1]
n_followers = len(offsets)
fig, ax = plt.subplots(figsize=(8, 8))
# Plot leader path
lx = trajectory[:, 0, 0]
ly = trajectory[:, 0, 1]
ax.plot(lx, ly, 'b--', alpha=0.4, label="Leader path")
# Plot agents at final time
final = trajectory[-1]
# Leader
ax.plot(final[0, 0], final[0, 1], 'b^', markersize=15, label="Leader")
ax.annotate("Leader",
(final[0, 0], final[0, 1]),
textcoords="offset points", xytext=(8, 8), fontsize=9)
# Followers
colors = ['red', 'green', 'orange', 'purple']
for i in range(n_followers):
idx = i + 1
ax.plot(final[idx, 0], final[idx, 1], 'o',
color=colors[i % len(colors)], markersize=12)
ax.annotate(f"F{i+1}",
(final[idx, 0], final[idx, 1]),
textcoords="offset points", xytext=(6, 6), fontsize=9)
# Draw formation line to leader
ax.plot([final[0, 0], final[idx, 0]],
[final[0, 1], final[idx, 1]],
'--', color='gray', alpha=0.5)
ax.set_xlabel("X (m)")
ax.set_ylabel("Y (m)")
ax.set_title(f"Leader-Follower Formation — {len(offsets)}+1 robots")
ax.legend()
ax.set_aspect('equal')
ax.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150)
plt.show()
def main():
n_steps = int(T / DT)
result = simulate(n_steps, DESIRED_OFFSETS)
trajectory = np.array(result["trajectory"])
print(f"Simulation complete: {len(trajectory)} steps, "
f"{trajectory.shape[1]} agents")
print(f"Final positions:")
for i, s in enumerate(trajectory[-1]):
label = "Leader" if i == 0 else f"Follower {i}"
print(f" {label}: x={s[0]:.2f}, y={s[1]:.2f}, θ={np.degrees(s[2]):.1f}°")
plot_formation(trajectory, DESIRED_OFFSETS)
if __name__ == "__main__":
main()
4.4 ROS 2 多机器人配置¶
<!-- launch/multi_robot_formation.launch -->
<launch>
<!-- Launch N robot instances with namespaced topics -->
<arg name="num_robots" default="4"/>
<!-- Robot 1 (Leader) -->
<group ns="robot1">
<include file="$(find my_robot)/launch/robot.launch"/>
<!-- Leader-specific: publishes formation leader pose -->
<node pkg="formation_control" type="leader_node" name="leader"/>
</group>
<!-- Robots 2..N (Followers) -->
<include file="$(find my_robot)/launch/follower.launch">
<arg name="robot_name" value="robot2"/>
<arg name="offset_x" value="-1.0"/>
<arg name="offset_y" value="0.0"/>
</include>
<include file="$(find my_robot)/launch/follower.launch">
<arg name="robot_name" value="robot3"/>
<arg name="offset_x" value="-2.0"/>
<arg name="offset_y" value="0.0"/>
</include>
</launch>
Topic 隔离重映射:
| 原始 Topic | 重映射 Topic | 说明 |
|---|---|---|
/cmd_vel |
/robot1/cmd_vel |
每个机器人独立 namespace |
/odom |
/robot1/odom |
|
/scan |
/robot1/scan |
5. 方法 B — 中级:基于共识的编队控制¶
5.1 基本思想¶
共识(Consensus)编队 使用图论将编队几何分布到所有机器人上。不存在单一领航者——每个机器人与邻居交换状态信息,通过局部交互共同达成编队形状的一致。
通信图拓扑 共识动力学
◆ ─ ◆ 每个机器人 i:
│ ╲ │ u_i = Σ a_ij (x_j - x_i)
◆ ─ ◆ ────────────────────
algebraic connectivity
5.2 图拉普拉斯算子¶
用**邻接矩阵** \(A = [a_{ij}]\) 和**度矩阵** \(D = \operatorname{diag}(d_i)\)(其中 \(d_i = \sum_j a_{ij}\))表示通信图。**图拉普拉斯算子**为:
对于期望偏移量为 \(\delta_i\) 的编队,共识协议为:
矩阵形式:
当图**连通**(代数连通度 \(\lambda_2(L) > 0\))时,编队收敛。
5.3 虚拟结构方法¶
将整个编队视为一个**刚性的虚拟结构**。所有机器人跟踪该结构运动时结构上的目标点:
其中 \(p_{\text{center}}(t)\) 和 \(R(t)\) 分别是结构的中心位置和旋转矩阵。
5.4 完整 Python 代码 — 共识编队¶
"""
Approach B: Consensus-Based Formation Control
==============================================
Distributed formation control using graph Laplacian and consensus protocol.
No leader — all robots exchange information with neighbors.
"""
import numpy as np
import matplotlib.pyplot as plt
import networkx as nx
# ─── Parameters ────────────────────────────────────────────────────
DT = 0.02 # Time step (s)
T = 20.0 # Total time (s)
N_ROBOTS = 5 # Number of robots
K_CONSENSUS = 1.5 # Consensus gain
# ─── Formation Offsets (in world frame, static) ────────────────────
# Define desired relative positions (relative to formation center)
OFFSETS = np.array([
[0.0, 0.0], # Robot 0 (formation center reference)
[1.0, 0.0], # Robot 1
[-1.0, 0.0], # Robot 2
[0.0, 1.0], # Robot 3
[0.0, -1.0], # Robot 4
], dtype=float)
# ─── Communication Graph ───────────────────────────────────────────
# Define who talks to whom
ADJACENCY = np.array([
[0, 1, 1, 0, 0],
[1, 0, 1, 1, 0],
[1, 1, 0, 0, 1],
[0, 1, 0, 0, 1],
[0, 0, 1, 1, 0],
], dtype=float)
# Build Laplacian: L = D - A
DEGREE = np.diag(ADJACENCY.sum(axis=1))
LAPLACIAN = DEGREE - ADJACENCY
print("Graph Laplacian:")
print(LAPLACIAN)
# Algebraic connectivity (smallest non-zero eigenvalue)
G_nx = nx.from_numpy_array(ADJACENCY)
eigenvalues = sorted(nx.laplacian_spectrum(G_nx))
algebraic_conn = eigenvalues[1] # First non-zero eigenvalue
print(f"Algebraic connectivity λ₂ = {algebraic_conn:.3f}")
def normalize_angle(a: float) -> float:
return (a + np.pi) % (2 * np.pi) - np.pi
def consensus_control(positions: np.ndarray, offsets: np.ndarray,
laplacian: np.ndarray) -> np.ndarray:
"""
Consensus protocol: each robot moves toward agreement on formation.
u_i = k * Σ a_ij * [(x_j - δ_j) - (x_i - δ_i)]
"""
n = positions.shape[0]
desired = positions - offsets # Desired if in formation
# Distributed disagreement: L @ (pos - offsets)
disagreement = laplacian @ (positions - offsets)
return K_CONSENSUS * disagreement
def moving_target(positions: np.ndarray, t: float, omega: float = 0.5) -> np.ndarray:
"""
Apply a common motion to all desired positions (formation moves as a whole).
Here: circular motion of the formation center.
"""
center = np.array([3.0 * np.cos(omega * t), 3.0 * np.sin(omega * t)])
return OFFSETS + center # Desired positions in world frame
def simulate_consensus(n_steps: int, initial_pos: np.ndarray) -> dict:
"""Simulate consensus-based formation."""
positions = initial_pos.copy()
positions_history = [positions.copy()]
for step in range(n_steps):
t = step * DT
desired_world = moving_target(positions, t)
offsets_dynamic = desired_world - OFFSETS + OFFSETS # recompute desired
desired_world = OFFSETS + np.array([
3.0 * np.cos(0.5 * t), 3.0 * np.sin(0.5 * t)
])
# Consensus input
u = consensus_control(positions, OFFSETS, LAPLACIAN)
positions += u * DT
positions_history.append(positions.copy())
return {"positions": np.array(positions_history), "offsets": OFFSETS}
def plot_consensus(positions_history: np.ndarray, offsets: np.ndarray,
save_path: str = None):
"""Visualize the consensus formation over time."""
fig, ax = plt.subplots(figsize=(8, 8))
n_robots = positions_history.shape[1]
colors = plt.cm.tab10(np.linspace(0, 1, n_robots))
# Plot trajectories
for i in range(n_robots):
traj = positions_history[:, i, :]
ax.plot(traj[:, 0], traj[:, 1], '-', color=colors[i], alpha=0.5, linewidth=1)
ax.plot(traj[-1, 0], traj[-1, 1], 'o', color=colors[i],
markersize=12, label=f"Robot {i}")
ax.annotate(f"R{i}", (traj[-1, 0], traj[-1, 1]),
textcoords="offset points", xytext=(5, 5), fontsize=9)
# Plot desired formation centers (at final time)
center = positions_history[-1, 0] # Robot 0 as reference
for i, offset in enumerate(offsets):
dx = offset[0] + (positions_history[-1, 0, 0] - offsets[0, 0])
dy = offset[1] + (positions_history[-1, 0, 1] - offsets[0, 1])
ax.plot(dx, dy, 's', color='gray', markersize=6, alpha=0.4)
ax.set_xlabel("X (m)")
ax.set_ylabel("Y (m)")
ax.set_title(f"Consensus Formation — {n_robots} robots, λ₂={algebraic_conn:.2f}")
ax.legend()
ax.set_aspect('equal')
ax.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=150)
plt.show()
def main():
n_steps = int(T / DT)
# Random initial positions
np.random.seed(42)
initial_pos = np.random.uniform(0, 6, size=(N_ROBOTS, 2)).astype(float)
result = simulate_consensus(n_steps, initial_pos)
positions_history = result["positions"]
print(f"Consensus simulation: {positions_history.shape[0]} steps, "
f"{positions_history.shape[1]} robots")
print(f"Algebraic connectivity λ₂ = {algebraic_conn:.3f} (graph is connected: {algebraic_conn > 0})")
# Final formation errors
final_pos = positions_history[-1]
center = final_pos[0]
for i in range(N_ROBOTS):
error = np.linalg.norm(final_pos[i] - (center + OFFSETS[i]))
print(f" Robot {i}: pos={final_pos[i]}, formation error={error:.3f}m")
plot_consensus(positions_history, OFFSETS)
if __name__ == "__main__":
main()
6. 方法 C — 现代:多智能体强化学习(MARL)¶
6.1 基本思想¶
MARL 用于编队控制时,训练一个网络策略,使多个机器人在无需显式通信的情况下学会协作。我们使用**集中式训练 + 分布式执行(CTDE)** 框架:
集中式训练(拥有全局信息)
┌─────────────────────────────────────┐
│ 所有机器人的观测 + │
│ Critic 可访问完整状态 │
└────────────────┬────────────────────┘
│ 训练
┌────────────────▼────────────────────┐
│ 分布式执行(测试时) │
│ 每个 Actor 仅看到本地观测 │
│ 机器人之间无通信 │
└─────────────────────────────────────┘
6.2 QMIX — 价值分解¶
QMIX 将**联合动作价值函数** \(Q_{\text{tot}}\) 分解为各智能体函数 \(Q_a\),通过**混合网络**实现,并强制满足**个体全局最优(IGOs)**约束:
混合网络使用具有单调性约束的**超网络**:
这保证了最大化 \(Q_{\text{tot}}\) 等价于每个智能体独立最大化自身的 \(Q_a\)。
6.3 MAPPO — 多智能体 PPO¶
MAPPO 将 PPO 应用于 CTDE 框架。每个智能体 \(i\) 拥有策略 \(\pi_{\theta_i}\),训练目标是最大化:
集中式 Critic \(V_\phi(s)\) 观察完整状态 \(s\);分布式 Actor \(\pi_{\theta_i}(o_i)\) 仅接收本地观测 \(o_i\)。
6.4 PettingZoo / rllib 代码 — MARL 编队¶
"""
Approach C: Multi-Agent RL Formation Control
=============================================
Uses rllib (Ray) with a custom multi-agent environment for formation control.
Trains 5 robots to maintain relative distances using QMIX-style value decomposition.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from ray.rllib.algorithms.maddpg import MADDPGAgent
from ray.tune import register_env
from ray import tune
class FormationEnv(gym.Env):
"""
Multi-agent formation environment.
Each agent observes: [self_pos_x, self_pos_y, rel_pos_to_others, target_offset_x, target_offset_y]
Actions: [v_x_cmd, v_y_cmd] (velocity commands)
Reward: negative formation error + movement penalty
"""
metadata = {"render_modes": ["human"]}
def __init__(self, config=None):
super().__init__()
self.n_agents = config.get("n_agents", 5)
self.dt = 0.1
self.max_steps = 500
# Observation: self pos (2) + relative to others (2*(n-1)) + target offset (2)
obs_dim = 2 + 2 * (self.n_agents - 1) + 2
self.observation_space = spaces.Box(
low=-10.0, high=10.0, shape=(obs_dim,), dtype=np.float32
)
# Action: 2D velocity command
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
self.positions = None
self.target_offsets = None
self.step_count = 0
def _compute_offsets(self) -> np.ndarray:
"""Desired formation offsets (line formation)."""
offsets = np.zeros((self.n_agents, 2))
for i in range(1, self.n_agents):
offsets[i] = [-1.0 * i, 0.0]
return offsets
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.step_count = 0
# Randomize starting positions
rng = np.random.default_rng(seed)
self.positions = rng.uniform(0, 5, size=(self.n_agents, 2)).astype(np.float32)
# Formation offsets (leader at front, others behind)
self.target_offsets = self._compute_offsets()
return self._get_obs(), self._get_obs_as_dict()
def _get_obs(self) -> np.ndarray:
"""Return flat observation array (used by MADDPG)."""
obs_list = []
for i in range(self.n_agents):
obs_i = [self.positions[i, 0], self.positions[i, 1]]
# Relative positions to other agents
for j in range(self.n_agents):
if j != i:
obs_i.extend([
self.positions[j, 0] - self.positions[i, 0],
self.positions[j, 1] - self.positions[i, 1],
])
# Target offset from formation center
center = np.mean(self.positions, axis=0)
obs_i.extend([
center[0] + self.target_offsets[i, 0] - self.positions[i, 0],
center[1] + self.target_offsets[i, 1] - self.positions[i, 1],
])
obs_list.append(np.array(obs_i, dtype=np.float32))
return np.array(obs_list)
def _get_obs_as_dict(self) -> dict:
"""Return dict observation (for rllib multi-agent API)."""
obs = self._get_obs()
return {f"agent_{i}": obs[i] for i in range(self.n_agents)}
def step(self, actions):
self.step_count += 1
# Convert dict or array actions
if isinstance(actions, dict):
actions_list = [actions[f"agent_{i}"] for i in range(self.n_agents)]
else:
actions_list = actions
actions_np = np.array(actions_list, dtype=np.float32)
# Apply velocity commands
for i in range(self.n_agents):
self.positions[i] += actions_np[i] * 0.5 * self.dt
# Clamp positions
self.positions[i] = np.clip(self.positions[i], -10, 10)
# ── Reward: formation maintenance ──
center = np.mean(self.positions, axis=0)
rewards = []
for i in range(self.n_agents):
desired = center + self.target_offsets[i]
error = np.linalg.norm(self.positions[i] - desired)
# Reward: high when error is low, slight penalty for large movements
r = -error + 0.1 * (1.0 - np.linalg.norm(actions_np[i]))
rewards.append(float(r))
rewards_dict = {f"agent_{i}": rewards[i] for i in range(self.n_agents)}
# Termination
terminated = {f"agent_{i}": False for i in range(self.n_agents)}
terminated["__all__"] = self.step_count >= self.max_steps
truncated = {f"agent_{i}": False for i in range(self.n_agents)}
truncated["__all__"] = False
obs = self._get_obs_as_dict()
info = {}
return obs, rewards_dict, terminated, truncated, info
# ─── Registration & Training ────────────────────────────────────────
register_env("formation_env", lambda cfg: FormationEnv(cfg))
def train_maddpg(n_timesteps: int = 200_000):
"""Train MADDPG agents on the formation task."""
config = {
"env": "formation_env",
"env_config": {"n_agents": 5},
"num_workers": 2,
"gamma": 0.99,
"actor_lr": 1e-4,
"critic_lr": 1e-3,
"train_batch_size": 1024,
"model": {
"fcnet_hiddens": [128, 128],
},
"framework": "torch",
}
print(f"[INFO] Training MADDPG for {n_timesteps} timesteps...")
results = tune.run(
"MADDPG",
config=config,
stop={"timesteps_total": n_timesteps},
checkpoint_at_end=True,
)
best = results.get_best_result(metric="episode_reward_mean", mode="max")
print(f"[INFO] Best checkpoint: {best.checkpoint.path}")
return best
def evaluate_agent(checkpoint_path: str, n_episodes: int = 5):
"""Evaluate a trained formation policy."""
from ray.rllib.algorithms.maddpg import MADDPGAgent
env = FormationEnv({"n_agents": 5})
agent = MADDPGAgent(config={"env_config": {"n_agents": 5}, "framework": "torch"})
agent.restore(checkpoint_path)
for ep in range(n_episodes):
obs, _ = env.reset()
done = False
ep_reward = 0.0
while not done:
actions = {}
for i in range(env.n_agents):
action, _ = agent.compute_action(obs[f"agent_{i}"], policy_id=f"agent_{i}")
actions[f"agent_{i}"] = action
obs, rewards, terminated, truncated, info = env.step(actions)
ep_reward += sum(rewards.values())
done = terminated["__all__"] or truncated["__all__"]
print(f" Episode {ep+1}: reward={ep_reward:.1f}")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "eval":
evaluate_agent(sys.argv[2] if len(sys.argv) > 2 else None)
else:
train_maddpg()
7. 编队方法对比¶
| 指标 | Leader-Follower | 共识 / 虚拟结构 | MARL (MADDPG/QMIX) |
|---|---|---|---|
| 可扩展性 | ★★★☆☆(单点故障) | ★★★★☆ | ★★★★★ |
| 容错性 | ★★☆☆☆ | ★★★★☆ | ★★★★☆ |
| 编队刚性 | ✅ 显式(固定偏移量) | ✅ 图论刚性 | ✅ 从奖励函数中涌现 |
| 通信需求 | 单向(领航者 → 跟随者) | 双向(all-to-all) | 无 / 部分(CTDE) |
| 收敛速度 | 快(由领航者驱动) | 中等(分布式) | 慢(需要训练) |
| 避障能力 | ❌ 需要重规划 | ❌ 需要碰撞避免层 | ✅ 可学习反应式行为 |
| 搭建复杂度 | 低 | 中 | 高 |
| 计算成本 | 很低 | 低 | 高(训练 + 推理) |
| 适应性 | 仅支持固定编队 | 可重配置(更改拓扑) | 可学习新编队 |
| 最佳场景 | 简单、小规模编队 | 结构化环境 | 复杂、大规模编队 |
8. 分步实现指南¶
阶段 1 — 单机器人遥控(所有方法)¶
- 在 ROS 2 中配置每台机器人,验证
cmd_vel/odomTopic 正常。 - 使用
teleop_twist_keyboard测试每台机器人的单独遥控。 - 配置 WiFi 网络,验证机器人间网络互通。
阶段 2 — ROS 2 Namespace 配置¶
# Launch robots with separate namespaces
ros2 launch multi_robot_bringup robot_multi.launch.py num_robots:=4
# Verify topics are namespaced:
ros2 topic list | grep robot
# Expected: /robot1/cmd_vel, /robot2/cmd_vel, ...
阶段 3 — Leader-Follower(方法 A)¶
- 实现领航者轨迹发布节点。
- 添加跟随者控制器节点,订阅
/robot1/pose。 - 在参数文件中配置每个机器人的偏移量。
- 先在仿真中测试,再部署到实物。
阶段 4 — 共识编队(方法 B)¶
- 定义通信拓扑(Mesh、Chain、Star)。
- 使用
networkx拉普拉斯算子实现共识协议。 - 通过 ROS 2 Topic 实现机器人间通信(
/robot{i}/state)。 - 先测试静态编队,再添加移动目标。
阶段 5 — MARL(方法 C)¶
- 安装 Ray:
pip install ray[rllib] - 在仿真中运行编队环境:
python formation_rl.py - 训练约 20 万步(或使用预训练 checkpoint)。
- 通过 ROS 2 将训练好的策略部署到实物机器人。
9. 扩展与变体¶
9.1 编队形状动态切换¶
通过更改偏移量动态切换编队形状:
SHAPES = {
"line": [(-i, 0) for i in range(n)],
"triangle": [(0,0), (-1,-1), (-1,1)],
"diamond": [(0,0), (0,-1), (1,0), (0,1)],
}
# 通过 ROS 参数或 Service Call 触发形状切换
9.2 编队中的避障¶
在共识协议中添加**势场层**:
9.3 动态编队规模¶
使用**图论刚性**验证 \(n\) 个机器人、\(m\) 条边的编队是否为最小刚性:
10. 参考资料¶
- Ren, W., & Beard, R.W. (2005). Consensus Seeking in Multiagent Systems Under Dynamically Changing Interaction Topologies. IEEE Transactions on Automatic Control, 50(5), 655–661.
- Jadbabaie, A., Lin, J., & Morse, A.S. (2003). Coordination of Groups of Mobile Autonomous Agents Using Nearest Neighbor Rules. IEEE Transactions on Automatic Control, 48(6), 988–1001.
- Olfati-Saber, R., & Murray, R.M. (2004). Consensus Problems in Networks of Agents With Switching Topology and Time-Delays. IEEE Transactions on Automatic Control, 49(9), 1520–1533.
- Desai, J.P., Ostrowski, J.P., & Kumar, V. (2001). Modeling and Control of Formations of Nonholonomic Mobile Robots. IEEE Transactions on Robotics and Automation, 17(6), 905–908.
- Low, C.B. (2019). A Flexible Hybrid Formation Control for Mobile Robots Using Reinforcement Learning. IEEE Robotics and Automation Letters.
- Lowe, R., Wu, Y., et al. (2017). Multi-Agent Actor-Critic for Mixed Cooperative-Competitive Environments. NeurIPS.
- Rashid, T., Samvelov, M., et al. (2018). QMIX: Monotonic Value Function Factorisation for Deep Multi-Agent Reinforcement Learning. ICML.
- Yu, C., et al. (2022). The Surprising Effectiveness of PPO in Cooperative Multi-Agent Reinforcement Learning. ICML.
- Peterson, F., et al. (2023). Networked Multi-Robot Systems. ACM Computing Surveys.
- Ray/rllib Documentation — 多智能体 RL 框架
- NetworkX Laplacian Documentation — 图论工具
- ROS 2 多机器人文档 — Namespace 与重映射