Skip to content

时间规划

时间规划是指在考虑时间约束的情况下进行规划。本章将介绍时间规划的核心概念和算法。

学习目标

完成本章后,你将能够:

  • 理解时间约束的表示方法
  • 掌握时间网络的概念
  • 了解时间规划算法
  • 应用时间规划解决实际问题

1. 时间表示

1.1 时间点和时间区间

时间表示:
- 时间点:t ∈ ℝ
- 时间区间:[start, end],其中 start ≤ end
- 持续时间:duration = end - start

时间约束:
- t₁ < t₂:时间点 t₁ 在 t₂ 之前
- t₁ ≤ t₂:时间点 t₁ 不晚于 t₂
- t₁ = t₂:时间点 t₁ 和 t₂ 同时
- duration = d:持续时间为 d
class TimeInterval:
    """时间区间"""

    def __init__(self, start, end):
        """
        初始化时间区间

        参数:
        start: 开始时间
        end: 结束时间
        """
        self.start = start
        self.end = end

        # 验证
        if start > end:
            raise ValueError("开始时间不能晚于结束时间")

    @property
    def duration(self):
        """持续时间"""
        return self.end - self.start

    def contains(self, time_point):
        """检查是否包含时间点"""
        return self.start <= time_point <= self.end

    def overlaps(self, other):
        """检查是否与另一个区间重叠"""
        return self.start < other.end and other.start < self.end

    def __repr__(self):
        return f"[{self.start}, {self.end}]"

# 示例
interval1 = TimeInterval(0, 10)
interval2 = TimeInterval(5, 15)

print(f"区间1: {interval1}")
print(f"区间2: {interval2}")
print(f"区间1持续时间: {interval1.duration}")
print(f"区间1和区间2重叠: {interval1.overlaps(interval2)}")

1.2 时间约束

class TemporalConstraint:
    """时间约束"""

    def __init__(self, type, time_points, value=None):
        """
        初始化时间约束

        参数:
        type: 约束类型 ('before', 'after', 'equal', 'duration')
        time_points: 时间点列表
        value: 约束值(对于 duration 类型)
        """
        self.type = type
        self.time_points = time_points
        self.value = value

    def is_satisfied(self, time_values):
        """检查约束是否满足"""
        if self.type == 'before':
            t1, t2 = self.time_points
            return time_values[t1] < time_values[t2]

        elif self.type == 'after':
            t1, t2 = self.time_points
            return time_values[t1] > time_values[t2]

        elif self.type == 'equal':
            t1, t2 = self.time_points
            return time_values[t1] == time_values[t2]

        elif self.type == 'duration':
            t1, t2 = self.time_points
            return time_values[t2] - time_values[t1] == self.value

        return False

    def __repr__(self):
        if self.type == 'before':
            return f"{self.time_points[0]} < {self.time_points[1]}"
        elif self.type == 'after':
            return f"{self.time_points[0]} > {self.time_points[1]}"
        elif self.type == 'equal':
            return f"{self.time_points[0]} = {self.time_points[1]}"
        elif self.type == 'duration':
            return f"{self.time_points[1]} - {self.time_points[0]} = {self.value}"
        return ""

1.3 时间网络

