强化学习¶
强化学习是机器学习的一个分支,智能体通过与环境的交互来学习最优策略。本章将介绍强化学习的核心概念和算法。
学习目标¶
完成本章后,你将能够:
- 理解强化学习的基本概念
- 掌握值函数方法(Q-Learning、SARSA)
- 理解策略梯度方法
- 了解深度强化学习
1. 强化学习基础¶
1.1 强化学习框架¶
强化学习框架:
环境 (Environment)
│
│ 状态 s_t, 奖励 r_t
▼
┌─────────────┐
│ 智能体 │
│ (Agent) │
└──────┬──────┘
│
│ 动作 a_t
▼
环境 (Environment)
核心要素:
- 智能体(Agent):学习和决策的主体
- 环境(Environment):智能体交互的世界
- 状态(State):环境的描述
- 动作(Action):智能体的行为
- 奖励(Reward):环境的反馈
- 策略(Policy):状态到动作的映射
- 值函数(Value Function):长期奖励的估计
1.2 强化学习与 MDP¶
import numpy as np
class RLAgent:
"""强化学习智能体"""
def __init__(self, states, actions, learning_rate=0.1, discount_factor=0.9,
exploration_rate=0.1):
"""
初始化智能体
参数:
states: 状态列表
actions: 动作列表
learning_rate: 学习率 α
discount_factor: 折扣因子 γ
exploration_rate: 探索率 ε
"""
self.states = states
self.actions = actions
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
# 初始化 Q 表
self.q_table = {(s, a): 0 for s in states for a in actions}
def choose_action(self, state):
"""选择动作(ε-贪心策略)"""
if np.random.random() < self.exploration_rate:
# 探索:随机选择动作
return np.random.choice(self.actions)
else:
# 利用:选择 Q 值最大的动作
q_values = [self.q_table[(state, a)] for a in self.actions]
return self.actions[np.argmax(q_values)]
def update(self, state, action, reward, next_state, done):
"""更新 Q 值"""
# 获取当前 Q 值
current_q = self.q_table[(state, action)]
# 计算目标 Q 值
if done:
target_q = reward
else:
max_q_next = max(self.q_table[(next_state, a)] for a in self.actions)
target_q = reward + self.discount_factor * max_q_next
# 更新 Q 值
self.q_table[(state, action)] = current_q + self.learning_rate * (
target_q - current_q
)
def get_policy(self):
"""获取策略"""
policy = {}
for state in self.states:
q_values = [self.q_table[(state, a)] for a in self.actions]
policy[state] = self.actions[np.argmax(q_values)]
return policy
2. 值函数方法¶
2.1 Q-Learning¶
Q-Learning 是一种离策略(off-policy)的时序差分学习算法。
Q-Learning 算法:
1. 初始化 Q(s, a) = 0 对所有 s, a
2. 对每个 episode:
a. 初始化状态 s
b. 重复直到 episode 结束:
- 选择动作 a(ε-贪心)
- 执行动作,观察 r, s'
- 更新 Q(s, a) ← Q(s, a) + α [r + γ max_{a'} Q(s', a') - Q(s, a)]
- s ← s'
class QLearning:
"""Q-Learning 算法"""
def __init__(self, states, actions, learning_rate=0.1, discount_factor=0.9,
exploration_rate=0.1, exploration_decay=0.995):
"""
初始化 Q-Learning
参数:
states: 状态列表
actions: 动作列表
learning_rate: 学习率
discount_factor: 折扣因子
exploration_rate: 初始探索率
exploration_decay: 探索率衰减
"""
self.states = states
self.actions = actions
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
self.exploration_decay = exploration_decay
# 初始化 Q 表
self.q_table = {(s, a): 0 for s in states for a in actions}
# 记录训练历史
self.training_history = []
def choose_action(self, state):
"""选择动作"""
if np.random.random() < self.exploration_rate:
return np.random.choice(self.actions)
else:
q_values = [self.q_table[(state, a)] for a in self.actions]
return self.actions[np.argmax(q_values)]
def update(self, state, action, reward, next_state):
"""更新 Q 值"""
# Q-Learning 更新规则
max_q_next = max(self.q_table[(next_state, a)] for a in self.actions)
current_q = self.q_table[(state, action)]
self.q_table[(state, action)] = current_q + self.learning_rate * (
reward + self.discount_factor * max_q_next - current_q
)
def decay_exploration(self):
"""衰减探索率"""
self.exploration_rate *= self.exploration_decay
def train(self, env, num_episodes=1000, max_steps=100):
"""
训练智能体
参数:
env: 环境
num_episodes: 训练 episode 数
max_steps: 每个 episode 的最大步数
返回:
training_history: 训练历史
"""
for episode in range(num_episodes):
state = env.reset()
total_reward = 0
for step in range(max_steps):
# 选择动作
action = self.choose_action(state)
# 执行动作
next_state, reward, done = env.step(action)
# 更新 Q 值
self.update(state, action, reward, next_state)
# 更新状态
state = next_state
total_reward += reward
if done:
break
# 衰减探索率
self.decay_exploration()
# 记录训练历史
self.training_history.append({
'episode': episode,
'total_reward': total_reward,
'steps': step + 1,
'exploration_rate': self.exploration_rate
})
# 打印进度
if (episode + 1) % 100 == 0:
avg_reward = np.mean([h['total_reward'] for h in self.training_history[-100:]])
print(f"Episode {episode + 1}, 平均奖励: {avg_reward:.2f}, "
f"探索率: {self.exploration_rate:.3f}")
return self.training_history
# 示例:网格世界环境
class GridWorldEnv:
"""网格世界环境"""
def __init__(self, width, height, goal, obstacles):
self.width = width
self.height = height
self.goal = goal
self.obstacles = obstacles
self.state = (0, 0)
self.states = [(x, y) for x in range(width) for y in range(height)]
self.actions = ['up', 'down', 'left', 'right']
def reset(self):
"""重置环境"""
self.state = (0, 0)
return self.state
def step(self, action):
"""执行动作"""
x, y = self.state
# 计算新状态
if action == 'up':
new_state = (x, min(y + 1, self.height - 1))
elif action == 'down':
new_state = (x, max(y - 1, 0))
elif action == 'left':
new_state = (max(x - 1, 0), y)
elif action == 'right':
new_state = (min(x + 1, self.width - 1), y)
# 检查是否是障碍物
if new_state in self.obstacles:
new_state = self.state # 碰到障碍物,保持原位
reward = -10
elif new_state == self.goal:
reward = 100
self.state = new_state
return new_state, reward, True
else:
reward = -1
self.state = new_state
return new_state, reward, False
2.2 SARSA¶
SARSA 是一种在策略(on-policy)的时序差分学习算法。
SARSA 算法:
1. 初始化 Q(s, a) = 0 对所有 s, a
2. 对每个 episode:
a. 初始化状态 s
b. 选择动作 a(ε-贪心)
c. 重复直到 episode 结束:
- 执行动作,观察 r, s'
- 选择动作 a'(ε-贪心)
- 更新 Q(s, a) ← Q(s, a) + α [r + γ Q(s', a') - Q(s, a)]
- s ← s', a ← a'
class SARSA:
"""SARSA 算法"""
def __init__(self, states, actions, learning_rate=0.1, discount_factor=0.9,
exploration_rate=0.1, exploration_decay=0.995):
self.states = states
self.actions = actions
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
self.exploration_decay = exploration_decay
# 初始化 Q 表
self.q_table = {(s, a): 0 for s in states for a in actions}
# 记录训练历史
self.training_history = []
def choose_action(self, state):
"""选择动作"""
if np.random.random() < self.exploration_rate:
return np.random.choice(self.actions)
else:
q_values = [self.q_table[(state, a)] for a in self.actions]
return self.actions[np.argmax(q_values)]
def update(self, state, action, reward, next_state, next_action):
"""更新 Q 值"""
# SARSA 更新规则
current_q = self.q_table[(state, action)]
next_q = self.q_table[(next_state, next_action)]
self.q_table[(state, action)] = current_q + self.learning_rate * (
reward + self.discount_factor * next_q - current_q
)
def train(self, env, num_episodes=1000, max_steps=100):
"""训练智能体"""
for episode in range(num_episodes):
state = env.reset()
action = self.choose_action(state)
total_reward = 0
for step in range(max_steps):
# 执行动作
next_state, reward, done = env.step(action)
# 选择下一个动作
next_action = self.choose_action(next_state)
# 更新 Q 值
self.update(state, action, reward, next_state, next_action)
# 更新状态和动作
state = next_state
action = next_action
total_reward += reward
if done:
break
# 衰减探索率
self.exploration_rate *= self.exploration_decay
# 记录训练历史
self.training_history.append({
'episode': episode,
'total_reward': total_reward,
'steps': step + 1
})
if (episode + 1) % 100 == 0:
avg_reward = np.mean([h['total_reward'] for h in self.training_history[-100:]])
print(f"Episode {episode + 1}, 平均奖励: {avg_reward:.2f}")
return self.training_history
2.3 期望 SARSA¶
class ExpectedSARSA:
"""期望 SARSA 算法"""
def __init__(self, states, actions, learning_rate=0.1, discount_factor=0.9,
exploration_rate=0.1):
self.states = states
self.actions = actions
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
# 初始化 Q 表
self.q_table = {(s, a): 0 for s in states for a in actions}
def choose_action(self, state):
"""选择动作"""
if np.random.random() < self.exploration_rate:
return np.random.choice(self.actions)
else:
q_values = [self.q_table[(state, a)] for a in self.actions]
return self.actions[np.argmax(q_values)]
def update(self, state, action, reward, next_state):
"""更新 Q 值"""
# 计算期望 Q 值
expected_q = 0
for a in self.actions:
if a == self._greedy_action(next_state):
prob = 1 - self.exploration_rate + self.exploration_rate / len(self.actions)
else:
prob = self.exploration_rate / len(self.actions)
expected_q += prob * self.q_table[(next_state, a)]
# 更新 Q 值
current_q = self.q_table[(state, action)]
self.q_table[(state, action)] = current_q + self.learning_rate * (
reward + self.discount_factor * expected_q - current_q
)
def _greedy_action(self, state):
"""贪婪动作"""
q_values = [self.q_table[(state, a)] for a in self.actions]
return self.actions[np.argmax(q_values)]
3. 策略梯度方法¶
3.1 策略梯度定理¶
策略梯度定理:
∇J(θ) = E_π[∇log π(a|s, θ) * Q^π(s, a)]
其中:
- J(θ): 策略的期望回报
- π(a|s, θ): 参数化的策略
- Q^π(s, a): 动作值函数
- θ: 策略参数
import numpy as np
class PolicyGradient:
"""策略梯度算法"""
def __init__(self, state_dim, action_dim, learning_rate=0.01):
"""
初始化策略梯度
参数:
state_dim: 状态维度
action_dim: 动作维度
learning_rate: 学习率
"""
self.state_dim = state_dim
self.action_dim = action_dim
self.learning_rate = learning_rate
# 初始化策略参数
self.theta = np.random.randn(state_dim, action_dim) * 0.01
# 记录轨迹
self.trajectory = []
def softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def get_action_prob(self, state):
"""获取动作概率"""
# 计算每个动作的分数
scores = state @ self.theta
# Softmax 归一化
probs = self.softmax(scores)
return probs
def choose_action(self, state):
"""选择动作"""
probs = self.get_action_prob(state)
action = np.random.choice(self.action_dim, p=probs)
return action
def update(self, states, actions, rewards):
"""
更新策略参数
参数:
states: 状态序列
actions: 动作序列
rewards: 奖励序列
"""
# 计算折扣回报
returns = self._compute_returns(rewards)
# 计算梯度
gradients = np.zeros_like(self.theta)
for state, action, G in zip(states, actions, returns):
probs = self.get_action_prob(state)
# ∇log π(a|s, θ)
grad_log_pi = np.outer(state, -probs)
grad_log_pi[:, action] += state
# 累加梯度
gradients += grad_log_pi * G
# 更新参数
self.theta += self.learning_rate * gradients / len(states)
def _compute_returns(self, rewards, gamma=0.99):
"""计算折扣回报"""
returns = []
G = 0
for r in reversed(rewards):
G = r + gamma * G
returns.insert(0, G)
# 标准化
returns = np.array(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
return returns
3.2 REINFORCE 算法¶
class REINFORCE:
"""REINFORCE 算法"""
def __init__(self, state_dim, action_dim, learning_rate=0.01, gamma=0.99):
self.state_dim = state_dim
self.action_dim = action_dim
self.learning_rate = learning_rate
self.gamma = gamma
# 初始化策略网络
self.theta = np.random.randn(state_dim, action_dim) * 0.01
# 记录轨迹
self.states = []
self.actions = []
self.rewards = []
def softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def choose_action(self, state):
"""选择动作"""
scores = state @ self.theta
probs = self.softmax(scores)
action = np.random.choice(self.action_dim, p=probs)
# 记录
self.states.append(state)
self.actions.append(action)
return action
def store_reward(self, reward):
"""存储奖励"""
self.rewards.append(reward)
def update(self):
"""更新策略"""
# 计算折扣回报
returns = []
G = 0
for r in reversed(self.rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = np.array(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 计算梯度
gradients = np.zeros_like(self.theta)
for state, action, G in zip(self.states, self.actions, returns):
probs = self.softmax(state @ self.theta)
# ∇log π(a|s, θ)
grad_log_pi = np.outer(state, -probs)
grad_log_pi[:, action] += state
gradients += grad_log_pi * G
# 更新参数
self.theta += self.learning_rate * gradients / len(self.states)
# 清空轨迹
self.states = []
self.actions = []
self.rewards = []
3.3 Actor-Critic 方法¶
class ActorCritic:
"""Actor-Critic 方法"""
def __init__(self, state_dim, action_dim, actor_lr=0.001, critic_lr=0.01, gamma=0.99):
self.state_dim = state_dim
self.action_dim = action_dim
self.actor_lr = actor_lr
self.critic_lr = critic_lr
self.gamma = gamma
# Actor 参数
self.actor_theta = np.random.randn(state_dim, action_dim) * 0.01
# Critic 参数
self.critic_theta = np.random.randn(state_dim) * 0.01
def softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def choose_action(self, state):
"""选择动作"""
scores = state @ self.actor_theta
probs = self.softmax(scores)
action = np.random.choice(self.action_dim, p=probs)
return action
def get_value(self, state):
"""获取状态值"""
return state @ self.critic_theta
def update(self, state, action, reward, next_state, done):
"""
更新 Actor 和 Critic
参数:
state: 当前状态
action: 执行的动作
reward: 获得的奖励
next_state: 下一个状态
done: 是否结束
"""
# 计算 TD 误差
if done:
td_target = reward
else:
td_target = reward + self.gamma * self.get_value(next_state)
td_error = td_target - self.get_value(state)
# 更新 Critic
self.critic_theta += self.critic_lr * td_error * state
# 更新 Actor
probs = self.softmax(state @ self.actor_theta)
grad_log_pi = np.outer(state, -probs)
grad_log_pi[:, action] += state
self.actor_theta += self.actor_lr * td_error * grad_log_pi
4. 深度强化学习¶
4.1 深度 Q 网络(DQN)¶
import numpy as np
class DQN:
"""深度 Q 网络"""
def __init__(self, state_dim, action_dim, hidden_dim=64, learning_rate=0.001,
gamma=0.99, epsilon=0.1, epsilon_decay=0.995, target_update=10):
"""
初始化 DQN
参数:
state_dim: 状态维度
action_dim: 动作维度
hidden_dim: 隐藏层维度
learning_rate: 学习率
gamma: 折扣因子
epsilon: 探索率
epsilon_decay: 探索率衰减
target_update: 目标网络更新频率
"""
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dim = hidden_dim
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.target_update = target_update
# 初始化网络参数
self._init_network()
# 经验回放缓冲区
self.replay_buffer = []
self.buffer_size = 10000
self.batch_size = 32
# 训练计数
self.train_step = 0
def _init_network(self):
"""初始化网络"""
# 主网络
self.w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
self.b1 = np.zeros(self.hidden_dim)
self.w2 = np.random.randn(self.hidden_dim, self.action_dim) * 0.01
self.b2 = np.zeros(self.action_dim)
# 目标网络
self.target_w1 = self.w1.copy()
self.target_b1 = self.b1.copy()
self.target_w2 = self.w2.copy()
self.target_b2 = self.b2.copy()
def forward(self, state, use_target=False):
"""前向传播"""
if use_target:
h = np.maximum(0, state @ self.target_w1 + self.target_b1)
q_values = h @ self.target_w2 + self.target_b2
else:
h = np.maximum(0, state @ self.w1 + self.b1)
q_values = h @ self.w2 + self.b2
return q_values
def choose_action(self, state):
"""选择动作"""
if np.random.random() < self.epsilon:
return np.random.randint(self.action_dim)
else:
q_values = self.forward(state)
return np.argmax(q_values)
def store_transition(self, state, action, reward, next_state, done):
"""存储经验"""
self.replay_buffer.append((state, action, reward, next_state, done))
if len(self.replay_buffer) > self.buffer_size:
self.replay_buffer.pop(0)
def update(self):
"""更新网络"""
if len(self.replay_buffer) < self.batch_size:
return
# 采样经验
indices = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
batch = [self.replay_buffer[i] for i in indices]
states = np.array([t[0] for t in batch])
actions = np.array([t[1] for t in batch])
rewards = np.array([t[2] for t in batch])
next_states = np.array([t[3] for t in batch])
dones = np.array([t[4] for t in batch])
# 计算目标 Q 值
next_q_values = self.forward(next_states, use_target=True)
max_next_q = np.max(next_q_values, axis=1)
targets = rewards + self.gamma * max_next_q * (1 - dones)
# 计算当前 Q 值
current_q_values = self.forward(states)
current_q = current_q_values[np.arange(self.batch_size), actions]
# 计算损失
loss = np.mean((current_q - targets) ** 2)
# 计算梯度(简化)
# 实际实现需要反向传播
# 更新目标网络
self.train_step += 1
if self.train_step % self.target_update == 0:
self.target_w1 = self.w1.copy()
self.target_b1 = self.b1.copy()
self.target_w2 = self.w2.copy()
self.target_b2 = self.b2.copy()
# 衰减探索率
self.epsilon *= self.epsilon_decay
4.2 策略梯度方法(深度版本)¶
class DeepPolicyGradient:
"""深度策略梯度"""
def __init__(self, state_dim, action_dim, hidden_dim=64, learning_rate=0.001, gamma=0.99):
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dim = hidden_dim
self.learning_rate = learning_rate
self.gamma = gamma
# 初始化网络参数
self._init_network()
# 记录轨迹
self.states = []
self.actions = []
self.rewards = []
self.log_probs = []
def _init_network(self):
"""初始化网络"""
# 策略网络
self.w1 = np.random.randn(self.state_dim, self.hidden_dim) * 0.01
self.b1 = np.zeros(self.hidden_dim)
self.w2 = np.random.randn(self.hidden_dim, self.action_dim) * 0.01
self.b2 = np.zeros(self.action_dim)
def forward(self, state):
"""前向传播"""
h = np.maximum(0, state @ self.w1 + self.b1)
logits = h @ self.w2 + self.b2
probs = self._softmax(logits)
return probs
def _softmax(self, x):
"""Softmax 函数"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def choose_action(self, state):
"""选择动作"""
probs = self.forward(state)
action = np.random.choice(self.action_dim, p=probs)
# 记录
self.states.append(state)
self.actions.append(action)
self.log_probs.append(np.log(probs[action] + 1e-8))
return action
def store_reward(self, reward):
"""存储奖励"""
self.rewards.append(reward)
def update(self):
"""更新网络"""
# 计算折扣回报
returns = []
G = 0
for r in reversed(self.rewards):
G = r + self.gamma * G
returns.insert(0, G)
returns = np.array(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# 计算损失
loss = 0
for log_prob, G in zip(self.log_probs, returns):
loss -= log_prob * G
# 计算梯度(简化)
# 实际实现需要反向传播
# 清空轨迹
self.states = []
self.actions = []
self.rewards = []
self.log_probs = []
5. 实践练习¶
练习 1:实现 Q-Learning¶
def q_learning_exercise():
# 创建环境
env = GridWorldEnv(5, 5, (4, 4), [(1, 1), (2, 2)])
# 创建智能体
agent = QLearning(
states=env.states,
actions=env.actions,
learning_rate=0.1,
discount_factor=0.9,
exploration_rate=0.1
)
# 训练
history = agent.train(env, num_episodes=1000)
# 获取策略
policy = agent.get_policy()
print("学到的策略:")
for y in range(4, -1, -1):
for x in range(5):
action = policy.get((x, y), ' ')
print(f"{action:>6}", end=" ")
print()
练习 2:实现策略梯度¶
def policy_gradient_exercise():
# 创建智能体
agent = PolicyGradient(state_dim=2, action_dim=4)
# 模拟训练
for episode in range(100):
state = np.array([0, 0])
total_reward = 0
for step in range(50):
action = agent.choose_action(state)
next_state = state + np.random.randn(2) * 0.1
reward = -np.linalg.norm(next_state - np.array([4, 4]))
agent.states.append(state)
agent.actions.append(action)
agent.rewards.append(reward)
state = next_state
total_reward += reward
agent.update()
if (episode + 1) % 10 == 0:
print(f"Episode {episode + 1}, 总奖励: {total_reward:.2f}")
练习 3:实现 Actor-Critic¶
def actor_critic_exercise():
# 创建智能体
agent = ActorCritic(state_dim=2, action_dim=4)
# 模拟训练
for episode in range(100):
state = np.array([0, 0])
total_reward = 0
for step in range(50):
action = agent.choose_action(state)
next_state = state + np.random.randn(2) * 0.1
reward = -np.linalg.norm(next_state - np.array([4, 4]))
done = (step == 49)
agent.update(state, action, reward, next_state, done)
state = next_state
total_reward += reward
if (episode + 1) % 10 == 0:
print(f"Episode {episode + 1}, 总奖励: {total_reward:.2f}")
6. 常见问题¶
1. 探索与利用的平衡¶
问题:如何平衡探索和利用?
解决方案: - ε-贪心策略 - UCB 算法 - Thompson 采样
2. 奖励稀疏问题¶
问题:奖励信号稀疏,学习困难
解决方案: - 奖励塑形 - 课程学习 - 分层强化学习
3. 样本效率低¶
问题:需要大量样本才能学习
解决方案: - 经验回放 - 优先经验回放 - 模型-based 方法
下一步¶
参考资源¶
- Acting, Planning, and Learning - Chapter 10
- Reinforcement Learning: An Introduction
- Spinning Up in Deep RL