Skip to content

Obstacle Avoidance for Mobile Robots (移动机器人避障)

Project Type: Navigation | Difficulty: ★★☆☆☆ to ★★★★★ (approach-dependent) | Estimated Time: 2–4 weekends


1. Project Overview

Obstacle avoidance is the capability of a mobile robot to navigate safely without colliding with obstacles in its environment. Unlike global path planning (which assumes full map knowledge), obstacle avoidance reacts to perceived obstacles in real-time, making it essential for dynamic, partially-known, or unknown environments.

  ┌──────────────────────────────────────────────────────────┐
  │                  OBSTACLE AVOIDANCE                       │
  │                                                           │
  │     Goal                                                 │
  │       ★                                                  │
  │       │                                                  │
  │       │        ┌──────┐                                  │
  │       │        │ OBS  │     Robot's avoidance behavior:   │
  │       │        └──────┘     1. Detect obstacle ahead      │
  │       │             \                                    │
  │       │              ╲                                   │
  │       ▼               ◄────── Robot steers around       │
  │     Start                                                │
  └──────────────────────────────────────────────────────────┘

In this project you will explore three progressively more sophisticated approaches:

Tier Approach Method Sensor
Tier 1 Bug Algorithm & VFH Geometry-based, reactive Proximity sensors, LiDAR
Tier 2 Dynamic Window Approach (DWA) Velocity-space optimization 2D LiDAR
Tier 3 Deep RL (PPO) Learning-based, end-to-end Simulated sensor array

2. Hardware & Software Requirements

Hardware

Component Specification Notes
Chassis 2-wheel differential drive Any mobile robot platform
Motors 2× DC gear motors with encoders Encoders for odometry
LiDAR RPLIDAR A1/A2 or similar 360° 2D laser scanner
(Alternative) Ultrasonic/IR sensor array For Tier 1 approaches
Microcontroller Raspberry Pi 4 / Jetson Nano For on-board computation
Battery 12V LiPo or USB-C power bank Power all components

Software

Package Version Purpose
Python ≥ 3.8 Core language
NumPy ≥ 1.20 Numerical computation
OpenCV ≥ 4.5 Sensor processing
ROS 2 Humble / Iron Robot middleware
Stable-Baselines3 ≥ 1.7 PPO implementation
Gymnasium ≥ 0.28 RL environment interface
PyTorch ≥ 1.13 Neural network training
pip install numpy opencv-python stable-baselines3 gymnasium torch

3. Tier 1 — Traditional: Bug Algorithm & Vector Field Histogram (VFH)

3.1 Bug Algorithm

The Bug algorithm is the simplest reactive obstacle avoidance method. The robot uses only local sensing and follows two simple rules:

Algorithm Rules

  1. Direction-to-Goal (m-line): Move directly toward the goal until an obstacle is encountered.
  2. Leave-Circle: When leaving an obstacle, head toward the goal along the shortest path.
  3. Wall-Following: When an obstacle blocks the path, follow the obstacle boundary until the m-line is reachable.

Mathematical Formulation

The Bug algorithm can be formalized as:

\[ \mathbf{d}_{\text{robot \to goal}} = \begin{pmatrix} x_g - x_r \\ y_g - y_r \end{pmatrix}, \quad \phi = \arctan2(y_g - y_r, x_g - x_r) \]

where \((x_g, y_g)\) is the goal position and \((x_r, y_r)\) is the robot position.

Hit point detection: When distance to any obstacle \(d < d_{\text{threshold}}\), a hit point is recorded.

Leave point condition: The robot leaves an obstacle when: $$ |\mathbf{p}{\text{current}} - \mathbf{p}}}| > |\mathbf{p{\text{goal}} - \mathbf{p}| $$}

3.2 Bug Algorithm — Complete Python Code

"""
Bug Algorithm — Reactive Obstacle Avoidance
============================================
A simple geometry-based obstacle avoidance algorithm.
Requires: numpy, matplotlib (for visualization)
"""

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Circle

# ─── Configuration ──────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])          # Goal position (x, y)
START = np.array([0.5, 0.5])         # Start position
ROBOT_RADIUS = 0.2                    # Robot collision radius (m)
GOAL_THRESHOLD = 0.3                  # Distance to goal to consider reached
OBSTACLE_THRESHOLD = 0.5              # Distance to detect obstacle
WALL_FOLLOW_GAIN = 1.0               # Gain for wall following direction
ROBOT_SPEED = 0.1                     # Forward speed (m/s)

# ─── Environment Definition ─────────────────────────────────────
def create_environment():
    """
    Create a simple environment with rectangular obstacles.
    Returns a list of (cx, cy, width, height) tuples.
    """
    obstacles = [
        (3.0, 3.0, 1.5, 1.5),    # Center (3,3), size 1.5x1.5
        (5.5, 5.5, 1.0, 2.0),    # Center (5.5, 5.5), size 1.0x2.0
        (6.5, 2.5, 2.0, 1.0),    # Center (6.5, 2.5), size 2.0x1.0
    ]
    return obstacles


def is_collision(point: np.ndarray, obstacles: list) -> bool:
    """
    Check if a point collides with any obstacle.
    Uses expanded obstacles to account for robot radius.
    """
    for (cx, cy, w, h) in obstacles:
        # Expanded rectangle (add robot radius)
        hw, hh = w/2 + ROBOT_RADIUS, h/2 + ROBOT_RADIUS
        if (cx - hw <= point[0] <= cx + hw and
            cy - hh <= point[1] <= cy + hh):
            return True
    return False


def get_obstacle_distance(point: np.ndarray, obstacles: list) -> float:
    """
    Compute minimum distance to any obstacle.
    Returns infinity if no obstacles nearby.
    """
    min_dist = np.inf
    for (cx, cy, w, h) in obstacles:
        # Distance to rectangle edge
        hw, hh = w/2, h/2
        dx = max(abs(point[0] - cx) - hw, 0)
        dy = max(abs(point[1] - cy) - hh, 0)
        dist = np.sqrt(dx**2 + dy**2)
        min_dist = min(min_dist, dist)
    return min_dist


