跳转至

视觉-语言导航 (Vision-Language Navigation)

项目类型: 具身人工智能 (Embodied AI) | 难度: ★★★☆☆ 到 ★★★★★ | 预计时间: 2–4 周


1. 项目概述

视觉-语言导航(Vision-Language Navigation,简称 VLN) 是一项让机器人通过自然语言指令导航到目标位置的任务。例如,指令 "在蓝色椅子处左转,经过厨房后在餐桌旁停下"。机器人需要将语言引用与视觉观察对应起来,并据此规划导航路径。

┌─────────────────────────────────────────────────────────────────────┐
│                    视觉-语言导航流水线 (VLN Pipeline)                  │
│                                                                     │
│   ┌──────────┐    ┌──────────────────┐    ┌────────────────────┐    │
│   │ 自然语言 │───▶│ 指令解析器       │───▶│ 跨模态对齐          │    │
│   │ "经过红  │    │ (NLP)           │    │                     │    │
│   │ 色门..." │    │                 │    │ • 视觉-语言对齐      │    │
│   └──────────┘    │ • 实体提取       │    │ • 空间推理           │    │
│                   │ • 动作解析       │    └─────────┬──────────┘    │
│                   │ • 路径点序列     │              │               │
│                   └──────────────────┘              │               │
│                                                      │               │
│   ┌──────────┐    ┌──────────────────┐              │               │
│   │ RGB/深度 │───▶│ 视觉特征提取器    │───────────────┘               │
│   │ 相机     │    │ (ResNet/ViT/CLIP)│                                │
│   └──────────┘    └──────────────────┘                                │
│                              │                                        │
│                              ▼                                        │
│                   ┌──────────────────┐    ┌────────────────────┐    │
│                   │ 路径规划器         │───▶│ 动作指令            │    │
│                   │ (A* / 强化学习策略)│    │ (速度, 转向角度)   │    │
│                   └──────────────────┘    └────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

本项目将实现**三种递进式的导航方法**:

层级 方法 核心技术 数据集
1 — 传统 模块化流水线 NLP 解析 + 地标检测 + A* 规划 自定义合成数据
2 — 中级 Seq2Seq + 注意力 跨模态注意力、Teacher Forcing R2R (Room-to-Room)
3 — 现代 基础模型 CLIP 视觉特征 + LLM 指令遵循 零样本 (Zero-shot)

2. 硬件与软件需求

硬件

组件 规格 备注
机器人平台 TurtleBot3 / 定制轮式机器人 差分驱动
RGB-D 相机 RealSense D435 / Azure Kinect 用于深度感知
机载 PC Jetson Nano / Raspberry Pi 4 / 笔记本电脑 用于实际部署
(可选)激光雷达 RPLIDAR A1 / L515 用于模块化流水线
仿真 PC 带独立 GPU 的台式机 用于 Habitat / AI2-THOR

软件

版本 用途
Python ≥ 3.8 核心语言
PyTorch ≥ 1.13 神经网络训练
Transformers ≥ 4.30 CLIP、LLaVA、GPT 模型
OpenCV ≥ 4.5 图像预处理
NumPy ≥ 1.20 数值计算
Habitat-Sim ≥ 0.2 3D 具身 AI 仿真器
AI2-THOR ≥ 4.0 家庭导航环境
spaCy / NLTK 最新版 NLP 指令解析
scikit-image ≥ 0.19 图像特征提取
matplotlib ≥ 3.5 可视化
pip install torch torchvision transformers opencv-python numpy
pip install habitat-sim ai2-thor  # 仿真后端
pip install spacy nltk scikit-image matplotlib
python -m spacy download en_core_web_sm

3. 第 1 层 — 传统:模块化流水线

3.1 概念

模块化流水线将 VLN 分解为三个独立阶段:(1)NLP 解析 从指令中提取实体和动作;(2)视觉地标检测 在图像中定位被引用的物体;(3)路径规划 导航至检测到的地标位置。

这种方法透明且可调试——每个阶段都是具有可解释输出的白盒。

3.2 核心组件

3.2.1 指令解析器 (NLP)

使用 spaCy 进行命名实体识别(NER)和依存句法分析,提取:

  • 地标(Landmarks):指令中引用的物体("蓝色椅子"、"厨房桌子")
  • 动作(Actions):导航动词("转"、"走"、"停"、"经过")
  • 方向(Directions):空间关系("左"、"右"、"直行"、"后面")
import spacy

