规划与学习¶
规划与学习的结合是现代人工智能的重要方向。本章将介绍如何将规划和学习结合起来,以提高规划效率和泛化能力。
学习目标¶
完成本章后,你将能够:
- 理解规划与学习的关系
- 掌握学习启发函数的方法
- 了解端到端学习方法
- 应用规划与学习解决实际问题
1. 规划与学习的关系¶
1.1 规划与学习的互补性¶
规划与学习的互补性:
规划的优势:
- 可以利用领域知识
- 可以保证最优性
- 可以处理新情况
学习的优势:
- 可以从经验中学习
- 可以泛化到新情况
- 可以减少计算量
结合方式:
- 学习辅助规划(Learning for Planning)
- 规划辅助学习(Planning for Learning)
- 端到端学习(End-to-End Learning)
class PlanningWithLearning:
"""规划与学习结合"""
def __init__(self, planner, learner):
"""
初始化
参数:
planner: 规划器
learner: 学习器
"""
self.planner = planner
self.learner = learner
def plan(self, problem):
"""使用学习辅助规划"""
# 使用学习到的启发函数
heuristic = self.learner.get_heuristic()
# 使用启发函数进行规划
plan = self.planner.plan(problem, heuristic)
return plan
def learn(self, experience):
"""从经验中学习"""
# 提取特征
features = self._extract_features(experience)
# 更新学习器
self.learner.update(features)
def _extract_features(self, experience):
"""提取特征"""
# 从经验中提取有用的特征
features = {
'state': experience['state'],
'action': experience['action'],
'reward': experience['reward'],
'next_state': experience['next_state']
}
return features
1.2 学习规划策略¶
class LearnedPolicy:
"""学习到的策略"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
"""
初始化
参数:
state_dim: 状态维度
action_dim: 动作维度
hidden_dim: 隐藏层维度
"""
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dim = hidden_dim
# 初始化网络参数
self._init_network()
def _init_network(self):
"""初始化网络"""
# 简单的两层网络
self.w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
self.b1 = np.zeros(self.hidden_dim)
self.w2 = np.random.randn(self.hidden_dim, self.action_dim) * 0.01
self.b2 = np.zeros(self.action_dim)
def forward(self, state):
"""前向传播"""
h = np.maximum(0, state @ self.w1 + self.b1) # ReLU
logits = h @ self.w2 + self.b2
probs = self._softmax(logits)
return probs
def _softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def choose_action(self, state):
"""选择动作"""
probs = self.forward(state)
action = np.random.choice(self.action_dim, p=probs)
return action
def update(self, states, actions, rewards, learning_rate=0.001):
"""更新策略"""
# 计算折扣回报
returns = []
G = 0
for r in reversed(rewards):
G = r + 0.99 * G
returns.insert(0, G)
returns = np.array(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 计算梯度(简化)
for state, action, G in zip(states, actions, returns):
probs = self.forward(state)
# ∇log π(a|s, θ)
grad_log_pi = np.outer(state, -probs)
grad_log_pi[:, action] += state
# 更新参数
self.w2 += learning_rate * G * grad_log_pi
2. 学习启发函数¶
2.1 启发函数学习¶
class LearnedHeuristic:
"""学习到的启发函数"""
def __init__(self, state_dim, hidden_dim=64):
"""
初始化
参数:
state_dim: 状态维度
hidden_dim: 隐藏层维度
"""
self.state_dim = state_dim
self.hidden_dim = hidden_dim
# 初始化网络参数
self.w1 = np.random.randn(state_dim, hidden_dim) * 0.01
self.b1 = np.zeros(hidden_dim)
self.w2 = np.random.randn(hidden_dim, 1) * 0.01
self.b2 = np.zeros(1)
def forward(self, state):
"""前向传播"""
h = np.maximum(0, state @ self.w1 + self.b1)
value = h @ self.w2 + self.b2
return value[0]
def __call__(self, state, goal=None):
"""计算启发值"""
if goal is not None:
# 使用状态和目标的差异
input_state = np.array(state) - np.array(goal)
else:
input_state = np.array(state)
return self.forward(input_state)
def update(self, states, true_values, learning_rate=0.001):
"""更新启发函数"""
for state, true_value in zip(states, true_values):
# 前向传播
h = np.maximum(0, state @ self.w1 + self.b1)
predicted_value = h @ self.w2 + self.b2
# 计算损失
loss = (predicted_value - true_value) ** 2
# 计算梯度(简化)
d_output = 2 * (predicted_value - true_value)
# 更新参数
self.w2 += learning_rate * d_output * h.reshape(-1, 1)
self.b2 += learning_rate * d_output
# 反向传播到隐藏层
d_h = d_output * self.w2.flatten()
d_h[h <= 0] = 0 # ReLU 梯度
self.w1 += learning_rate * np.outer(state, d_h)
self.b1 += learning_rate * d_h
# 示例
heuristic = LearnedHeuristic(state_dim=2)
# 训练数据
states = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
true_values = [10, 5, 0] # 到目标的距离
# 训练
for epoch in range(100):
heuristic.update(states, true_values, learning_rate=0.01)
# 测试
test_state = np.array([1.5, 1.5])
print(f"启发值: {heuristic(test_state):.2f}")
2.2 值函数学习¶
class ValueFunctionLearner:
"""值函数学习器"""
def __init__(self, state_dim, hidden_dim=64):
"""
初始化
参数:
state_dim: 状态维度
hidden_dim: 隐藏层维度
"""
self.state_dim = state_dim
self.hidden_dim = hidden_dim
# 初始化网络参数
self.w1 = np.random.randn(state_dim, hidden_dim) * 0.01
self.b1 = np.zeros(hidden_dim)
self.w2 = np.random.randn(hidden_dim, 1) * 0.01
self.b2 = np.zeros(1)
def forward(self, state):
"""前向传播"""
h = np.maximum(0, state @ self.w1 + self.b1)
value = h @ self.w2 + self.b2
return value[0]
def update(self, state, target_value, learning_rate=0.001):
"""更新值函数"""
# 前向传播
h = np.maximum(0, state @ self.w1 + self.b1)
predicted_value = h @ self.w2 + self.b2
# 计算损失
loss = (predicted_value - target_value) ** 2
# 计算梯度
d_output = 2 * (predicted_value - target_value)
# 更新参数
self.w2 += learning_rate * d_output * h.reshape(-1, 1)
self.b2 += learning_rate * d_output
# 反向传播到隐藏层
d_h = d_output * self.w2.flatten()
d_h[h <= 0] = 0
self.w1 += learning_rate * np.outer(state, d_h)
self.b1 += learning_rate * d_h
return loss
# 示例
value_learner = ValueFunctionLearner(state_dim=2)
# 训练
for episode in range(1000):
state = np.random.randn(2)
target_value = -np.linalg.norm(state) # 假设目标在原点
loss = value_learner.update(state, target_value, learning_rate=0.01)
if (episode + 1) % 100 == 0:
print(f"Episode {episode + 1}, Loss: {loss:.4f}")
3. 端到端学习¶
3.1 端到端规划¶
class EndToEndPlanner:
"""端到端规划器"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
"""
初始化
参数:
state_dim: 状态维度
action_dim: 动作维度
hidden_dim: 隐藏层维度
"""
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dim = hidden_dim
# 初始化网络参数
self._init_network()
# 经验回放缓冲区
self.replay_buffer = []
self.buffer_size = 10000
self.batch_size = 32
def _init_network(self):
"""初始化网络"""
# 策略网络
self.policy_w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
self.policy_b1 = np.zeros(self.hidden_dim)
self.policy_w2 = np.random.randn(self.hidden_dim, self.action_dim) * 0.01
self.policy_b2 = np.zeros(self.action_dim)
# 值网络
self.value_w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
self.value_b1 = np.zeros(self.hidden_dim)
self.value_w2 = np.random.randn(self.hidden_dim, 1) * 0.01
self.value_b2 = np.zeros(1)
def policy_forward(self, state):
"""策略前向传播"""
h = np.maximum(0, state @ self.policy_w1 + self.policy_b1)
logits = h @ self.policy_w2 + self.policy_b2
probs = self._softmax(logits)
return probs
def value_forward(self, state):
"""值前向传播"""
h = np.maximum(0, state @ self.value_w1 + self.value_b1)
value = h @ self.value_w2 + self.value_b2
return value[0]
def _softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def choose_action(self, state):
"""选择动作"""
probs = self.policy_forward(state)
action = np.random.choice(self.action_dim, p=probs)
return action
def store_experience(self, state, action, reward, next_state, done):
"""存储经验"""
self.replay_buffer.append((state, action, reward, next_state, done))
if len(self.replay_buffer) > self.buffer_size:
self.replay_buffer.pop(0)
def update(self, learning_rate=0.001):
"""更新网络"""
if len(self.replay_buffer) < self.batch_size:
return
# 采样经验
indices = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
batch = [self.replay_buffer[i] for i in indices]
states = np.array([t[0] for t in batch])
actions = np.array([t[1] for t in batch])
rewards = np.array([t[2] for t in batch])
next_states = np.array([t[3] for t in batch])
dones = np.array([t[4] for t in batch])
# 计算目标值
target_values = rewards + 0.99 * np.array([
self.value_forward(s) if not d else 0
for s, d in zip(next_states, dones)
])
# 更新值网络
for state, target_value in zip(states, target_values):
self._update_value_network(state, target_value, learning_rate)
# 更新策略网络
for state, action in zip(states, actions):
self._update_policy_network(state, action, learning_rate)
def _update_value_network(self, state, target_value, learning_rate):
"""更新值网络"""
# 前向传播
h = np.maximum(0, state @ self.value_w1 + self.value_b1)
predicted_value = h @ self.value_w2 + self.value_b2
# 计算梯度
d_output = 2 * (predicted_value - target_value)
# 更新参数
self.value_w2 += learning_rate * d_output * h.reshape(-1, 1)
self.value_b2 += learning_rate * d_output
# 反向传播到隐藏层
d_h = d_output * self.value_w2.flatten()
d_h[h <= 0] = 0
self.value_w1 += learning_rate * np.outer(state, d_h)
self.value_b1 += learning_rate * d_h
def _update_policy_network(self, state, action, learning_rate):
"""更新策略网络"""
# 前向传播
probs = self.policy_forward(state)
# 计算梯度(策略梯度)
advantage = self.value_forward(state) # 简化
grad_log_pi = np.outer(state, -probs)
grad_log_pi[:, action] += state
# 更新参数
self.policy_w2 += learning_rate * advantage * grad_log_pi
3.2 模仿学习¶
class ImitationLearning:
"""模仿学习"""
def __init__(self, state_dim, action_dim, hidden_dim=64):
"""
初始化
参数:
state_dim: 状态维度
action_dim: 动作维度
hidden_dim: 隐藏层维度
"""
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dim = hidden_dim
# 初始化网络参数
self.w1 = np.random.randn(state_dim, hidden_dim) * 0.01
self.b1 = np.zeros(hidden_dim)
self.w2 = np.random.randn(hidden_dim, action_dim) * 0.01
self.b2 = np.zeros(action_dim)
def forward(self, state):
"""前向传播"""
h = np.maximum(0, state @ self.w1 + self.b1)
logits = h @ self.w2 + self.b2
probs = self._softmax(logits)
return probs
def _softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def choose_action(self, state):
"""选择动作"""
probs = self.forward(state)
action = np.random.choice(self.action_dim, p=probs)
return action
def imitate(self, expert_states, expert_actions, learning_rate=0.001, epochs=100):
"""
模仿专家
参数:
expert_states: 专家状态
expert_actions: 专家动作
learning_rate: 学习率
epochs: 训练轮数
"""
for epoch in range(epochs):
total_loss = 0
for state, expert_action in zip(expert_states, expert_actions):
# 前向传播
probs = self.forward(state)
# 计算交叉熵损失
loss = -np.log(probs[expert_action] + 1e-8)
total_loss += loss
# 计算梯度
grad = probs.copy()
grad[expert_action] -= 1
# 反向传播
h = np.maximum(0, state @ self.w1 + self.b1)
# 更新参数
self.w2 += learning_rate * np.outer(h, grad)
self.b2 += learning_rate * grad
# 反向传播到隐藏层
d_h = grad @ self.w2.T
d_h[h <= 0] = 0
self.w1 += learning_rate * np.outer(state, d_h)
self.b1 += learning_rate * d_h
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch + 1}, Loss: {total_loss / len(expert_states):.4f}")
# 示例
imitation = ImitationLearning(state_dim=2, action_dim=4)
# 专家数据
expert_states = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
expert_actions = [0, 1, 2] # 专家选择的动作
# 训练
imitation.imitate(expert_states, expert_actions, learning_rate=0.01, epochs=50)
4. 层次化规划与学习¶
4.1 层次化任务网络学习¶
class HTNLearner:
"""HTN 学习器"""
def __init__(self, state_dim, action_dim):
"""
初始化
参数:
state_dim: 状态维度
action_dim: 动作维度
"""
self.state_dim = state_dim
self.action_dim = action_dim
# 方法库
self.methods = {}
# 学习到的方法
self.learned_methods = {}
def add_method(self, name, method):
"""添加方法"""
self.methods[name] = method
def learn_method(self, examples):
"""
从示例中学习方法
参数:
examples: 示例列表 [(state, action_sequence), ...]
"""
# 分析示例
for state, action_sequence in examples:
# 提取特征
features = self._extract_features(state)
# 学习方法
method_name = f"learned_method_{len(self.learned_methods)}"
method = self._create_method(features, action_sequence)
self.learned_methods[method_name] = method
def _extract_features(self, state):
"""提取特征"""
return state
def _create_method(self, features, action_sequence):
"""创建方法"""
def method(state):
# 检查是否匹配
if np.allclose(state, features):
return action_sequence
return None
return method
def get_method(self, state):
"""获取方法"""
# 首先检查学习到的方法
for name, method in self.learned_methods.items():
result = method(state)
if result is not None:
return result
# 然后检查预定义方法
for name, method in self.methods.items():
result = method(state)
if result is not None:
return result
return None
# 示例
learner = HTNLearner(state_dim=2, action_dim=4)
# 添加预定义方法
def move_to_goal(state):
if np.linalg.norm(state - np.array([4, 4])) < 1:
return ['move_right', 'move_up']
return None
learner.add_method('move_to_goal', move_to_goal)
# 从示例学习
examples = [
(np.array([0, 0]), ['move_right', 'move_right', 'move_up', 'move_up']),
(np.array([1, 1]), ['move_right', 'move_up']),
]
learner.learn_method(examples)
# 测试
state = np.array([0, 0])
method = learner.get_method(state)
print(f"状态 {state} 的方法: {method}")
4.2 选项框架¶
class Option:
"""选项"""
def __init__(name, initiation, policy, termination):
"""
初始化选项
参数:
name: 选项名称
initiation: 启动条件
policy: 策略
termination: 终止条件
"""
self.name = name
self.initiation = initiation
self.policy = policy
self.termination = termination
def is_applicable(self, state):
"""检查是否可应用"""
return self.initiation(state)
def get_action(self, state):
"""获取动作"""
return self.policy(state)
def is_terminated(self, state):
"""检查是否终止"""
return self.termination(state)
class OptionFramework:
"""选项框架"""
def __init__(self):
self.options = []
def add_option(self, option):
"""添加选项"""
self.options.append(option)
def get_applicable_options(self, state):
"""获取可应用的选项"""
applicable = []
for option in self.options:
if option.is_applicable(state):
applicable.append(option)
return applicable
def execute_option(self, option, state, env, max_steps=100):
"""执行选项"""
current_state = state
total_reward = 0
for step in range(max_steps):
# 检查是否终止
if option.is_terminated(current_state):
break
# 获取动作
action = option.get_action(current_state)
# 执行动作
next_state, reward, done = env.step(action)
total_reward += reward
current_state = next_state
if done:
break
return current_state, total_reward
# 示例:导航选项
def navigate_to_target(target):
"""创建导航到目标的选项"""
def initiation(state):
# 只要不在目标位置就可以启动
return np.linalg.norm(state - target) > 0.5
def policy(state):
# 简单的朝向目标移动
direction = target - state
if abs(direction[0]) > abs(direction[1]):
return 'right' if direction[0] > 0 else 'left'
else:
return 'up' if direction[1] > 0 else 'down'
def termination(state):
# 到达目标附近终止
return np.linalg.norm(state - target) < 0.5
return Option('navigate_to_target', initiation, policy, termination)
# 创建选项框架
framework = OptionFramework()
# 添加导航选项
target1 = np.array([4, 4])
target2 = np.array([0, 0])
framework.add_option(navigate_to_target(target1))
framework.add_option(navigate_to_target(target2))
# 测试
state = np.array([0, 0])
applicable = framework.get_applicable_options(state)
print(f"状态 {state} 的可应用选项: {[o.name for o in applicable]}")
5. 实践练习¶
练习 1:实现学习启发函数¶
def learned_heuristic_exercise():
# 创建学习器
heuristic = LearnedHeuristic(state_dim=2)
# 训练数据
states = [np.array([x, y]) for x in range(5) for y in range(5)]
true_values = [np.linalg.norm(s - np.array([4, 4])) for s in states]
# 训练
for epoch in range(100):
heuristic.update(states, true_values, learning_rate=0.01)
# 测试
test_state = np.array([2, 3])
print(f"启发值: {heuristic(test_state):.2f}")
print(f"真实值: {np.linalg.norm(test_state - np.array([4, 4])):.2f}")
练习 2:实现模仿学习¶
def imitation_learning_exercise():
# 创建学习器
imitation = ImitationLearning(state_dim=2, action_dim=4)
# 专家数据
expert_states = [np.array([0, 0]), np.array([1, 1]), np.array([2, 2])]
expert_actions = [0, 1, 2]
# 训练
imitation.imitate(expert_states, expert_actions, learning_rate=0.01, epochs=50)
# 测试
test_state = np.array([1.5, 1.5])
action = imitation.choose_action(test_state)
print(f"状态 {test_state} 的动作: {action}")
练习 3:实现选项框架¶
def option_framework_exercise():
# 创建选项框架
framework = OptionFramework()
# 添加选项
framework.add_option(navigate_to_target(np.array([4, 4])))
framework.add_option(navigate_to_target(np.array([0, 0])))
# 测试
state = np.array([2, 2])
applicable = framework.get_applicable_options(state)
print(f"状态 {state} 的可应用选项:")
for option in applicable:
print(f" - {option.name}")
6. 常见问题¶
1. 样本效率低¶
问题:需要大量样本才能学习
解决方案: - 使用经验回放 - 使用优先经验回放 - 使用模型-based 方法
2. 泛化能力差¶
问题:学习到的策略泛化能力差
解决方案: - 使用更好的特征表示 - 使用数据增强 - 使用正则化
3. 学习不稳定¶
问题:学习过程不稳定
解决方案: - 使用目标网络 - 使用梯度裁剪 - 使用更小的学习率
下一步¶
参考资源¶
- Acting, Planning, and Learning - Chapter 4, 7, 10
- Reinforcement Learning: An Introduction
- Imitation Learning