class TemporalNetwork:
    """时间网络"""

    def __init__(self):
        self.time_points = {}  # 名称 -> 时间点
        self.constraints = []  # 约束列表

    def add_time_point(self, name):
        """添加时间点"""
        self.time_points[name] = len(self.time_points)

    def add_constraint(self, constraint):
        """添加约束"""
        self.constraints.append(constraint)

    def is_consistent(self):
        """检查网络是否一致"""
        # 使用 Floyd-Warshall 算法检查一致性
        n = len(self.time_points)

        # 初始化距离矩阵
        dist = [[float('inf')] * n for _ in range(n)]
        for i in range(n):
            dist[i][i] = 0

        # 添加约束
        for constraint in self.constraints:
            if constraint.type == 'before':
                t1 = self.time_points[constraint.time_points[0]]
                t2 = self.time_points[constraint.time_points[1]]
                dist[t1][t2] = -1  # t1 < t2 意味着 t2 - t1 >= 1

            elif constraint.type == 'after':
                t1 = self.time_points[constraint.time_points[0]]
                t2 = self.time_points[constraint.time_points[1]]
                dist[t2][t1] = -1  # t1 > t2 意味着 t1 - t2 >= 1

            elif constraint.type == 'duration':
                t1 = self.time_points[constraint.time_points[0]]
                t2 = self.time_points[constraint.time_points[1]]
                d = constraint.value
                dist[t1][t2] = d
                dist[t2][t1] = -d

        # Floyd-Warshall
        for k in range(n):
            for i in range(n):
                for j in range(n):
                    if dist[i][k] + dist[k][j] < dist[i][j]:
                        dist[i][j] = dist[i][k] + dist[k][j]

        # 检查负环
        for i in range(n):
            if dist[i][i] < 0:
                return False

        return True

    def get_time_bounds(self):
        """获取时间边界"""
        n = len(self.time_points)

        # 初始化距离矩阵
        dist = [[float('inf')] * n for _ in range(n)]
        for i in range(n):
            dist[i][i] = 0

        # 添加约束
        for constraint in self.constraints:
            if constraint.type == 'before':
                t1 = self.time_points[constraint.time_points[0]]
                t2 = self.time_points[constraint.time_points[1]]
                dist[t1][t2] = -1

            elif constraint.type == 'after':
                t1 = self.time_points[constraint.time_points[0]]
                t2 = self.time_points[constraint.time_points[1]]
                dist[t2][t1] = -1

            elif constraint.type == 'duration':
                t1 = self.time_points[constraint.time_points[0]]
                t2 = self.time_points[constraint.time_points[1]]
                d = constraint.value
                dist[t1][t2] = d
                dist[t2][t1] = -d

        # Floyd-Warshall
        for k in range(n):
            for i in range(n):
                for j in range(n):
                    if dist[i][k] + dist[k][j] < dist[i][j]:
                        dist[i][j] = dist[i][k] + dist[k][j]

        # 提取边界
        bounds = {}
        for name, idx in self.time_points.items():
            lower = -dist[idx][0]  # 相对于第一个时间点
            upper = dist[0][idx]
            bounds[name] = (lower, upper)

        return bounds

# 示例
network = TemporalNetwork()
network.add_time_point('start')
network.add_time_point('action1_start')
network.add_time_point('action1_end')
network.add_time_point('action2_start')
network.add_time_point('action2_end')
network.add_time_point('goal')

# 添加约束
network.add_constraint(TemporalConstraint('before', ['start', 'action1_start']))
network.add_constraint(TemporalConstraint('duration', ['action1_start', 'action1_end'], 5))
network.add_constraint(TemporalConstraint('before', ['action1_end', 'action2_start']))
network.add_constraint(TemporalConstraint('duration', ['action2_start', 'action2_end'], 3))
network.add_constraint(TemporalConstraint('before', ['action2_end', 'goal']))

print("网络一致性:", network.is_consistent())
print("时间边界:", network.get_time_bounds())

2. 时间规划问题

2.1 时间动作表示

class TemporalAction:
    """时间动作"""

    def __init__(self, name, duration, conditions, effects):
        """
        初始化时间动作

        参数:
        name: 动作名称
        duration: 持续时间
        conditions: 条件(时间点 -> 条件)
        effects: 效果(时间点 -> 效果)
        """
        self.name = name
        self.duration = duration
        self.conditions = conditions  # {time_point: condition}
        self.effects = effects        # {time_point: effect}

    def is_applicable(self, state, time_point):
        """检查动作在时间点是否可应用"""
        if time_point not in self.conditions:
            return True

        condition = self.conditions[time_point]
        return condition(state)

    def apply(self, state, time_point):
        """在时间点应用效果"""
        if time_point not in self.effects:
            return state

        effect = self.effects[time_point]
        return effect(state)

# 示例:移动动作
def move_action(duration=5):
    """创建移动动作"""

    def precondition(state):
        # 检查是否可以移动
        return True

    def effect(state):
        # 更新位置
        return state

    return TemporalAction(
        name='move',
        duration=duration,
        conditions={'start': precondition},
        effects={'end': effect}
    )

2.2 时间规划问题定义

class TemporalPlanningProblem:
    """时间规划问题"""

    def __init__(self, initial_state, goal_state, actions, temporal_constraints):
        """
        初始化时间规划问题

        参数:
        initial_state: 初始状态
        goal_state: 目标状态
        actions: 时间动作列表
        temporal_constraints: 时间约束列表
        """
        self.initial_state = initial_state
        self.goal_state = goal_state
        self.actions = actions
        self.temporal_constraints = temporal_constraints

    def is_goal(self, state):
        """检查是否达到目标"""
        return state == self.goal_state

    def get_applicable_actions(self, state, time_point):
        """获取在时间点可应用的动作"""
        applicable = []
        for action in self.actions:
            if action.is_applicable(state, time_point):
                applicable.append(action)
        return applicable