nlp = spacy.load("en_core_web_sm")

def parse_instruction(instruction: str) -> dict:
    """
    Parse a navigation instruction into structured components.

    Returns: {
        'entities': [{'text': 'blue chair', 'label': 'LANDMARK'}, ...],
        'actions':  [{'verb': 'turn', 'direction': 'left'}, ...],
        'route':    ['turn left', 'go straight', 'stop']
    }
    """
    doc = nlp(instruction)
    entities = []
    actions = []

    # Named entity recognition
    for ent in doc.ents:
        entities.append({'text': ent.text, 'label': 'OBJECT'})

    # Verb + direction extraction via dependency parsing
    for token in doc:
        if token.pos_ == "VERB":
            direction = None
            for child in token.children:
                if child.dep_ in ("prep", "advcl"):
                    direction = child.text
            actions.append({'verb': token.lemma_, 'direction': direction})

    # Build route sequence
    route = []
    for token in doc:
        if token.dep_ == "ROOT" and token.pos_ == "VERB":
            route.append(token.lemma_)
        if token.dep_ in ("prep", "pcomp"):
            route.append(token.head.text + " " + token.text)

    return {'entities': entities, 'actions': actions, 'route': route}

3.2.2 视觉地标检测器

使用预训练的 ResNet-50 提取特征图,然后使用余弦相似度将检测到的物体与指令中的地标进行比较。

import torch
import torchvision.models as models
import torchvision.transforms as T
import cv2
import numpy as np

# Load pretrained ResNet-50 as feature extractor
resnet = models.resnet50(pretrained=True)
resnet = torch.nn.Sequential(*list(resnet.children())[:-2])  # Remove FC, keep feature maps
resnet.eval()

transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# CLIP-style landmark vocabulary (simplified)
LANDMARK_VOCAB = [
    "chair", "table", "door", "window", "bed", "sofa",
    "kitchen", "bathroom", "hallway", "stairs",
    "blue", "red", "green", "white", "black"
]

def detect_landmarks(frame: np.ndarray, target_landmarks: list) -> list:
    """
    Detect landmark locations in the image frame.

    Parameters
    ----------
    frame : BGR image (HxWx3)
    target_landmarks : list of landmark names from instruction parser

    Returns
    -------
    detections : list of (label, bbox, confidence) tuples
    """
    # Resize and preprocess
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    input_tensor = transform(rgb).unsqueeze(0)

    # Extract feature map (1x2048x7x7 for ResNet-50)
    with torch.no_grad():
        features = resnet(input_tensor)  # [1, 2048, H', W']

    B, C, H, W = features.shape
    feature_map = features.squeeze(0).reshape(C, H * W).T  # (H*W) x 2048

    # Simple color-based landmark detection (complement to deep features)
    detections = []
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

    color_map = {
        'blue': ([100, 150, 0], [140, 255, 255]),
        'red':  ([0, 100, 100], [10, 255, 255]),
        'green':([40, 50, 50],  [80, 255, 255]),
    }

    for landmark in target_landmarks:
        color_key = landmark.lower()
        if color_key in color_map:
            lower, upper = color_map[color_key]
            mask = cv2.inRange(hsv, np.array(lower), np.array(upper))
            # Find largest contour
            contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
            if contours:
                largest = max(contours, key=cv2.contourArea)
                x, y, w, h = cv2.boundingRect(largest)
                detections.append({
                    'label': landmark,
                    'bbox': (x, y, x+w, y+h),
                    'confidence': float(cv2.contourArea(largest) / (frame.shape[0] * frame.shape[1]))
                })

    return detections

3.2.3 A* 路径规划器

给定自上而下地图(由深度传感器或 SLAM 构建),使用 A* 算法规划通过检测到的地标位置的路径。

import heapq

def astar_path(grid: np.ndarray, start: tuple, goal: tuple,
               landmarks: list = None) -> list:
    """
    A* path planning on a 2D occupancy grid.

    Parameters
    ----------
    grid : 2D numpy array (0 = free, 1 = obstacle)
    start : (row, col) starting position
    goal : (row, col) goal position
    landmarks : list of (row, col) waypoints to visit in order

    Returns
    -------
    path : list of (row, col) positions from start to goal
    """
    if landmarks is None:
        landmarks = []

    def heuristic(a, b):
        # Manhattan distance (can use Euclidean for 8-connected)
        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    def neighbors(pos):
        for dr, dc in [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)]:
            nr, nc = pos[0] + dr, pos[1] + dc
            if 0 <= nr < grid.shape[0] and 0 <= nc < grid.shape[1]:
                if grid[nr, nc] == 0:  # free cell
                    yield (nr, nc)

    # Insert intermediate goals for landmarks
    waypoints = [start] + landmarks + [goal]

    full_path = []
    for i in range(len(waypoints) - 1):
        sub_path = _astar_single(grid, waypoints[i], waypoints[i+1], heuristic, neighbors)
        if sub_path is None:
            return None  # No path found
        full_path.extend(sub_path[:-1])

    full_path.append(goal)
    return full_path


