Skip to content

规划与学习

规划与学习的结合是现代人工智能的重要方向。本章将介绍如何将规划和学习结合起来,以提高规划效率和泛化能力。

学习目标

完成本章后,你将能够:

  • 理解规划与学习的关系
  • 掌握学习启发函数的方法
  • 了解端到端学习方法
  • 应用规划与学习解决实际问题

1. 规划与学习的关系

1.1 规划与学习的互补性

规划与学习的互补性:

规划的优势:
- 可以利用领域知识
- 可以保证最优性
- 可以处理新情况

学习的优势:
- 可以从经验中学习
- 可以泛化到新情况
- 可以减少计算量

结合方式:
- 学习辅助规划(Learning for Planning)
- 规划辅助学习(Planning for Learning)
- 端到端学习(End-to-End Learning)
class PlanningWithLearning:
    """规划与学习结合"""

    def __init__(self, planner, learner):
        """
        初始化

        参数:
        planner: 规划器
        learner: 学习器
        """
        self.planner = planner
        self.learner = learner

    def plan(self, problem):
        """使用学习辅助规划"""
        # 使用学习到的启发函数
        heuristic = self.learner.get_heuristic()

        # 使用启发函数进行规划
        plan = self.planner.plan(problem, heuristic)

        return plan

    def learn(self, experience):
        """从经验中学习"""
        # 提取特征
        features = self._extract_features(experience)

        # 更新学习器
        self.learner.update(features)

    def _extract_features(self, experience):
        """提取特征"""
        # 从经验中提取有用的特征
        features = {
            'state': experience['state'],
            'action': experience['action'],
            'reward': experience['reward'],
            'next_state': experience['next_state']
        }
        return features

1.2 学习规划策略