def get_gradient_toward_goal(point: np.ndarray) -> np.ndarray:
    """
    Compute unit gradient vector pointing toward the goal.
    """
    direction = GOAL - point
    distance = np.linalg.norm(direction)
    if distance < 1e-6:
        return np.array([0.0, 0.0])
    return direction / distance


def get_wall_follow_direction(point: np.ndarray,
                               obstacles: list,
                               prev_direction: np.ndarray) -> np.ndarray:
    """
    Compute wall-following direction using right-hand rule.
    Keeps the robot hugging the right side of obstacles.
    """
    # Sample points around the robot to find obstacle boundary
    num_samples = 16
    angles = np.linspace(0, 2*np.pi, num_samples, endpoint=False)
    distances = np.array([
        get_obstacle_distance(point + 0.3 * np.array([np.cos(a), np.sin(a)]), obstacles)
        for a in angles
    ])

    # Find the direction with maximum clearance (most open direction)
    # but prefer directions that maintain forward progress
    best_idx = np.argmax(distances)
    best_angle = angles[best_idx]
    best_dist = distances[best_idx]

    # Combine: prefer open space but also forward direction
    goal_dir = get_gradient_toward_goal(point)
    goal_angle = np.arctan2(goal_dir[1], goal_dir[0])

    # Weighted combination of open direction and goal direction
    open_dir = np.array([np.cos(best_angle), np.sin(best_angle)])
    combined = 0.6 * open_dir + 0.4 * goal_dir
    return combined / (np.linalg.norm(combined) + 1e-6)


def bug_algorithm_step(point: np.ndarray,
                        mode: str,
                        hit_point: np.ndarray,
                        prev_direction: np.ndarray,
                        obstacles: list) -> tuple:
    """
    Execute one step of the Bug algorithm.

    Returns: (new_point, new_mode, hit_point, trajectory)
    """
    # Check if goal reached
    if np.linalg.norm(point - GOAL) < GOAL_THRESHOLD:
        return point, "goal_reached", hit_point, point

    # Get distance to nearest obstacle
    dist_to_obstacle = get_obstacle_distance(point, obstacles)

    # State machine transitions
    if mode == "go_to_goal":
        if dist_to_obstacle < OBSTACLE_THRESHOLD:
            # Hit obstacle - switch to wall following
            mode = "wall_follow"
            hit_point = point.copy()
        else:
            # Move toward goal
            direction = get_gradient_toward_goal(point)
            new_point = point + ROBOT_SPEED * direction
            if is_collision(new_point, obstacles):
                new_point = point  # Stay in place if collision
            return new_point, mode, hit_point, new_point

    elif mode == "wall_follow":
        # Check leave condition
        dist_to_goal_from_hit = np.linalg.norm(GOAL - hit_point)
        dist_to_goal_from_current = np.linalg.norm(GOAL - point)
        dist_from_hit = np.linalg.norm(point - hit_point)

        # Leave if we're closer to goal than when we hit
        if (dist_from_hit > dist_to_goal_from_hit and
            dist_to_goal_from_current < dist_to_goal_from_hit + 0.5):
            # Can head back to m-line (direct path to goal)
            mode = "go_to_goal"
            direction = get_gradient_toward_goal(point)
        else:
            # Follow wall
            direction = get_wall_follow_direction(point, obstacles, prev_direction)

        new_point = point + ROBOT_SPEED * direction
        if is_collision(new_point, obstacles):
            new_point = point
        return new_point, mode, hit_point, new_point

    return point, mode, hit_point, point


def run_bug_simulation():
    """Run Bug algorithm simulation and visualize."""
    obstacles = create_environment()
    point = START.copy()
    mode = "go_to_goal"
    hit_point = None
    trajectory = [point.copy()]
    prev_direction = np.array([0.0, 1.0])

    fig, ax = plt.subplots(figsize=(10, 10))
    ax.set_xlim(-1, 10)
    ax.set_ylim(-1, 10)
    ax.set_aspect('equal')
    ax.set_xlabel('X (m)')
    ax.set_ylabel('Y (m)')
    ax.set_title('Bug Algorithm — Obstacle Avoidance')

    # Draw obstacles
    for (cx, cy, w, h) in obstacles:
        rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                               color='gray', alpha=0.7)
        ax.add_patch(rect)

    # Draw goal
    ax.plot(GOAL[0], GOAL[1], 'g*', markersize=20, label='Goal')

    # Simulation loop
    for step in range(1000):
        point, mode, hit_point, new_pos = bug_algorithm_step(
            point, mode, hit_point, prev_direction, obstacles
        )
        trajectory.append(point.copy())

        if mode == "goal_reached":
            print(f"[INFO] Goal reached in {step} steps!")
            break

    # Plot trajectory
    trajectory = np.array(trajectory)
    ax.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Path')
    ax.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10, label='Start')
    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.savefig("bug_algorithm_result.png", dpi=150)
    plt.show()
    print(f"[INFO] Final position: {point}, Distance to goal: {np.linalg.norm(point - GOAL):.3f}")


if __name__ == "__main__":
    run_bug_simulation()

3.3 Vector Field Histogram (VFH)

VFH (Koren & Borenstein, 1991) creates a polar histogram of the environment and selects the smoothest steering direction toward the goal.

Algorithm Steps

  1. Build Cartesian Histogram Grid: Divide the robot's workspace into cells with occupancy values.
  2. Compute Polar Histogram: Convert to polar coordinates centered on the robot:
\[ h_k = \sum_{i,j} c_{ij} \cdot e^{-\frac{(k - \beta_{ij})^2}{2\sigma^2}} \]

where \(c_{ij}\) is the occupancy of cell \((i,j)\), \(\beta_{ij}\) is the angle to that cell, and \(\sigma\) controls the smoothing.

  1. Valley Selection: Identify "valleys" (ranges of \(k\) with low \(h_k\)) that are wide enough for the robot.
  2. Steering Command: Select the valley closest to the goal direction.

3.4 VFH — Complete Python Code

"""
Vector Field Histogram (VFH) — Obstacle Avoidance
===================================================
A histogram-based reactive navigation algorithm.
Requires: numpy, matplotlib
"""

