跳转至

概率规划

概率规划是指在状态转移具有不确定性的环境中进行规划。本章将介绍马尔可夫决策过程(MDP)和相关的规划算法。

学习目标

完成本章后,你将能够:

  • 理解 MDP 的基本概念
  • 掌握值迭代和策略迭代算法
  • 理解部分可观察 MDP(POMDP)
  • 应用概率规划解决实际问题

1. 马尔可夫决策过程(MDP)

1.1 MDP 定义

MDP 是一个五元组 (S, A, T, R, γ):
- S: 状态集合
- A: 动作集合
- T: 状态转移函数 T(s, a, s') = P(s' | s, a)
- R: 奖励函数 R(s, a, s')
- γ: 折扣因子 (0 ≤ γ ≤ 1)

马尔可夫性质:
P(s_{t+1} | s_t, a_t, s_{t-1}, a_{t-1}, ...) = P(s_{t+1} | s_t, a_t)

即下一个状态只依赖于当前状态和动作,与历史无关
import numpy as np

class MDP:
    """马尔可夫决策过程"""

    def __init__(self, states, actions, transition_model, reward_function, discount_factor=0.9):
        """
        初始化 MDP

        参数:
        states: 状态列表
        actions: 动作列表
        transition_model: 转移模型 P(s'|s,a)
        reward_function: 奖励函数 R(s,a,s')
        discount_factor: 折扣因子
        """
        self.states = states
        self.actions = actions
        self.transition_model = transition_model
        self.reward_function = reward_function
        self.discount_factor = discount_factor

    def get_transition_probability(self, state, action, next_state):
        """获取转移概率"""
        return self.transition_model(state, action, next_state)

    def get_reward(self, state, action, next_state):
        """获取奖励"""
        return self.reward_function(state, action, next_state)

    def get_expected_reward(self, state, action):
        """获取期望奖励"""
        expected_reward = 0
        for next_state in self.states:
            prob = self.get_transition_probability(state, action, next_state)
            reward = self.get_reward(state, action, next_state)
            expected_reward += prob * reward
        return expected_reward

# 示例:网格世界 MDP
class GridWorldMDP:
    """网格世界 MDP"""

    def __init__(self, width, height, goal, obstacles, slip_prob=0.1):
        self.width = width
        self.height = height
        self.goal = goal
        self.obstacles = obstacles
        self.slip_prob = slip_prob

        # 状态和动作
        self.states = [(x, y) for x in range(width) for y in range(height)]
        self.actions = ['up', 'down', 'left', 'right']

    def transition_model(self, state, action, next_state):
        """转移模型"""
        if state == self.goal:
            return 1.0 if next_state == state else 0.0

        # 计算期望的下一个状态
        expected_next = self._get_expected_next_state(state, action)

        # 滑动概率
        if next_state == expected_next:
            return 1 - self.slip_prob
        elif self._is_slippery_move(state, action, next_state):
            return self.slip_prob / 3  # 滑动到其他三个方向
        else:
            return 0.0

    def reward_function(self, state, action, next_state):
        """奖励函数"""
        if next_state == self.goal:
            return 100  # 到达目标
        elif next_state in self.obstacles:
            return -100  # 碰到障碍物
        else:
            return -1  # 每步代价

    def _get_expected_next_state(self, state, action):
        """获取期望的下一个状态"""
        x, y = state

        if action == 'up':
            return (x, min(y + 1, self.height - 1))
        elif action == 'down':
            return (x, max(y - 1, 0))
        elif action == 'left':
            return (max(x - 1, 0), y)
        elif action == 'right':
            return (min(x + 1, self.width - 1), y)

    def _is_slippery_move(self, state, action, next_state):
        """检查是否是滑动移动"""
        # 实现滑动逻辑
        return False

1.2 MDP 策略

class Policy:
    """策略"""

    def __init__(self, mdp):
        self.mdp = mdp
        self.policy = {}  # 状态 -> 动作

    def get_action(self, state):
        """获取动作"""
        return self.policy.get(state, None)

    def set_action(self, state, action):
        """设置动作"""
        self.policy[state] = action

    def evaluate(self, num_iterations=100):
        """评估策略"""
        # 初始化值函数
        V = {s: 0 for s in self.mdp.states}

        for _ in range(num_iterations):
            V_new = V.copy()

            for state in self.mdp.states:
                action = self.get_action(state)

                if action is None:
                    continue

                # 计算值函数
                value = 0
                for next_state in self.mdp.states:
                    prob = self.mdp.get_transition_probability(state, action, next_state)
                    reward = self.mdp.get_reward(state, action, next_state)
                    value += prob * (reward + self.mdp.discount_factor * V[next_state])

                V_new[state] = value

            V = V_new

        return V

2. 动态规划算法

2.1 值迭代

值迭代算法通过迭代更新值函数来找到最优策略。

值迭代算法:
1. 初始化 V(s) = 0 对所有 s
2. 重复直到收敛:
   对每个状态 s:
     V(s) ← max_a Σ_{s'} T(s,a,s') [R(s,a,s') + γ V(s')]
3. 提取策略:
   π(s) ← argmax_a Σ_{s'} T(s,a,s') [R(s,a,s') + γ V(s')]
def value_iteration(mdp, epsilon=0.01, max_iterations=1000):
    """
    值迭代算法

    参数:
    mdp: MDP 问题
    epsilon: 收敛阈值
    max_iterations: 最大迭代次数

    返回:
    V: 最优值函数
    policy: 最优策略
    """
    # 初始化值函数
    V = {s: 0 for s in mdp.states}

    for iteration in range(max_iterations):
        delta = 0
        V_new = V.copy()

        for state in mdp.states:
            # 计算每个动作的 Q 值
            q_values = []
            for action in mdp.actions:
                q = sum(
                    mdp.get_transition_probability(state, action, next_state) *
                    (mdp.get_reward(state, action, next_state) + 
                     mdp.discount_factor * V[next_state])
                    for next_state in mdp.states
                )
                q_values.append(q)

            # 更新值函数
            V_new[state] = max(q_values)
            delta = max(delta, abs(V_new[state] - V[state]))

        V = V_new

        # 检查收敛
        if delta < epsilon:
            print(f"值迭代收敛于第 {iteration + 1} 次迭代")
            break

    # 提取策略
    policy = {}
    for state in mdp.states:
        q_values = []
        for action in mdp.actions:
            q = sum(
                mdp.get_transition_probability(state, action, next_state) *
                (mdp.get_reward(state, action, next_state) + 
                 mdp.discount_factor * V[next_state])
                for next_state in mdp.states
            )
            q_values.append((q, action))
        policy[state] = max(q_values)[1]

    return V, policy

# 示例
mdp = GridWorldMDP(5, 5, (4, 4), [(1, 1), (2, 2)])
V, policy = value_iteration(mdp)

print("最优值函数:")
for y in range(4, -1, -1):
    for x in range(5):
        print(f"{V[(x, y)]:8.2f}", end=" ")
    print()

2.2 策略迭代

策略迭代算法通过交替进行策略评估和策略改进来找到最优策略。

策略迭代算法:
1. 初始化随机策略 π
2. 重复直到策略不变:
   a. 策略评估:计算 V^π
   b. 策略改进:π(s) ← argmax_a Σ_{s'} T(s,a,s') [R(s,a,s') + γ V^π(s')]
def policy_iteration(mdp, max_iterations=100):
    """
    策略迭代算法

    参数:
    mdp: MDP 问题
    max_iterations: 最大迭代次数

    返回:
    V: 最优值函数
    policy: 最优策略
    """
    # 初始化随机策略
    policy = {s: np.random.choice(mdp.actions) for s in mdp.states}

    for iteration in range(max_iterations):
        # 策略评估
        V = evaluate_policy(mdp, policy)

        # 策略改进
        policy_stable = True
        new_policy = {}

        for state in mdp.states:
            # 计算当前策略的动作
            old_action = policy[state]

            # 计算每个动作的 Q 值
            q_values = []
            for action in mdp.actions:
                q = sum(
                    mdp.get_transition_probability(state, action, next_state) *
                    (mdp.get_reward(state, action, next_state) + 
                     mdp.discount_factor * V[next_state])
                    for next_state in mdp.states
                )
                q_values.append((q, action))

            # 选择最优动作
            new_action = max(q_values)[1]
            new_policy[state] = new_action

            # 检查策略是否改变
            if new_action != old_action:
                policy_stable = False

        policy = new_policy

        # 检查收敛
        if policy_stable:
            print(f"策略迭代收敛于第 {iteration + 1} 次迭代")
            break

    return V, policy

def evaluate_policy(mdp, policy, num_iterations=100):
    """评估策略"""
    V = {s: 0 for s in mdp.states}

    for _ in range(num_iterations):
        V_new = V.copy()

        for state in mdp.states:
            action = policy[state]

            value = sum(
                mdp.get_transition_probability(state, action, next_state) *
                (mdp.get_reward(state, action, next_state) + 
                 mdp.discount_factor * V[next_state])
                for next_state in mdp.states
            )

            V_new[state] = value

        V = V_new

    return V

# 示例
V, policy = policy_iteration(mdp)

print("\n最优策略:")
for y in range(4, -1, -1):
    for x in range(5):
        action = policy.get((x, y), ' ')
        print(f"{action:>6}", end=" ")
    print()

2.3 广义策略迭代

def generalized_policy_iteration(mdp, eval_iterations=10, max_iterations=100):
    """
    广义策略迭代

    参数:
    mdp: MDP 问题
    eval_iterations: 每次策略评估的迭代次数
    max_iterations: 最大迭代次数

    返回:
    V: 值函数
    policy: 策略
    """
    # 初始化
    V = {s: 0 for s in mdp.states}
    policy = {s: np.random.choice(mdp.actions) for s in mdp.states}

    for iteration in range(max_iterations):
        # 策略评估(部分)
        for _ in range(eval_iterations):
            V_new = V.copy()

            for state in mdp.states:
                action = policy[state]

                value = sum(
                    mdp.get_transition_probability(state, action, next_state) *
                    (mdp.get_reward(state, action, next_state) + 
                     mdp.discount_factor * V[next_state])
                    for next_state in mdp.states
                )

                V_new[state] = value

            V = V_new

        # 策略改进
        policy_stable = True
        new_policy = {}

        for state in mdp.states:
            old_action = policy[state]

            q_values = []
            for action in mdp.actions:
                q = sum(
                    mdp.get_transition_probability(state, action, next_state) *
                    (mdp.get_reward(state, action, next_state) + 
                     mdp.discount_factor * V[next_state])
                    for next_state in mdp.states
                )
                q_values.append((q, action))

            new_action = max(q_values)[1]
            new_policy[state] = new_action

            if new_action != old_action:
                policy_stable = False

        policy = new_policy

        if policy_stable:
            print(f"广义策略迭代收敛于第 {iteration + 1} 次迭代")
            break

    return V, policy

3. 部分可观察 MDP(POMDP)

3.1 POMDP 定义

POMDP 是一个七元组 (S, A, T, R, γ, Ω, O):
- S: 状态集合
- A: 动作集合
- T: 状态转移函数 T(s, a, s') = P(s' | s, a)
- R: 奖励函数 R(s, a, s')
- γ: 折扣因子
- Ω: 观察集合
- O: 观察函数 O(a, s', o) = P(o | a, s')

与 MDP 的区别:
- 智能体不能直接观察状态
- 需要通过观察来推断状态
- 使用信念状态(belief state)代替确定状态
class POMDP:
    """部分可观察 MDP"""

    def __init__(self, states, actions, observations, transition_model, 
                 reward_function, observation_model, discount_factor=0.9):
        """
        初始化 POMDP

        参数:
        states: 状态列表
        actions: 动作列表
        observations: 观察列表
        transition_model: 转移模型
        reward_function: 奖励函数
        observation_model: 观察模型
        discount_factor: 折扣因子
        """
        self.states = states
        self.actions = actions
        self.observations = observations
        self.transition_model = transition_model
        self.reward_function = reward_function
        self.observation_model = observation_model
        self.discount_factor = discount_factor

    def get_observation_probability(self, action, next_state, observation):
        """获取观察概率"""
        return self.observation_model(action, next_state, observation)

# 信念状态更新
def update_belief(pomdp, belief, action, observation):
    """
    更新信念状态

    参数:
    pomdp: POMDP 问题
    belief: 当前信念状态
    action: 执行的动作
    observation: 观察结果

    返回:
    new_belief: 更新后的信念状态
    """
    new_belief = {}

    for next_state in pomdp.states:
        # 计算 P(s' | b, a, o)
        prob = 0

        for state in pomdp.states:
            # P(s' | s, a)
            trans_prob = pomdp.transition_model(state, action, next_state)

            # P(o | a, s')
            obs_prob = pomdp.observation_model(action, next_state, observation)

            # b(s)
            belief_prob = belief.get(state, 0)

            prob += trans_prob * obs_prob * belief_prob

        new_belief[next_state] = prob

    # 归一化
    total = sum(new_belief.values())
    if total > 0:
        for state in pomdp.states:
            new_belief[state] /= total

    return new_belief

3.2 POMDP 求解方法

def solve_pomdp_value_iteration(pomdp, num_iterations=100):
    """
    POMDP 值迭代

    参数:
    pomdp: POMDP 问题
    num_iterations: 迭代次数

    返回:
    value_function: 值函数
    """
    # 初始化值函数(信念状态到值的映射)
    value_function = {}

    for iteration in range(num_iterations):
        new_value_function = {}

        # 对每个信念状态
        for belief in generate_belief_states(pomdp):
            # 计算每个动作的 Q 值
            q_values = []

            for action in pomdp.actions:
                q = 0

                # 对每个可能的观察
                for observation in pomdp.observations:
                    # 计算观察概率
                    obs_prob = sum(
                        belief.get(state, 0) * 
                        pomdp.observation_model(action, state, observation)
                        for state in pomdp.states
                    )

                    if obs_prob > 0:
                        # 更新信念状态
                        new_belief = update_belief(pomdp, belief, action, observation)

                        # 获取新信念状态的值
                        new_value = get_value(value_function, new_belief)

                        q += obs_prob * new_value

                # 添加即时奖励
                immediate_reward = sum(
                    belief.get(state, 0) * 
                    pomdp.reward_function(state, action, state)
                    for state in pomdp.states
                )

                q = immediate_reward + pomdp.discount_factor * q
                q_values.append(q)

            # 更新值函数
            new_value_function[belief_to_key(belief)] = max(q_values)

        value_function = new_value_function

    return value_function

def generate_belief_states(pomdp):
    """生成信念状态"""
    # 简化:生成均匀分布的信念状态
    beliefs = []

    for state in pomdp.states:
        belief = {s: 0 for s in pomdp.states}
        belief[state] = 1.0
        beliefs.append(belief)

    return beliefs

def belief_to_key(belief):
    """将信念状态转换为键"""
    return tuple(sorted(belief.items()))

def get_value(value_function, belief):
    """获取信念状态的值"""
    key = belief_to_key(belief)
    return value_function.get(key, 0)

4. 在线规划方法

4.1 蒙特卡洛树搜索(MCTS)

import random
import math

class MCTSNode:
    """MCTS 节点"""

    def __init__(self, state, parent=None, action=None):
        self.state = state
        self.parent = parent
        self.action = action
        self.children = []
        self.visits = 0
        self.value = 0

    def is_fully_expanded(self):
        """是否完全扩展"""
        return len(self.children) > 0

    def best_child(self, exploration_weight=1.0):
        """选择最佳子节点"""
        choices = []
        for child in self.children:
            if child.visits == 0:
                return child

            # UCB1 公式
            exploitation = child.value / child.visits
            exploration = exploration_weight * math.sqrt(2 * math.log(self.visits) / child.visits)
            choices.append((exploitation + exploration, child))

        return max(choices)[1]

def mcts_search(initial_state, problem, num_iterations=1000):
    """
    蒙特卡洛树搜索

    参数:
    initial_state: 初始状态
    problem: 规划问题
    num_iterations: 迭代次数

    返回:
    best_action: 最佳动作
    """
    root = MCTSNode(initial_state)

    for _ in range(num_iterations):
        node = root

        # 选择
        while node.is_fully_expanded() and node.children:
            node = node.best_child()

        # 扩展
        if not node.is_fully_expanded():
            actions = problem.get_applicable_actions(node.state)
            for action in actions:
                if not any(child.action == action for child in node.children):
                    new_state = action.apply(node.state)
                    child = MCTSNode(new_state, parent=node, action=action)
                    node.children.append(child)
                    node = child
                    break

        # 模拟
        reward = simulate(node.state, problem)

        # 回溯
        while node is not None:
            node.visits += 1
            node.value += reward
            node = node.parent

    # 返回访问次数最多的动作
    best_child = max(root.children, key=lambda c: c.visits)
    return best_child.action

def simulate(state, problem, max_depth=100):
    """模拟"""
    current_state = state
    total_reward = 0

    for _ in range(max_depth):
        if problem.is_goal(current_state):
            total_reward += 100
            break

        actions = problem.get_applicable_actions(current_state)
        if not actions:
            break

        action = random.choice(actions)
        current_state = action.apply(current_state)
        total_reward -= 1  # 每步代价

    return total_reward

4.2 在线规划算法

def online_planning(problem, num_iterations=1000):
    """
    在线规划算法

    参数:
    problem: 规划问题
    num_iterations: 每次规划的迭代次数

    返回:
    actions: 动作序列
    """
    current_state = problem.initial_state
    actions = []

    while not problem.is_goal(current_state):
        # 使用 MCTS 进行规划
        action = mcts_search(current_state, problem, num_iterations)

        # 执行动作
        current_state = action.apply(current_state)
        actions.append(action.name)

    return actions

5. 实践练习

练习 1:实现值迭代

def value_iteration_exercise():
    # 创建 MDP
    mdp = GridWorldMDP(4, 4, (3, 3), [(1, 1)])

    # 运行值迭代
    V, policy = value_iteration(mdp)

    print("最优值函数:")
    for y in range(3, -1, -1):
        for x in range(4):
            print(f"{V[(x, y)]:8.2f}", end=" ")
        print()

    print("\n最优策略:")
    for y in range(3, -1, -1):
        for x in range(4):
            action = policy.get((x, y), ' ')
            print(f"{action:>6}", end=" ")
        print()

练习 2:实现策略迭代

def policy_iteration_exercise():
    # 创建 MDP
    mdp = GridWorldMDP(4, 4, (3, 3), [(1, 1)])

    # 运行策略迭代
    V, policy = policy_iteration(mdp)

    print("最优值函数:")
    for y in range(3, -1, -1):
        for x in range(4):
            print(f"{V[(x, y)]:8.2f}", end=" ")
        print()

练习 3:实现 POMDP

def pomdp_exercise():
    # 创建 POMDP
    states = [(0, 0), (0, 1), (1, 0), (1, 1)]
    actions = ['up', 'down', 'left', 'right']
    observations = ['north', 'south', 'east', 'west', 'wall']

    # 简化的转移和观察模型
    def transition_model(state, action, next_state):
        # 简化:确定性转移
        expected_next = {
            'up': (state[0], min(state[1] + 1, 1)),
            'down': (state[0], max(state[1] - 1, 0)),
            'left': (max(state[0] - 1, 0), state[1]),
            'right': (min(state[0] + 1, 1), state[1])
        }
        return 1.0 if next_state == expected_next.get(action, state) else 0.0

    def observation_model(action, state, observation):
        # 简化:根据方向观察
        if observation == 'north' and state[1] == 1:
            return 0.8
        elif observation == 'south' and state[1] == 0:
            return 0.8
        elif observation == 'east' and state[0] == 1:
            return 0.8
        elif observation == 'west' and state[0] == 0:
            return 0.8
        elif observation == 'wall':
            return 0.2
        return 0.1

    pomdp = POMDP(states, actions, observations, transition_model,
                  lambda s, a, s2: -1, observation_model)

    # 初始信念状态
    belief = {s: 0.25 for s in states}

    # 更新信念状态
    action = 'up'
    observation = 'north'
    new_belief = update_belief(pomdp, belief, action, observation)

    print("初始信念状态:", belief)
    print(f"执行动作 {action},观察 {observation}")
    print("更新后信念状态:", new_belief)

6. 常见问题

1. MDP 模型未知

问题:转移概率和奖励函数未知

解决方案: - 使用强化学习 - 使用模型学习 - 使用在线规划

2. 状态空间太大

问题:状态空间太大,无法枚举

解决方案: - 使用函数近似 - 使用状态抽象 - 使用采样方法

3. 计算复杂度高

问题:值迭代和策略迭代计算量大

解决方案: - 使用异步更新 - 使用近似方法 - 使用并行计算

下一步

参考资源


← 返回索引 | 下一页:强化学习 →