class LearnedPolicy:
    """学习到的策略"""

    def __init__(self, state_dim, action_dim, hidden_dim=64):
        """
        初始化

        参数:
        state_dim: 状态维度
        action_dim: 动作维度
        hidden_dim: 隐藏层维度
        """
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim

        # 初始化网络参数
        self._init_network()

    def _init_network(self):
        """初始化网络"""
        # 简单的两层网络
        self.w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
        self.b1 = np.zeros(self.hidden_dim)
        self.w2 = np.random.randn(self.hidden_dim, self.action_dim) * 0.01
        self.b2 = np.zeros(self.action_dim)

    def forward(self, state):
        """前向传播"""
        h = np.maximum(0, state @ self.w1 + self.b1)  # ReLU
        logits = h @ self.w2 + self.b2
        probs = self._softmax(logits)
        return probs

    def _softmax(self, x):
        """Softmax 函数"""
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum()

    def choose_action(self, state):
        """选择动作"""
        probs = self.forward(state)
        action = np.random.choice(self.action_dim, p=probs)
        return action

    def update(self, states, actions, rewards, learning_rate=0.001):
        """更新策略"""
        # 计算折扣回报
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + 0.99 * G
            returns.insert(0, G)

        returns = np.array(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # 计算梯度(简化)
        for state, action, G in zip(states, actions, returns):
            probs = self.forward(state)

            # ∇log π(a|s, θ)
            grad_log_pi = np.outer(state, -probs)
            grad_log_pi[:, action] += state

            # 更新参数
            self.w2 += learning_rate * G * grad_log_pi

2. 学习启发函数

2.1 启发函数学习

class LearnedHeuristic:
    """学习到的启发函数"""

    def __init__(self, state_dim, hidden_dim=64):
        """
        初始化

        参数:
        state_dim: 状态维度
        hidden_dim: 隐藏层维度
        """
        self.state_dim = state_dim
        self.hidden_dim = hidden_dim

        # 初始化网络参数
        self.w1 = np.random.randn(state_dim, hidden_dim) * 0.01
        self.b1 = np.zeros(hidden_dim)
        self.w2 = np.random.randn(hidden_dim, 1) * 0.01
        self.b2 = np.zeros(1)

    def forward(self, state):
        """前向传播"""
        h = np.maximum(0, state @ self.w1 + self.b1)
        value = h @ self.w2 + self.b2
        return value[0]

    def __call__(self, state, goal=None):
        """计算启发值"""
        if goal is not None:
            # 使用状态和目标的差异
            input_state = np.array(state) - np.array(goal)
        else:
            input_state = np.array(state)

        return self.forward(input_state)

    def update(self, states, true_values, learning_rate=0.001):
        """更新启发函数"""
        for state, true_value in zip(states, true_values):
            # 前向传播
            h = np.maximum(0, state @ self.w1 + self.b1)
            predicted_value = h @ self.w2 + self.b2

            # 计算损失
            loss = (predicted_value - true_value) ** 2

            # 计算梯度(简化)
            d_output = 2 * (predicted_value - true_value)

            # 更新参数
            self.w2 += learning_rate * d_output * h.reshape(-1, 1)
            self.b2 += learning_rate * d_output

            # 反向传播到隐藏层
            d_h = d_output * self.w2.flatten()
            d_h[h <= 0] = 0  # ReLU 梯度

            self.w1 += learning_rate * np.outer(state, d_h)
            self.b1 += learning_rate * d_h

# 示例
heuristic = LearnedHeuristic(state_dim=2)

# 训练数据
states = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
true_values = [10, 5, 0]  # 到目标的距离

# 训练
for epoch in range(100):
    heuristic.update(states, true_values, learning_rate=0.01)

# 测试
test_state = np.array([1.5, 1.5])
print(f"启发值: {heuristic(test_state):.2f}")

2.2 值函数学习

class ValueFunctionLearner:
    """值函数学习器"""

    def __init__(self, state_dim, hidden_dim=64):
        """
        初始化

        参数:
        state_dim: 状态维度
        hidden_dim: 隐藏层维度
        """
        self.state_dim = state_dim
        self.hidden_dim = hidden_dim

        # 初始化网络参数
        self.w1 = np.random.randn(state_dim, hidden_dim) * 0.01
        self.b1 = np.zeros(hidden_dim)
        self.w2 = np.random.randn(hidden_dim, 1) * 0.01
        self.b2 = np.zeros(1)

    def forward(self, state):
        """前向传播"""
        h = np.maximum(0, state @ self.w1 + self.b1)
        value = h @ self.w2 + self.b2
        return value[0]

    def update(self, state, target_value, learning_rate=0.001):
        """更新值函数"""
        # 前向传播
        h = np.maximum(0, state @ self.w1 + self.b1)
        predicted_value = h @ self.w2 + self.b2

        # 计算损失
        loss = (predicted_value - target_value) ** 2

        # 计算梯度
        d_output = 2 * (predicted_value - target_value)

        # 更新参数
        self.w2 += learning_rate * d_output * h.reshape(-1, 1)
        self.b2 += learning_rate * d_output

        # 反向传播到隐藏层
        d_h = d_output * self.w2.flatten()
        d_h[h <= 0] = 0

        self.w1 += learning_rate * np.outer(state, d_h)
        self.b1 += learning_rate * d_h

        return loss

# 示例
value_learner = ValueFunctionLearner(state_dim=2)

# 训练
for episode in range(1000):
    state = np.random.randn(2)
    target_value = -np.linalg.norm(state)  # 假设目标在原点

    loss = value_learner.update(state, target_value, learning_rate=0.01)

    if (episode + 1) % 100 == 0:
        print(f"Episode {episode + 1}, Loss: {loss:.4f}")

3. 端到端学习

3.1 端到端规划

class EndToEndPlanner:
    """端到端规划器"""

    def __init__(self, state_dim, action_dim, hidden_dim=64):
        """
        初始化

        参数:
        state_dim: 状态维度
        action_dim: 动作维度
        hidden_dim: 隐藏层维度
        """
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim

        # 初始化网络参数
        self._init_network()

        # 经验回放缓冲区
        self.replay_buffer = []
        self.buffer_size = 10000
        self.batch_size = 32

    def _init_network(self):
        """初始化网络"""
        # 策略网络
        self.policy_w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
        self.policy_b1 = np.zeros(self.hidden_dim)
        self.policy_w2 = np.random.randn(self.hidden_dim, self.action_dim) * 0.01
        self.policy_b2 = np.zeros(self.action_dim)

        # 值网络
        self.value_w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
        self.value_b1 = np.zeros(self.hidden_dim)
        self.value_w2 = np.random.randn(self.hidden_dim, 1) * 0.01
        self.value_b2 = np.zeros(1)

    def policy_forward(self, state):
        """策略前向传播"""
        h = np.maximum(0, state @ self.policy_w1 + self.policy_b1)
        logits = h @ self.policy_w2 + self.policy_b2
        probs = self._softmax(logits)
        return probs

    def value_forward(self, state):
        """值前向传播"""
        h = np.maximum(0, state @ self.value_w1 + self.value_b1)
        value = h @ self.value_w2 + self.value_b2
        return value[0]

    def _softmax(self, x):
        """Softmax 函数"""
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum()

    def choose_action(self, state):
        """选择动作"""
        probs = self.policy_forward(state)
        action = np.random.choice(self.action_dim, p=probs)
        return action

    def store_experience(self, state, action, reward, next_state, done):
        """存储经验"""
        self.replay_buffer.append((state, action, reward, next_state, done))

        if len(self.replay_buffer) > self.buffer_size:
            self.replay_buffer.pop(0)

    def update(self, learning_rate=0.001):
        """更新网络"""
        if len(self.replay_buffer) < self.batch_size:
            return

        # 采样经验
        indices = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
        batch = [self.replay_buffer[i] for i in indices]

        states = np.array([t[0] for t in batch])
        actions = np.array([t[1] for t in batch])
        rewards = np.array([t[2] for t in batch])
        next_states = np.array([t[3] for t in batch])
        dones = np.array([t[4] for t in batch])

        # 计算目标值
        target_values = rewards + 0.99 * np.array([
            self.value_forward(s) if not d else 0
            for s, d in zip(next_states, dones)
        ])

        # 更新值网络
        for state, target_value in zip(states, target_values):
            self._update_value_network(state, target_value, learning_rate)

        # 更新策略网络
        for state, action in zip(states, actions):
            self._update_policy_network(state, action, learning_rate)

    def _update_value_network(self, state, target_value, learning_rate):
        """更新值网络"""
        # 前向传播
        h = np.maximum(0, state @ self.value_w1 + self.value_b1)
        predicted_value = h @ self.value_w2 + self.value_b2

        # 计算梯度
        d_output = 2 * (predicted_value - target_value)

        # 更新参数
        self.value_w2 += learning_rate * d_output * h.reshape(-1, 1)
        self.value_b2 += learning_rate * d_output

        # 反向传播到隐藏层
        d_h = d_output * self.value_w2.flatten()
        d_h[h <= 0] = 0

        self.value_w1 += learning_rate * np.outer(state, d_h)
        self.value_b1 += learning_rate * d_h

    def _update_policy_network(self, state, action, learning_rate):
        """更新策略网络"""
        # 前向传播
        probs = self.policy_forward(state)

        # 计算梯度(策略梯度)
        advantage = self.value_forward(state)  # 简化

        grad_log_pi = np.outer(state, -probs)
        grad_log_pi[:, action] += state

        # 更新参数
        self.policy_w2 += learning_rate * advantage * grad_log_pi

3.2 模仿学习

class ImitationLearning:
    """模仿学习"""

    def __init__(self, state_dim, action_dim, hidden_dim=64):
        """
        初始化

        参数:
        state_dim: 状态维度
        action_dim: 动作维度
        hidden_dim: 隐藏层维度
        """
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim

        # 初始化网络参数
        self.w1 = np.random.randn(state_dim, hidden_dim) * 0.01
        self.b1 = np.zeros(hidden_dim)
        self.w2 = np.random.randn(hidden_dim, action_dim) * 0.01
        self.b2 = np.zeros(action_dim)

    def forward(self, state):
        """前向传播"""
        h = np.maximum(0, state @ self.w1 + self.b1)
        logits = h @ self.w2 + self.b2
        probs = self._softmax(logits)
        return probs

    def _softmax(self, x):
        """Softmax 函数"""
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum()

    def choose_action(self, state):
        """选择动作"""
        probs = self.forward(state)
        action = np.random.choice(self.action_dim, p=probs)
        return action

    def imitate(self, expert_states, expert_actions, learning_rate=0.001, epochs=100):
        """
        模仿专家

        参数:
        expert_states: 专家状态
        expert_actions: 专家动作
        learning_rate: 学习率
        epochs: 训练轮数
        """
        for epoch in range(epochs):
            total_loss = 0

            for state, expert_action in zip(expert_states, expert_actions):
                # 前向传播
                probs = self.forward(state)

                # 计算交叉熵损失
                loss = -np.log(probs[expert_action] + 1e-8)
                total_loss += loss

                # 计算梯度
                grad = probs.copy()
                grad[expert_action] -= 1

                # 反向传播
                h = np.maximum(0, state @ self.w1 + self.b1)

                # 更新参数
                self.w2 += learning_rate * np.outer(h, grad)
                self.b2 += learning_rate * grad

                # 反向传播到隐藏层
                d_h = grad @ self.w2.T
                d_h[h <= 0] = 0

                self.w1 += learning_rate * np.outer(state, d_h)
                self.b1 += learning_rate * d_h

            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch + 1}, Loss: {total_loss / len(expert_states):.4f}")