import numpy as np
import matplotlib.pyplot as plt

# ─── Configuration ──────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])          # Goal position
ROBOT_POS = np.array([0.5, 0.5])     # Current position
ROBOT_RADIUS = 0.2                    # Robot radius (m)
VFH_CELL_SIZE = 0.2                   # Grid cell size (m)
VFH_SENSOR_RANGE = 3.0               # Sensor range (m)
VFH_NUM_SECTORS = 360                 # Number of polar sectors
VFH_A_THRESHOLD = 0.35               # Obstacle threshold (0-1)
VFH_SECOND_A_THRESHOLD = 0.15        # Tight threshold for narrow passages
WALL_SMOOTHING = 0.35                # Smoothing parameter sigma
ROBOT_SPEED = 0.08                    # Forward speed
MAX_STEERING_RATE = 0.5               # Max steering change per step

# ─── Simulated Sensor Reading ─────────────────────────────────────
def simulate_lidar(position: np.ndarray, obstacles: list, num_readings: int = 360) -> np.ndarray:
    """
    Simulate LiDAR scan. Returns array of distances in each direction.
    """
    angles = np.linspace(0, 2*np.pi, num_readings, endpoint=False)
    distances = np.full(num_readings, VFH_SENSOR_RANGE)

    for i, angle in enumerate(angles):
        direction = np.array([np.cos(angle), np.sin(angle)])
        for dist in np.linspace(0.1, VFH_SENSOR_RANGE, 50):
            test_point = position + dist * direction
            for (cx, cy, w, h) in obstacles:
                hw, hh = w/2, h/2
                if (cx - hw <= test_point[0] <= cx + hw and
                    cy - hh <= test_point[1] <= cy + hh):
                    distances[i] = min(distances[i], dist)
                    break
    return distances


def compute_polar_histogram(lidar_distances: np.ndarray) -> np.ndarray:
    """
    Convert LiDAR distances to polar histogram.
    Higher values mean more obstacle presence.
    """
    n = len(lidar_distances)
    histogram = np.zeros(n)

    for k in range(n):
        # Each sector corresponds to one lidar reading
        d = lidar_distances[k]
        if d < VFH_SENSOR_RANGE:
            # Active cells closer to robot contribute more
            m_k = 1.0 - (d / VFH_SENSOR_RANGE)
            histogram[k] = m_k ** 2  # Square for sharper response
        else:
            histogram[k] = 0.0

    # Smooth the histogram
    smoothed = np.zeros_like(histogram)
    for k in range(n):
        for s in range(-8, 9):  # ±8 sectors smoothing window
            idx = (k + s) % n
            weight = np.exp(-(s * WALL_SMOOTHING) ** 2 / 2)
            smoothed[k] += histogram[idx] * weight
    return smoothed


def find_valleys(histogram: np.ndarray, threshold: float) -> list:
    """
    Find valleys (open passages) in the polar histogram.
    A valley is a consecutive sequence of sectors below threshold.
    """
    n = len(histogram)
    valleys = []
    in_valley = False
    start_idx = 0

    for k in range(n):
        if histogram[k] < threshold and not in_valley:
            in_valley = True
            start_idx = k
        elif histogram[k] >= threshold and in_valley:
            in_valley = False
            valleys.append((start_idx, k - 1))

    if in_valley:
        valleys.append((start_idx, n - 1))

    return valleys


def select_steering_direction(valleys: list, goal_angle: float, prev_direction: float) -> float:
    """
    Select the best steering direction from available valleys.
    Prefers directions close to the goal.
    """
    if not valleys:
        # No valley found - rotate in place
        return prev_direction + MAX_STEERING_RATE

    # Find valley closest to goal angle
    best_valley = None
    best_score = np.inf

    for (start, end) in valleys:
        valley_center = (start + end) / 2
        if start == 0 and end == len(valleys) > 0:
            # Handle wrap-around
            valley_center = ((end + start + len(valleys)) / 2) % 360

        # Angular difference to goal
        diff = abs(valley_center - goal_angle)
        diff = min(diff, 360 - diff)  # Handle wrap-around

        if diff < best_score:
            best_score = diff
            best_valley = valley_center

    return np.deg2rad(best_valley)


def vfh_step(position: np.ndarray,
             heading: float,
             obstacles: list) -> tuple:
    """
    Execute one VFH step.

    Returns: (new_position, new_heading)
    """
    # 1. Get LiDAR scan
    lidar_distances = simulate_lidar(position, obstacles)

    # 2. Compute polar histogram
    histogram = compute_polar_histogram(lidar_distances)

    # 3. Find valleys
    threshold = VFH_A_THRESHOLD
    valleys = find_valleys(histogram, threshold)

    # 4. Compute goal direction
    direction_to_goal = GOAL - position
    goal_angle = np.rad2deg(np.arctan2(direction_to_goal[1], direction_to_goal[0]))
    if goal_angle < 0:
        goal_angle += 360

    # 5. Select steering
    desired_heading = select_steering_direction(valleys, goal_angle, heading)

    # 6. Limit steering rate
    heading_diff = desired_heading - heading
    while heading_diff > np.pi:
        heading_diff -= 2*np.pi
    while heading_diff < -np.pi:
        heading_diff += 2*np.pi
    heading += np.clip(heading_diff, -MAX_STEERING_RATE, MAX_STEERING_RATE)

    # 7. Move forward
    new_position = position + ROBOT_SPEED * np.array([np.cos(heading), np.sin(heading)])

    return new_position, heading, histogram


