时间规划¶
时间规划是指在考虑时间约束的情况下进行规划。本章将介绍时间规划的核心概念和算法。
学习目标¶
完成本章后,你将能够:
- 理解时间约束的表示方法
- 掌握时间网络的概念
- 了解时间规划算法
- 应用时间规划解决实际问题
1. 时间表示¶
1.1 时间点和时间区间¶
时间表示:
- 时间点:t ∈ ℝ
- 时间区间:[start, end],其中 start ≤ end
- 持续时间:duration = end - start
时间约束:
- t₁ < t₂:时间点 t₁ 在 t₂ 之前
- t₁ ≤ t₂:时间点 t₁ 不晚于 t₂
- t₁ = t₂:时间点 t₁ 和 t₂ 同时
- duration = d:持续时间为 d
class TimeInterval:
"""时间区间"""
def __init__(self, start, end):
"""
初始化时间区间
参数:
start: 开始时间
end: 结束时间
"""
self.start = start
self.end = end
# 验证
if start > end:
raise ValueError("开始时间不能晚于结束时间")
@property
def duration(self):
"""持续时间"""
return self.end - self.start
def contains(self, time_point):
"""检查是否包含时间点"""
return self.start <= time_point <= self.end
def overlaps(self, other):
"""检查是否与另一个区间重叠"""
return self.start < other.end and other.start < self.end
def __repr__(self):
return f"[{self.start}, {self.end}]"
# 示例
interval1 = TimeInterval(0, 10)
interval2 = TimeInterval(5, 15)
print(f"区间1: {interval1}")
print(f"区间2: {interval2}")
print(f"区间1持续时间: {interval1.duration}")
print(f"区间1和区间2重叠: {interval1.overlaps(interval2)}")
1.2 时间约束¶
class TemporalConstraint:
"""时间约束"""
def __init__(self, type, time_points, value=None):
"""
初始化时间约束
参数:
type: 约束类型 ('before', 'after', 'equal', 'duration')
time_points: 时间点列表
value: 约束值(对于 duration 类型)
"""
self.type = type
self.time_points = time_points
self.value = value
def is_satisfied(self, time_values):
"""检查约束是否满足"""
if self.type == 'before':
t1, t2 = self.time_points
return time_values[t1] < time_values[t2]
elif self.type == 'after':
t1, t2 = self.time_points
return time_values[t1] > time_values[t2]
elif self.type == 'equal':
t1, t2 = self.time_points
return time_values[t1] == time_values[t2]
elif self.type == 'duration':
t1, t2 = self.time_points
return time_values[t2] - time_values[t1] == self.value
return False
def __repr__(self):
if self.type == 'before':
return f"{self.time_points[0]} < {self.time_points[1]}"
elif self.type == 'after':
return f"{self.time_points[0]} > {self.time_points[1]}"
elif self.type == 'equal':
return f"{self.time_points[0]} = {self.time_points[1]}"
elif self.type == 'duration':
return f"{self.time_points[1]} - {self.time_points[0]} = {self.value}"
return ""
1.3 时间网络¶
class TemporalNetwork:
"""时间网络"""
def __init__(self):
self.time_points = {} # 名称 -> 时间点
self.constraints = [] # 约束列表
def add_time_point(self, name):
"""添加时间点"""
self.time_points[name] = len(self.time_points)
def add_constraint(self, constraint):
"""添加约束"""
self.constraints.append(constraint)
def is_consistent(self):
"""检查网络是否一致"""
# 使用 Floyd-Warshall 算法检查一致性
n = len(self.time_points)
# 初始化距离矩阵
dist = [[float('inf')] * n for _ in range(n)]
for i in range(n):
dist[i][i] = 0
# 添加约束
for constraint in self.constraints:
if constraint.type == 'before':
t1 = self.time_points[constraint.time_points[0]]
t2 = self.time_points[constraint.time_points[1]]
dist[t1][t2] = -1 # t1 < t2 意味着 t2 - t1 >= 1
elif constraint.type == 'after':
t1 = self.time_points[constraint.time_points[0]]
t2 = self.time_points[constraint.time_points[1]]
dist[t2][t1] = -1 # t1 > t2 意味着 t1 - t2 >= 1
elif constraint.type == 'duration':
t1 = self.time_points[constraint.time_points[0]]
t2 = self.time_points[constraint.time_points[1]]
d = constraint.value
dist[t1][t2] = d
dist[t2][t1] = -d
# Floyd-Warshall
for k in range(n):
for i in range(n):
for j in range(n):
if dist[i][k] + dist[k][j] < dist[i][j]:
dist[i][j] = dist[i][k] + dist[k][j]
# 检查负环
for i in range(n):
if dist[i][i] < 0:
return False
return True
def get_time_bounds(self):
"""获取时间边界"""
n = len(self.time_points)
# 初始化距离矩阵
dist = [[float('inf')] * n for _ in range(n)]
for i in range(n):
dist[i][i] = 0
# 添加约束
for constraint in self.constraints:
if constraint.type == 'before':
t1 = self.time_points[constraint.time_points[0]]
t2 = self.time_points[constraint.time_points[1]]
dist[t1][t2] = -1
elif constraint.type == 'after':
t1 = self.time_points[constraint.time_points[0]]
t2 = self.time_points[constraint.time_points[1]]
dist[t2][t1] = -1
elif constraint.type == 'duration':
t1 = self.time_points[constraint.time_points[0]]
t2 = self.time_points[constraint.time_points[1]]
d = constraint.value
dist[t1][t2] = d
dist[t2][t1] = -d
# Floyd-Warshall
for k in range(n):
for i in range(n):
for j in range(n):
if dist[i][k] + dist[k][j] < dist[i][j]:
dist[i][j] = dist[i][k] + dist[k][j]
# 提取边界
bounds = {}
for name, idx in self.time_points.items():
lower = -dist[idx][0] # 相对于第一个时间点
upper = dist[0][idx]
bounds[name] = (lower, upper)
return bounds
# 示例
network = TemporalNetwork()
network.add_time_point('start')
network.add_time_point('action1_start')
network.add_time_point('action1_end')
network.add_time_point('action2_start')
network.add_time_point('action2_end')
network.add_time_point('goal')
# 添加约束
network.add_constraint(TemporalConstraint('before', ['start', 'action1_start']))
network.add_constraint(TemporalConstraint('duration', ['action1_start', 'action1_end'], 5))
network.add_constraint(TemporalConstraint('before', ['action1_end', 'action2_start']))
network.add_constraint(TemporalConstraint('duration', ['action2_start', 'action2_end'], 3))
network.add_constraint(TemporalConstraint('before', ['action2_end', 'goal']))
print("网络一致性:", network.is_consistent())
print("时间边界:", network.get_time_bounds())
2. 时间规划问题¶
2.1 时间动作表示¶
class TemporalAction:
"""时间动作"""
def __init__(self, name, duration, conditions, effects):
"""
初始化时间动作
参数:
name: 动作名称
duration: 持续时间
conditions: 条件(时间点 -> 条件)
effects: 效果(时间点 -> 效果)
"""
self.name = name
self.duration = duration
self.conditions = conditions # {time_point: condition}
self.effects = effects # {time_point: effect}
def is_applicable(self, state, time_point):
"""检查动作在时间点是否可应用"""
if time_point not in self.conditions:
return True
condition = self.conditions[time_point]
return condition(state)
def apply(self, state, time_point):
"""在时间点应用效果"""
if time_point not in self.effects:
return state
effect = self.effects[time_point]
return effect(state)
# 示例:移动动作
def move_action(duration=5):
"""创建移动动作"""
def precondition(state):
# 检查是否可以移动
return True
def effect(state):
# 更新位置
return state
return TemporalAction(
name='move',
duration=duration,
conditions={'start': precondition},
effects={'end': effect}
)
2.2 时间规划问题定义¶
class TemporalPlanningProblem:
"""时间规划问题"""
def __init__(self, initial_state, goal_state, actions, temporal_constraints):
"""
初始化时间规划问题
参数:
initial_state: 初始状态
goal_state: 目标状态
actions: 时间动作列表
temporal_constraints: 时间约束列表
"""
self.initial_state = initial_state
self.goal_state = goal_state
self.actions = actions
self.temporal_constraints = temporal_constraints
def is_goal(self, state):
"""检查是否达到目标"""
return state == self.goal_state
def get_applicable_actions(self, state, time_point):
"""获取在时间点可应用的动作"""
applicable = []
for action in self.actions:
if action.is_applicable(state, time_point):
applicable.append(action)
return applicable
3. 时间规划算法¶
3.1 时间 A* 搜索¶
def temporal_a_star(problem, heuristic=None):
"""
时间 A* 搜索
参数:
problem: 时间规划问题
heuristic: 启发函数
返回:
plan: 时间规划
"""
import heapq
# 初始化
start_state = problem.initial_state
start_time = 0
# 优先队列:(f值, 状态, 时间, 规划)
if heuristic:
h = heuristic(start_state, problem.goal_state)
else:
h = 0
frontier = [(h, start_state, start_time, [])]
explored = set()
while frontier:
f, state, time, plan = heapq.heappop(frontier)
# 检查是否达到目标
if problem.is_goal(state):
return plan
# 检查是否已探索
state_key = (state, time)
if state_key in explored:
continue
explored.add(state_key)
# 获取可应用的动作
actions = problem.get_applicable_actions(state, time)
for action in actions:
# 计算新时间
new_time = time + action.duration
# 应用效果
new_state = action.apply(state, 'end')
# 计算新 f 值
if heuristic:
h = heuristic(new_state, problem.goal_state)
else:
h = 0
new_f = new_time + h
# 添加到队列
new_plan = plan + [(action.name, time, new_time)]
heapq.heappush(frontier, (new_f, new_state, new_time, new_plan))
return None # 无解
3.2 时间规划器¶
class TemporalPlanner:
"""时间规划器"""
def __init__(self, problem):
self.problem = problem
self.plan = None
self.schedule = None
def plan(self):
"""生成时间规划"""
# 使用时间 A* 搜索
self.plan = temporal_a_star(self.problem)
if self.plan:
# 生成调度
self.schedule = self._generate_schedule()
return self.plan
def _generate_schedule(self):
"""生成调度"""
if not self.plan:
return None
schedule = {}
for action_name, start_time, end_time in self.plan:
schedule[action_name] = {
'start': start_time,
'end': end_time,
'duration': end_time - start_time
}
return schedule
def validate(self):
"""验证规划"""
if not self.plan:
return False
# 检查时间约束
for constraint in self.problem.temporal_constraints:
if not constraint.is_satisfied(self.schedule):
return False
return True
def visualize(self):
"""可视化规划"""
if not self.schedule:
print("无规划可显示")
return
print("时间规划:")
print("-" * 50)
for action_name, times in self.schedule.items():
start = times['start']
end = times['end']
duration = times['duration']
# 生成时间条
bar_length = 30
start_pos = int(start / 20 * bar_length)
end_pos = int(end / 20 * bar_length)
bar = [' '] * bar_length
for i in range(start_pos, min(end_pos, bar_length)):
bar[i] = '█'
print(f"{action_name:>15}: |{''.join(bar)}| [{start:.1f}, {end:.1f}]")
print("-" * 50)
4. 时间约束管理¶
4.1 约束传播¶
def constraint_propagation(network):
"""
约束传播
参数:
network: 时间网络
返回:
propagated_network: 传播后的网络
"""
# 使用 Floyd-Warshall 算法进行约束传播
n = len(network.time_points)
# 初始化距离矩阵
dist = [[float('inf')] * n for _ in range(n)]
for i in range(n):
dist[i][i] = 0
# 添加约束
for constraint in network.constraints:
if constraint.type == 'before':
t1 = network.time_points[constraint.time_points[0]]
t2 = network.time_points[constraint.time_points[1]]
dist[t1][t2] = -1
elif constraint.type == 'after':
t1 = network.time_points[constraint.time_points[0]]
t2 = network.time_points[constraint.time_points[1]]
dist[t2][t1] = -1
elif constraint.type == 'duration':
t1 = network.time_points[constraint.time_points[0]]
t2 = network.time_points[constraint.time_points[1]]
d = constraint.value
dist[t1][t2] = d
dist[t2][t1] = -d
# Floyd-Warshall
for k in range(n):
for i in range(n):
for j in range(n):
if dist[i][k] + dist[k][j] < dist[i][j]:
dist[i][j] = dist[i][k] + dist[k][j]
# 创建传播后的网络
propagated_network = TemporalNetwork()
# 添加时间点
for name in network.time_points:
propagated_network.add_time_point(name)
# 添加传播后的约束
for i, name1 in enumerate(network.time_points):
for j, name2 in enumerate(network.time_points):
if i != j and dist[i][j] != float('inf'):
if dist[i][j] < 0:
# name1 < name2
propagated_network.add_constraint(
TemporalConstraint('before', [name1, name2])
)
elif dist[i][j] > 0:
# name1 > name2
propagated_network.add_constraint(
TemporalConstraint('after', [name1, name2])
)
return propagated_network
4.2 时间窗口管理¶
class TemporalWindow:
"""时间窗口"""
def __init__(self, start, end):
"""
初始化时间窗口
参数:
start: 窗口开始时间
end: 窗口结束时间
"""
self.start = start
self.end = end
@property
def duration(self):
"""窗口持续时间"""
return self.end - self.start
def contains(self, time_point):
"""检查是否包含时间点"""
return self.start <= time_point <= self.end
def intersect(self, other):
"""计算交集"""
new_start = max(self.start, other.start)
new_end = min(self.end, other.end)
if new_start <= new_end:
return TemporalWindow(new_start, new_end)
return None
def union(self, other):
"""计算并集"""
new_start = min(self.start, other.start)
new_end = max(self.end, other.end)
return TemporalWindow(new_start, new_end)
def __repr__(self):
return f"[{self.start}, {self.end}]"
# 示例
window1 = TemporalWindow(0, 10)
window2 = TemporalWindow(5, 15)
print(f"窗口1: {window1}")
print(f"窗口2: {window2}")
print(f"交集: {window1.intersect(window2)}")
print(f"并集: {window1.union(window2)}")
5. 实践练习¶
练习 1:实现时间网络¶
def temporal_network_exercise():
# 创建时间网络
network = TemporalNetwork()
# 添加时间点
network.add_time_point('start')
network.add_time_point('action1')
network.add_time_point('action2')
network.add_time_point('goal')
# 添加约束
network.add_constraint(TemporalConstraint('before', ['start', 'action1']))
network.add_constraint(TemporalConstraint('before', ['action1', 'action2']))
network.add_constraint(TemporalConstraint('before', ['action2', 'goal']))
network.add_constraint(TemporalConstraint('duration', ['start', 'goal'], 20))
# 检查一致性
print("网络一致性:", network.is_consistent())
# 获取时间边界
bounds = network.get_time_bounds()
print("时间边界:", bounds)
练习 2:实现时间规划器¶
def temporal_planner_exercise():
# 创建时间动作
action1 = move_action(duration=5)
action2 = move_action(duration=3)
# 创建规划问题
problem = TemporalPlanningProblem(
initial_state='start',
goal_state='goal',
actions=[action1, action2],
temporal_constraints=[]
)
# 创建规划器
planner = TemporalPlanner(problem)
# 生成规划
plan = planner.plan()
if plan:
print("时间规划:", plan)
planner.visualize()
else:
print("无解")
练习 3:实现约束传播¶
def constraint_propagation_exercise():
# 创建时间网络
network = TemporalNetwork()
network.add_time_point('A')
network.add_time_point('B')
network.add_time_point('C')
network.add_constraint(TemporalConstraint('before', ['A', 'B']))
network.add_constraint(TemporalConstraint('before', ['B', 'C']))
# 约束传播
propagated = constraint_propagation(network)
print("传播后的约束:")
for constraint in propagated.constraints:
print(f" {constraint}")
6. 常见问题¶
1. 时间约束冲突¶
问题:时间约束相互冲突
解决方案: - 使用约束传播检测冲突 - 放松某些约束 - 使用软约束
2. 计算复杂度高¶
问题:时间规划计算量大
解决方案: - 使用启发式搜索 - 使用分层规划 - 使用近似方法
3. 实时性要求¶
问题:需要在规定时间内完成规划
解决方案: - 使用在线规划 - 使用 anytime 算法 - 使用预计算