跳转至

移动机器人避障 (Obstacle Avoidance for Mobile Robots)

项目类型: 导航 | 难度: ★★☆☆☆ 到 ★★★★★(取决于方法) | 预计时间: 2–4 个周末


1. 项目概述

**障碍物避障**是移动机器人在环境中安全导航而不与障碍物碰撞的能力。与假设完全地图知识的全局路径规划不同,避障对环境中**感知到的障碍物**进行实时反应,使其成为动态、部分已知或未知环境中的关键技术。

  ┌──────────────────────────────────────────────────────────┐
  │                    障碍物避障                              │
  │                                                           │
  │     目标                                                 │
  │       ★                                                  │
  │       │                                                  │
  │       │        ┌──────┐                                  │
  │       │        │ 障碍 │     机器人避障行为:                │
  │       │        └──────┘     1. 检测前方障碍物              │
  │       │             \                                    │
  │       │              ╲                                   │
  │       ▼               ◄────── 机器人绕行                  │
  │     起点                                                │
  └──────────────────────────────────────────────────────────┘

在本项目中,您将探索**三种逐步复杂的方法**:

层次 方法 技术 传感器
层次 1 Bug 算法和 VFH 几何方法,反应式 近距离传感器、LiDAR
层次 2 动态窗口法 (DWA) 速度空间优化 2D LiDAR
层次 3 深度强化学习 (PPO) 基于学习,端到端 模拟传感器阵列

2. 硬件与软件需求

硬件

组件 规格 备注
底盘 双轮差速驱动 任意移动机器人平台
电机 2× 带编码器的直流齿轮电机 用于里程计
LiDAR RPLIDAR A1/A2 或类似 360° 2D 激光扫描仪
(替代) 超声波/红外传感器阵列 用于层次 1 方法
微控制器 树莓派 4 / Jetson Nano 用于板载计算
电池 12V 锂聚合物电池或 USB-C 移动电源 为所有组件供电

软件

软件包 版本 用途
Python ≥ 3.8 核心语言
NumPy ≥ 1.20 数值计算
OpenCV ≥ 4.5 传感器处理
ROS 2 Humble / Iron 机器人中间件
Stable-Baselines3 ≥ 1.7 PPO 实现
Gymnasium ≥ 0.28 强化学习环境接口
PyTorch ≥ 1.13 神经网络训练
pip install numpy opencv-python stable-baselines3 gymnasium torch

3. 层次 1 — 传统方法:Bug 算法与向量场直方图 (VFH)

3.1 Bug 算法

**Bug 算法**是最简单的反应式障碍物避障方法。机器人仅使用本地感知并遵循两条简单规则:

算法规则

  1. 指向目标方向 (m 线):沿直线朝向目标移动,直到遇到障碍物。
  2. 离开圆 (Leave-Circle):离开障碍物时,沿最短路径朝目标方向前进。
  3. 跟随墙壁 (Wall-Following):当障碍物阻挡路径时,沿障碍物边界移动,直到可以重新沿 m 线移动。

数学公式

Bug 算法可形式化为:

\[ \mathbf{d}_{\text{robot \to goal}} = \begin{pmatrix} x_g - x_r \\ y_g - y_r \end{pmatrix}, \quad \phi = \arctan2(y_g - y_r, x_g - x_r) \]

其中 \((x_g, y_g)\) 是目标位置,\((x_r, y_r)\) 是机器人位置。

碰撞点检测:当到任意障碍物的距离 \(d < d_{\text{threshold}}\) 时,记录一个碰撞点。

离开点条件:当满足以下条件时,机器人离开障碍物:

\[ \|\mathbf{p}_{\text{current}} - \mathbf{p}_{\text{hit}}\| > \|\mathbf{p}_{\text{goal}} - \mathbf{p}_{\text{hit}}\| \]

3.2 Bug 算法 — 完整 Python 代码

"""
Bug Algorithm — Reactive Obstacle Avoidance
============================================
A simple geometry-based obstacle avoidance algorithm.
Requires: numpy, matplotlib (for visualization)
"""

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle

# ─── Configuration ──────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])          # Goal position (x, y)
START = np.array([0.5, 0.5])         # Start position
ROBOT_RADIUS = 0.2                    # Robot collision radius (m)
GOAL_THRESHOLD = 0.3                  # Distance to goal to consider reached
OBSTACLE_THRESHOLD = 0.5              # Distance to detect obstacle
WALL_FOLLOW_GAIN = 1.0               # Gain for wall following direction
ROBOT_SPEED = 0.1                     # Forward speed (m/s)

# ─── Environment Definition ─────────────────────────────────────
def create_environment():
    """
    Create a simple environment with rectangular obstacles.
    Returns a list of (cx, cy, width, height) tuples.
    """
    obstacles = [
        (3.0, 3.0, 1.5, 1.5),    # Center (3,3), size 1.5x1.5
        (5.5, 5.5, 1.0, 2.0),    # Center (5.5, 5.5), size 1.0x2.0
        (6.5, 2.5, 2.0, 1.0),    # Center (6.5, 2.5), size 2.0x1.0
    ]
    return obstacles


def is_collision(point: np.ndarray, obstacles: list) -> bool:
    """
    Check if a point collides with any obstacle.
    Uses expanded obstacles to account for robot radius.
    """
    for (cx, cy, w, h) in obstacles:
        # Expanded rectangle (add robot radius)
        hw, hh = w/2 + ROBOT_RADIUS, h/2 + ROBOT_RADIUS
        if (cx - hw <= point[0] <= cx + hw and
            cy - hh <= point[1] <= cy + hh):
            return True
    return False


def get_obstacle_distance(point: np.ndarray, obstacles: list) -> float:
    """
    Compute minimum distance to any obstacle.
    Returns infinity if no obstacles nearby.
    """
    min_dist = np.inf
    for (cx, cy, w, h) in obstacles:
        # Distance to rectangle edge
        hw, hh = w/2, h/2
        dx = max(abs(point[0] - cx) - hw, 0)
        dy = max(abs(point[1] - cy) - hh, 0)
        dist = np.sqrt(dx**2 + dy**2)
        min_dist = min(min_dist, dist)
    return min_dist