def run_vfh_simulation():
    """Run VFH simulation and visualize."""
    obstacles = [
        (3.0, 3.0, 1.5, 1.5),
        (5.5, 5.5, 1.0, 2.0),
        (6.5, 2.5, 2.0, 1.0),
    ]

    position = ROBOT_POS.copy()
    heading = np.arctan2(GOAL[1] - position[1], GOAL[0] - position[0])
    trajectory = [position.copy()]
    all_histograms = []

    fig, axes = plt.subplots(1, 2, figsize=(14, 6))

    for step in range(1500):
        position, heading, histogram = vfh_step(position, heading, obstacles)
        trajectory.append(position.copy())
        all_histograms.append(histogram)

        if np.linalg.norm(position - GOAL) < 0.3:
            print(f"[INFO] VFH: Goal reached in {step} steps!")
            break

    trajectory = np.array(trajectory)

    # Plot trajectory
    ax1 = axes[0]
    ax1.set_xlim(-1, 10)
    ax1.set_ylim(-1, 10)
    ax1.set_aspect('equal')
    ax1.set_title('VFH — Trajectory')

    for (cx, cy, w, h) in obstacles:
        rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                               color='gray', alpha=0.7)
        ax1.add_patch(rect)

    ax1.plot(GOAL[0], GOAL[1], 'g*', markersize=20)
    ax1.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=1.5)
    ax1.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10)
    ax1.grid(True, alpha=0.3)

    # Plot final histogram
    ax2 = axes[1]
    final_hist = all_histograms[-1]
    angles = np.linspace(0, 360, len(final_hist), endpoint=False)
    ax2.fill(np.deg2rad(angles), final_hist, alpha=0.7, color='blue')
    ax2.set_theta_zero_location('E')
    ax2.set_theta_direction(1)
    ax2.set_title('VFH — Final Polar Histogram')
    ax2.set_rlim(0, 1)

    plt.tight_layout()
    plt.savefig("vfh_result.png", dpi=150)
    plt.show()

    print(f"[INFO] Final distance to goal: {np.linalg.norm(position - GOAL):.3f}")


if __name__ == "__main__":
    run_vfh_simulation()

3.5 Bug vs VFH — Comparison

Feature Bug Algorithm VFH
Complexity Very low Medium
Smoothness Jerky (changes direction abruptly) Smooth steering
Sensor requirement Simple proximity sensor 360° LiDAR
Local minima Can get stuck in loops Better at avoiding local minima
Threshold tuning Few parameters Multiple (threshold, smoothing)

4. Tier 2 — Intermediate: Dynamic Window Approach (DWA)

4.1 Concept

The Dynamic Window Approach (Fox et al., 1997) formulates obstacle avoidance as an optimization problem in velocity space. Instead of computing steering direction directly, DWA:

  1. Samples velocity space \((v, \omega)\) within dynamically feasible ranges
  2. Simulates trajectories for each velocity sample over a short time horizon
  3. Selects the velocity that maximizes an objective function

4.2 Mathematical Formulation

Velocity Space

The robot's dynamics are:

\[ \dot{x} = v \cos(\theta), \quad \dot{y} = v \sin(\theta), \quad \dot{\theta} = \omega \]

Dynamic Window

The dynamic window restricts velocities to:

\[ V_d = \{ (v, \omega) \mid v \in [v_c - \dot{v}_a \cdot \Delta t, v_c + \dot{v}_a \cdot \Delta t] \cap [v_{\min}, v_{\max}] \} \]
\[ \omega \in [\omega_c - \dot{\omega}_a \cdot \Delta t, \omega_c + \dot{\omega}_a \cdot \Delta t] \cap [\omega_{\min}, \omega_{\max}] \]

Objective Function

\[ G(v, \omega) = \alpha \cdot \text{heading}(v, \omega) + \beta \cdot \text{dist}(v, \omega) + \gamma \cdot \text{velocity}(v) \]
  • heading: Progress toward goal direction (higher = better)
  • dist: Clearance from obstacles (higher = farther from obstacles)
  • velocity: Forward velocity (higher = faster)

4.3 DWA — Complete Python Code

"""
Dynamic Window Approach (DWA) — Local Planner
=============================================
Velocity-space optimization for obstacle avoidance.
Requires: numpy, matplotlib
"""

import numpy as np
import matplotlib.pyplot as plt

# ─── Robot Configuration ─────────────────────────────────────────
ROBOT_CONFIG = {
    'max_speed': 0.5,         # Maximum linear velocity (m/s)
    'min_speed': -0.2,        # Minimum reverse speed (m/s)
    'max_yaw_rate': 1.5,      # Maximum angular velocity (rad/s)
    'max_accel': 2.0,         # Maximum linear acceleration (m/s^2)
    'max_yaw_accel': 3.0,     # Maximum angular acceleration (rad/s^2)
    'robot_radius': 0.2,      # Collision radius (m)
    'predict_time': 1.5,      # Trajectory prediction horizon (s)
    'dt': 0.1,                # Time step (s)
}

# ─── DWA Weights ─────────────────────────────────────────────────
WEIGHTS = {
    'heading': 0.8,          # Weight for goal heading
    'velocity': 0.2,          # Weight for forward velocity
    'clearance': 0.1,         # Weight for obstacle clearance
}

# ─── Environment ─────────────────────────────────────────────────
GOAL = np.array([8.0, 8.0])
OBSTACLES = [
    (3.0, 3.0, 1.5, 1.5),
    (5.5, 5.5, 1.0, 2.0),
    (6.5, 2.5, 2.0, 1.0),
]


def is_collision(x: float, y: float, obstacles: list) -> bool:
    """Check if point (x, y) collides with any obstacle."""
    for (cx, cy, w, h) in obstacles:
        hw, hh = w/2 + ROBOT_CONFIG['robot_radius'], h/2 + ROBOT_CONFIG['robot_radius']
        if cx - hw <= x <= cx + hw and cy - hh <= y <= cy + hh:
            return True
    return False


def predict_trajectory(x: float, y: float, theta: float,
                       v: float, omega: float, dt: float) -> np.ndarray:
    """
    Predict trajectory given initial state and velocity.
    Returns array of (x, y, theta) over the prediction horizon.
    """
    trajectory = []
    time = 0.0
    px, py, ptheta = x, y, theta

    while time <= ROBOT_CONFIG['predict_time']:
        trajectory.append([px, py, ptheta])
        px += v * np.cos(ptheta) * dt
        py += v * np.sin(ptheta) * dt
        ptheta += omega * dt
        time += dt

    return np.array(trajectory)