3. 时间规划算法

3.1 时间 A* 搜索

def temporal_a_star(problem, heuristic=None):
    """
    时间 A* 搜索

    参数:
    problem: 时间规划问题
    heuristic: 启发函数

    返回:
    plan: 时间规划
    """
    import heapq

    # 初始化
    start_state = problem.initial_state
    start_time = 0

    # 优先队列:(f值, 状态, 时间, 规划)
    if heuristic:
        h = heuristic(start_state, problem.goal_state)
    else:
        h = 0

    frontier = [(h, start_state, start_time, [])]
    explored = set()

    while frontier:
        f, state, time, plan = heapq.heappop(frontier)

        # 检查是否达到目标
        if problem.is_goal(state):
            return plan

        # 检查是否已探索
        state_key = (state, time)
        if state_key in explored:
            continue

        explored.add(state_key)

        # 获取可应用的动作
        actions = problem.get_applicable_actions(state, time)

        for action in actions:
            # 计算新时间
            new_time = time + action.duration

            # 应用效果
            new_state = action.apply(state, 'end')

            # 计算新 f 值
            if heuristic:
                h = heuristic(new_state, problem.goal_state)
            else:
                h = 0

            new_f = new_time + h

            # 添加到队列
            new_plan = plan + [(action.name, time, new_time)]
            heapq.heappush(frontier, (new_f, new_state, new_time, new_plan))

    return None  # 无解

3.2 时间规划器

class TemporalPlanner:
    """时间规划器"""

    def __init__(self, problem):
        self.problem = problem
        self.plan = None
        self.schedule = None

    def plan(self):
        """生成时间规划"""
        # 使用时间 A* 搜索
        self.plan = temporal_a_star(self.problem)

        if self.plan:
            # 生成调度
            self.schedule = self._generate_schedule()

        return self.plan

    def _generate_schedule(self):
        """生成调度"""
        if not self.plan:
            return None

        schedule = {}
        for action_name, start_time, end_time in self.plan:
            schedule[action_name] = {
                'start': start_time,
                'end': end_time,
                'duration': end_time - start_time
            }

        return schedule

    def validate(self):
        """验证规划"""
        if not self.plan:
            return False

        # 检查时间约束
        for constraint in self.problem.temporal_constraints:
            if not constraint.is_satisfied(self.schedule):
                return False

        return True

    def visualize(self):
        """可视化规划"""
        if not self.schedule:
            print("无规划可显示")
            return

        print("时间规划:")
        print("-" * 50)

        for action_name, times in self.schedule.items():
            start = times['start']
            end = times['end']
            duration = times['duration']

            # 生成时间条
            bar_length = 30
            start_pos = int(start / 20 * bar_length)
            end_pos = int(end / 20 * bar_length)

            bar = [' '] * bar_length
            for i in range(start_pos, min(end_pos, bar_length)):
                bar[i] = '█'

            print(f"{action_name:>15}: |{''.join(bar)}| [{start:.1f}, {end:.1f}]")

        print("-" * 50)

4. 时间约束管理

4.1 约束传播

def constraint_propagation(network):
    """
    约束传播

    参数:
    network: 时间网络

    返回:
    propagated_network: 传播后的网络
    """
    # 使用 Floyd-Warshall 算法进行约束传播
    n = len(network.time_points)

    # 初始化距离矩阵
    dist = [[float('inf')] * n for _ in range(n)]
    for i in range(n):
        dist[i][i] = 0

    # 添加约束
    for constraint in network.constraints:
        if constraint.type == 'before':
            t1 = network.time_points[constraint.time_points[0]]
            t2 = network.time_points[constraint.time_points[1]]
            dist[t1][t2] = -1

        elif constraint.type == 'after':
            t1 = network.time_points[constraint.time_points[0]]
            t2 = network.time_points[constraint.time_points[1]]
            dist[t2][t1] = -1

        elif constraint.type == 'duration':
            t1 = network.time_points[constraint.time_points[0]]
            t2 = network.time_points[constraint.time_points[1]]
            d = constraint.value
            dist[t1][t2] = d
            dist[t2][t1] = -d

    # Floyd-Warshall
    for k in range(n):
        for i in range(n):
            for j in range(n):
                if dist[i][k] + dist[k][j] < dist[i][j]:
                    dist[i][j] = dist[i][k] + dist[k][j]

    # 创建传播后的网络
    propagated_network = TemporalNetwork()

    # 添加时间点
    for name in network.time_points:
        propagated_network.add_time_point(name)

    # 添加传播后的约束
    for i, name1 in enumerate(network.time_points):
        for j, name2 in enumerate(network.time_points):
            if i != j and dist[i][j] != float('inf'):
                if dist[i][j] < 0:
                    # name1 < name2
                    propagated_network.add_constraint(
                        TemporalConstraint('before', [name1, name2])
                    )
                elif dist[i][j] > 0:
                    # name1 > name2
                    propagated_network.add_constraint(
                        TemporalConstraint('after', [name1, name2])
                    )

    return propagated_network