def _astar_single(grid, start, goal, heuristic, neighbors):
    """Single-shot A* between two waypoints."""
    open_set = [(heuristic(start, goal), start)]
    came_from = {}
    g_score = {start: 0}

    while open_set:
        _, current = heapq.heappop(open_set)

        if current == goal:
            # Reconstruct path
            path = []
            while current in came_from:
                path.append(current)
                current = came_from[current]
            path.append(start)
            return path[::-1]

        for neighbor in neighbors(current):
            tentative_g = g_score[current] + 1
            if neighbor not in g_score or tentative_g < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g
                f_score = tentative_g + heuristic(neighbor, goal)
                heapq.heappush(open_set, (f_score, neighbor))

    return None  # No path found

3.3 完整流水线整合

def modular_vln_loop(instruction: str, rgb_frame: np.ndarray,
                    depth_frame: np.ndarray, grid_map: np.ndarray,
                    robot_pos: tuple) -> tuple:
    """
    Complete modular VLN pipeline.

    Returns: (next_action, detected_landmarks, planned_path)
    """
    # Step 1: Parse instruction
    parsed = parse_instruction(instruction)
    target_landmarks = [e['text'] for e in parsed['entities']]

    # Step 2: Detect landmarks in current view
    detections = detect_landmarks(rgb_frame, target_landmarks)

    # Step 3: Project detections to world coordinates (simplified)
    # In practice: use depth + camera intrinsics + robot pose
    landmark_waypoints = [d['bbox'][:2] for d in detections]  # placeholder

    # Step 4: A* path planning
    # Goal is the last detected landmark or end of instruction
    goal = landmark_waypoints[-1] if landmark_waypoints else (15, 15)
    path = astar_path(grid_map, robot_pos, goal, landmark_waypoints[:-1])

    # Step 5: Compute next action from path
    if path and len(path) > 1:
        next_pos = path[1]
        direction = (next_pos[0] - robot_pos[0], next_pos[1] - robot_pos[1])
        action = {'type': 'move_to', 'target': next_pos}
    else:
        action = {'type': 'stop'}

    return action, detections, path

3.4 局限性

方面 问题
NLP 难以处理复杂指代表达("左侧第二个门")
视觉 基于颜色的检测脆弱;需要鲁棒的地标识别
规划 A* 在 2D 网格上运行,忽略 3D 几何和门
泛化 每个组件必须独立重新训练才能适应新环境

4. 第 2 层 — 中级:Seq2Seq + 注意力

4.1 概念

Seq2Seq VLN 使用**编码器-解码器架构**:

  • 编码器:处理指令(作为词嵌入序列)和视觉观察(作为空间特征序列)。
  • 解码器:生成导航动作序列,在每一步关注指令和视觉特征的相关部分。

核心创新是**跨模态注意力机制**,允许模型将语言标记与视觉区域对齐。

4.2 模型架构

Speaker-Follower 模型(Fried et al., 2018)和 CMU How-to-nav 模型(Wang et al., 2018)都使用注意力机制的 Seq2Seq:

┌─────────────────────────────────────────────────────────────────────┐
│                 Seq2Seq VLN 模型 (Speaker-Follower)                   │
│                                                                     │
│  指令: "Turn left at the blue chair"                                 │
│                                                                     │
│  ┌─────────┐                                                        │
│  │ 编码器  │                                                        │
│  │         │    ┌──────────────────────────────────────────────┐   │
│  │ ┌─────┐ │    │                                              │   │
│  │ │ w₁  │─┼───▶│                                              │   │
│  │ ├─────┤ │    │            跨模态注意力                        │   │
│  │ │ w₂  │─┼───▶│                                              │   │
│  │ ├─────┤ │    │  α_i = softmax(vᵀ·tanh(W₁h_i + W₂v_j))      │   │
│  │ │ w₃  │─┼───▶│                                              │   │
│  │ └─────┘ │    └──────────────────────────────────────────────┘   │
│  └─────────┘                         │                             │
│       │                             │                             │
│       │ h_i (语言隐藏状态)           │ c_t (上下文向量)              │
│       │                             ▼                             │
│       │                      ┌──────────────┐                     │
│  ┌────▼────┐                 │   解码器     │                     │
│  │  LSTM   │◀────────────────│              │                     │
│  │         │                 │  a_t = argmax│                     │
│  │ h_t     │────────────────▶│  P(a|context)│                     │
│  └─────────┘                 └──────────────┘                     │
│                                                                     │
│  动作空间: {Forward, Left, Right, <Stop>}                           │
└─────────────────────────────────────────────────────────────────────┘