def calc_clearance(x: float, y: float, obstacles: list) -> float:
    """
    Calculate minimum distance to any obstacle.
    """
    min_dist = np.inf
    for (cx, cy, w, h) in obstacles:
        hw, hh = w/2, h/2
        dx = max(abs(x - cx) - hw, 0)
        dy = max(abs(y - cy) - hh, 0)
        dist = np.sqrt(dx**2 + dy**2)
        min_dist = min(min_dist, dist)
    return min_dist


def calc_heading_score(x: float, y: float, theta: float, goal: np.ndarray) -> float:
    """
    Calculate heading score (progress toward goal).
    Returns value in [0, 1], where 1 is facing directly at goal.
    """
    dx = goal[0] - x
    dy = goal[1] - y
    goal_angle = np.arctan2(dy, dx)

    angle_diff = goal_angle - theta
    while angle_diff > np.pi:
        angle_diff -= 2*np.pi
    while angle_diff < -np.pi:
        angle_diff += 2*np.pi

    # Convert to score: 0 when perpendicular, 1 when aligned
    return 0.5 * (1.0 - abs(angle_diff) / np.pi)


def calc_velocity_score(v: float) -> float:
    """
    Calculate velocity score (normalized forward speed).
    """
    return (v - ROBOT_CONFIG['min_speed']) / (ROBOT_CONFIG['max_speed'] - ROBOT_CONFIG['min_speed'])


def sample_velocities(v_c: float, omega_c: float) -> list:
    """
    Sample velocities within the dynamic window.
    """
    v_samples, omega_samples = 5, 10
    velocities = []

    v_range = [
        max(ROBOT_CONFIG['min_speed'], v_c - ROBOT_CONFIG['max_accel'] * ROBOT_CONFIG['dt']),
        min(ROBOT_CONFIG['max_speed'], v_c + ROBOT_CONFIG['max_accel'] * ROBOT_CONFIG['dt'])
    ]
    omega_range = [
        max(-ROBOT_CONFIG['max_yaw_rate'], omega_c - ROBOT_CONFIG['max_yaw_accel'] * ROBOT_CONFIG['dt']),
        min(ROBOT_CONFIG['max_yaw_rate'], omega_c + ROBOT_CONFIG['max_yaw_accel'] * ROBOT_CONFIG['dt'])
    ]

    for v in np.linspace(v_range[0], v_range[1], v_samples):
        for omega in np.linspace(omega_range[0], omega_range[1], omega_samples):
            velocities.append((v, omega))

    return velocities


def dwa_control(x: float, y: float, theta: float,
                v_c: float, omega_c: float, goal: np.ndarray) -> tuple:
    """
    Dynamic Window Approach control law.

    Returns: (v, omega, best_trajectory)
    """
    velocities = sample_velocities(v_c, omega_c)

    best_score = -np.inf
    best_v, best_omega, best_trajectory = 0.0, 0.0, None

    for (v, omega) in velocities:
        # Predict trajectory
        trajectory = predict_trajectory(x, y, theta, v, omega, ROBOT_CONFIG['dt'])

        # Check for collision along trajectory
        collision = False
        min_clearance = np.inf
        for (px, py, _) in trajectory:
            if is_collision(px, py, OBSTACLES):
                collision = True
                break
            clearance = calc_clearance(px, py, OBSTACLES)
            min_clearance = min(min_clearance, clearance)

        if collision:
            continue

        # Score trajectory
        final_x, final_y, final_theta = trajectory[-1]
        heading = calc_heading_score(final_x, final_y, theta, goal)
        velocity = calc_velocity_score(v)
        clearance = min_clearance / 5.0  # Normalize (5m max clearance)

        score = (WEIGHTS['heading'] * heading +
                 WEIGHTS['velocity'] * velocity +
                 WEIGHTS['clearance'] * clearance)

        if score > best_score:
            best_score = score
            best_v = v
            best_omega = omega
            best_trajectory = trajectory

    return best_v, best_omega, best_trajectory


def run_dwa_simulation():
    """Run DWA simulation with visualization."""
    # Initial state
    x, y, theta = 0.5, 0.5, np.arctan2(GOAL[1] - 0.5, GOAL[0] - 0.5)
    v, omega = 0.0, 0.0

    trajectory = [[x, y, theta]]
    all_trajectories = []

    fig, ax = plt.subplots(figsize=(10, 10))

    for step in range(2000):
        # DWA control
        v, omega, trajectory_pred = dwa_control(x, y, theta, v, omega, GOAL)

        # Simulate one step
        x += v * np.cos(theta) * ROBOT_CONFIG['dt']
        y += v * np.sin(theta) * ROBOT_CONFIG['dt']
        theta += omega * ROBOT_CONFIG['dt']

        trajectory.append([x, y, theta])
        if trajectory_pred is not None:
            all_trajectories.append(trajectory_pred)

        # Check goal
        if np.linalg.norm([x - GOAL[0], y - GOAL[1]]) < 0.3:
            print(f"[INFO] DWA: Goal reached in {step} steps!")
            break

    trajectory = np.array(trajectory)

    # Plot
    ax.set_xlim(-1, 10)
    ax.set_ylim(-1, 10)
    ax.set_aspect('equal')
    ax.set_title('Dynamic Window Approach — Trajectory')

    # Draw obstacles
    for (cx, cy, w, h) in OBSTACLES:
        rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                               color='gray', alpha=0.7)
        ax.add_patch(rect)

    # Draw goal
    ax.plot(GOAL[0], GOAL[1], 'g*', markersize=20, label='Goal')

    # Draw robot trajectory
    ax.plot(trajectory[:, 0], trajectory[:, 1], 'b-', linewidth=2, label='Robot path')
    ax.plot(trajectory[0, 0], trajectory[0, 1], 'bo', markersize=10, label='Start')

    # Draw last few predicted trajectories
    if all_trajectories:
        for traj in all_trajectories[-50::10]:  # Every 10th, last 50
            traj = np.array(traj)
            ax.plot(traj[:, 0], traj[:, 1], 'g-', alpha=0.1, linewidth=1)

    ax.legend()
    ax.grid(True, alpha=0.3)

    plt.savefig("dwa_result.png", dpi=150)
    plt.show()

    print(f"[INFO] Final position: ({x:.2f}, {y:.2f}), Distance: {np.linalg.norm([x-GOAL[0], y-GOAL[1]]):.3f}")


