概率规划¶
概率规划是指在状态转移具有不确定性的环境中进行规划。本章将介绍马尔可夫决策过程(MDP)和相关的规划算法。
学习目标¶
完成本章后,你将能够:
- 理解 MDP 的基本概念
- 掌握值迭代和策略迭代算法
- 理解部分可观察 MDP(POMDP)
- 应用概率规划解决实际问题
1. 马尔可夫决策过程(MDP)¶
1.1 MDP 定义¶
MDP 是一个五元组 (S, A, T, R, γ):
- S: 状态集合
- A: 动作集合
- T: 状态转移函数 T(s, a, s') = P(s' | s, a)
- R: 奖励函数 R(s, a, s')
- γ: 折扣因子 (0 ≤ γ ≤ 1)
马尔可夫性质:
P(s_{t+1} | s_t, a_t, s_{t-1}, a_{t-1}, ...) = P(s_{t+1} | s_t, a_t)
即下一个状态只依赖于当前状态和动作,与历史无关
import numpy as np
class MDP:
"""马尔可夫决策过程"""
def __init__(self, states, actions, transition_model, reward_function, discount_factor=0.9):
"""
初始化 MDP
参数:
states: 状态列表
actions: 动作列表
transition_model: 转移模型 P(s'|s,a)
reward_function: 奖励函数 R(s,a,s')
discount_factor: 折扣因子
"""
self.states = states
self.actions = actions
self.transition_model = transition_model
self.reward_function = reward_function
self.discount_factor = discount_factor
def get_transition_probability(self, state, action, next_state):
"""获取转移概率"""
return self.transition_model(state, action, next_state)
def get_reward(self, state, action, next_state):
"""获取奖励"""
return self.reward_function(state, action, next_state)
def get_expected_reward(self, state, action):
"""获取期望奖励"""
expected_reward = 0
for next_state in self.states:
prob = self.get_transition_probability(state, action, next_state)
reward = self.get_reward(state, action, next_state)
expected_reward += prob * reward
return expected_reward
# 示例:网格世界 MDP
class GridWorldMDP:
"""网格世界 MDP"""
def __init__(self, width, height, goal, obstacles, slip_prob=0.1):
self.width = width
self.height = height
self.goal = goal
self.obstacles = obstacles
self.slip_prob = slip_prob
# 状态和动作
self.states = [(x, y) for x in range(width) for y in range(height)]
self.actions = ['up', 'down', 'left', 'right']
def transition_model(self, state, action, next_state):
"""转移模型"""
if state == self.goal:
return 1.0 if next_state == state else 0.0
# 计算期望的下一个状态
expected_next = self._get_expected_next_state(state, action)
# 滑动概率
if next_state == expected_next:
return 1 - self.slip_prob
elif self._is_slippery_move(state, action, next_state):
return self.slip_prob / 3 # 滑动到其他三个方向
else:
return 0.0
def reward_function(self, state, action, next_state):
"""奖励函数"""
if next_state == self.goal:
return 100 # 到达目标
elif next_state in self.obstacles:
return -100 # 碰到障碍物
else:
return -1 # 每步代价
def _get_expected_next_state(self, state, action):
"""获取期望的下一个状态"""
x, y = state
if action == 'up':
return (x, min(y + 1, self.height - 1))
elif action == 'down':
return (x, max(y - 1, 0))
elif action == 'left':
return (max(x - 1, 0), y)
elif action == 'right':
return (min(x + 1, self.width - 1), y)
def _is_slippery_move(self, state, action, next_state):
"""检查是否是滑动移动"""
# 实现滑动逻辑
return False
1.2 MDP 策略¶
class Policy:
"""策略"""
def __init__(self, mdp):
self.mdp = mdp
self.policy = {} # 状态 -> 动作
def get_action(self, state):
"""获取动作"""
return self.policy.get(state, None)
def set_action(self, state, action):
"""设置动作"""
self.policy[state] = action
def evaluate(self, num_iterations=100):
"""评估策略"""
# 初始化值函数
V = {s: 0 for s in self.mdp.states}
for _ in range(num_iterations):
V_new = V.copy()
for state in self.mdp.states:
action = self.get_action(state)
if action is None:
continue
# 计算值函数
value = 0
for next_state in self.mdp.states:
prob = self.mdp.get_transition_probability(state, action, next_state)
reward = self.mdp.get_reward(state, action, next_state)
value += prob * (reward + self.mdp.discount_factor * V[next_state])
V_new[state] = value
V = V_new
return V
2. 动态规划算法¶
2.1 值迭代¶
值迭代算法通过迭代更新值函数来找到最优策略。
值迭代算法:
1. 初始化 V(s) = 0 对所有 s
2. 重复直到收敛:
对每个状态 s:
V(s) ← max_a Σ_{s'} T(s,a,s') [R(s,a,s') + γ V(s')]
3. 提取策略:
π(s) ← argmax_a Σ_{s'} T(s,a,s') [R(s,a,s') + γ V(s')]
def value_iteration(mdp, epsilon=0.01, max_iterations=1000):
"""
值迭代算法
参数:
mdp: MDP 问题
epsilon: 收敛阈值
max_iterations: 最大迭代次数
返回:
V: 最优值函数
policy: 最优策略
"""
# 初始化值函数
V = {s: 0 for s in mdp.states}
for iteration in range(max_iterations):
delta = 0
V_new = V.copy()
for state in mdp.states:
# 计算每个动作的 Q 值
q_values = []
for action in mdp.actions:
q = sum(
mdp.get_transition_probability(state, action, next_state) *
(mdp.get_reward(state, action, next_state) +
mdp.discount_factor * V[next_state])
for next_state in mdp.states
)
q_values.append(q)
# 更新值函数
V_new[state] = max(q_values)
delta = max(delta, abs(V_new[state] - V[state]))
V = V_new
# 检查收敛
if delta < epsilon:
print(f"值迭代收敛于第 {iteration + 1} 次迭代")
break
# 提取策略
policy = {}
for state in mdp.states:
q_values = []
for action in mdp.actions:
q = sum(
mdp.get_transition_probability(state, action, next_state) *
(mdp.get_reward(state, action, next_state) +
mdp.discount_factor * V[next_state])
for next_state in mdp.states
)
q_values.append((q, action))
policy[state] = max(q_values)[1]
return V, policy
# 示例
mdp = GridWorldMDP(5, 5, (4, 4), [(1, 1), (2, 2)])
V, policy = value_iteration(mdp)
print("最优值函数:")
for y in range(4, -1, -1):
for x in range(5):
print(f"{V[(x, y)]:8.2f}", end=" ")
print()
2.2 策略迭代¶
策略迭代算法通过交替进行策略评估和策略改进来找到最优策略。
策略迭代算法:
1. 初始化随机策略 π
2. 重复直到策略不变:
a. 策略评估:计算 V^π
b. 策略改进:π(s) ← argmax_a Σ_{s'} T(s,a,s') [R(s,a,s') + γ V^π(s')]
def policy_iteration(mdp, max_iterations=100):
"""
策略迭代算法
参数:
mdp: MDP 问题
max_iterations: 最大迭代次数
返回:
V: 最优值函数
policy: 最优策略
"""
# 初始化随机策略
policy = {s: np.random.choice(mdp.actions) for s in mdp.states}
for iteration in range(max_iterations):
# 策略评估
V = evaluate_policy(mdp, policy)
# 策略改进
policy_stable = True
new_policy = {}
for state in mdp.states:
# 计算当前策略的动作
old_action = policy[state]
# 计算每个动作的 Q 值
q_values = []
for action in mdp.actions:
q = sum(
mdp.get_transition_probability(state, action, next_state) *
(mdp.get_reward(state, action, next_state) +
mdp.discount_factor * V[next_state])
for next_state in mdp.states
)
q_values.append((q, action))
# 选择最优动作
new_action = max(q_values)[1]
new_policy[state] = new_action
# 检查策略是否改变
if new_action != old_action:
policy_stable = False
policy = new_policy
# 检查收敛
if policy_stable:
print(f"策略迭代收敛于第 {iteration + 1} 次迭代")
break
return V, policy
def evaluate_policy(mdp, policy, num_iterations=100):
"""评估策略"""
V = {s: 0 for s in mdp.states}
for _ in range(num_iterations):
V_new = V.copy()
for state in mdp.states:
action = policy[state]
value = sum(
mdp.get_transition_probability(state, action, next_state) *
(mdp.get_reward(state, action, next_state) +
mdp.discount_factor * V[next_state])
for next_state in mdp.states
)
V_new[state] = value
V = V_new
return V
# 示例
V, policy = policy_iteration(mdp)
print("\n最优策略:")
for y in range(4, -1, -1):
for x in range(5):
action = policy.get((x, y), ' ')
print(f"{action:>6}", end=" ")
print()
2.3 广义策略迭代¶
def generalized_policy_iteration(mdp, eval_iterations=10, max_iterations=100):
"""
广义策略迭代
参数:
mdp: MDP 问题
eval_iterations: 每次策略评估的迭代次数
max_iterations: 最大迭代次数
返回:
V: 值函数
policy: 策略
"""
# 初始化
V = {s: 0 for s in mdp.states}
policy = {s: np.random.choice(mdp.actions) for s in mdp.states}
for iteration in range(max_iterations):
# 策略评估(部分)
for _ in range(eval_iterations):
V_new = V.copy()
for state in mdp.states:
action = policy[state]
value = sum(
mdp.get_transition_probability(state, action, next_state) *
(mdp.get_reward(state, action, next_state) +
mdp.discount_factor * V[next_state])
for next_state in mdp.states
)
V_new[state] = value
V = V_new
# 策略改进
policy_stable = True
new_policy = {}
for state in mdp.states:
old_action = policy[state]
q_values = []
for action in mdp.actions:
q = sum(
mdp.get_transition_probability(state, action, next_state) *
(mdp.get_reward(state, action, next_state) +
mdp.discount_factor * V[next_state])
for next_state in mdp.states
)
q_values.append((q, action))
new_action = max(q_values)[1]
new_policy[state] = new_action
if new_action != old_action:
policy_stable = False
policy = new_policy
if policy_stable:
print(f"广义策略迭代收敛于第 {iteration + 1} 次迭代")
break
return V, policy
3. 部分可观察 MDP(POMDP)¶
3.1 POMDP 定义¶
POMDP 是一个七元组 (S, A, T, R, γ, Ω, O):
- S: 状态集合
- A: 动作集合
- T: 状态转移函数 T(s, a, s') = P(s' | s, a)
- R: 奖励函数 R(s, a, s')
- γ: 折扣因子
- Ω: 观察集合
- O: 观察函数 O(a, s', o) = P(o | a, s')
与 MDP 的区别:
- 智能体不能直接观察状态
- 需要通过观察来推断状态
- 使用信念状态(belief state)代替确定状态
class POMDP:
"""部分可观察 MDP"""
def __init__(self, states, actions, observations, transition_model,
reward_function, observation_model, discount_factor=0.9):
"""
初始化 POMDP
参数:
states: 状态列表
actions: 动作列表
observations: 观察列表
transition_model: 转移模型
reward_function: 奖励函数
observation_model: 观察模型
discount_factor: 折扣因子
"""
self.states = states
self.actions = actions
self.observations = observations
self.transition_model = transition_model
self.reward_function = reward_function
self.observation_model = observation_model
self.discount_factor = discount_factor
def get_observation_probability(self, action, next_state, observation):
"""获取观察概率"""
return self.observation_model(action, next_state, observation)
# 信念状态更新
def update_belief(pomdp, belief, action, observation):
"""
更新信念状态
参数:
pomdp: POMDP 问题
belief: 当前信念状态
action: 执行的动作
observation: 观察结果
返回:
new_belief: 更新后的信念状态
"""
new_belief = {}
for next_state in pomdp.states:
# 计算 P(s' | b, a, o)
prob = 0
for state in pomdp.states:
# P(s' | s, a)
trans_prob = pomdp.transition_model(state, action, next_state)
# P(o | a, s')
obs_prob = pomdp.observation_model(action, next_state, observation)
# b(s)
belief_prob = belief.get(state, 0)
prob += trans_prob * obs_prob * belief_prob
new_belief[next_state] = prob
# 归一化
total = sum(new_belief.values())
if total > 0:
for state in pomdp.states:
new_belief[state] /= total
return new_belief
3.2 POMDP 求解方法¶
def solve_pomdp_value_iteration(pomdp, num_iterations=100):
"""
POMDP 值迭代
参数:
pomdp: POMDP 问题
num_iterations: 迭代次数
返回:
value_function: 值函数
"""
# 初始化值函数(信念状态到值的映射)
value_function = {}
for iteration in range(num_iterations):
new_value_function = {}
# 对每个信念状态
for belief in generate_belief_states(pomdp):
# 计算每个动作的 Q 值
q_values = []
for action in pomdp.actions:
q = 0
# 对每个可能的观察
for observation in pomdp.observations:
# 计算观察概率
obs_prob = sum(
belief.get(state, 0) *
pomdp.observation_model(action, state, observation)
for state in pomdp.states
)
if obs_prob > 0:
# 更新信念状态
new_belief = update_belief(pomdp, belief, action, observation)
# 获取新信念状态的值
new_value = get_value(value_function, new_belief)
q += obs_prob * new_value
# 添加即时奖励
immediate_reward = sum(
belief.get(state, 0) *
pomdp.reward_function(state, action, state)
for state in pomdp.states
)
q = immediate_reward + pomdp.discount_factor * q
q_values.append(q)
# 更新值函数
new_value_function[belief_to_key(belief)] = max(q_values)
value_function = new_value_function
return value_function
def generate_belief_states(pomdp):
"""生成信念状态"""
# 简化:生成均匀分布的信念状态
beliefs = []
for state in pomdp.states:
belief = {s: 0 for s in pomdp.states}
belief[state] = 1.0
beliefs.append(belief)
return beliefs
def belief_to_key(belief):
"""将信念状态转换为键"""
return tuple(sorted(belief.items()))
def get_value(value_function, belief):
"""获取信念状态的值"""
key = belief_to_key(belief)
return value_function.get(key, 0)
4. 在线规划方法¶
4.1 蒙特卡洛树搜索(MCTS)¶
import random
import math
class MCTSNode:
"""MCTS 节点"""
def __init__(self, state, parent=None, action=None):
self.state = state
self.parent = parent
self.action = action
self.children = []
self.visits = 0
self.value = 0
def is_fully_expanded(self):
"""是否完全扩展"""
return len(self.children) > 0
def best_child(self, exploration_weight=1.0):
"""选择最佳子节点"""
choices = []
for child in self.children:
if child.visits == 0:
return child
# UCB1 公式
exploitation = child.value / child.visits
exploration = exploration_weight * math.sqrt(2 * math.log(self.visits) / child.visits)
choices.append((exploitation + exploration, child))
return max(choices)[1]
def mcts_search(initial_state, problem, num_iterations=1000):
"""
蒙特卡洛树搜索
参数:
initial_state: 初始状态
problem: 规划问题
num_iterations: 迭代次数
返回:
best_action: 最佳动作
"""
root = MCTSNode(initial_state)
for _ in range(num_iterations):
node = root
# 选择
while node.is_fully_expanded() and node.children:
node = node.best_child()
# 扩展
if not node.is_fully_expanded():
actions = problem.get_applicable_actions(node.state)
for action in actions:
if not any(child.action == action for child in node.children):
new_state = action.apply(node.state)
child = MCTSNode(new_state, parent=node, action=action)
node.children.append(child)
node = child
break
# 模拟
reward = simulate(node.state, problem)
# 回溯
while node is not None:
node.visits += 1
node.value += reward
node = node.parent
# 返回访问次数最多的动作
best_child = max(root.children, key=lambda c: c.visits)
return best_child.action
def simulate(state, problem, max_depth=100):
"""模拟"""
current_state = state
total_reward = 0
for _ in range(max_depth):
if problem.is_goal(current_state):
total_reward += 100
break
actions = problem.get_applicable_actions(current_state)
if not actions:
break
action = random.choice(actions)
current_state = action.apply(current_state)
total_reward -= 1 # 每步代价
return total_reward
4.2 在线规划算法¶
def online_planning(problem, num_iterations=1000):
"""
在线规划算法
参数:
problem: 规划问题
num_iterations: 每次规划的迭代次数
返回:
actions: 动作序列
"""
current_state = problem.initial_state
actions = []
while not problem.is_goal(current_state):
# 使用 MCTS 进行规划
action = mcts_search(current_state, problem, num_iterations)
# 执行动作
current_state = action.apply(current_state)
actions.append(action.name)
return actions
5. 实践练习¶
练习 1:实现值迭代¶
def value_iteration_exercise():
# 创建 MDP
mdp = GridWorldMDP(4, 4, (3, 3), [(1, 1)])
# 运行值迭代
V, policy = value_iteration(mdp)
print("最优值函数:")
for y in range(3, -1, -1):
for x in range(4):
print(f"{V[(x, y)]:8.2f}", end=" ")
print()
print("\n最优策略:")
for y in range(3, -1, -1):
for x in range(4):
action = policy.get((x, y), ' ')
print(f"{action:>6}", end=" ")
print()
练习 2:实现策略迭代¶
def policy_iteration_exercise():
# 创建 MDP
mdp = GridWorldMDP(4, 4, (3, 3), [(1, 1)])
# 运行策略迭代
V, policy = policy_iteration(mdp)
print("最优值函数:")
for y in range(3, -1, -1):
for x in range(4):
print(f"{V[(x, y)]:8.2f}", end=" ")
print()
练习 3:实现 POMDP¶
def pomdp_exercise():
# 创建 POMDP
states = [(0, 0), (0, 1), (1, 0), (1, 1)]
actions = ['up', 'down', 'left', 'right']
observations = ['north', 'south', 'east', 'west', 'wall']
# 简化的转移和观察模型
def transition_model(state, action, next_state):
# 简化:确定性转移
expected_next = {
'up': (state[0], min(state[1] + 1, 1)),
'down': (state[0], max(state[1] - 1, 0)),
'left': (max(state[0] - 1, 0), state[1]),
'right': (min(state[0] + 1, 1), state[1])
}
return 1.0 if next_state == expected_next.get(action, state) else 0.0
def observation_model(action, state, observation):
# 简化:根据方向观察
if observation == 'north' and state[1] == 1:
return 0.8
elif observation == 'south' and state[1] == 0:
return 0.8
elif observation == 'east' and state[0] == 1:
return 0.8
elif observation == 'west' and state[0] == 0:
return 0.8
elif observation == 'wall':
return 0.2
return 0.1
pomdp = POMDP(states, actions, observations, transition_model,
lambda s, a, s2: -1, observation_model)
# 初始信念状态
belief = {s: 0.25 for s in states}
# 更新信念状态
action = 'up'
observation = 'north'
new_belief = update_belief(pomdp, belief, action, observation)
print("初始信念状态:", belief)
print(f"执行动作 {action},观察 {observation}")
print("更新后信念状态:", new_belief)
6. 常见问题¶
1. MDP 模型未知¶
问题:转移概率和奖励函数未知
解决方案: - 使用强化学习 - 使用模型学习 - 使用在线规划
2. 状态空间太大¶
问题:状态空间太大,无法枚举
解决方案: - 使用函数近似 - 使用状态抽象 - 使用采样方法
3. 计算复杂度高¶
问题:值迭代和策略迭代计算量大
解决方案: - 使用异步更新 - 使用近似方法 - 使用并行计算