def get_gradient_toward_goal(point: np.ndarray) -> np.ndarray:
    """
    Compute unit gradient vector pointing toward the goal.
    """
    direction = GOAL - point
    distance = np.linalg.norm(direction)
    if distance < 1e-6:
        return np.array([0.0, 0.0])
    return direction / distance


def get_wall_follow_direction(point: np.ndarray,
                               obstacles: list,
                               prev_direction: np.ndarray) -> np.ndarray:
    """
    Compute wall-following direction using right-hand rule.
    Keeps the robot hugging the right side of obstacles.
    """
    # Sample points around the robot to find obstacle boundary
    num_samples = 16
    angles = np.linspace(0, 2*np.pi, num_samples, endpoint=False)
    distances = np.array([
        get_obstacle_distance(point + 0.3 * np.array([np.cos(a), np.sin(a)]), obstacles)
        for a in angles
    ])

    # Find the direction with maximum clearance (most open direction)
    # but prefer directions that maintain forward progress
    best_idx = np.argmax(distances)
    best_angle = angles[best_idx]
    best_dist = distances[best_idx]

    # Combine: prefer open space but also forward direction
    goal_dir = get_gradient_toward_goal(point)
    goal_angle = np.arctan2(goal_dir[1], goal_dir[0])

    # Weighted combination of open direction and goal direction
    open_dir = np.array([np.cos(best_angle), np.sin(best_angle)])
    combined = 0.6 * open_dir + 0.4 * goal_dir
    return combined / (np.linalg.norm(combined) + 1e-6)


def bug_algorithm_step(point: np.ndarray,
                        mode: str,
                        hit_point: np.ndarray,
                        prev_direction: np.ndarray,
                        obstacles: list) -> tuple:
    """
    Execute one step of the Bug algorithm.

    Returns: (new_point, new_mode, hit_point, trajectory)
    """
    # Check if goal reached
    if np.linalg.norm(point - GOAL) < GOAL_THRESHOLD:
        return point, "goal_reached", hit_point, point

    # Get distance to nearest obstacle
    dist_to_obstacle = get_obstacle_distance(point, obstacles)

    # State machine transitions
    if mode == "go_to_goal":
        if dist_to_obstacle < OBSTACLE_THRESHOLD:
            # Hit obstacle - switch to wall following
            mode = "wall_follow"
            hit_point = point.copy()
        else:
            # Move toward goal
            direction = get_gradient_toward_goal(point)
            new_point = point + ROBOT_SPEED * direction
            if is_collision(new_point, obstacles):
                new_point = point  # Stay in place if collision
            return new_point, mode, hit_point, new_point

    elif mode == "wall_follow":
        # Check leave condition
        dist_to_goal_from_hit = np.linalg.norm(GOAL - hit_point)
        dist_to_goal_from_current = np.linalg.norm(GOAL - point)
        dist_from_hit = np.linalg.norm(point - hit_point)

        # Leave if we're closer to goal than when we hit
        if (dist_from_hit > dist_to_goal_from_hit and
            dist_to_goal_from_current < dist_to_goal_from_hit + 0.5):
            # Can head back to m-line (direct path to goal)
            mode = "go_to_goal"
            direction = get_gradient_toward_goal(point)
        else:
            # Follow wall
            direction = get_wall_follow_direction(point, obstacles, prev_direction)

        new_point = point + ROBOT_SPEED * direction
        if is_collision(new_point, obstacles):
            new_point = point
        return new_point, mode, hit_point, new_point

    return point, mode, hit_point, point


def run_bug_simulation():
    """Run Bug algorithm simulation and visualize."""
    obstacles = create_environment()
    point = START.copy()
    mode = "go_to_goal"
    hit_point = None
    trajectory = [point.copy()]
    prev_direction = np.array([0.0, 1.0])

    fig, ax = plt.subplots(figsize=(10, 10))
    ax.set_xlim(-1, 10)
    ax.set_ylim(-1, 10)
    ax.set_aspect('equal')
    ax.set_xlabel('X (m)')
    ax.set_ylabel('Y (m)')
    ax.set_title('Bug Algorithm — Obstacle Avoidance')

    # Draw obstacles
    for (cx, cy, w, h) in obstacles:
        rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                               color='gray', alpha=0.7)
        ax.add_patch(rect)

    # Draw goal
    ax.plot(GOAL[0], GOAL[1], 'g*', markersize=20, label='Goal')

    # Simulation loop
    for step in range(1000):
        point, mode, hit_point, new_pos = bug_algorithm_step(
            point, mode, hit_point, prev_direction, obstacles
        )
        trajectory.append(point.copy())

        if mode == "goal_reached":
            print(f"[INFO] Goal reached in {step} steps!")
            break

    # Plot trajectory
    trajectory = np.array(trajectory)
    ax.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Path')
    ax.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10, label='Start')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.savefig("bug_algorithm_result.png", dpi=150)
    plt.show()
    print(f"[INFO] Final position: {point}, Distance to goal: {np.linalg.norm(point - GOAL):.3f}")


if __name__ == "__main__":
    run_bug_simulation()

3.3 向量场直方图 (VFH)

VFH (Koren & Borenstein, 1991) 创建环境的**极坐标直方图**,并选择朝向目标的最平滑方向。

算法步骤

  1. 构建笛卡尔直方图网格:将机器人工作空间划分为带有占据值的单元格。
  2. 计算极坐标直方图:转换为以机器人为中心的极坐标:
\[ h_k = \sum_{i,j} c_{ij} \cdot e^{-\frac{(k - \beta_{ij})^2}{2\sigma^2}} \]

其中 \(c_{ij}\) 是单元格 \((i,j)\) 的占据值,\(\beta_{ij}\) 是到该单元格的角度,\(\sigma\) 控制平滑程度。

  1. 选择山谷:识别宽度足够机器人通过的"山谷"(\(k\) 值范围小)。
  2. 转向命令:选择最接近目标方向的山谷。