4.3 跨模态注意力

注意力机制根据当前解码器状态计算视觉特征的加权和:

\[ c_t = \sum_{j=1}^{N} \alpha_{tj} \, v_j \]

其中注意力权重为:

\[ \alpha_{tj} = \frac{\exp\big(e_{tj}\big)}{\sum_{k=1}^{N} \exp\big(e_{tk}\big)}, \quad e_{tj} = v^\top \tanh\big(W_1 h_{t-1} + W_2 v_j\big) \]
  • \(h_{t-1}\) — 解码器前一隐藏状态
  • \(v_j\) — 区域 \(j\) 的视觉特征
  • \(W_1, W_2, v\) — 可学习的注意力参数

4.4 完整 PyTorch 实现

"""
Tier 2: Seq2Seq VLN with Cross-Modal Attention
==============================================
Implements a simplified Speaker-Follower style model.
Trainable on R2R (Room-to-Room) dataset.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

# ─── Model ──────────────────────────────────────────────────────────────────

class Seq2SeqVLN(nn.Module):
    """
    Seq2Seq VLN with cross-modal attention.

    Args:
        vocab_size: size of instruction vocabulary
        embed_dim: word embedding dimension
        hidden_dim: LSTM hidden dimension
        visual_dim: dimension of visual features (e.g., ResNet-2048)
        num_actions: number of navigation actions
        dropout: dropout probability
    """

    def __init__(self, vocab_size: int, embed_dim: int = 256,
                 hidden_dim: int = 512, visual_dim: int = 2048,
                 num_actions: int = 4, dropout: float = 0.3):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_actions = num_actions

        # Language embedding
        self.word_embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lang_lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.lang_proj = nn.Linear(hidden_dim * 2, hidden_dim)

        # Visual projection
        self.visual_proj = nn.Linear(visual_dim, hidden_dim)

        # Cross-modal attention
        self.attn_W1 = nn.Linear(hidden_dim, hidden_dim)
        self.attn_W2 = nn.Linear(hidden_dim, hidden_dim)
        self.attn_v = nn.Linear(hidden_dim, 1)

        # Decoder LSTM
        self.decoder_lstm = nn.LSTM(hidden_dim * 3, hidden_dim, batch_first=True)
        self.action_head = nn.Linear(hidden_dim, num_actions)

        self.dropout = nn.Dropout(dropout)

    def forward(self, instruction_tokens: torch.Tensor,
                visual_features: torch.Tensor,
                action_history: torch.Tensor) -> torch.Tensor:
        """
        Forward pass.

        Parameters
        ----------
        instruction_tokens : (B, L) — token IDs
        visual_features : (B, N, D_v) — N spatial regions, D_v dims
        action_history : (B, T) — previous actions (for teacher forcing)

        Returns
        -------
        logits : (B, num_actions) — action probability logits
        """
        B = instruction_tokens.size(0)

        # ── Language encoder ──────────────────────────────────────────────
        lang_embed = self.word_embed(instruction_tokens)  # (B, L, D_e)
        lang_h, (h_lang, _) = self.lang_lstm(lang_embed)  # (B, L, 2D), (2, B, D)
        # Combine bidirectional states
        h_lang = torch.cat([h_lang[0], h_lang[1]], dim=-1)  # (B, 2D)
        h_lang = self.dropout(torch.tanh(self.lang_proj(h_lang)))  # (B, D)

        # ── Visual projection ──────────────────────────────────────────────
        V = self.visual_proj(visual_features)  # (B, N, D)

        # ── Decoder LSTM ──────────────────────────────────────────────────
        # Initialize LSTM with language context
        decoder_state = (h_lang.unsqueeze(0), torch.zeros_like(h_lang.unsqueeze(0)))

        # Concatenate context + previous action embedding
        if action_history.size(1) > 0:
            prev_action = self.word_embed(action_history[:, -1])  # (B, D)
        else:
            prev_action = torch.zeros(B, self.hidden_dim, device=instruction_tokens.device)

        # Context from attention
        context = self._cross_attention(h_lang, V)  # (B, D)

        lstm_input = torch.cat([context, prev_action], dim=-1)  # (B, 2D)
        lstm_out, decoder_state = self.decoder_lstm(lstm_input.unsqueeze(1), decoder_state)
        lstm_out = lstm_out.squeeze(1)  # (B, D)

        # ── Action prediction ──────────────────────────────────────────────
        logits = self.action_head(self.dropout(lstm_out))  # (B, num_actions)
        return logits

    def _cross_attention(self, query: torch.Tensor, visual: torch.Tensor) -> torch.Tensor:
        """
        Cross-modal attention: attend to visual features given a language query.

        Parameters
        ----------
        query : (B, D) — language hidden state
        visual : (B, N, D) — visual feature grid

        Returns
        -------
        context : (B, D) — attended visual features
        """
        # Expand query for batch processing
        q = query.unsqueeze(1).expand_as(visual)  # (B, N, D)
        # Attention energy
        energy = torch.tanh(self.attn_W1(q) + self.attn_W2(visual))  # (B, N, D)
        energy = self.attn_v(energy).squeeze(-1)  # (B, N)
        alpha = F.softmax(energy, dim=-1)  # (B, N), normalized over regions
        context = torch.bmm(alpha.unsqueeze(1), visual).squeeze(1)  # (B, D)
        return context


def train_seq2seq_vln(model: Seq2SeqVLN, train_loader, optimizer, num_epochs: int = 20):
    """Train the Seq2Seq VLN model."""
    criterion = nn.CrossEntropyLoss(ignore_index=-1)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0

        for batch in train_loader:
            instruction, visual, actions_gt, lengths = batch

            optimizer.zero_grad()

            # Forward pass
            logits = model(instruction, visual, actions_gt[:, :-1])

            # Teacher forcing: predict next action
            loss = criterion(logits, actions_gt[:, -1])

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"[Epoch {epoch+1}/{num_epochs}] Loss: {avg_loss:.4f}")


# ─── R2R Dataset (simplified) ────────────────────────────────────────────────

class R2RDataset(torch.utils.data.Dataset):
    """
    Simplified Room-to-Room dataset.

    Each sample contains:
    - instruction: natural language description
    - path: sequence of (x, y, heading) viewpoints
    - action_seq: corresponding action sequence
    """

    def __init__(self, split='train'):
        self.split = split
        # In practice, load from Matterport3D R2R dataset
        # Here we show synthetic data for illustration
        self.data = [
            {
                'instruction': "Turn left and go past the chair",
                'path': [(0, 0, 0), (1, 0, 0), (2, 0, 0), (2, 1, np.pi/2)],
                'actions': [0, 0, 1, 2],  # Forward, Forward, Left, Stop
            },
        ]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        return {
            'instruction': sample['instruction'],
            'path': np.array(sample['path']),
            'actions': np.array(sample['actions'], dtype=np.int64),
        }


# ─── Evaluation Metrics ───────────────────────────────────────────────────────

def evaluate_vln(model, dataset, device='cuda'):
    """
    Evaluate VLN model using standard metrics.

    Metrics:
    - Success Rate (SR): fraction of episodes where agent stops within 3m of goal
    - SPL: Success weighted by Path Length = SR * (optimal_length / agent_length)
    - nDTW: normalized Dynamic Time Warping (sequence similarity)
    """
    model.eval()
    sr_total, spl_total, n_samples = 0, 0, 0

    with torch.no_grad():
        for sample in dataset:
            instruction = sample['instruction']
            gt_path = sample['path']
            gt_actions = sample['actions']

            # Simulate inference (would use Habitat-sim in practice)
            # Here: predicted actions
            pred_actions = torch.argmax(model(
                sample['inst_encoded'].unsqueeze(0).to(device),
                sample['visual'].unsqueeze(0).to(device),
                torch.zeros(1, 1, dtype=torch.long).to(device)
            ), dim=-1)

            # Compute SR (simplified)
            sr = float(torch.argmax(pred_actions) == gt_actions[-1])
            sr_total += sr
            n_samples += 1

    return {
        'Success Rate': sr_total / n_samples,
        'SPL': spl_total / n_samples if n_samples > 0 else 0,
    }

4.5 R2R 数据集与评估

Room-to-Room (R2R) 数据集是 VLN 的标准基准:

指标 定义 理想值
成功率 (SR) 代理在目标 3m 内停止的回合比例 1.0
SPL 按路径长度加权成功率 = SR × (最优长度 / 代理路径长度) 1.0
nDTW 归一化动态时间扭曲(序列相似度) 1.0
CLS 覆盖率加长度分数 1.0

4.6 对比——模块化 vs Seq2Seq

方面 模块化流水线 Seq2Seq + 注意力
可解释性 高(白盒阶段) 低(神经黑盒)
训练方式 每个组件独立监督学习 端到端可微
泛化能力 差(基于规则) 更好(习得表征)
数据效率 高(无需训练) 低(需要大型 VLN 数据集)
组合性 有限(手工规则) 强(学习的泛化)
训练复杂度 ⭐⭐⭐

5. 第 3 层 — 现代:基础模型用于 VLN

5.1 概念

基础模型(大型预训练视觉-语言模型)支持**零样本 VLN**——无需任务特定训练即可在新环境中导航。CLIP 提供对齐的视觉-文本特征;LLaVA / GPT-4V 支持指令遵循。

核心优势:无需 VLN 特定训练数据。

5.2 CLIP 视觉特征用于导航

CLIP 将图像和文本编码到共享的嵌入空间。我们使用 CLIP 的视觉编码器表示每个候选导航视点,然后根据指令相似度对它们进行评分。

"""
Tier 3: Zero-Shot VLN using CLIP
================================
CLIP-based viewpoint scoring for instruction-guided navigation.
No VLN-specific training required.
"""

import torch
import clip
from PIL import Image
import numpy as np
import cv2

# Load CLIP model (ViT-B/32)
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)


def encode_text_instruction(instruction: str) -> torch.Tensor:
    """Encode a natural language instruction into CLIP text features."""
    text = clip.tokenize([instruction]).to(device)
    with torch.no_grad():
        text_features = model.encode_text(text)
        text_features /= text_features.norm(dim=-1, keepdim=True)
    return text_features


def encode_viewpoint(frame: np.ndarray) -> torch.Tensor:
    """Encode an RGB frame into CLIP visual features."""
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image_input = preprocess(image).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image_input)
        image_features /= image_features.norm(dim=-1, keepdim=True)
    return image_features


def score_viewpoints(current_frame: np.ndarray,
                     candidate_frames: list,
                     instruction: str) -> list:
    """
    Score candidate navigation viewpoints based on instruction relevance.

    Parameters
    ----------
    current_frame : current RGB observation
    candidate_frames : list of RGB frames from candidate viewpoints
    instruction : natural language navigation instruction

    Returns
    -------
    scores : list of (viewpoint_id, similarity_score) sorted descending
    """
    # Encode instruction once
    text_features = encode_text_instruction(instruction)

    results = []
    for idx, frame in enumerate(candidate_frames):
        vis_features = encode_viewpoint(frame)
        # Cosine similarity in CLIP embedding space
        similarity = (vis_features @ text_features.T).item()
        results.append((idx, similarity))

    # Sort by descending similarity
    results.sort(key=lambda x: x[1], reverse=True)
    return results


def zero_shot_vln_step(instruction: str, current_frame: np.ndarray,
                       candidate_poses: list, robot_state: dict) -> dict:
    """
    Single step of zero-shot VLN using CLIP.

    Parameters
    ----------
    instruction : full navigation instruction
    current_frame : current RGB observation
    candidate_poses : list of candidate robot poses to evaluate
    robot_state : {'x', 'y', 'heading', 'map'}

    Returns
    -------
    next_action : {'type': 'move', 'target_pose': pose}
    """
    # In practice: render candidate viewpoints from Habitat-sim
    # Here: simulate with current frame (would be different viewpoints)
    candidate_frames = [current_frame] * len(candidate_poses)

    # Score each candidate
    scores = score_viewpoints(current_frame, candidate_frames, instruction)

    # Pick highest-scoring viewpoint
    best_idx, best_score = scores[0]

    if best_score > 0.25:  # Threshold for accepting a viewpoint
        action = {
            'type': 'move',
            'target_pose': candidate_poses[best_idx],
            'confidence': best_score
        }
    else:
        # Fallback: random exploration or stop
        action = {'type': 'explore', 'confidence': best_score}

    return action

5.3 LLaVA 用于指令遵循

LLaVA(大型语言和视觉助手)可以推理导航指令和视觉上下文:

"""
LLaVA-based navigation instruction generation.
Given a panoramic view, LLaVA generates a sub-instruction for the next step.
"""

from transformers import AutoProcessor, LlavaForConditionalGeneration
import torch
from PIL import Image

# Load LLaVA-1.5-7B
model_name = "llava-hf/llava-1.5-7b-hf"
processor = AutoProcessor.from_pretrained(model_name)
model = LlavaForConditionalGeneration.from_pretrained(
    model_name, torch_dtype=torch.float16, device_map="auto"
)


def generate_sub_instruction(panorama_image: Image.Image,
                              full_instruction: str,
                              history: list) -> str:
    """
    Use LLaVA to generate the next sub-instruction.

    Parameters
    ----------
    panorama_image : 360-degree panoramic RGB image
    full_instruction : original navigation instruction
    history : list of already-executed sub-instructions

    Returns
    -------
    next_step : natural language sub-instruction
    """
    # Remaining instruction
    remaining = full_instruction
    for past in history:
        remaining = remaining.replace(past, "").strip()

    prompt = (
        f"[INST] <<SYS>>\n"
        f"You are a navigation assistant. Given a panoramic view of an environment "
        f"and a full navigation instruction, identify the most important landmark "
        f"or direction for the NEXT step. Output ONLY a short instruction (max 10 words).\n"
        f"<</SYS>>\n\n"
        f"Full instruction: '{full_instruction}'\n"
        f"Previous steps completed: {', '.join(history) if history else 'None'}\n"
        f"What is the next step? [/INST]"
    )

    inputs = processor(text=prompt, images=panorama_image, return_tensors="pt").to(device)

    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=20,
            do_sample=False,
            temperature=0.0,  # deterministic
        )

    next_step = processor.decode(output_ids[0], skip_special_tokens=True)
    return next_step.strip()


# ─── Habitat-Sim Integration ──────────────────────────────────────────────────

def habitat_vln_episode(instruction: str, start_pose: dict,
                         goal_id: str, num_steps: int = 100):
    """
    Run a VLN episode in Habitat-Sim.

    Parameters
    ----------
    instruction : navigation instruction
    start_pose : {'x', 'y', 'z', 'yaw'}
    goal_id : target object instance ID
    num_steps : max steps before timeout

    Returns
    -------
    result : {'success': bool, 'steps': int, 'path_length': float}
    """
    import habitat_sim

    # Initialize simulator
    sim = habitat_sim.Simulator(habitat_sim.Configuration(
        habitat_sim.specifications.DEFAULT_SCENE,
        {
            'width': 640,
            'height': 480,
            'sensor_height': 1.5,
            'hfov': 90,
        }
    ))

    # Set agent state
    agent_state = habitat_sim.AgentState()
    agent_state.position = [start_pose['x'], start_pose['y'], start_pose['z']]
    agent_state.rotation = habitat_sim.utils.quat_from_angle_axis(
        start_pose['yaw'], habitat_sim.geo.GRAVITY
    )
    sim.agents[0].set_state(agent_state)

    done = False
    steps = 0

    for step in range(num_steps):
        # Get RGB observation
        obs = sim.render()
        rgb = obs['rgba_camera']

        # CLIP-based viewpoint scoring
        action = zero_shot_vln_step(instruction, rgb, candidate_poses=[...], robot_state={})

        # Execute action
        if action['type'] == 'move':
            # Move to target pose
            move_and_look(sim, action['target_pose'])
        else:
            # Random exploration
            sim.agents[0].move_forward(0.25)
            sim.agents[0].rotate(15, habitat_sim.geo.GRAVITY)

        # Check goal
        if check_goal_reached(sim, goal_id):
            done = True
            break

    sim.close()
    return {'success': done, 'steps': steps, 'path_length': steps * 0.25}

5.4 三层方法对比

方面 第 1 层:模块化 第 2 层:Seq2Seq 第 3 层:基础模型
需要训练 否(基于规则) 是(大型 VLN 数据集) 否(零样本)
泛化能力 中等
数据依赖 R2R / RxR(100k 样本) CLIP / LLaVA 预训练
计算成本 中等 高(大型模型)
可解释性 中等
成功率 (R2R) ~20–30% ~50–70% ~40–60%(零样本)
设置复杂度 ⭐⭐⭐ ⭐⭐
最佳场景 调试、简单场景 研究、基准 SOTA 新环境、快速部署

6. 分步实施指南

第 1 阶段 — 第 1 层:模块化流水线(第 1 周)

  1. 设置 NLP 解析
    pip install spacy
    python -m spacy download en_core_web_sm
    
  2. 使用 spaCy NER 和依存句法分析实现 parse_instruction()
  3. 在示例指令上测试,如 "Turn left at the blue chair"

  4. 实现地标检测

  5. 使用预训练 ResNet-50 特征
  6. 为常见地标添加基于颜色的检测
  7. 在相机帧上可视化检测到的边界框

  8. 实现 A* 路径规划器

  9. 从深度传感器或预建地图创建网格地图
  10. 将地标集成为路径点
  11. 在简单环境中测试( Gazebo 或 Habitat)

  12. 整合和测试

  13. 在仿真中端到端运行
  14. 在已知环境中测量成功率

第 2 阶段 — 第 2 层:Seq2Seq(第 2 周)

  1. 下载 R2R 数据集
    wget https://github.com/peteanderson80/Matterport3DSimulator
    # Follow instructions to download R2R dataset
    
  2. 预处理指令和动作序列
  3. 为每个视点预提取 ResNet 视觉特征

  4. 实现模型

  5. 构建带跨模态注意力的 Seq2SeqVLN
  6. 在 R2R 训练集上训练
  7. 监控损失和验证 SR

  8. 在 R2R 上评估

  9. 在 val_seen 和 val_unseen 分割上测试
  10. 将 SR 和 SPL 与已发布基准进行比较

第 3 阶段 — 第 3 层:基础模型(第 3 周)

  1. 设置 CLIP
    pip install git+https://github.com/openai/CLIP.git
    
  2. 实现视点评分
  3. 与 Habitat-Sim 集成以渲染候选视点

  4. 设置 LLaVA(可选)

  5. 需要 ≥16GB VRAM 的 GPU
  6. 微调或使用零样本提示

  7. 评估零样本性能

  8. 与第 2 层训练模型进行比较
  9. 测量对新环境的泛化能力

7. 扩展与变体

7.1 AuxRN — 辅助推理

在训练期间添加辅助推理任务: - CVAE(对比 VLN): 正负指令-路径对的对比学习 - 感知损失: 视觉特征的重构损失 - 自我监控: 跟踪代理是否正在朝目标前进

7.2 视觉-语言预训练 (VLN-BERT)

在大型图像-描述数据集上预训练联合视觉-语言模型,然后再在 VLN 上微调:

预训练:MSCOCO 描述、Visual Genome → 学习对齐的视觉-语言表征
微调:R2R 数据集 → 学习导航特定的对应关系

7.3 多模态融合

使用早期融合而非晚期融合(CLIP 风格): - 在每一层连接视觉和语言特征 - 使用跨注意力层(如 Flamingo、GPT-4V)进行更深入的对应

7.4 主动学习

  • 收集失败回合的人类反馈
  • 在纠正的演示上重新训练(DAgger 风格)

8. 参考资料

  1. Anderson et al., 2018 — Vision-and-Language Navigation: Interpreting Grounded Language Instructions in Photo-Realistic Environments — R2R 数据集与 VLN 任务定义
  2. Fried et al., 2018 — Speaker-Follower Models for Language-Based Image Exploration — 带注意力的 Seq2Seq VLN
  3. Wang et al., 2018 — Look Before You Leap: Bridging Model-Free and Model-Based RL for Vision-Based Navigation — CMU How-to-nav
  4. Huang et al., 2019 — Transferable Representation Learning in Vision-and-Language Navigation — AuxRN 辅助任务
  5. Li et al., 2022 — VLN-BERT: A Pretrained Language Model for Vision-and-Language Navigation — VLN 预训练
  6. Radford et al., 2021 — Learning Transferable Visual Models From Natural Language Supervision (CLIP) — CLIP 基础模型
  7. Liu et al., 2024 — LLaVA: Large Language and Vision Assistant — LLaVA 架构
  8. Habitat-Sim GitHub — 具身 AI 仿真器
  9. AI2-THOR GitHub — 交互式家居环境
  10. R2R Dataset — Room-to-Room 导航基准
  11. Ma et al., 2019 — Rethinking the Performance of Navigation with Language — VLN 基准分析