# 示例
imitation = ImitationLearning(state_dim=2, action_dim=4)

# 专家数据
expert_states = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
expert_actions = [0, 1, 2]  # 专家选择的动作

# 训练
imitation.imitate(expert_states, expert_actions, learning_rate=0.01, epochs=50)

4. 层次化规划与学习

4.1 层次化任务网络学习

class HTNLearner:
    """HTN 学习器"""

    def __init__(self, state_dim, action_dim):
        """
        初始化

        参数:
        state_dim: 状态维度
        action_dim: 动作维度
        """
        self.state_dim = state_dim
        self.action_dim = action_dim

        # 方法库
        self.methods = {}

        # 学习到的方法
        self.learned_methods = {}

    def add_method(self, name, method):
        """添加方法"""
        self.methods[name] = method

    def learn_method(self, examples):
        """
        从示例中学习方法

        参数:
        examples: 示例列表 [(state, action_sequence), ...]
        """
        # 分析示例
        for state, action_sequence in examples:
            # 提取特征
            features = self._extract_features(state)

            # 学习方法
            method_name = f"learned_method_{len(self.learned_methods)}"
            method = self._create_method(features, action_sequence)

            self.learned_methods[method_name] = method

    def _extract_features(self, state):
        """提取特征"""
        return state

    def _create_method(self, features, action_sequence):
        """创建方法"""
        def method(state):
            # 检查是否匹配
            if np.allclose(state, features):
                return action_sequence
            return None
        return method

    def get_method(self, state):
        """获取方法"""
        # 首先检查学习到的方法
        for name, method in self.learned_methods.items():
            result = method(state)
            if result is not None:
                return result

        # 然后检查预定义方法
        for name, method in self.methods.items():
            result = method(state)
            if result is not None:
                return result

        return None