3.4 VFH — 完整 Python 代码

"""
Vector Field Histogram (VFH) — Obstacle Avoidance
===================================================
A histogram-based reactive navigation algorithm.
Requires: numpy, matplotlib
"""

import numpy as np
import matplotlib.pyplot as plt

# ─── Configuration ──────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])          # Goal position
ROBOT_POS = np.array([0.5, 0.5])     # Current position
ROBOT_RADIUS = 0.2                    # Robot radius (m)
VFH_CELL_SIZE = 0.2                   # Grid cell size (m)
VFH_SENSOR_RANGE = 3.0               # Sensor range (m)
VFH_NUM_SECTORS = 360                 # Number of polar sectors
VFH_A_THRESHOLD = 0.35               # Obstacle threshold (0-1)
VFH_SECOND_A_THRESHOLD = 0.15        # Tight threshold for narrow passages
WALL_SMOOTHING = 0.35                # Smoothing parameter sigma
ROBOT_SPEED = 0.08                    # Forward speed
MAX_STEERING_RATE = 0.5               # Max steering change per step

# ─── Simulated Sensor Reading ─────────────────────────────────────
def simulate_lidar(position: np.ndarray, obstacles: list, num_readings: int = 360) -> np.ndarray:
    """
    Simulate LiDAR scan. Returns array of distances in each direction.
    """
    angles = np.linspace(0, 2*np.pi, num_readings, endpoint=False)
    distances = np.full(num_readings, VFH_SENSOR_RANGE)

    for i, angle in enumerate(angles):
        direction = np.array([np.cos(angle), np.sin(angle)])
        for dist in np.linspace(0.1, VFH_SENSOR_RANGE, 50):
            test_point = position + dist * direction
            for (cx, cy, w, h) in obstacles:
                hw, hh = w/2, h/2
                if (cx - hw <= test_point[0] <= cx + hw and
                    cy - hh <= test_point[1] <= cy + hh):
                    distances[i] = min(distances[i], dist)
                    break
    return distances


def compute_polar_histogram(lidar_distances: np.ndarray) -> np.ndarray:
    """
    Convert LiDAR distances to polar histogram.
    Higher values mean more obstacle presence.
    """
    n = len(lidar_distances)
    histogram = np.zeros(n)

    for k in range(n):
        # Each sector corresponds to one lidar reading
        d = lidar_distances[k]
        if d < VFH_SENSOR_RANGE:
            # Active cells closer to robot contribute more
            m_k = 1.0 - (d / VFH_SENSOR_RANGE)
            histogram[k] = m_k ** 2  # Square for sharper response
        else:
            histogram[k] = 0.0

    # Smooth the histogram
    smoothed = np.zeros_like(histogram)
    for k in range(n):
        for s in range(-8, 9):  # ±8 sectors smoothing window
            idx = (k + s) % n
            weight = np.exp(-(s * WALL_SMOOTHING) ** 2 / 2)
            smoothed[k] += histogram[idx] * weight
    return smoothed


def find_valleys(histogram: np.ndarray, threshold: float) -> list:
    """
    Find valleys (open passages) in the polar histogram.
    A valley is a consecutive sequence of sectors below threshold.
    """
    n = len(histogram)
    valleys = []
    in_valley = False
    start_idx = 0

    for k in range(n):
        if histogram[k] < threshold and not in_valley:
            in_valley = True
            start_idx = k
        elif histogram[k] >= threshold and in_valley:
            in_valley = False
            valleys.append((start_idx, k - 1))

    if in_valley:
        valleys.append((start_idx, n - 1))

    return valleys


def select_steering_direction(valleys: list, goal_angle: float, prev_direction: float) -> float:
    """
    Select the best steering direction from available valleys.
    Prefers directions close to the goal.
    """
    if not valleys:
        # No valley found - rotate in place
        return prev_direction + MAX_STEERING_RATE

    # Find valley closest to goal angle
    best_valley = None
    best_score = np.inf

    for (start, end) in valleys:
        valley_center = (start + end) / 2
        if start == 0 and end == len(valleys) > 0:
            # Handle wrap-around
            valley_center = ((end + start + len(valleys)) / 2) % 360

        # Angular difference to goal
        diff = abs(valley_center - goal_angle)
        diff = min(diff, 360 - diff)  # Handle wrap-around

        if diff < best_score:
            best_score = diff
            best_valley = valley_center

    return np.deg2rad(best_valley)


def vfh_step(position: np.ndarray,
             heading: float,
             obstacles: list) -> tuple:
    """
    Execute one VFH step.

    Returns: (new_position, new_heading)
    """
    # 1. Get LiDAR scan
    lidar_distances = simulate_lidar(position, obstacles)

    # 2. Compute polar histogram
    histogram = compute_polar_histogram(lidar_distances)

    # 3. Find valleys
    threshold = VFH_A_THRESHOLD
    valleys = find_valleys(histogram, threshold)

    # 4. Compute goal direction
    direction_to_goal = GOAL - position
    goal_angle = np.rad2deg(np.arctan2(direction_to_goal[1], direction_to_goal[0]))
    if goal_angle < 0:
        goal_angle += 360

    # 5. Select steering
    desired_heading = select_steering_direction(valleys, goal_angle, heading)

    # 6. Limit steering rate
    heading_diff = desired_heading - heading
    while heading_diff > np.pi:
        heading_diff -= 2*np.pi
    while heading_diff < -np.pi:
        heading_diff += 2*np.pi
    heading += np.clip(heading_diff, -MAX_STEERING_RATE, MAX_STEERING_RATE)

    # 7. Move forward
    new_position = position + ROBOT_SPEED * np.array([np.cos(heading), np.sin(heading)])

    return new_position, heading, histogram