if __name__ == "__main__":
    run_dwa_simulation()

5. Tier 3 — Modern: Deep Reinforcement Learning (PPO)

5.1 Concept

Instead of hand-coding avoidance rules, we train a neural network policy using Proximal Policy Optimization (PPO). The agent learns from experience which actions to take given sensor observations.

5.2 Gymnasium Environment

"""
DRL Obstacle Avoidance — PPO Training
======================================
Custom Gymnasium environment for obstacle avoidance.
Train a PPO agent to navigate while avoiding obstacles.
"""

import numpy as np
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
import matplotlib.pyplot as plt

# ─── Environment Configuration ─────────────────────────────────
ENV_CONFIG = {
    'map_size': 10.0,           # Environment size (meters)
    'robot_radius': 0.2,        # Robot collision radius
    'obstacle_count': 8,        # Number of random obstacles
    'obstacle_size_range': (0.3, 1.2),  # Min/max obstacle size
    'goal_radius': 0.5,         # Goal reached threshold
    'max_steps': 500,           # Maximum episode length
    'sensor_range': 3.0,        # LiDAR-like sensor range
    'sensor_count': 16,         # Number of sensor rays
}


class ObstacleAvoidanceEnv(gym.Env):
    """
    2D obstacle avoidance environment with LiDAR-like sensing.

    State: 18-dimensional vector
        - 16 LiDAR distance readings (normalized)
        - 1 goal bearing (angle to goal, normalized)
        - 1 goal distance (normalized)

    Action: 2-dimensional continuous
        - Linear velocity command (0 to 1)
        - Angular velocity command (-1 to 1)
    """

    metadata = {"render_modes": ["human"]}

    def __init__(self, render_mode=None):
        super().__init__()
        self.render_mode = render_mode

        # Observation: sensor readings + goal info
        self.observation_space = spaces.Box(
            low=0.0, high=1.0,
            shape=(18,), dtype=np.float32
        )

        # Action: [linear_vel, angular_vel]
        self.action_space = spaces.Box(
            low=np.array([-1.0, -1.0]),
            high=np.array([1.0, 1.0]),
            dtype=np.float32
        )

        self.config = ENV_CONFIG
        self.robot_pos = None
        self.robot_theta = None
        self.goal_pos = None
        self.obstacles = None
        self.step_count = None

    def _generate_obstacles(self) -> list:
        """Generate random rectangular obstacles."""
        obstacles = []
        for _ in range(self.config['obstacle_count']):
            min_size, max_size = self.config['obstacle_size_range']
            w = np.random.uniform(min_size, max_size)
            h = np.random.uniform(min_size, max_size)
            x = np.random.uniform(w/2 + 0.5, self.config['map_size'] - w/2 - 0.5)
            y = np.random.uniform(h/2 + 0.5, self.config['map_size'] - h/2 - 0.5)
            obstacles.append((x, y, w, h))
        return obstacles

    def _is_collision(self, pos: np.ndarray) -> bool:
        """Check if position collides with any obstacle."""
        r = self.config['robot_radius']
        for (cx, cy, w, h) in self.obstacles:
            hw, hh = w/2 + r, h/2 + r
            if cx - hw <= pos[0] <= cx + hw and cy - hh <= pos[1] <= cy + hh:
                return True
        return False

    def _get_sensor_readings(self) -> np.ndarray:
        """Simulate LiDAR-like sensor readings."""
        readings = np.zeros(self.config['sensor_count'], dtype=np.float32)
        angles = np.linspace(0, 2*np.pi, self.config['sensor_count'], endpoint=False)

        for i, angle in enumerate(angles):
            direction = np.array([np.cos(angle), np.sin(angle)])
            for dist in np.linspace(0.1, self.config['sensor_range'], 50):
                test_pos = self.robot_pos + dist * direction
                if test_pos[0] < 0 or test_pos[0] > self.config['map_size']:
                    break
                if test_pos[1] < 0 or test_pos[1] > self.config['map_size']:
                    break
                if self._is_collision(test_pos):
                    readings[i] = dist / self.config['sensor_range']
                    break
            else:
                readings[i] = 1.0  # No obstacle in range

        return readings

    def _get_observation(self) -> np.ndarray:
        """Get full observation vector."""
        sensors = self._get_sensor_readings()

        # Goal bearing (angle to goal, normalized to [-1, 1])
        direction_to_goal = self.goal_pos - self.robot_pos
        goal_angle = np.arctan2(direction_to_goal[1], direction_to_goal[0])
        angle_diff = goal_angle - self.robot_theta
        while angle_diff > np.pi:
            angle_diff -= 2*np.pi
        while angle_diff < -np.pi:
            angle_diff += 2*np.pi
        goal_bearing = angle_diff / np.pi  # Normalize to [-1, 1]

        # Normalized distance to goal
        goal_distance = np.linalg.norm(direction_to_goal) / (self.config['map_size'] * np.sqrt(2))

        return np.concatenate([sensors, [goal_bearing, goal_distance]]).astype(np.float32)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        # Reset robot
        self.robot_pos = np.array([0.5, 0.5])
        self.robot_theta = 0.0

        # Place goal (far corner)
        self.goal_pos = np.array([
            self.config['map_size'] - 0.5,
            self.config['map_size'] - 0.5
        ])

        # Generate obstacles
        self.obstacles = self._generate_obstacles()

        # Ensure start and goal are clear
        while self._is_collision(self.robot_pos):
            self.obstacles = self._generate_obstacles()

        self.step_count = 0
        return self._get_observation(), {}

    def step(self, action):
        self.step_count += 1

        # Unpack action
        linear_vel = float(action[0])  # 0 to 1
        angular_vel = float(action[1])  # -1 to 1

        # Convert to actual velocities
        v = linear_vel * 0.5  # Max 0.5 m/s
        omega = angular_vel * 1.5  # Max 1.5 rad/s

        # Update pose
        self.robot_pos[0] += v * np.cos(self.robot_theta) * 0.1
        self.robot_pos[1] += v * np.sin(self.robot_theta) * 0.1
        self.robot_theta += omega * 0.1

        # Clamp to map bounds
        self.robot_pos = np.clip(self.robot_pos, 0.1, self.config['map_size'] - 0.1)

        # ── Compute Reward ──
        reward = 0.0
        done = False
        info = {}

        dist_to_goal = np.linalg.norm(self.robot_pos - self.goal_pos)

        # Goal reached
        if dist_to_goal < self.config['goal_radius']:
            reward = 100.0
            done = True
            info = {"success": True}
            return self._get_observation(), reward, done, False, info

        # Collision with obstacle
        if self._is_collision(self.robot_pos):
            reward = -50.0
            done = True
            info = {"collision": True}
            return self._get_observation(), reward, done, False, info

        # Out of bounds
        if (self.robot_pos[0] <= 0.1 or self.robot_pos[0] >= self.config['map_size'] - 0.1 or
            self.robot_pos[1] <= 0.1 or self.robot_pos[1] >= self.config['map_size'] - 0.1):
            reward = -20.0
            done = True
            info = {"out_of_bounds": True}
            return self._get_observation(), reward, done, False, info

        # Time limit
        if self.step_count >= self.config['max_steps']:
            done = True
            info = {"max_steps": True}
            return self._get_observation(), reward, done, True, info

        # Sparse reward: make progress toward goal
        reward = -0.01 * dist_to_goal  # Small negative per step
        reward += 0.5 * (1.0 - dist_to_goal / (self.config['map_size'] * np.sqrt(2)))

        return self._get_observation(), reward, done, False, info

    def render(self):
        """Render the environment (matplotlib)."""
        if not hasattr(self, '_render_fig'):
            plt.ion()
            self._render_fig, self._render_ax = plt.subplots(figsize=(8, 8))
            self._render_ax.set_xlim(0, self.config['map_size'])
            self._render_ax.set_ylim(0, self.config['map_size'])
            self._render_ax.set_aspect('equal')

        ax = self._render_ax
        ax.clear()
        ax.set_xlim(0, self.config['map_size'])
        ax.set_ylim(0, self.config['map_size'])

        # Draw obstacles
        for (cx, cy, w, h) in self.obstacles:
            rect = plt.Rectangle((cx - w/2, cy - h/2), w, h,
                                   color='gray', alpha=0.7)
            ax.add_patch(rect)

        # Draw goal
        ax.plot(self.goal_pos[0], self.goal_pos[1], 'g*', markersize=20)

        # Draw robot
        circle = plt.Circle(self.robot_pos, self.config['robot_radius'],
                             color='blue')
        ax.add_patch(circle)

        # Draw robot heading
        arrow_len = 0.3
        ax.arrow(self.robot_pos[0], self.robot_pos[1],
                 arrow_len * np.cos(self.robot_theta),
                 arrow_len * np.sin(self.robot_theta),
                 head_width=0.1, head_length=0.05, fc='blue', ec='blue')

        plt.pause(0.01)