# 示例
learner = HTNLearner(state_dim=2, action_dim=4)

# 添加预定义方法
def move_to_goal(state):
    if np.linalg.norm(state - np.array([4, 4])) < 1:
        return ['move_right', 'move_up']
    return None

learner.add_method('move_to_goal', move_to_goal)

# 从示例学习
examples = [
    (np.array([0, 0]), ['move_right', 'move_right', 'move_up', 'move_up']),
    (np.array([1, 1]), ['move_right', 'move_up']),
]

learner.learn_method(examples)

# 测试
state = np.array([0, 0])
method = learner.get_method(state)
print(f"状态 {state} 的方法: {method}")

4.2 选项框架

class Option:
    """选项"""

    def __init__(name, initiation, policy, termination):
        """
        初始化选项

        参数:
        name: 选项名称
        initiation: 启动条件
        policy: 策略
        termination: 终止条件
        """
        self.name = name
        self.initiation = initiation
        self.policy = policy
        self.termination = termination

    def is_applicable(self, state):
        """检查是否可应用"""
        return self.initiation(state)

    def get_action(self, state):
        """获取动作"""
        return self.policy(state)

    def is_terminated(self, state):
        """检查是否终止"""
        return self.termination(state)

class OptionFramework:
    """选项框架"""

    def __init__(self):
        self.options = []

    def add_option(self, option):
        """添加选项"""
        self.options.append(option)

    def get_applicable_options(self, state):
        """获取可应用的选项"""
        applicable = []
        for option in self.options:
            if option.is_applicable(state):
                applicable.append(option)
        return applicable

    def execute_option(self, option, state, env, max_steps=100):
        """执行选项"""
        current_state = state
        total_reward = 0

        for step in range(max_steps):
            # 检查是否终止
            if option.is_terminated(current_state):
                break

            # 获取动作
            action = option.get_action(current_state)

            # 执行动作
            next_state, reward, done = env.step(action)

            total_reward += reward
            current_state = next_state

            if done:
                break

        return current_state, total_reward