def run_vfh_simulation():
    """Run VFH simulation and visualize."""
    obstacles = [
        (3.0, 3.0, 1.5, 1.5),
        (5.5, 5.5, 1.0, 2.0),
        (6.5, 2.5, 2.0, 1.0),
    ]

    position = ROBOT_POS.copy()
    heading = np.arctan2(GOAL[1] - position[1], GOAL[0] - position[0])
    trajectory = [position.copy()]
    all_histograms = []

    fig, axes = plt.subplots(1, 2, figsize=(14, 6))

    for step in range(1500):
        position, heading, histogram = vfh_step(position, heading, obstacles)
        trajectory.append(position.copy())
        all_histograms.append(histogram)

        if np.linalg.norm(position - GOAL) < 0.3:
            print(f"[INFO] VFH: Goal reached in {step} steps!")
            break

    trajectory = np.array(trajectory)

    # Plot trajectory
    ax1 = axes[0]
    ax1.set_xlim(-1, 10)
    ax1.set_ylim(-1, 10)
    ax1.set_aspect('equal')
    ax1.set_title('VFH — Trajectory')

    for (cx, cy, w, h) in obstacles:
        rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                               color='gray', alpha=0.7)
        ax1.add_patch(rect)

    ax1.plot(GOAL[0], GOAL[1], 'g*', markersize=20)
    ax1.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=1.5)
    ax1.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10)
    ax1.grid(True, alpha=0.3)

    # Plot final histogram
    ax2 = axes[1]
    final_hist = all_histograms[-1]
    angles = np.linspace(0, 360, len(final_hist), endpoint=False)
    ax2.fill(np.deg2rad(angles), final_hist, alpha=0.7, color='blue')
    ax2.set_theta_zero_location('E')
    ax2.set_theta_direction(1)
    ax2.set_title('VFH — Final Polar Histogram')
    ax2.set_rlim(0, 1)

    plt.tight_layout()
    plt.savefig("vfh_result.png", dpi=150)
    plt.show()

    print(f"[INFO] Final distance to goal: {np.linalg.norm(position - GOAL):.3f}")


if __name__ == "__main__":
    run_vfh_simulation()

3.5 Bug 与 VFH 对比

特性 Bug 算法 VFH
复杂度 非常低 中等
平滑性 抖动(方向突变) 平滑转向
传感器需求 简单近距离传感器 360° LiDAR
局部最小值 可能陷入循环 更好地避免局部最小值
参数调整 参数少 多参数(阈值、平滑度)

4. 层次 2 — 中级:动态窗口法 (DWA)

4.1 概念

动态窗口法 (Fox et al., 1997) 将避障问题表述为**速度空间**中的优化问题。DWA 不直接计算转向方向,而是:

  1. 在动态可行范围内采样速度空间 \((v, \omega)\)
  2. 为每个速度样本模拟轨迹,时间跨度很短
  3. 选择使目标函数最大化的速度

4.2 数学公式

机器人动力学

\[ \dot{x} = v \cos(\theta), \quad \dot{y} = v \sin(\theta), \quad \dot{\theta} = \omega \]

动态窗口

动态窗口将速度限制为:

\[ V_d = \{ (v, \omega) \mid v \in [v_c - \dot{v}_a \cdot \Delta t, v_c + \dot{v}_a \cdot \Delta t] \cap [v_{\min}, v_{\max}] \} \]
\[ \omega \in [\omega_c - \dot{\omega}_a \cdot \Delta t, \omega_c + \dot{\omega}_a \cdot \Delta t] \cap [\omega_{\min}, \omega_{\max}] \]

目标函数

\[ G(v, \omega) = \alpha \cdot \text{heading}(v, \omega) + \beta \cdot \text{dist}(v, \omega) + \gamma \cdot \text{velocity}(v) \]
  • heading:朝向目标方向的进度(越高越好)
  • dist:与障碍物的距离(越高越好)
  • velocity:前进速度(越高越好)

4.3 DWA — 完整 Python 代码

"""
Dynamic Window Approach (DWA) — Local Planner
==============================================
Velocity-space optimization for obstacle avoidance.
Requires: numpy, matplotlib
"""

import numpy as np
import matplotlib.pyplot as plt

# ─── Robot Configuration ─────────────────────────────────────────
ROBOT_CONFIG = {
    'max_speed': 0.5,         # Maximum linear velocity (m/s)
    'min_speed': -0.2,        # Minimum reverse speed (m/s)
    'max_yaw_rate': 1.5,      # Maximum angular velocity (rad/s)
    'max_accel': 2.0,         # Maximum linear acceleration (m/s^2)
    'max_yaw_accel': 3.0,     # Maximum angular acceleration (rad/s^2)
    'robot_radius': 0.2,      # Collision radius (m)
    'predict_time': 1.5,      # Trajectory prediction horizon (s)
    'dt': 0.1,                # Time step (s)
}

# ─── DWA Weights ─────────────────────────────────────────────────
WEIGHTS = {
    'heading': 0.8,          # Weight for goal heading
    'velocity': 0.2,          # Weight for forward velocity
    'clearance': 0.1,         # Weight for obstacle clearance
}

# ─── Environment ─────────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])
OBSTACLES = [
    (3.0, 3.0, 1.5, 1.5),
    (5.5, 5.5, 1.0, 2.0),
    (6.5, 2.5, 2.0, 1.0),
]


def is_collision(x: float, y: float, obstacles: list) -> bool:
    """Check if point (x, y) collides with any obstacle."""
    for (cx, cy, w, h) in obstacles:
        hw, hh = w/2 + ROBOT_CONFIG['robot_radius'], h/2 + ROBOT_CONFIG['robot_radius']
        if cx - hw <= x <= cx + hw and cy - hh <= y <= cy + hh:
            return True
    return False


def predict_trajectory(x: float, y: float, theta: float,
                       v: float, omega: float, dt: float) -> np.ndarray:
    """
    Predict trajectory given initial state and velocity.
    Returns array of (x, y, theta) over the prediction horizon.
    """
    trajectory = []
    time = 0.0
    px, py, ptheta = x, y, theta

    while time <= ROBOT_CONFIG['predict_time']:
        trajectory.append([px, py, ptheta])
        px += v * np.cos(ptheta) * dt
        py += v * np.sin(ptheta) * dt
        ptheta += omega * dt
        time += dt

    return np.array(trajectory)