4.2 时间窗口管理

class TemporalWindow:
    """时间窗口"""

    def __init__(self, start, end):
        """
        初始化时间窗口

        参数:
        start: 窗口开始时间
        end: 窗口结束时间
        """
        self.start = start
        self.end = end

    @property
    def duration(self):
        """窗口持续时间"""
        return self.end - self.start

    def contains(self, time_point):
        """检查是否包含时间点"""
        return self.start <= time_point <= self.end

    def intersect(self, other):
        """计算交集"""
        new_start = max(self.start, other.start)
        new_end = min(self.end, other.end)

        if new_start <= new_end:
            return TemporalWindow(new_start, new_end)
        return None

    def union(self, other):
        """计算并集"""
        new_start = min(self.start, other.start)
        new_end = max(self.end, other.end)
        return TemporalWindow(new_start, new_end)

    def __repr__(self):
        return f"[{self.start}, {self.end}]"

# 示例
window1 = TemporalWindow(0, 10)
window2 = TemporalWindow(5, 15)

print(f"窗口1: {window1}")
print(f"窗口2: {window2}")
print(f"交集: {window1.intersect(window2)}")
print(f"并集: {window1.union(window2)}")

5. 实践练习

练习 1:实现时间网络

def temporal_network_exercise():
    # 创建时间网络
    network = TemporalNetwork()

    # 添加时间点
    network.add_time_point('start')
    network.add_time_point('action1')
    network.add_time_point('action2')
    network.add_time_point('goal')

    # 添加约束
    network.add_constraint(TemporalConstraint('before', ['start', 'action1']))
    network.add_constraint(TemporalConstraint('before', ['action1', 'action2']))
    network.add_constraint(TemporalConstraint('before', ['action2', 'goal']))
    network.add_constraint(TemporalConstraint('duration', ['start', 'goal'], 20))

    # 检查一致性
    print("网络一致性:", network.is_consistent())

    # 获取时间边界
    bounds = network.get_time_bounds()
    print("时间边界:", bounds)

练习 2:实现时间规划器

def temporal_planner_exercise():
    # 创建时间动作
    action1 = move_action(duration=5)
    action2 = move_action(duration=3)

    # 创建规划问题
    problem = TemporalPlanningProblem(
        initial_state='start',
        goal_state='goal',
        actions=[action1, action2],
        temporal_constraints=[]
    )

    # 创建规划器
    planner = TemporalPlanner(problem)

    # 生成规划
    plan = planner.plan()

    if plan:
        print("时间规划:", plan)
        planner.visualize()
    else:
        print("无解")

练习 3:实现约束传播

def constraint_propagation_exercise():
    # 创建时间网络
    network = TemporalNetwork()
    network.add_time_point('A')
    network.add_time_point('B')
    network.add_time_point('C')

    network.add_constraint(TemporalConstraint('before', ['A', 'B']))
    network.add_constraint(TemporalConstraint('before', ['B', 'C']))

    # 约束传播
    propagated = constraint_propagation(network)

    print("传播后的约束:")
    for constraint in propagated.constraints:
        print(f"  {constraint}")

6. 常见问题

1. 时间约束冲突

问题:时间约束相互冲突

解决方案: - 使用约束传播检测冲突 - 放松某些约束 - 使用软约束

2. 计算复杂度高

问题:时间规划计算量大

解决方案: - 使用启发式搜索 - 使用分层规划 - 使用近似方法

3. 实时性要求

问题:需要在规定时间内完成规划

解决方案: - 使用在线规划 - 使用 anytime 算法 - 使用预计算

下一步

参考资源


← 返回索引 | 下一页:运动规划 →