# 示例:导航选项
def navigate_to_target(target):
    """创建导航到目标的选项"""

    def initiation(state):
        # 只要不在目标位置就可以启动
        return np.linalg.norm(state - target) > 0.5

    def policy(state):
        # 简单的朝向目标移动
        direction = target - state
        if abs(direction[0]) > abs(direction[1]):
            return 'right' if direction[0] > 0 else 'left'
        else:
            return 'up' if direction[1] > 0 else 'down'

    def termination(state):
        # 到达目标附近终止
        return np.linalg.norm(state - target) < 0.5

    return Option('navigate_to_target', initiation, policy, termination)

# 创建选项框架
framework = OptionFramework()

# 添加导航选项
target1 = np.array([4, 4])
target2 = np.array([0, 0])

framework.add_option(navigate_to_target(target1))
framework.add_option(navigate_to_target(target2))

# 测试
state = np.array([0, 0])
applicable = framework.get_applicable_options(state)
print(f"状态 {state} 的可应用选项: {[o.name for o in applicable]}")

5. 实践练习

练习 1:实现学习启发函数

def learned_heuristic_exercise():
    # 创建学习器
    heuristic = LearnedHeuristic(state_dim=2)

    # 训练数据
    states = [np.array([x, y]) for x in range(5) for y in range(5)]
    true_values = [np.linalg.norm(s - np.array([4, 4])) for s in states]

    # 训练
    for epoch in range(100):
        heuristic.update(states, true_values, learning_rate=0.01)

    # 测试
    test_state = np.array([2, 3])
    print(f"启发值: {heuristic(test_state):.2f}")
    print(f"真实值: {np.linalg.norm(test_state - np.array([4, 4])):.2f}")

练习 2:实现模仿学习

def imitation_learning_exercise():
    # 创建学习器
    imitation = ImitationLearning(state_dim=2, action_dim=4)

    # 专家数据
    expert_states = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
    expert_actions = [0, 1, 2]

    # 训练
    imitation.imitate(expert_states, expert_actions, learning_rate=0.01, epochs=50)

    # 测试
    test_state = np.array([1.5, 1.5])
    action = imitation.choose_action(test_state)
    print(f"状态 {test_state} 的动作: {action}")

练习 3:实现选项框架

def option_framework_exercise():
    # 创建选项框架
    framework = OptionFramework()

    # 添加选项
    framework.add_option(navigate_to_target(np.array([4, 4])))
    framework.add_option(navigate_to_target(np.array([0, 0])))

    # 测试
    state = np.array([2, 2])
    applicable = framework.get_applicable_options(state)

    print(f"状态 {state} 的可应用选项:")
    for option in applicable:
        print(f"  - {option.name}")

6. 常见问题

1. 样本效率低

问题:需要大量样本才能学习

解决方案: - 使用经验回放 - 使用优先经验回放 - 使用模型-based 方法

2. 泛化能力差

问题:学习到的策略泛化能力差

解决方案: - 使用更好的特征表示 - 使用数据增强 - 使用正则化

3. 学习不稳定

问题:学习过程不稳定

解决方案: - 使用目标网络 - 使用梯度裁剪 - 使用更小的学习率

下一步

  • SLAM — 应用规划与学习进行同时定位与建图
  • 强化学习 — 深入学习强化学习
  • 路径规划 — ROS 中的路径规划

参考资源


← 返回索引 | 返回首页 →