def calc_clearance(x: float, y: float, obstacles: list) -> float:
    """
    Calculate minimum distance to any obstacle.
    """
    min_dist = np.inf
    for (cx, cy, w, h) in obstacles:
        hw, hh = w/2, h/2
        dx = max(abs(x - cx) - hw, 0)
        dy = max(abs(y - cy) - hh, 0)
        dist = np.sqrt(dx**2 + dy**2)
        min_dist = min(min_dist, dist)
    return min_dist


def calc_heading_score(x: float, y: float, theta: float, goal: np.ndarray) -> float:
    """
    Calculate heading score (progress toward goal).
    Returns value in [0, 1], where 1 is facing directly at goal.
    """
    dx = goal[0] - x
    dy = goal[1] - y
    goal_angle = np.arctan2(dy, dx)

    angle_diff = goal_angle - theta
    while angle_diff > np.pi:
        angle_diff -= 2*np.pi
    while angle_diff < -np.pi:
        angle_diff += 2*np.pi

    # Convert to score: 0 when perpendicular, 1 when aligned
    return 0.5 * (1.0 - abs(angle_diff) / np.pi)


def calc_velocity_score(v: float) -> float:
    """
    Calculate velocity score (normalized forward speed).
    """
    return (v - ROBOT_CONFIG['min_speed']) / (ROBOT_CONFIG['max_speed'] - ROBOT_CONFIG['min_speed'])


def sample_velocities(v_c: float, omega_c: float) -> list:
    """
    Sample velocities within the dynamic window.
    """
    v_samples, omega_samples = 5, 10
    velocities = []

    v_range = [
        max(ROBOT_CONFIG['min_speed'], v_c - ROBOT_CONFIG['max_accel'] * ROBOT_CONFIG['dt']),
        min(ROBOT_CONFIG['max_speed'], v_c + ROBOT_CONFIG['max_accel'] * ROBOT_CONFIG['dt'])
    ]
    omega_range = [
        max(-ROBOT_CONFIG['max_yaw_rate'], omega_c - ROBOT_CONFIG['max_yaw_accel'] * ROBOT_CONFIG['dt']),
        min(ROBOT_CONFIG['max_yaw_rate'], omega_c + ROBOT_CONFIG['max_yaw_accel'] * ROBOT_CONFIG['dt'])
    ]

    for v in np.linspace(v_range[0], v_range[1], v_samples):
        for omega in np.linspace(omega_range[0], omega_range[1], omega_samples):
            velocities.append((v, omega))

    return velocities


def dwa_control(x: float, y: float, theta: float,
                v_c: float, omega_c: float, goal: np.ndarray) -> tuple:
    """
    Dynamic Window Approach control law.

    Returns: (v, omega, best_trajectory)
    """
    velocities = sample_velocities(v_c, omega_c)

    best_score = -np.inf
    best_v, best_omega, best_trajectory = 0.0, 0.0, None

    for (v, omega) in velocities:
        # Predict trajectory
        trajectory = predict_trajectory(x, y, theta, v, omega, ROBOT_CONFIG['dt'])

        # Check for collision along trajectory
        collision = False
        min_clearance = np.inf
        for (px, py, _) in trajectory:
            if is_collision(px, py, OBSTACLES):
                collision = True
                break
            clearance = calc_clearance(px, py, OBSTACLES)
            min_clearance = min(min_clearance, clearance)

        if collision:
            continue

        # Score trajectory
        final_x, final_y, final_theta = trajectory[-1]
        heading = calc_heading_score(final_x, final_y, theta, goal)
        velocity = calc_velocity_score(v)
        clearance = min_clearance / 5.0  # Normalize (5m max clearance)

        score = (WEIGHTS['heading'] * heading +
                 WEIGHTS['velocity'] * velocity +
                 WEIGHTS['clearance'] * clearance)

        if score > best_score:
            best_score = score
            best_v = v
            best_omega = omega
            best_trajectory = trajectory

    return best_v, best_omega, best_trajectory


def run_dwa_simulation():
    """Run DWA simulation with visualization."""
    # Initial state
    x, y, theta = 0.5, 0.5, np.arctan2(GOAL[1] - 0.5, GOAL[0] - 0.5)
    v, omega = 0.0, 0.0

    trajectory = [[x, y, theta]]
    all_trajectories = []

    fig, ax = plt.subplots(figsize=(10, 10))

    for step in range(2000):
        # DWA control
        v, omega, trajectory_pred = dwa_control(x, y, theta, v, omega, GOAL)

        # Simulate one step
        x += v * np.cos(theta) * ROBOT_CONFIG['dt']
        y += v * np.sin(theta) * ROBOT_CONFIG['dt']
        theta += omega * ROBOT_CONFIG['dt']

        trajectory.append([x, y, theta])
        if trajectory_pred is not None:
            all_trajectories.append(trajectory_pred)

        # Check goal
        if np.linalg.norm([x - GOAL[0], y - GOAL[1]]) < 0.3:
            print(f"[INFO] DWA: Goal reached in {step} steps!")
            break

    trajectory = np.array(trajectory)

    # Plot
    ax.set_xlim(-1, 10)
    ax.set_ylim(-1, 10)
    ax.set_aspect('equal')
    ax.set_title('Dynamic Window Approach — Trajectory')

    # Draw obstacles
    for (cx, cy, w, h) in OBSTACLES:
        rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                               color='gray', alpha=0.7)
        ax.add_patch(rect)

    # Draw goal
    ax.plot(GOAL[0], GOAL[1], 'g*', markersize=20, label='Goal')

    # Draw robot trajectory
    ax.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Robot path')
    ax.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10, label='Start')

    # Draw last few predicted trajectories
    if all_trajectories:
        for traj in all_trajectories[-50::10]:  # Every 10th, last 50
            traj = np.array(traj)
            ax.plot(traj[:, 0], traj[:, 1], 'g-', alpha=0.1, linewidth=1)

    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.savefig("dwa_result.png", dpi=150)
    plt.show()

    print(f"[INFO] Final position: ({x:.2f}, {y:.2f}), Distance: {np.linalg.norm([x-GOAL[0], y-GOAL[1]]):.3f}")


