移动机器人避障 (Obstacle Avoidance for Mobile Robots)¶
项目类型: 导航 | 难度: ★★☆☆☆ 到 ★★★★★(取决于方法) | 预计时间: 2–4 个周末
1. 项目概述¶
**障碍物避障**是移动机器人在环境中安全导航而不与障碍物碰撞的能力。与假设完全地图知识的全局路径规划不同,避障对环境中**感知到的障碍物**进行实时反应,使其成为动态、部分已知或未知环境中的关键技术。
┌──────────────────────────────────────────────────────────┐
│ 障碍物避障 │
│ │
│ 目标 │
│ ★ │
│ │ │
│ │ ┌──────┐ │
│ │ │ 障碍 │ 机器人避障行为: │
│ │ └──────┘ 1. 检测前方障碍物 │
│ │ \ │
│ │ ╲ │
│ ▼ ◄────── 机器人绕行 │
│ 起点 │
└──────────────────────────────────────────────────────────┘
在本项目中,您将探索**三种逐步复杂的方法**:
| 层次 | 方法 | 技术 | 传感器 |
|---|---|---|---|
| 层次 1 | Bug 算法和 VFH | 几何方法,反应式 | 近距离传感器、LiDAR |
| 层次 2 | 动态窗口法 (DWA) | 速度空间优化 | 2D LiDAR |
| 层次 3 | 深度强化学习 (PPO) | 基于学习,端到端 | 模拟传感器阵列 |
2. 硬件与软件需求¶
硬件¶
| 组件 | 规格 | 备注 |
|---|---|---|
| 底盘 | 双轮差速驱动 | 任意移动机器人平台 |
| 电机 | 2× 带编码器的直流齿轮电机 | 用于里程计 |
| LiDAR | RPLIDAR A1/A2 或类似 | 360° 2D 激光扫描仪 |
| (替代) | 超声波/红外传感器阵列 | 用于层次 1 方法 |
| 微控制器 | 树莓派 4 / Jetson Nano | 用于板载计算 |
| 电池 | 12V 锂聚合物电池或 USB-C 移动电源 | 为所有组件供电 |
软件¶
| 软件包 | 版本 | 用途 |
|---|---|---|
| Python | ≥ 3.8 | 核心语言 |
| NumPy | ≥ 1.20 | 数值计算 |
| OpenCV | ≥ 4.5 | 传感器处理 |
| ROS 2 | Humble / Iron | 机器人中间件 |
| Stable-Baselines3 | ≥ 1.7 | PPO 实现 |
| Gymnasium | ≥ 0.28 | 强化学习环境接口 |
| PyTorch | ≥ 1.13 | 神经网络训练 |
3. 层次 1 — 传统方法:Bug 算法与向量场直方图 (VFH)¶
3.1 Bug 算法¶
**Bug 算法**是最简单的反应式障碍物避障方法。机器人仅使用本地感知并遵循两条简单规则:
算法规则¶
- 指向目标方向 (m 线):沿直线朝向目标移动,直到遇到障碍物。
- 离开圆 (Leave-Circle):离开障碍物时,沿最短路径朝目标方向前进。
- 跟随墙壁 (Wall-Following):当障碍物阻挡路径时,沿障碍物边界移动,直到可以重新沿 m 线移动。
数学公式¶
Bug 算法可形式化为:
其中 \((x_g, y_g)\) 是目标位置,\((x_r, y_r)\) 是机器人位置。
碰撞点检测:当到任意障碍物的距离 \(d < d_{\text{threshold}}\) 时,记录一个碰撞点。
离开点条件:当满足以下条件时,机器人离开障碍物:
3.2 Bug 算法 — 完整 Python 代码¶
"""
Bug Algorithm — Reactive Obstacle Avoidance
============================================
A simple geometry-based obstacle avoidance algorithm.
Requires: numpy, matplotlib (for visualization)
"""
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
# ─── Configuration ──────────────────────────────────────────────
GOAL = np.array([8.0, 8.0]) # Goal position (x, y)
START = np.array([0.5, 0.5]) # Start position
ROBOT_RADIUS = 0.2 # Robot collision radius (m)
GOAL_THRESHOLD = 0.3 # Distance to goal to consider reached
OBSTACLE_THRESHOLD = 0.5 # Distance to detect obstacle
WALL_FOLLOW_GAIN = 1.0 # Gain for wall following direction
ROBOT_SPEED = 0.1 # Forward speed (m/s)
# ─── Environment Definition ─────────────────────────────────────
def create_environment():
"""
Create a simple environment with rectangular obstacles.
Returns a list of (cx, cy, width, height) tuples.
"""
obstacles = [
(3.0, 3.0, 1.5, 1.5), # Center (3,3), size 1.5x1.5
(5.5, 5.5, 1.0, 2.0), # Center (5.5, 5.5), size 1.0x2.0
(6.5, 2.5, 2.0, 1.0), # Center (6.5, 2.5), size 2.0x1.0
]
return obstacles
def is_collision(point: np.ndarray, obstacles: list) -> bool:
"""
Check if a point collides with any obstacle.
Uses expanded obstacles to account for robot radius.
"""
for (cx, cy, w, h) in obstacles:
# Expanded rectangle (add robot radius)
hw, hh = w/2 + ROBOT_RADIUS, h/2 + ROBOT_RADIUS
if (cx - hw <= point[0] <= cx + hw and
cy - hh <= point[1] <= cy + hh):
return True
return False
def get_obstacle_distance(point: np.ndarray, obstacles: list) -> float:
"""
Compute minimum distance to any obstacle.
Returns infinity if no obstacles nearby.
"""
min_dist = np.inf
for (cx, cy, w, h) in obstacles:
# Distance to rectangle edge
hw, hh = w/2, h/2
dx = max(abs(point[0] - cx) - hw, 0)
dy = max(abs(point[1] - cy) - hh, 0)
dist = np.sqrt(dx**2 + dy**2)
min_dist = min(min_dist, dist)
return min_dist
def get_gradient_toward_goal(point: np.ndarray) -> np.ndarray:
"""
Compute unit gradient vector pointing toward the goal.
"""
direction = GOAL - point
distance = np.linalg.norm(direction)
if distance < 1e-6:
return np.array([0.0, 0.0])
return direction / distance
def get_wall_follow_direction(point: np.ndarray,
obstacles: list,
prev_direction: np.ndarray) -> np.ndarray:
"""
Compute wall-following direction using right-hand rule.
Keeps the robot hugging the right side of obstacles.
"""
# Sample points around the robot to find obstacle boundary
num_samples = 16
angles = np.linspace(0, 2*np.pi, num_samples, endpoint=False)
distances = np.array([
get_obstacle_distance(point + 0.3 * np.array([np.cos(a), np.sin(a)]), obstacles)
for a in angles
])
# Find the direction with maximum clearance (most open direction)
# but prefer directions that maintain forward progress
best_idx = np.argmax(distances)
best_angle = angles[best_idx]
best_dist = distances[best_idx]
# Combine: prefer open space but also forward direction
goal_dir = get_gradient_toward_goal(point)
goal_angle = np.arctan2(goal_dir[1], goal_dir[0])
# Weighted combination of open direction and goal direction
open_dir = np.array([np.cos(best_angle), np.sin(best_angle)])
combined = 0.6 * open_dir + 0.4 * goal_dir
return combined / (np.linalg.norm(combined) + 1e-6)
def bug_algorithm_step(point: np.ndarray,
mode: str,
hit_point: np.ndarray,
prev_direction: np.ndarray,
obstacles: list) -> tuple:
"""
Execute one step of the Bug algorithm.
Returns: (new_point, new_mode, hit_point, trajectory)
"""
# Check if goal reached
if np.linalg.norm(point - GOAL) < GOAL_THRESHOLD:
return point, "goal_reached", hit_point, point
# Get distance to nearest obstacle
dist_to_obstacle = get_obstacle_distance(point, obstacles)
# State machine transitions
if mode == "go_to_goal":
if dist_to_obstacle < OBSTACLE_THRESHOLD:
# Hit obstacle - switch to wall following
mode = "wall_follow"
hit_point = point.copy()
else:
# Move toward goal
direction = get_gradient_toward_goal(point)
new_point = point + ROBOT_SPEED * direction
if is_collision(new_point, obstacles):
new_point = point # Stay in place if collision
return new_point, mode, hit_point, new_point
elif mode == "wall_follow":
# Check leave condition
dist_to_goal_from_hit = np.linalg.norm(GOAL - hit_point)
dist_to_goal_from_current = np.linalg.norm(GOAL - point)
dist_from_hit = np.linalg.norm(point - hit_point)
# Leave if we're closer to goal than when we hit
if (dist_from_hit > dist_to_goal_from_hit and
dist_to_goal_from_current < dist_to_goal_from_hit + 0.5):
# Can head back to m-line (direct path to goal)
mode = "go_to_goal"
direction = get_gradient_toward_goal(point)
else:
# Follow wall
direction = get_wall_follow_direction(point, obstacles, prev_direction)
new_point = point + ROBOT_SPEED * direction
if is_collision(new_point, obstacles):
new_point = point
return new_point, mode, hit_point, new_point
return point, mode, hit_point, point
def run_bug_simulation():
"""Run Bug algorithm simulation and visualize."""
obstacles = create_environment()
point = START.copy()
mode = "go_to_goal"
hit_point = None
trajectory = [point.copy()]
prev_direction = np.array([0.0, 1.0])
fig, ax = plt.subplots(figsize=(10, 10))
ax.set_xlim(-1, 10)
ax.set_ylim(-1, 10)
ax.set_aspect('equal')
ax.set_xlabel('X (m)')
ax.set_ylabel('Y (m)')
ax.set_title('Bug Algorithm — Obstacle Avoidance')
# Draw obstacles
for (cx, cy, w, h) in obstacles:
rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
color='gray', alpha=0.7)
ax.add_patch(rect)
# Draw goal
ax.plot(GOAL[0], GOAL[1], 'g*', markersize=20, label='Goal')
# Simulation loop
for step in range(1000):
point, mode, hit_point, new_pos = bug_algorithm_step(
point, mode, hit_point, prev_direction, obstacles
)
trajectory.append(point.copy())
if mode == "goal_reached":
print(f"[INFO] Goal reached in {step} steps!")
break
# Plot trajectory
trajectory = np.array(trajectory)
ax.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Path')
ax.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10, label='Start')
ax.legend()
ax.grid(True, alpha=0.3)
plt.savefig("bug_algorithm_result.png", dpi=150)
plt.show()
print(f"[INFO] Final position: {point}, Distance to goal: {np.linalg.norm(point - GOAL):.3f}")
if __name__ == "__main__":
run_bug_simulation()
3.3 向量场直方图 (VFH)¶
VFH (Koren & Borenstein, 1991) 创建环境的**极坐标直方图**,并选择朝向目标的最平滑方向。
算法步骤¶
- 构建笛卡尔直方图网格:将机器人工作空间划分为带有占据值的单元格。
- 计算极坐标直方图:转换为以机器人为中心的极坐标:
其中 \(c_{ij}\) 是单元格 \((i,j)\) 的占据值,\(\beta_{ij}\) 是到该单元格的角度,\(\sigma\) 控制平滑程度。
- 选择山谷:识别宽度足够机器人通过的"山谷"(\(k\) 值范围小)。
- 转向命令:选择最接近目标方向的山谷。
3.4 VFH — 完整 Python 代码¶
"""
Vector Field Histogram (VFH) — Obstacle Avoidance
===================================================
A histogram-based reactive navigation algorithm.
Requires: numpy, matplotlib
"""
import numpy as np
import matplotlib.pyplot as plt
# ─── Configuration ──────────────────────────────────────────────
GOAL = np.array([8.0, 8.0]) # Goal position
ROBOT_POS = np.array([0.5, 0.5]) # Current position
ROBOT_RADIUS = 0.2 # Robot radius (m)
VFH_CELL_SIZE = 0.2 # Grid cell size (m)
VFH_SENSOR_RANGE = 3.0 # Sensor range (m)
VFH_NUM_SECTORS = 360 # Number of polar sectors
VFH_A_THRESHOLD = 0.35 # Obstacle threshold (0-1)
VFH_SECOND_A_THRESHOLD = 0.15 # Tight threshold for narrow passages
WALL_SMOOTHING = 0.35 # Smoothing parameter sigma
ROBOT_SPEED = 0.08 # Forward speed
MAX_STEERING_RATE = 0.5 # Max steering change per step
# ─── Simulated Sensor Reading ─────────────────────────────────────
def simulate_lidar(position: np.ndarray, obstacles: list, num_readings: int = 360) -> np.ndarray:
"""
Simulate LiDAR scan. Returns array of distances in each direction.
"""
angles = np.linspace(0, 2*np.pi, num_readings, endpoint=False)
distances = np.full(num_readings, VFH_SENSOR_RANGE)
for i, angle in enumerate(angles):
direction = np.array([np.cos(angle), np.sin(angle)])
for dist in np.linspace(0.1, VFH_SENSOR_RANGE, 50):
test_point = position + dist * direction
for (cx, cy, w, h) in obstacles:
hw, hh = w/2, h/2
if (cx - hw <= test_point[0] <= cx + hw and
cy - hh <= test_point[1] <= cy + hh):
distances[i] = min(distances[i], dist)
break
return distances
def compute_polar_histogram(lidar_distances: np.ndarray) -> np.ndarray:
"""
Convert LiDAR distances to polar histogram.
Higher values mean more obstacle presence.
"""
n = len(lidar_distances)
histogram = np.zeros(n)
for k in range(n):
# Each sector corresponds to one lidar reading
d = lidar_distances[k]
if d < VFH_SENSOR_RANGE:
# Active cells closer to robot contribute more
m_k = 1.0 - (d / VFH_SENSOR_RANGE)
histogram[k] = m_k ** 2 # Square for sharper response
else:
histogram[k] = 0.0
# Smooth the histogram
smoothed = np.zeros_like(histogram)
for k in range(n):
for s in range(-8, 9): # ±8 sectors smoothing window
idx = (k + s) % n
weight = np.exp(-(s * WALL_SMOOTHING) ** 2 / 2)
smoothed[k] += histogram[idx] * weight
return smoothed
def find_valleys(histogram: np.ndarray, threshold: float) -> list:
"""
Find valleys (open passages) in the polar histogram.
A valley is a consecutive sequence of sectors below threshold.
"""
n = len(histogram)
valleys = []
in_valley = False
start_idx = 0
for k in range(n):
if histogram[k] < threshold and not in_valley:
in_valley = True
start_idx = k
elif histogram[k] >= threshold and in_valley:
in_valley = False
valleys.append((start_idx, k - 1))
if in_valley:
valleys.append((start_idx, n - 1))
return valleys
def select_steering_direction(valleys: list, goal_angle: float, prev_direction: float) -> float:
"""
Select the best steering direction from available valleys.
Prefers directions close to the goal.
"""
if not valleys:
# No valley found - rotate in place
return prev_direction + MAX_STEERING_RATE
# Find valley closest to goal angle
best_valley = None
best_score = np.inf
for (start, end) in valleys:
valley_center = (start + end) / 2
if start == 0 and end == len(valleys) > 0:
# Handle wrap-around
valley_center = ((end + start + len(valleys)) / 2) % 360
# Angular difference to goal
diff = abs(valley_center - goal_angle)
diff = min(diff, 360 - diff) # Handle wrap-around
if diff < best_score:
best_score = diff
best_valley = valley_center
return np.deg2rad(best_valley)
def vfh_step(position: np.ndarray,
heading: float,
obstacles: list) -> tuple:
"""
Execute one VFH step.
Returns: (new_position, new_heading)
"""
# 1. Get LiDAR scan
lidar_distances = simulate_lidar(position, obstacles)
# 2. Compute polar histogram
histogram = compute_polar_histogram(lidar_distances)
# 3. Find valleys
threshold = VFH_A_THRESHOLD
valleys = find_valleys(histogram, threshold)
# 4. Compute goal direction
direction_to_goal = GOAL - position
goal_angle = np.rad2deg(np.arctan2(direction_to_goal[1], direction_to_goal[0]))
if goal_angle < 0:
goal_angle += 360
# 5. Select steering
desired_heading = select_steering_direction(valleys, goal_angle, heading)
# 6. Limit steering rate
heading_diff = desired_heading - heading
while heading_diff > np.pi:
heading_diff -= 2*np.pi
while heading_diff < -np.pi:
heading_diff += 2*np.pi
heading += np.clip(heading_diff, -MAX_STEERING_RATE, MAX_STEERING_RATE)
# 7. Move forward
new_position = position + ROBOT_SPEED * np.array([np.cos(heading), np.sin(heading)])
return new_position, heading, histogram
def run_vfh_simulation():
"""Run VFH simulation and visualize."""
obstacles = [
(3.0, 3.0, 1.5, 1.5),
(5.5, 5.5, 1.0, 2.0),
(6.5, 2.5, 2.0, 1.0),
]
position = ROBOT_POS.copy()
heading = np.arctan2(GOAL[1] - position[1], GOAL[0] - position[0])
trajectory = [position.copy()]
all_histograms = []
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
for step in range(1500):
position, heading, histogram = vfh_step(position, heading, obstacles)
trajectory.append(position.copy())
all_histograms.append(histogram)
if np.linalg.norm(position - GOAL) < 0.3:
print(f"[INFO] VFH: Goal reached in {step} steps!")
break
trajectory = np.array(trajectory)
# Plot trajectory
ax1 = axes[0]
ax1.set_xlim(-1, 10)
ax1.set_ylim(-1, 10)
ax1.set_aspect('equal')
ax1.set_title('VFH — Trajectory')
for (cx, cy, w, h) in obstacles:
rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
color='gray', alpha=0.7)
ax1.add_patch(rect)
ax1.plot(GOAL[0], GOAL[1], 'g*', markersize=20)
ax1.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=1.5)
ax1.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10)
ax1.grid(True, alpha=0.3)
# Plot final histogram
ax2 = axes[1]
final_hist = all_histograms[-1]
angles = np.linspace(0, 360, len(final_hist), endpoint=False)
ax2.fill(np.deg2rad(angles), final_hist, alpha=0.7, color='blue')
ax2.set_theta_zero_location('E')
ax2.set_theta_direction(1)
ax2.set_title('VFH — Final Polar Histogram')
ax2.set_rlim(0, 1)
plt.tight_layout()
plt.savefig("vfh_result.png", dpi=150)
plt.show()
print(f"[INFO] Final distance to goal: {np.linalg.norm(position - GOAL):.3f}")
if __name__ == "__main__":
run_vfh_simulation()
3.5 Bug 与 VFH 对比¶
| 特性 | Bug 算法 | VFH |
|---|---|---|
| 复杂度 | 非常低 | 中等 |
| 平滑性 | 抖动(方向突变) | 平滑转向 |
| 传感器需求 | 简单近距离传感器 | 360° LiDAR |
| 局部最小值 | 可能陷入循环 | 更好地避免局部最小值 |
| 参数调整 | 参数少 | 多参数(阈值、平滑度) |
4. 层次 2 — 中级:动态窗口法 (DWA)¶
4.1 概念¶
动态窗口法 (Fox et al., 1997) 将避障问题表述为**速度空间**中的优化问题。DWA 不直接计算转向方向,而是:
- 在动态可行范围内采样速度空间 \((v, \omega)\)
- 为每个速度样本模拟轨迹,时间跨度很短
- 选择使目标函数最大化的速度
4.2 数学公式¶
机器人动力学¶
动态窗口¶
动态窗口将速度限制为:
目标函数¶
- heading:朝向目标方向的进度(越高越好)
- dist:与障碍物的距离(越高越好)
- velocity:前进速度(越高越好)
4.3 DWA — 完整 Python 代码¶
"""
Dynamic Window Approach (DWA) — Local Planner
==============================================
Velocity-space optimization for obstacle avoidance.
Requires: numpy, matplotlib
"""
import numpy as np
import matplotlib.pyplot as plt
# ─── Robot Configuration ─────────────────────────────────────────
ROBOT_CONFIG = {
'max_speed': 0.5, # Maximum linear velocity (m/s)
'min_speed': -0.2, # Minimum reverse speed (m/s)
'max_yaw_rate': 1.5, # Maximum angular velocity (rad/s)
'max_accel': 2.0, # Maximum linear acceleration (m/s^2)
'max_yaw_accel': 3.0, # Maximum angular acceleration (rad/s^2)
'robot_radius': 0.2, # Collision radius (m)
'predict_time': 1.5, # Trajectory prediction horizon (s)
'dt': 0.1, # Time step (s)
}
# ─── DWA Weights ─────────────────────────────────────────────────
WEIGHTS = {
'heading': 0.8, # Weight for goal heading
'velocity': 0.2, # Weight for forward velocity
'clearance': 0.1, # Weight for obstacle clearance
}
# ─── Environment ─────────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])
OBSTACLES = [
(3.0, 3.0, 1.5, 1.5),
(5.5, 5.5, 1.0, 2.0),
(6.5, 2.5, 2.0, 1.0),
]
def is_collision(x: float, y: float, obstacles: list) -> bool:
"""Check if point (x, y) collides with any obstacle."""
for (cx, cy, w, h) in obstacles:
hw, hh = w/2 + ROBOT_CONFIG['robot_radius'], h/2 + ROBOT_CONFIG['robot_radius']
if cx - hw <= x <= cx + hw and cy - hh <= y <= cy + hh:
return True
return False
def predict_trajectory(x: float, y: float, theta: float,
v: float, omega: float, dt: float) -> np.ndarray:
"""
Predict trajectory given initial state and velocity.
Returns array of (x, y, theta) over the prediction horizon.
"""
trajectory = []
time = 0.0
px, py, ptheta = x, y, theta
while time <= ROBOT_CONFIG['predict_time']:
trajectory.append([px, py, ptheta])
px += v * np.cos(ptheta) * dt
py += v * np.sin(ptheta) * dt
ptheta += omega * dt
time += dt
return np.array(trajectory)
def calc_clearance(x: float, y: float, obstacles: list) -> float:
"""
Calculate minimum distance to any obstacle.
"""
min_dist = np.inf
for (cx, cy, w, h) in obstacles:
hw, hh = w/2, h/2
dx = max(abs(x - cx) - hw, 0)
dy = max(abs(y - cy) - hh, 0)
dist = np.sqrt(dx**2 + dy**2)
min_dist = min(min_dist, dist)
return min_dist
def calc_heading_score(x: float, y: float, theta: float, goal: np.ndarray) -> float:
"""
Calculate heading score (progress toward goal).
Returns value in [0, 1], where 1 is facing directly at goal.
"""
dx = goal[0] - x
dy = goal[1] - y
goal_angle = np.arctan2(dy, dx)
angle_diff = goal_angle - theta
while angle_diff > np.pi:
angle_diff -= 2*np.pi
while angle_diff < -np.pi:
angle_diff += 2*np.pi
# Convert to score: 0 when perpendicular, 1 when aligned
return 0.5 * (1.0 - abs(angle_diff) / np.pi)
def calc_velocity_score(v: float) -> float:
"""
Calculate velocity score (normalized forward speed).
"""
return (v - ROBOT_CONFIG['min_speed']) / (ROBOT_CONFIG['max_speed'] - ROBOT_CONFIG['min_speed'])
def sample_velocities(v_c: float, omega_c: float) -> list:
"""
Sample velocities within the dynamic window.
"""
v_samples, omega_samples = 5, 10
velocities = []
v_range = [
max(ROBOT_CONFIG['min_speed'], v_c - ROBOT_CONFIG['max_accel'] * ROBOT_CONFIG['dt']),
min(ROBOT_CONFIG['max_speed'], v_c + ROBOT_CONFIG['max_accel'] * ROBOT_CONFIG['dt'])
]
omega_range = [
max(-ROBOT_CONFIG['max_yaw_rate'], omega_c - ROBOT_CONFIG['max_yaw_accel'] * ROBOT_CONFIG['dt']),
min(ROBOT_CONFIG['max_yaw_rate'], omega_c + ROBOT_CONFIG['max_yaw_accel'] * ROBOT_CONFIG['dt'])
]
for v in np.linspace(v_range[0], v_range[1], v_samples):
for omega in np.linspace(omega_range[0], omega_range[1], omega_samples):
velocities.append((v, omega))
return velocities
def dwa_control(x: float, y: float, theta: float,
v_c: float, omega_c: float, goal: np.ndarray) -> tuple:
"""
Dynamic Window Approach control law.
Returns: (v, omega, best_trajectory)
"""
velocities = sample_velocities(v_c, omega_c)
best_score = -np.inf
best_v, best_omega, best_trajectory = 0.0, 0.0, None
for (v, omega) in velocities:
# Predict trajectory
trajectory = predict_trajectory(x, y, theta, v, omega, ROBOT_CONFIG['dt'])
# Check for collision along trajectory
collision = False
min_clearance = np.inf
for (px, py, _) in trajectory:
if is_collision(px, py, OBSTACLES):
collision = True
break
clearance = calc_clearance(px, py, OBSTACLES)
min_clearance = min(min_clearance, clearance)
if collision:
continue
# Score trajectory
final_x, final_y, final_theta = trajectory[-1]
heading = calc_heading_score(final_x, final_y, theta, goal)
velocity = calc_velocity_score(v)
clearance = min_clearance / 5.0 # Normalize (5m max clearance)
score = (WEIGHTS['heading'] * heading +
WEIGHTS['velocity'] * velocity +
WEIGHTS['clearance'] * clearance)
if score > best_score:
best_score = score
best_v = v
best_omega = omega
best_trajectory = trajectory
return best_v, best_omega, best_trajectory
def run_dwa_simulation():
"""Run DWA simulation with visualization."""
# Initial state
x, y, theta = 0.5, 0.5, np.arctan2(GOAL[1] - 0.5, GOAL[0] - 0.5)
v, omega = 0.0, 0.0
trajectory = [[x, y, theta]]
all_trajectories = []
fig, ax = plt.subplots(figsize=(10, 10))
for step in range(2000):
# DWA control
v, omega, trajectory_pred = dwa_control(x, y, theta, v, omega, GOAL)
# Simulate one step
x += v * np.cos(theta) * ROBOT_CONFIG['dt']
y += v * np.sin(theta) * ROBOT_CONFIG['dt']
theta += omega * ROBOT_CONFIG['dt']
trajectory.append([x, y, theta])
if trajectory_pred is not None:
all_trajectories.append(trajectory_pred)
# Check goal
if np.linalg.norm([x - GOAL[0], y - GOAL[1]]) < 0.3:
print(f"[INFO] DWA: Goal reached in {step} steps!")
break
trajectory = np.array(trajectory)
# Plot
ax.set_xlim(-1, 10)
ax.set_ylim(-1, 10)
ax.set_aspect('equal')
ax.set_title('Dynamic Window Approach — Trajectory')
# Draw obstacles
for (cx, cy, w, h) in OBSTACLES:
rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
color='gray', alpha=0.7)
ax.add_patch(rect)
# Draw goal
ax.plot(GOAL[0], GOAL[1], 'g*', markersize=20, label='Goal')
# Draw robot trajectory
ax.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Robot path')
ax.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10, label='Start')
# Draw last few predicted trajectories
if all_trajectories:
for traj in all_trajectories[-50::10]: # Every 10th, last 50
traj = np.array(traj)
ax.plot(traj[:, 0], traj[:, 1], 'g-', alpha=0.1, linewidth=1)
ax.legend()
ax.grid(True, alpha=0.3)
plt.savefig("dwa_result.png", dpi=150)
plt.show()
print(f"[INFO] Final position: ({x:.2f}, {y:.2f}), Distance: {np.linalg.norm([x-GOAL[0], y-GOAL[1]]):.3f}")
if __name__ == "__main__":
run_dwa_simulation()
5. 层次 3 — 现代:深度强化学习 (PPO)¶
5.1 概念¶
我们使用近端策略优化 (PPO) 训练**神经网络策略**来代替手工编码避障规则。智能体从经验中学习给定传感器观测应该采取什么动作。
5.2 Gymnasium 环境¶
"""
DRL Obstacle Avoidance — PPO Training
======================================
Custom Gymnasium environment for obstacle avoidance.
Train a PPO agent to navigate while avoiding obstacles.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import matplotlib.pyplot as plt
# ─── Environment Configuration ─────────────────────────────────
ENV_CONFIG = {
'map_size': 10.0, # Environment size (meters)
'robot_radius': 0.2, # Robot collision radius
'obstacle_count': 8, # Number of random obstacles
'obstacle_size_range': (0.3, 1.2), # Min/max obstacle size
'goal_radius': 0.5, # Goal reached threshold
'max_steps': 500, # Maximum episode length
'sensor_range': 3.0, # LiDAR-like sensor range
'sensor_count': 16, # Number of sensor rays
}
class ObstacleAvoidanceEnv(gym.Env):
"""
2D obstacle avoidance environment with LiDAR-like sensing.
State: 18-dimensional vector
- 16 LiDAR distance readings (normalized)
- 1 goal bearing (angle to goal, normalized)
- 1 goal distance (normalized)
Action: 2-dimensional continuous
- Linear velocity command (0 to 1)
- Angular velocity command (-1 to 1)
"""
metadata = {"render_modes": ["human"]}
def __init__(self, render_mode=None):
super().__init__()
self.render_mode = render_mode
# Observation: sensor readings + goal info
self.observation_space = spaces.Box(
low=0.0, high=1.0,
shape=(18,), dtype=np.float32
)
# Action: [linear_vel, angular_vel]
self.action_space = spaces.Box(
low=np.array([-1.0, -1.0]),
high=np.array([1.0, 1.0]),
dtype=np.float32
)
self.config = ENV_CONFIG
self.robot_pos = None
self.robot_theta = None
self.goal_pos = None
self.obstacles = None
self.step_count = None
def _generate_obstacles(self) -> list:
"""Generate random rectangular obstacles."""
obstacles = []
for _ in range(self.config['obstacle_count']):
min_size, max_size = self.config['obstacle_size_range']
w = np.random.uniform(min_size, max_size)
h = np.random.uniform(min_size, max_size)
x = np.random.uniform(w/2 + 0.5, self.config['map_size'] - w/2 - 0.5)
y = np.random.uniform(h/2 + 0.5, self.config['map_size'] - h/2 - 0.5)
obstacles.append((x, y, w, h))
return obstacles
def _is_collision(self, pos: np.ndarray) -> bool:
"""Check if position collides with any obstacle."""
r = self.config['robot_radius']
for (cx, cy, w, h) in self.obstacles:
hw, hh = w/2 + r, h/2 + r
if cx - hw <= pos[0] <= cx + hw and cy - hh <= pos[1] <= cy + hh:
return True
return False
def _get_sensor_readings(self) -> np.ndarray:
"""Simulate LiDAR-like sensor readings."""
readings = np.zeros(self.config['sensor_count'], dtype=np.float32)
angles = np.linspace(0, 2*np.pi, self.config['sensor_count'], endpoint=False)
for i, angle in enumerate(angles):
direction = np.array([np.cos(angle), np.sin(angle)])
for dist in np.linspace(0.1, self.config['sensor_range'], 50):
test_pos = self.robot_pos + dist * direction
if test_pos[0] < 0 or test_pos[0] > self.config['map_size']:
break
if test_pos[1] < 0 or test_pos[1] > self.config['map_size']:
break
if self._is_collision(test_pos):
readings[i] = dist / self.config['sensor_range']
break
else:
readings[i] = 1.0 # No obstacle in range
return readings
def _get_observation(self) -> np.ndarray:
"""Get full observation vector."""
sensors = self._get_sensor_readings()
# Goal bearing (angle to goal, normalized to [-1, 1])
direction_to_goal = self.goal_pos - self.robot_pos
goal_angle = np.arctan2(direction_to_goal[1], direction_to_goal[0])
angle_diff = goal_angle - self.robot_theta
while angle_diff > np.pi:
angle_diff -= 2*np.pi
while angle_diff < -np.pi:
angle_diff += 2*np.pi
goal_bearing = angle_diff / np.pi # Normalize to [-1, 1]
# Normalized distance to goal
goal_distance = np.linalg.norm(direction_to_goal) / (self.config['map_size'] * np.sqrt(2))
return np.concatenate([sensors, [goal_bearing, goal_distance]]).astype(np.float32)
def reset(self, seed=None, options=None):
super().reset(seed=seed)
# Reset robot
self.robot_pos = np.array([0.5, 0.5])
self.robot_theta = 0.0
# Place goal (far corner)
self.goal_pos = np.array([
self.config['map_size'] - 0.5,
self.config['map_size'] - 0.5
])
# Generate obstacles
self.obstacles = self._generate_obstacles()
# Ensure start and goal are clear
while self._is_collision(self.robot_pos):
self.obstacles = self._generate_obstacles()
self.step_count = 0
return self._get_observation(), {}
def step(self, action):
self.step_count += 1
# Unpack action
linear_vel = float(action[0]) # 0 to 1
angular_vel = float(action[1]) # -1 to 1
# Convert to actual velocities
v = linear_vel * 0.5 # Max 0.5 m/s
omega = angular_vel * 1.5 # Max 1.5 rad/s
# Update pose
self.robot_pos[0] += v * np.cos(self.robot_theta) * 0.1
self.robot_pos[1] += v * np.sin(self.robot_theta) * 0.1
self.robot_theta += omega * 0.1
# Clamp to map bounds
self.robot_pos = np.clip(self.robot_pos, 0.1, self.config['map_size'] - 0.1)
# ── Compute Reward ──
reward = 0.0
done = False
info = {}
dist_to_goal = np.linalg.norm(self.robot_pos - self.goal_pos)
# Goal reached
if dist_to_goal < self.config['goal_radius']:
reward = 100.0
done = True
info = {"success": True}
return self._get_observation(), reward, done, False, info
# Collision with obstacle
if self._is_collision(self.robot_pos):
reward = -50.0
done = True
info = {"collision": True}
return self._get_observation(), reward, done, False, info
# Out of bounds
if (self.robot_pos[0] <= 0.1 or self.robot_pos[0] >= self.config['map_size'] - 0.1 or
self.robot_pos[1] <= 0.1 or self.robot_pos[1] >= self.config['map_size'] - 0.1):
reward = -20.0
done = True
info = {"out_of_bounds": True}
return self._get_observation(), reward, done, False, info
# Time limit
if self.step_count >= self.config['max_steps']:
done = True
info = {"max_steps": True}
return self._get_observation(), reward, done, True, info
# Sparse reward: make progress toward goal
reward = -0.01 * dist_to_goal # Small negative per step
reward += 0.5 * (1.0 - dist_to_goal / (self.config['map_size'] * np.sqrt(2)))
return self._get_observation(), reward, done, False, info
def render(self):
"""Render the environment (matplotlib)."""
if not hasattr(self, '_render_fig'):
plt.ion()
self._render_fig, self._render_ax = plt.subplots(figsize=(8, 8))
self._render_ax.set_xlim(0, self.config['map_size'])
self._render_ax.set_ylim(0, self.config['map_size'])
self._render_ax.set_aspect('equal')
ax = self._render_ax
ax.clear()
ax.set_xlim(0, self.config['map_size'])
ax.set_ylim(0, self.config['map_size'])
# Draw obstacles
for (cx, cy, w, h) in self.obstacles:
rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
color='gray', alpha=0.7)
ax.add_patch(rect)
# Draw goal
ax.plot(self.goal_pos[0], self.goal_pos[1], 'g*', markersize=20)
# Draw robot
circle = plt.Circle(self.robot_pos, self.config['robot_radius'],
color='blue')
ax.add_patch(circle)
# Draw robot heading
arrow_len = 0.3
ax.arrow(self.robot_pos[0], self.robot_pos[1],
arrow_len * np.cos(self.robot_theta),
arrow_len * np.sin(self.robot_theta),
head_width=0.1, head_length=0.05, fc='blue', ec='blue')
plt.pause(0.01)
def train_ppo(total_timesteps: int = 100_000, save_path: str = "ppo_obstacle_avoider"):
"""Train a PPO agent on obstacle avoidance."""
env = ObstacleAvoidanceEnv()
check_env(env) # Validate environment
model = PPO(
policy="MlpPolicy",
env=env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
ent_coef=0.01, # Small entropy bonus for exploration
verbose=1,
tensorboard_log="./tb_logs/",
)
print(f"[INFO] Training PPO for {total_timesteps} timesteps...")
model.learn(total_timesteps=total_timesteps, progress_bar=True)
model.save(save_path)
print(f"[INFO] Model saved to {save_path}")
return model
def evaluate(model_path: str = "ppo_obstacle_avoider", episodes: int = 10):
"""Evaluate a trained model."""
env = ObstacleAvoidanceEnv(render_mode="human")
try:
model = PPO.load(model_path)
except:
print(f"[ERROR] Model not found at {model_path}. Train first with: python obstacle_avoidance_rl.py train")
return
successes = 0
collisions = 0
for ep in range(episodes):
obs, _ = env.reset()
done = False
steps = 0
while not done and steps < 500:
action, _ = model.predict(obs, deterministic=True)
obs, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
steps += 1
env.render()
if info.get("success", False):
successes += 1
print(f" Episode {ep+1}: SUCCESS ({steps} steps)")
elif info.get("collision", False):
collisions += 1
print(f" Episode {ep+1}: COLLISION")
else:
print(f" Episode {ep+1}: TIMEOUT")
print(f"\n[RESULTS] Success: {successes}/{episodes}, Collisions: {collisions}/{episodes}")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
if sys.argv[1] == "eval":
evaluate()
elif sys.argv[1] == "train":
train_ppo()
else:
print("Usage: python obstacle_avoidance_rl.py [train|eval]")
print(" train: Train PPO agent (default 100k steps)")
print(" eval: Evaluate trained model")
5.3 ROS 2 集成¶
"""
DRL Obstacle Avoidance — ROS 2 Deployment
==========================================
Bridge trained PPO model with ROS 2 navigation.
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
from stable_baselines3 import PPO
class DRLNavigator(Node):
def __init__(self):
super().__init__('drl_navigator')
self.declare_parameter('model_path', 'ppo_obstacle_avoider')
# Load trained model
model_path = self.get_parameter('model_path').value
self.model = PPO.load(model_path)
# Publishers and subscribers
self.scan_sub = self.create_subscription(
LaserScan, '/scan', self.scan_callback, 10
)
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
# State
self.last_scan = None
self.scan_received = False
self.get_logger().info('DRL Navigator ready')
def scan_callback(self, msg: LaserScan):
"""Process LiDAR scan and convert to observation."""
# Handle angle_min/angle_max
num_readings = len(msg.ranges)
angles = np.linspace(
msg.angle_min, msg.angle_max, num_readings
)
# Extract relevant sector (front 180 degrees)
front_mask = (angles >= -np.pi/2) & (angles <= np.pi/2)
front_angles = angles[front_mask]
front_ranges = np.array(msg.ranges)[front_mask]
# Downsample to 16 readings
indices = np.linspace(0, len(front_ranges)-1, 16).astype(int)
sensor_readings = front_ranges[indices]
# Normalize (0 = close obstacle, 1 = no obstacle)
sensor_readings = np.clip(sensor_readings / msg.range_max, 0, 1)
# Add goal info (assume goal is roughly straight ahead)
# In real system, use navigation goal from move_base
goal_bearing = 0.0
goal_distance = 0.5
self.last_scan = np.concatenate([
sensor_readings,
[goal_bearing, goal_distance]
])
self.scan_received = True
def publish_cmd(self):
if not self.scan_received or self.last_scan is None:
return
action, _ = self.model.predict(self.last_scan, deterministic=True)
linear_vel = float(action[0]) * 0.5 # Scale to actual velocity
angular_vel = float(action[1]) * 1.5
cmd = Twist()
cmd.linear.x = max(0.0, linear_vel) # Only forward
cmd.angular.z = angular_vel
self.cmd_pub.publish(cmd)
def main(args=None):
rclpy.init(args=args)
navigator = DRLNavigator()
# Control loop at 10 Hz
from rclpy.timer import Timer
timer = navigator.create_timer(0.1, navigator.publish_cmd)
try:
rclpy.spin(navigator)
except KeyboardInterrupt:
pass
finally:
navigator.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
6. 分步实施指南¶
阶段 1 — Bug 算法(层次 1)¶
- 定义环境(障碍物位置)
- 实现近距离传感器仿真
- 实现状态机:
go_to_goal→wall_follow→go_to_goal - 用简单的矩形障碍物测试
- 用 matplotlib 添加可视化
阶段 2 — VFH(层次 1)¶
- 添加 360° LiDAR 传感器仿真
- 从扫描数据计算极坐标直方图
- 用阈值实现山谷检测
- 选择朝向目标的方向
- 调整平滑参数 \(\sigma\) 和阈值 \(a_{th}\)
阶段 3 — DWA(层次 2)¶
- 定义机器人动力学和加速度限制
- 在动态窗口内采样速度空间 \((v, \omega)\)
- 为每个速度样本预测轨迹
- 评估目标函数:heading + clearance + velocity
- 与
nav2局部规划器进行 ROS 2 集成
阶段 4 — DRL(层次 3)¶
- 定义 Gymnasium 环境:
__init__、reset、step、render - 实现类 LiDAR 传感器观测
- 用
stable_baselines3训练 PPO(100k+ 步) - 评估成功率和碰撞率
- 用 ROS 2 桥接部署到真实机器人
7. 方法对比¶
| 指标 | Bug / VFH | DWA | 深度强化学习 (PPO) |
|---|---|---|---|
| 精度 | ★★★☆☆ | ★★★★☆ | ★★★★☆ |
| 平滑性 | ★★☆☆☆ (Bug) / ★★★☆☆ (VFH) | ★★★★☆ | ★★★★★ |
| 鲁棒性 | ★★★☆☆ | ★★★★☆ | ★★★★★ |
| 计算量 | 非常低 | 低 | 高(推理) |
| 需要训练 | 否 | 否 | 是(仿真) |
| 参数调整 | 低 | 中 | 高(超参数) |
| 仿真到实际差距 | 不适用 | 不适用 | 困难但可行 |
| 动态障碍物 | 中等 | 好 | 优秀 |
| 适用场景 | 简单环境 | 结构化空间 | 复杂/多变环境 |
| 复杂度 | ⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
8. 扩展与变体¶
8.1 传感器改进¶
- **添加 IMU 数据**以改善运动预测
- 融合相机 + LiDAR 以获得更丰富的环境表示
- 事件相机 用于高速障碍物避障
8.2 算法组合¶
- 全局规划器 + VFH:用 A* 做全局路径,VFH 做局部避障
- DWA + MPC:用模型预测控制替代 DWA 的轨迹评估
- 强化学习 + 经典方法:训练强化学习来调节经典规划器参数
8.3 仿真到实际迁移¶
- 在物理仿真器(Gazebo、PyBullet、Isaac Sim)中训练
- 添加领域随机化(光照、摩擦、传感器噪声)
- 用模仿学习在真实机器人上微调
- 进行系统辨识以校准模型
8.4 多机器人扩展¶
- 集中式:单一控制器协调所有机器人
- 分散式:每个机器人独立运行避障
- 通信:共享位置以实现协作避障
9. 参考资料¶
- Borenstein, J., & Koren, Y. (1991). "The Vector Field Histogram — Fast Obstacle Avoidance for Mobile Robots." IEEE Transactions on Robotics and Automation, 7(3), 278–288.
- Fox, D., Burgard, W., & Thrun, S. (1997). "The Dynamic Window Approach to Collision Avoidance." IEEE Robotics & Automation Magazine, 4(1), 23–33.
- Koren, Y., & Borenstein, J. (1991). "Potential Field Methods and Their Inherent Limitations for Mobile Robot Navigation." IEEE ICRA, 1398–1404.
- Marder-Eppstein, E., et al. (2010). "The Dynamic Window Approach in ROS." ROS Wiki
- Schulman, J., Wolski, F., et al. (2017). "Proximal Policy Optimization Algorithms." arXiv:1707.06347.
- Raffin, A., et al. (2021). "Stable-Baselines3: Reliable Reinforcement Learning Implementations." JMLR, 22(268), 1–8.
- Kuderer, M., et al. (2012). "Feature-Based Prediction of Trajectories for Socially Compliant Navigation." RSS.
- Chen, Y. F., et al. (2017). " Decentralized Non-communicating Multiagent Collision Avoidance with Deep Reinforcement Learning." IEEE ICRA.
- Zhou, M., et al. (2020). "Collision Avoidance in Dynamic Environments: An RL-based Approach." arXiv:2003.02918.
- Gymnasium Documentation — gymnasium.farama.org
- Stable-Baselines3 Documentation — stable-baselines3.readthedocs.io