def train_ppo(total_timesteps: int = 100_000, save_path: str = "ppo_obstacle_avoider"):
    """Train a PPO agent on obstacle avoidance."""
    env = ObstacleAvoidanceEnv()
    check_env(env)  # Validate environment

    model = PPO(
        policy="MlpPolicy",
        env=env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.01,  # Small entropy bonus for exploration
        verbose=1,
        tensorboard_log="./tb_logs/",
    )

    print(f"[INFO] Training PPO for {total_timesteps} timesteps...")
    model.learn(total_timesteps=total_timesteps, progress_bar=True)
    model.save(save_path)
    print(f"[INFO] Model saved to {save_path}")
    return model


def evaluate(model_path: str = "ppo_obstacle_avoider", episodes: int = 10):
    """Evaluate a trained model."""
    env = ObstacleAvoidanceEnv(render_mode="human")

    try:
        model = PPO.load(model_path)
    except:
        print(f"[ERROR] Model not found at {model_path}. Train first with: python obstacle_avoidance_rl.py train")
        return

    successes = 0
    collisions = 0

    for ep in range(episodes):
        obs, _ = env.reset()
        done = False
        steps = 0

        while not done and steps < 500:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            steps += 1
            env.render()

        if info.get("success", False):
            successes += 1
            print(f"  Episode {ep+1}: SUCCESS ({steps} steps)")
        elif info.get("collision", False):
            collisions += 1
            print(f"  Episode {ep+1}: COLLISION")
        else:
            print(f"  Episode {ep+1}: TIMEOUT")

    print(f"\n[RESULTS] Success: {successes}/{episodes}, Collisions: {collisions}/{episodes}")


if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1:
        if sys.argv[1] == "eval":
            evaluate()
        elif sys.argv[1] == "train":
            train_ppo()
    else:
        print("Usage: python obstacle_avoidance_rl.py [train|eval]")
        print("  train: Train PPO agent (default 100k steps)")
        print("  eval:  Evaluate trained model")

5.3 ROS 2 Integration

"""
DRL Obstacle Avoidance — ROS 2 Deployment
==========================================
Bridge trained PPO model with ROS 2 navigation.
"""

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
import numpy as np
from stable_baselines3 import PPO