if __name__ == "__main__":
    run_dwa_simulation()

5. 层次 3 — 现代:深度强化学习 (PPO)

5.1 概念

我们使用近端策略优化 (PPO) 训练**神经网络策略**来代替手工编码避障规则。智能体从经验中学习给定传感器观测应该采取什么动作。

5.2 Gymnasium 环境

"""
DRL Obstacle Avoidance — PPO Training
======================================
Custom Gymnasium environment for obstacle avoidance.
Train a PPO agent to navigate while avoiding obstacles.
"""

import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import matplotlib.pyplot as plt

# ─── Environment Configuration ─────────────────────────────────
ENV_CONFIG = {
    'map_size': 10.0,           # Environment size (meters)
    'robot_radius': 0.2,        # Robot collision radius
    'obstacle_count': 8,        # Number of random obstacles
    'obstacle_size_range': (0.3, 1.2),  # Min/max obstacle size
    'goal_radius': 0.5,         # Goal reached threshold
    'max_steps': 500,           # Maximum episode length
    'sensor_range': 3.0,        # LiDAR-like sensor range
    'sensor_count': 16,         # Number of sensor rays
}


class ObstacleAvoidanceEnv(gym.Env):
    """
    2D obstacle avoidance environment with LiDAR-like sensing.

    State: 18-dimensional vector
        - 16 LiDAR distance readings (normalized)
        - 1 goal bearing (angle to goal, normalized)
        - 1 goal distance (normalized)

    Action: 2-dimensional continuous
        - Linear velocity command (0 to 1)
        - Angular velocity command (-1 to 1)
    """

    metadata = {"render_modes": ["human"]}

    def __init__(self, render_mode=None):
        super().__init__()
        self.render_mode = render_mode

        # Observation: sensor readings + goal info
        self.observation_space = spaces.Box(
            low=0.0, high=1.0,
            shape=(18,), dtype=np.float32
        )

        # Action: [linear_vel, angular_vel]
        self.action_space = spaces.Box(
            low=np.array([-1.0, -1.0]),
            high=np.array([1.0, 1.0]),
            dtype=np.float32
        )

        self.config = ENV_CONFIG
        self.robot_pos = None
        self.robot_theta = None
        self.goal_pos = None
        self.obstacles = None
        self.step_count = None

    def _generate_obstacles(self) -> list:
        """Generate random rectangular obstacles."""
        obstacles = []
        for _ in range(self.config['obstacle_count']):
            min_size, max_size = self.config['obstacle_size_range']
            w = np.random.uniform(min_size, max_size)
            h = np.random.uniform(min_size, max_size)
            x = np.random.uniform(w/2 + 0.5, self.config['map_size'] - w/2 - 0.5)
            y = np.random.uniform(h/2 + 0.5, self.config['map_size'] - h/2 - 0.5)
            obstacles.append((x, y, w, h))
        return obstacles

    def _is_collision(self, pos: np.ndarray) -> bool:
        """Check if position collides with any obstacle."""
        r = self.config['robot_radius']
        for (cx, cy, w, h) in self.obstacles:
            hw, hh = w/2 + r, h/2 + r
            if cx - hw <= pos[0] <= cx + hw and cy - hh <= pos[1] <= cy + hh:
                return True
        return False

    def _get_sensor_readings(self) -> np.ndarray:
        """Simulate LiDAR-like sensor readings."""
        readings = np.zeros(self.config['sensor_count'], dtype=np.float32)
        angles = np.linspace(0, 2*np.pi, self.config['sensor_count'], endpoint=False)

        for i, angle in enumerate(angles):
            direction = np.array([np.cos(angle), np.sin(angle)])
            for dist in np.linspace(0.1, self.config['sensor_range'], 50):
                test_pos = self.robot_pos + dist * direction
                if test_pos[0] < 0 or test_pos[0] > self.config['map_size']:
                    break
                if test_pos[1] < 0 or test_pos[1] > self.config['map_size']:
                    break
                if self._is_collision(test_pos):
                    readings[i] = dist / self.config['sensor_range']
                    break
            else:
                readings[i] = 1.0  # No obstacle in range

        return readings

    def _get_observation(self) -> np.ndarray:
        """Get full observation vector."""
        sensors = self._get_sensor_readings()

        # Goal bearing (angle to goal, normalized to [-1, 1])
        direction_to_goal = self.goal_pos - self.robot_pos
        goal_angle = np.arctan2(direction_to_goal[1], direction_to_goal[0])
        angle_diff = goal_angle - self.robot_theta
        while angle_diff > np.pi:
            angle_diff -= 2*np.pi
        while angle_diff < -np.pi:
            angle_diff += 2*np.pi
        goal_bearing = angle_diff / np.pi  # Normalize to [-1, 1]

        # Normalized distance to goal
        goal_distance = np.linalg.norm(direction_to_goal) / (self.config['map_size'] * np.sqrt(2))

        return np.concatenate([sensors, [goal_bearing, goal_distance]]).astype(np.float32)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        # Reset robot
        self.robot_pos = np.array([0.5, 0.5])
        self.robot_theta = 0.0

        # Place goal (far corner)
        self.goal_pos = np.array([
            self.config['map_size'] - 0.5,
            self.config['map_size'] - 0.5
        ])

        # Generate obstacles
        self.obstacles = self._generate_obstacles()

        # Ensure start and goal are clear
        while self._is_collision(self.robot_pos):
            self.obstacles = self._generate_obstacles()

        self.step_count = 0
        return self._get_observation(), {}

    def step(self, action):
        self.step_count += 1

        # Unpack action
        linear_vel = float(action[0])  # 0 to 1
        angular_vel = float(action[1])  # -1 to 1

        # Convert to actual velocities
        v = linear_vel * 0.5  # Max 0.5 m/s
        omega = angular_vel * 1.5  # Max 1.5 rad/s

        # Update pose
        self.robot_pos[0] += v * np.cos(self.robot_theta) * 0.1
        self.robot_pos[1] += v * np.sin(self.robot_theta) * 0.1
        self.robot_theta += omega * 0.1

        # Clamp to map bounds
        self.robot_pos = np.clip(self.robot_pos, 0.1, self.config['map_size'] - 0.1)

        # ── Compute Reward ──
        reward = 0.0
        done = False
        info = {}

        dist_to_goal = np.linalg.norm(self.robot_pos - self.goal_pos)

        # Goal reached
        if dist_to_goal < self.config['goal_radius']:
            reward = 100.0
            done = True
            info = {"success": True}
            return self._get_observation(), reward, done, False, info

        # Collision with obstacle
        if self._is_collision(self.robot_pos):
            reward = -50.0
            done = True
            info = {"collision": True}
            return self._get_observation(), reward, done, False, info

        # Out of bounds
        if (self.robot_pos[0] <= 0.1 or self.robot_pos[0] >= self.config['map_size'] - 0.1 or
            self.robot_pos[1] <= 0.1 or self.robot_pos[1] >= self.config['map_size'] - 0.1):
            reward = -20.0
            done = True
            info = {"out_of_bounds": True}
            return self._get_observation(), reward, done, False, info

        # Time limit
        if self.step_count >= self.config['max_steps']:
            done = True
            info = {"max_steps": True}
            return self._get_observation(), reward, done, True, info

        # Sparse reward: make progress toward goal
        reward = -0.01 * dist_to_goal  # Small negative per step
        reward += 0.5 * (1.0 - dist_to_goal / (self.config['map_size'] * np.sqrt(2)))

        return self._get_observation(), reward, done, False, info

    def render(self):
        """Render the environment (matplotlib)."""
        if not hasattr(self, '_render_fig'):
            plt.ion()
            self._render_fig, self._render_ax = plt.subplots(figsize=(8, 8))
            self._render_ax.set_xlim(0, self.config['map_size'])
            self._render_ax.set_ylim(0, self.config['map_size'])
            self._render_ax.set_aspect('equal')

        ax = self._render_ax
        ax.clear()
        ax.set_xlim(0, self.config['map_size'])
        ax.set_ylim(0, self.config['map_size'])

        # Draw obstacles
        for (cx, cy, w, h) in self.obstacles:
            rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                                   color='gray', alpha=0.7)
            ax.add_patch(rect)

        # Draw goal
        ax.plot(self.goal_pos[0], self.goal_pos[1], 'g*', markersize=20)

        # Draw robot
        circle = plt.Circle(self.robot_pos, self.config['robot_radius'],
                             color='blue')
        ax.add_patch(circle)

        # Draw robot heading
        arrow_len = 0.3
        ax.arrow(self.robot_pos[0], self.robot_pos[1],
                 arrow_len * np.cos(self.robot_theta),
                 arrow_len * np.sin(self.robot_theta),
                 head_width=0.1, head_length=0.05, fc='blue', ec='blue')

        plt.pause(0.01)


def train_ppo(total_timesteps: int = 100_000, save_path: str = "ppo_obstacle_avoider"):
    """Train a PPO agent on obstacle avoidance."""
    env = ObstacleAvoidanceEnv()
    check_env(env)  # Validate environment

    model = PPO(
        policy="MlpPolicy",
        env=env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01,  # Small entropy bonus for exploration
        verbose=1,
        tensorboard_log="./tb_logs/",
    )

    print(f"[INFO] Training PPO for {total_timesteps} timesteps...")
    model.learn(total_timesteps=total_timesteps, progress_bar=True)
    model.save(save_path)
    print(f"[INFO] Model saved to {save_path}")
    return model


def evaluate(model_path: str = "ppo_obstacle_avoider", episodes: int = 10):
    """Evaluate a trained model."""
    env = ObstacleAvoidanceEnv(render_mode="human")

    try:
        model = PPO.load(model_path)
    except:
        print(f"[ERROR] Model not found at {model_path}. Train first with: python obstacle_avoidance_rl.py train")
        return

    successes = 0
    collisions = 0

    for ep in range(episodes):
        obs, _ = env.reset()
        done = False
        steps = 0

        while not done and steps < 500:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            steps += 1
            env.render()

        if info.get("success", False):
            successes += 1
            print(f"  Episode {ep+1}: SUCCESS ({steps} steps)")
        elif info.get("collision", False):
            collisions += 1
            print(f"  Episode {ep+1}: COLLISION")
        else:
            print(f"  Episode {ep+1}: TIMEOUT")

    print(f"\n[RESULTS] Success: {successes}/{episodes}, Collisions: {collisions}/{episodes}")


if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1:
        if sys.argv[1] == "eval":
            evaluate()
        elif sys.argv[1] == "train":
            train_ppo()
    else:
        print("Usage: python obstacle_avoidance_rl.py [train|eval]")
        print("  train: Train PPO agent (default 100k steps)")
        print("  eval:  Evaluate trained model")

5.3 ROS 2 集成

"""
DRL Obstacle Avoidance — ROS 2 Deployment
==========================================
Bridge trained PPO model with ROS 2 navigation.
"""

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
from stable_baselines3 import PPO