class DRLNavigator(Node):
    def __init__(self):
        super().__init__('drl_navigator')
        self.declare_parameter('model_path', 'ppo_obstacle_avoider')

        # Load trained model
        model_path = self.get_parameter('model_path').value
        self.model = PPO.load(model_path)

        # Publishers and subscribers
        self.scan_sub = self.create_subscription(
            LaserScan, '/scan', self.scan_callback, 10
        )
        self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)

        # State
        self.last_scan = None
        self.scan_received = False

        self.get_logger().info('DRL Navigator ready')

    def scan_callback(self, msg: LaserScan):
        """Process LiDAR scan and convert to observation."""
        # Handle angle_min/angle_max
        num_readings = len(msg.ranges)
        angles = np.linspace(
            msg.angle_min, msg.angle_max, num_readings
        )

        # Extract relevant sector (front 180 degrees)
        front_mask = (angles >= -np.pi/2) & (angles <= np.pi/2)
        front_angles = angles[front_mask]
        front_ranges = np.array(msg.ranges)[front_mask]

        # Downsample to 16 readings
        indices = np.linspace(0, len(front_ranges)-1, 16).astype(int)
        sensor_readings = front_ranges[indices]

        # Normalize (0 = close obstacle, 1 = no obstacle)
        sensor_readings = np.clip(sensor_readings / msg.range_max, 0, 1)

        # Add goal info (assume goal is roughly straight ahead)
        # In real system, use navigation goal from move_base
        goal_bearing = 0.0
        goal_distance = 0.5

        self.last_scan = np.concatenate([
            sensor_readings,
            [goal_bearing, goal_distance]
        ])
        self.scan_received = True

    def publish_cmd(self):
        if not self.scan_received or self.last_scan is None:
            return

        action, _ = self.model.predict(self.last_scan, deterministic=True)

        linear_vel = float(action[0]) * 0.5  # Scale to actual velocity
        angular_vel = float(action[1]) * 1.5

        cmd = Twist()
        cmd.linear.x = max(0.0, linear_vel)  # Only forward
        cmd.angular.z = angular_vel
        self.cmd_pub.publish(cmd)


def main(args=None):
    rclpy.init(args=args)
    navigator = DRLNavigator()

    # Control loop at 10 Hz
    from rclpy.timer import Timer
    timer = navigator.create_timer(0.1, navigator.publish_cmd)

    try:
        rclpy.spin(navigator)
    except KeyboardInterrupt:
        pass
    finally:
        navigator.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

6. Step-by-Step Implementation Guide

Phase 1 — Bug Algorithm (Tier 1)

  1. Define your environment (obstacle positions)
  2. Implement proximity sensor simulation
  3. Implement state machine: go_to_goalwall_followgo_to_goal
  4. Test with simple rectangular obstacles
  5. Add visualization with matplotlib

Phase 2 — VFH (Tier 1)

  1. Add 360° LiDAR sensor simulation
  2. Compute polar histogram from scan data
  3. Implement valley detection with threshold
  4. Select steering direction toward goal
  5. Tune smoothing parameter \(\sigma\) and threshold \(a_{th}\)

Phase 3 — DWA (Tier 2)

  1. Define robot dynamics and acceleration limits
  2. Sample velocity space \((v, \omega)\) within dynamic window
  3. Predict trajectories for each velocity sample
  4. Evaluate objective function: heading + clearance + velocity
  5. ROS 2 integration with nav2 local planner

Phase 4 — DRL (Tier 3)

  1. Define Gymnasium environment with __init__, reset, step, render
  2. Implement LiDAR-like sensor observations
  3. Train PPO with stable_baselines3 (100k+ timesteps)
  4. Evaluate success rate and collision rate
  5. Deploy on real robot with ROS 2 bridge

7. Comparison of Approaches

Criteria Bug / VFH DWA Deep RL (PPO)
Accuracy ★★★☆☆ ★★★★☆ ★★★★☆
Smoothness ★★☆☆☆ (Bug) / ★★★☆☆ (VFH) ★★★★☆ ★★★★★
Robustness ★★★☆☆ ★★★★☆ ★★★★★
Computation Very low Low High (inference)
Training needed No No Yes (simulation)
Parameter tuning Low Medium High (hyperparameters)
Sim-to-real gap N/A N/A Challenging but doable
Dynamic obstacles Moderate Good Excellent
Best for Simple environments Structured spaces Complex/variable environments
Complexity ⭐⭐⭐ ⭐⭐⭐⭐

8. Extensions and Variations

8.1 Sensor Improvements

  • Add IMU data to DWA for better motion prediction
  • Fuse camera + LiDAR for richer environment representation
  • Event cameras for high-speed obstacle avoidance

8.2 Algorithm Combinations

  • Global planner + VFH: Use A* for global path, VFH for local avoidance
  • DWA + MPC: Replace DWA's trajectory evaluation with Model Predictive Control
  • RL + classical: Train RL to modulate classical planner parameters

8.3 Sim-to-Real Transfer

  1. Train in physics simulator (Gazebo, PyBullet, Isaac Sim)
  2. Add domain randomization (lighting, friction, sensor noise)
  3. Fine-tune on real robot with imitation learning
  4. Apply system identification for model calibration

8.4 Multi-Robot Extensions

  • Centralized: Single controller coordinates all robots
  • Decentralized: Each robot runs independent obstacle avoidance
  • Communication: Share positions for cooperative avoidance

9. References

  • Borenstein, J., & Koren, Y. (1991). "The Vector Field Histogram — Fast Obstacle Avoidance for Mobile Robots." IEEE Transactions on Robotics and Automation, 7(3), 278–288.
  • Fox, D., Burgard, W., & Thrun, S. (1997). "The Dynamic Window Approach to Collision Avoidance." IEEE Robotics & Automation Magazine, 4(1), 23–33.
  • Koren, Y., & Borenstein, J. (1991). "Potential Field Methods and Their Inherent Limitations for Mobile Robot Navigation." IEEE ICRA, 1398–1404.
  • Marder-Eppstein, E., et al. (2010). "The Dynamic Window Approach in ROS." ROS Wiki
  • Schulman, J., Wolski, F., et al. (2017). "Proximal Policy Optimization Algorithms." arXiv:1707.06347.
  • Raffin, A., et al. (2021). "Stable-Baselines3: Reliable Reinforcement Learning Implementations." JMLR, 22(268), 1–8.
  • Kuderer, M., et al. (2012). "Feature-Based Prediction of Trajectories for Socially Compliant Navigation." RSS.
  • Chen, Y. F., et al. (2017). " Decentralized Non-communicating Multiagent Collision Avoidance with Deep Reinforcement Learning." IEEE ICRA.
  • Zhou, M., et al. (2020). "Collision Avoidance in Dynamic Environments: An RL-based Approach." arXiv:2003.02918.
  • Gymnasium Documentation — gymnasium.farama.org
  • Stable-Baselines3 Documentation — stable-baselines3.readthedocs.io