class DRLNavigator(Node):
    def __init__(self):
        super().__init__('drl_navigator')
        self.declare_parameter('model_path', 'ppo_obstacle_avoider')

        # Load trained model
        model_path = self.get_parameter('model_path').value
        self.model = PPO.load(model_path)

        # Publishers and subscribers
        self.scan_sub = self.create_subscription(
            LaserScan, '/scan', self.scan_callback, 10
        )
        self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)

        # State
        self.last_scan = None
        self.scan_received = False

        self.get_logger().info('DRL Navigator ready')

    def scan_callback(self, msg: LaserScan):
        """Process LiDAR scan and convert to observation."""
        # Handle angle_min/angle_max
        num_readings = len(msg.ranges)
        angles = np.linspace(
            msg.angle_min, msg.angle_max, num_readings
        )

        # Extract relevant sector (front 180 degrees)
        front_mask = (angles >= -np.pi/2) & (angles <= np.pi/2)
        front_angles = angles[front_mask]
        front_ranges = np.array(msg.ranges)[front_mask]

        # Downsample to 16 readings
        indices = np.linspace(0, len(front_ranges)-1, 16).astype(int)
        sensor_readings = front_ranges[indices]

        # Normalize (0 = close obstacle, 1 = no obstacle)
        sensor_readings = np.clip(sensor_readings / msg.range_max, 0, 1)

        # Add goal info (assume goal is roughly straight ahead)
        # In real system, use navigation goal from move_base
        goal_bearing = 0.0
        goal_distance = 0.5

        self.last_scan = np.concatenate([
            sensor_readings,
            [goal_bearing, goal_distance]
        ])
        self.scan_received = True

    def publish_cmd(self):
        if not self.scan_received or self.last_scan is None:
            return

        action, _ = self.model.predict(self.last_scan, deterministic=True)

        linear_vel = float(action[0]) * 0.5  # Scale to actual velocity
        angular_vel = float(action[1]) * 1.5

        cmd = Twist()
        cmd.linear.x = max(0.0, linear_vel)  # Only forward
        cmd.angular.z = angular_vel
        self.cmd_pub.publish(cmd)


def main(args=None):
    rclpy.init(args=args)
    navigator = DRLNavigator()

    # Control loop at 10 Hz
    from rclpy.timer import Timer
    timer = navigator.create_timer(0.1, navigator.publish_cmd)

    try:
        rclpy.spin(navigator)
    except KeyboardInterrupt:
        pass
    finally:
        navigator.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

6. 分步实施指南

阶段 1 — Bug 算法(层次 1)

  1. 定义环境(障碍物位置)
  2. 实现近距离传感器仿真
  3. 实现状态机:go_to_goalwall_followgo_to_goal
  4. 用简单的矩形障碍物测试
  5. 用 matplotlib 添加可视化

阶段 2 — VFH(层次 1)

  1. 添加 360° LiDAR 传感器仿真
  2. 从扫描数据计算极坐标直方图
  3. 用阈值实现山谷检测
  4. 选择朝向目标的方向
  5. 调整平滑参数 \(\sigma\) 和阈值 \(a_{th}\)

阶段 3 — DWA(层次 2)

  1. 定义机器人动力学和加速度限制
  2. 在动态窗口内采样速度空间 \((v, \omega)\)
  3. 为每个速度样本预测轨迹
  4. 评估目标函数:heading + clearance + velocity
  5. nav2 局部规划器进行 ROS 2 集成

阶段 4 — DRL(层次 3)

  1. 定义 Gymnasium 环境:__init__resetsteprender
  2. 实现类 LiDAR 传感器观测
  3. stable_baselines3 训练 PPO(100k+ 步)
  4. 评估成功率和碰撞率
  5. 用 ROS 2 桥接部署到真实机器人

7. 方法对比

指标 Bug / VFH DWA 深度强化学习 (PPO)
精度 ★★★☆☆ ★★★★☆ ★★★★☆
平滑性 ★★☆☆☆ (Bug) / ★★★☆☆ (VFH) ★★★★☆ ★★★★★
鲁棒性 ★★★☆☆ ★★★★☆ ★★★★★
计算量 非常低 高(推理)
需要训练 是(仿真)
参数调整 高(超参数)
仿真到实际差距 不适用 不适用 困难但可行
动态障碍物 中等 优秀
适用场景 简单环境 结构化空间 复杂/多变环境
复杂度 ⭐⭐⭐ ⭐⭐⭐⭐

8. 扩展与变体

8.1 传感器改进

  • **添加 IMU 数据**以改善运动预测
  • 融合相机 + LiDAR 以获得更丰富的环境表示
  • 事件相机 用于高速障碍物避障

8.2 算法组合

  • 全局规划器 + VFH:用 A* 做全局路径,VFH 做局部避障
  • DWA + MPC:用模型预测控制替代 DWA 的轨迹评估
  • 强化学习 + 经典方法:训练强化学习来调节经典规划器参数

8.3 仿真到实际迁移

  1. 在物理仿真器(Gazebo、PyBullet、Isaac Sim)中训练
  2. 添加领域随机化(光照、摩擦、传感器噪声)
  3. 用模仿学习在真实机器人上微调
  4. 进行系统辨识以校准模型

8.4 多机器人扩展

  • 集中式:单一控制器协调所有机器人
  • 分散式:每个机器人独立运行避障
  • 通信:共享位置以实现协作避障

9. 参考资料

  • Borenstein, J., & Koren, Y. (1991). "The Vector Field Histogram — Fast Obstacle Avoidance for Mobile Robots." IEEE Transactions on Robotics and Automation, 7(3), 278–288.
  • Fox, D., Burgard, W., & Thrun, S. (1997). "The Dynamic Window Approach to Collision Avoidance." IEEE Robotics & Automation Magazine, 4(1), 23–33.
  • Koren, Y., & Borenstein, J. (1991). "Potential Field Methods and Their Inherent Limitations for Mobile Robot Navigation." IEEE ICRA, 1398–1404.
  • Marder-Eppstein, E., et al. (2010). "The Dynamic Window Approach in ROS." ROS Wiki
  • Schulman, J., Wolski, F., et al. (2017). "Proximal Policy Optimization Algorithms." arXiv:1707.06347.
  • Raffin, A., et al. (2021). "Stable-Baselines3: Reliable Reinforcement Learning Implementations." JMLR, 22(268), 1–8.
  • Kuderer, M., et al. (2012). "Feature-Based Prediction of Trajectories for Socially Compliant Navigation." RSS.
  • Chen, Y. F., et al. (2017). " Decentralized Non-communicating Multiagent Collision Avoidance with Deep Reinforcement Learning." IEEE ICRA.
  • Zhou, M., et al. (2020). "Collision Avoidance in Dynamic Environments: An RL-based Approach." arXiv:2003.02918.
  • Gymnasium Documentation — gymnasium.farama.org
  • Stable-Baselines3 Documentation — stable-baselines3.readthedocs.io