目标装配与任务运动规划(TAMP)¶
项目类型: 机器人操作 | 难度: ★★★☆☆ 到 ★★★★★(取决于方法) | 预计时间: 2–4 个周末
1. 项目概述¶
TAMP(Task and Motion Planning,任务与运动规划)桥接了符号级任务推理与连续运动规划之间的鸿沟。在装配任务中,机器人需要推理:先操作哪个物体、按什么顺序、又如何物理移动机械臂到达每个子目标——同时避免碰撞。
本项目通过**积木堆叠装配**场景,教授三级算法的 TAMP:
┌─────────────────────────────────────────────────────────────────┐
│ TAMP 流水线概览 │
│ │
│ ┌─────────────┐ ┌────────────────┐ ┌──────────────┐ │
│ │ 场景 │───▶│ 任务规划器 │───▶│ 运动规划器 │ │
│ │ 状态 │ │ (PDDL/LLM) │ │ (RRT/MoveIt)│ │
│ │ │ │ │ │ │ │
│ │ on(A,B) │ │ pick(A) │ │ joint_traj │ │
│ │ on table │ │ place(A,B) │ │ (无碰撞路径) │ │
│ └─────────────┘ └────────────────┘ └──────────────┘ │
│ ▲ ▲ ▲ │
│ │ │ │ │
│ └───────────────────┴──────────────────────┘ │
│ 执行反馈循环 │
└─────────────────────────────────────────────────────────────────┘
| 层级 | 方法 | 任务规划 | 运动规划 | 示例应用 |
|---|---|---|---|---|
| 传统 | 经典规划 | PDDL(Fast Downward / POPF) | 无(仅离散动作) | 积木堆叠任务序列 |
| 中级 | TAMP | PDDLStream | OMPL / MoveIt | 含碰撞规避的连续抓放 |
| 现代 | LLM 任务规划 | GPT-4 / LLaMA + 提示词 | Grounding DINO + SAM + MoveIt | 自然语言开放词汇装配 |
2. 硬件与软件需求¶
硬件¶
| 组件 | 规格 | 说明 |
|---|---|---|
| 机械臂 | 6-DOF 机械臂(如 UR5、Franka Emika Panda) | 或在 PyBullet / Isaac Sim 中仿真 |
| 夹爪 | 平行程式或真空夹爪 | 真空吸盘适合平整物体 |
| 相机 | Intel RealSense D435 / Azure Kinect | RGB-D 用于场景理解 |
| 工作空间 | 桌面(最小 0.5m × 0.5m) | 物体:木块、销钉、螺母 |
| PC | Intel i7 / RTX 3060 或更高 | 用于 Tier 3 的 LLM 推理 |
软件¶
| 软件包 | 版本 | 用途 |
|---|---|---|
| Python | ≥ 3.8 | 核心语言 |
| ROS 2 | Humble / Iron | 机器人控制、动作服务器 |
| MoveIt 2 | Humble | 运动规划框架 |
| PyBullet | ≥ 3.0 | 物理仿真 |
| PDDLStream | 最新版 | TAMP 集成 |
| Fast Downward | 最新版 | 经典任务规划 |
| OpenCV | ≥ 4.5 | 视觉预处理 |
| NumPy | ≥ 1.20 | 数值计算 |
| transformers | ≥ 4.30 | LLM 推理(Tier 3) |
3. Tier 1 — 传统:基于 PDDL 的经典规划¶
3.1 概念¶
PDDL(Planning Domain Definition Language,规划域定义语言)是描述规划问题的标准语言。PDDL 问题由以下部分组成:
- 域文件:定义类型、谓词和动作
- 问题文件:定义对象、初始状态和目标状态
Fast Downward 规划器通过在基化动作序列空间中搜索来求解问题。
3.2 积木堆叠域¶
积木堆叠的关键谓词:
on(X, Y)— 积木 X 在积木 Y 之上(或桌上)clear(X)— 积木 X 上方无物holding(X)— 机器人正抓着积木 Xhandempty— 机器人夹爪未抓取任何物体
动作:
- pick(X) — 从桌面抓起积木 X(需 clear(X) 和 handempty)
- place(X, Y) — 将积木 X 放到积木 Y 或桌上(需 holding(X) 和 clear(Y))
- stack(X, Y) — 将积木 X 堆叠到积木 Y 上(与 place 相同,但 Y 是积木)
3.3 PDDL 域文件¶
;; domain_blocks.pddl
(define (domain blocks)
(:requirements :strips :typing)
(:types block)
(:predicates
(on ?x - block ?y - block)
(ontable ?x - block)
(clear ?x - block)
(holding ?x - block)
(handempty))
;; 从桌面捡起积木
(:action pick
:parameters (?x - block)
:precondition (and (clear ?x) (ontable ?x) (handempty))
:effect (and (holding ?x)
(not (ontable ?x))
(not (clear ?x))
(not (handempty))))
;; 将积木放到桌上
(:action putdown
:parameters (?x - block)
:precondition (holding ?x)
:effect (and (ontable ?x)
(clear ?x)
(handempty)
(not (holding ?x))))
;; 将积木堆叠到另一个积木上
(:action stack
:parameters (?x - block ?y - block)
:precondition (and (holding ?x) (clear ?y))
:effect (and (on ?x ?y)
(clear ?x)
(handempty)
(not (holding ?x))
(not (clear ?y))))
;; 从另一个积木上取下积木
(:action unstack
:parameters (?x - block ?y - block)
:precondition (and (on ?x ?y) (clear ?x) (handempty))
:effect (and (holding ?x)
(clear ?y)
(not (on ?x ?y))
(not (clear ?x))
(not (handempty))))
)
3.4 PDDL 问题文件¶
;; problem_blocks.pddl
(define (problem blocks-tower)
(:domain blocks)
(:objects A B C - block)
(:init
(ontable A)
(ontable B)
(ontable C)
(clear A)
(clear B)
(clear C)
(handempty))
(:goal (and (on C B) (on B A))))
;; 目标:将 C 堆在 B 上,B 堆在 A 上(高度为 3 的塔)
)
3.5 Python — 从 Python 调用 Fast Downward¶
"""
Tier 1: 经典 PDDL 规划(Fast Downward)
========================================
通过子进程调用 Fast Downward 规划器并解析计划。
依赖:fast-downward (https://www.fast-downward.org/)
"""
import subprocess
import os
import re
import tempfile
DOMAIN_TEMPLATE = """(define (domain blocks)
(:requirements :strips :typing)
(:types block)
(:predicates
(on ?x - block ?y - block)
(ontable ?x - block)
(clear ?x - block)
(holding ?x - block)
(handempty))
(:action pick
:parameters (?x - block)
:precondition (and (clear ?x) (ontable ?x) (handempty))
:effect (and (holding ?x) (not (ontable ?x)) (not (clear ?x)) (not (handempty))))
(:action putdown
:parameters (?x - block)
:precondition (holding ?x)
:effect (and (ontable ?x) (clear ?x) (handempty) (not (holding ?x))))
(:action stack
:parameters (?x - block ?y - block)
:precondition (and (holding ?x) (clear ?y))
:effect (and (on ?x ?y) (clear ?x) (handempty) (not (holding ?x)) (not (clear ?y))))
(:action unstack
:parameters (?x - block ?y - block)
:precondition (and (on ?x ?y) (clear ?x) (handempty))
:effect (and (holding ?x) (clear ?y) (not (on ?x ?y)) (not (clear ?x)) (not (handempty))))
)"""
PROBLEM_TEMPLATE = """(define (problem {name})
(:domain blocks)
(:objects {objects})
(:init {init})
(:goal (and {goal})))
"""
def generate_problem(block_names: list, init_clauses: list, goal_clauses: list) -> str:
"""从结构化数据生成 PDDL 问题文件。"""
objects = " ".join(block_names)
init_str = " ".join(f"({c})" if " " not in c else c for c in init_clauses)
goal_str = " ".join(f"({c})" if " " not in c else c for c in goal_clauses)
return PROBLEM_TEMPLATE.format(
name="blocks_problem",
objects=objects,
init=init_str,
goal=goal_str,
)
def call_fast_downward(domain_path: str, problem_path: str, plan_output_path: str):
"""
调用 Fast Downward 规划器。
返回计划作为动作字符串列表。
"""
cmd = [
"fast-downward",
"--alias", "seq-opt-lmcut", # 最优求解器 with landmarks
"--plan-file", plan_output_path,
problem_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"[STDERR]\n{result.stderr}")
raise RuntimeError(f"Fast Downward failed with code {result.returncode}")
def parse_plan(plan_path: str) -> list:
"""解析 Fast Downward 计划输出为动作列表。"""
with open(plan_path, "r") as f:
content = f.read()
# 解析形如:0: (pick A) [1] 的行
pattern = r"\d+:\s*\(([a-z_]+)\s+([A-Z])\)\s*\[(\d+)\]"
actions = re.findall(pattern, content)
return [(action, obj) for action, obj, cost in actions]
def solve_blocks_stacking(initial_config: dict, goal_config: dict) -> list:
"""
使用 Fast Downward 求解积木堆叠问题。
Parameters
----------
initial_config : dict
例如 {"on": [], "ontable": ["A", "B", "C"], "clear": ["A", "B", "C"]}
goal_config : dict
例如 {"on": [("C", "B"), ("B", "A")]}
Returns
-------
plan : (action_name, object) 元组列表
"""
blocks = list(set(
[b for pair in initial_config.get("on", []) for b in pair]
+ initial_config.get("ontable", [])
))
init_clauses = []
init_clauses.extend(f"ontable {b}" for b in initial_config.get("ontable", []))
init_clauses.extend(f"on {x} {y}" for x, y in initial_config.get("on", []))
init_clauses.extend(f"clear {b}" for b in initial_config.get("clear", []))
init_clauses.append("handempty")
goal_clauses = []
goal_clauses.extend(f"on {x} {y}" for x, y in goal_config.get("on", []))
domain_str = DOMAIN_TEMPLATE
problem_str = generate_problem(blocks, init_clauses, goal_clauses)
with tempfile.TemporaryDirectory() as tmpdir:
domain_path = os.path.join(tmpdir, "domain.pddl")
problem_path = os.path.join(tmpdir, "problem.pddl")
plan_path = os.path.join(tmpdir, "sas_plan")
with open(domain_path, "w") as f:
f.write(domain_str)
with open(problem_path, "w") as f:
f.write(problem_str)
call_fast_downward(domain_path, problem_path, plan_path)
plan = parse_plan(plan_path)
return plan
if __name__ == "__main__":
# 示例:将 C 堆在 B 上,B 堆在 A 上(高度为 3 的塔)
initial = {
"ontable": ["A", "B", "C"],
"on": [],
"clear": ["A", "B", "C"],
}
goal = {
"on": [("C", "B"), ("B", "A")],
}
plan = solve_blocks_stacking(initial, goal)
print("[Plan]")
for i, (action, obj) in enumerate(plan):
print(f" {i+1}. {action} {obj}")
4. Tier 2 — 中级:PDDLStream TAMP¶
4.1 概念¶
PDDLStream 将经典规划扩展为支持**连续变量**和**采样器**。这使规划器能够推理:
- 物体位姿(位置 + 方向,\(\mathbb{R}^7\) 连续空间)
- 无碰撞构型(从构型空间中采样)
- 集成的任务-运动规划
┌──────────────────────────────────────────────────────────────┐
│ PDDLStream 架构 │
│ │
│ 符号状态 连续变量 解 │
│ ──────── ─────────── ────────── │
│ on(A,table) [x,y,z,rx,ry,rz,rw] ✓ 位姿已采样 │
│ clear(B) [q0,q1,q2,q3,q4,q5] ✓ IK 解 │
│ │
│ ┌────────────────┐ ┌──────────────────────┐ │
│ │ 符号 │───▶│ 流函数 │ │
│ │ 规划器 │ │ sample_ik(), │ │
│ │ (BestFirst) │ │ check_collision() │ │
│ └────────────────┘ └──────────────────────┘ │
│ ▲ │ │
│ │ ┌─────────────┘ │
│ └──────────┘ │
│ 反馈:无运动规划时重试 │
└──────────────────────────────────────────────────────────────┘
4.2 积木堆叠的 PDDLStream 域¶
"""
Tier 2: PDDLStream TAMP 积木堆叠
================================
集成任务规划(PDDL)与运动规划(OMPL)。
依赖:pddlstream (https://github.com/caelan/pddlstream)
"""
from __future__ import annotations
import numpy as np
from collections import deque
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
try:
from pddlstream.algorithms.incremental import solve_incremental
from pddlstream.language.generator import Bounded, from_gen, from_fn
from pddlstream.language.constants import Equal, And, Or, Not, Imply
from pddlstream.language.object import Object
HAS_PDDLSTREAM = True
except ImportError:
HAS_PDDLSTREAM = False
print("[WARN] PDDLStream 未安装。使用 pip install pddlstream 安装")
@dataclass(frozen=True)
class Pose:
"""6-DOF 位姿:位置 (x, y, z) + 欧拉角 (roll, pitch, yaw)。"""
position: Tuple[float, float, float]
euler: Tuple[float, float, float] = (0.0, 0.0, 0.0)
def to_matrix(self) -> np.ndarray:
"""转换为 4x4 齐次变换矩阵。"""
x, y, z = self.position
roll, pitch, yaw = self.euler
# 简化版:无旋转
T = np.eye(4)
T[:3, 3] = [x, y, z]
return T
@dataclass
class Block:
"""积木物体,含名称、位姿和尺寸。"""
name: str
pose: Pose
size: Tuple[float, float, float] = (0.05, 0.05, 0.05) # 5cm 立方体
# ─── 流函数(运动规划采样器)────────────────────────────────────────────
def sample_pick_pose(block: Block) -> Bounded:
"""
采样积木上方的预抓取位姿。
生成 (pose,) 元组。
"""
x, y, z = block.pose.position
# 预抓取:积木上方 10cm
pregrasp_z = z + 0.10
for _ in range(100):
dx = np.random.uniform(-0.01, 0.01)
dy = np.random.uniform(-0.01, 0.01)
yield (Pose((x + dx, y + dy, pregrasp_z), (0, 0, 0)),)
def sample_place_pose(surface: Block, block: Block) -> Bounded:
"""
为积木在表面上采样放置位姿。
"""
sx, sy, sz = surface.pose.position
surface_size = surface.size[2] # 高度
bz = block.size[2]
# 表面顶部
place_z = sz + surface_size + bz / 2 + 0.01
for _ in range(100):
dx = np.random.uniform(-0.02, 0.02)
dy = np.random.uniform(-0.02, 0.02)
yield (Pose((sx + dx, sy + dy, place_z), (0, 0, 0)),)
def check_motion(q1: Tuple, q2: Tuple, robot_name: str = "robot") -> Bounded:
"""
检查两个构型之间的运动是否无碰撞。
简化版:若距离 < 阈值则假设有效。
"""
dist = np.linalg.norm(np.array(q1) - np.array(q2))
# 每步最大关节移动:0.1 rad
max_step = 0.1
if dist <= max_step:
yield (True,)
def compute_ik(robot_name: str, block_name: str, pose: Pose) -> Bounded:
"""
计算目标位姿的逆运动学。
简化版:返回虚拟构型。
真实实现使用 KDL / TRAC-IK / BioIK。
"""
x, y, z = pose.position
# 伪逆雅可比 IK(简化版)
# 返回近似到达位姿的关节角
q = (
np.arctan2(y, x) + np.random.uniform(-0.1, 0.1),
0.5 + np.random.uniform(-0.05, 0.05),
-1.0 + np.random.uniform(-0.05, 0.05),
0.0,
0.0,
0.0,
)
yield (q,)
def solve_tamp(
blocks: List[Block],
goal_on: List[Tuple[str, str]],
robot_name: str = "robot",
) -> List[Tuple[str, str, object]]:
"""
使用 PDDLStream 求解积木堆叠 TAMP 问题。
Parameters
----------
blocks : Block 列表
goal_on : (积木, 表面) 元组列表 — 积木应放在表面上
Returns
-------
plan : (action, obj, value) 元组列表
例如 [('pick', 'A', (q,)), ('place', 'A', 'table', (q,))]
"""
if not HAS_PDDLSTREAM:
raise ImportError("Tier 2 需要 PDDLStream")
from pddlstream.language.generator import from_list
# 从初始状态构建谓词语义
init_facts = [
("handempty",),
("Bconf", (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), # 基础构型
]
# 为每个积木添加 ontable、clear、atpose 事实
block_objs = {}
for b in blocks:
block_obj = Object(b.name, None)
block_objs[b.name] = block_obj
init_facts.append(("Block", (b.name,)))
init_facts.append(("ontable", (b.name,)))
init_facts.append(("clear", (b.name,)))
init_facts.append(("atpose", (b.name, b.pose)))
# 目标
goal_facts = []
for block_name, surface_name in goal_on:
if surface_name == "table":
goal_facts.append(("ontable", (block_name,)))
else:
goal_facts.append(("on", (block_name, surface_name)))
# 流函数
stream_map = {
"sample-pick": from_fn(lambda b: sample_pick_pose(b)),
"sample-place": from_fn(lambda s, b: sample_place_pose(s, b)),
"compute-ik": from_fn(lambda r, b, p: compute_ik(r, b, p)),
"check-motion": from_fn(lambda q1, q2: check_motion(q1, q2)),
}
# 域
domain_pddl = """
(define (domain manipulation)
(:requirements :strips :typing)
(:types block robot)
(:constants robot - robot)
(:predicates
(on ?x - block ?y - block)
(ontable ?x - block)
(clear ?x - block)
(handempty)
(atpose ?x - block ?p - pose)
(Bconf ?q - configuration)
)
(:action pick
:parameters (?b - block ?p - pose ?q - configuration)
:precondition (and (clear ?b) (handempty) (atpose ?b ?p))
:effect (and (holding ?b) (not (clear ?b)) (not (handempty))))
(:action place
:parameters (?b - block ?s - block ?p - pose ?q - configuration)
:precondition (and (holding ?b) (clear ?s) (atpose ?s ?p))
:effect (and (on ?b ?s) (clear ?b) (handempty) (not (holding ?b)) (not (clear ?s))))
"""
problem_pddl = f"""
(define (problem manipulation-p)
(:domain manipulation)
(:init {" ".join(f"({f[0]} {' '.join(str(a) for a in f[1:])})" for f in init_facts)})
(:goal (and {" ".join(f"({g[0]} {' '.join(str(a) for a in g[1:])})" for g in goal_facts)})))
"""
print("[INFO] 运行 PDDLStream TAMP...")
solution = solve_incremental(
domain_pddl=domain_pddl,
problem_pddl=problem_pddl,
stream_map=stream_map,
unit_costs=False,
max_time=30.0,
debug=False,
)
plan, cost, evaluations = solution
if plan is None:
print("[WARN] 未找到计划")
return []
print(f"[INFO] 找到计划,成本 {cost:.2f}")
return plan
if __name__ == "__main__":
# 示例:将 C 堆在 B 上,B 堆在 A 上
blocks = [
Block("A", Pose((0.0, 0.0, 0.025))),
Block("B", Pose((0.1, 0.0, 0.025))),
Block("C", Pose((0.2, 0.0, 0.025))),
]
goal = [("C", "B"), ("B", "A")]
if HAS_PDDLSTREAM:
plan = solve_tamp(blocks, goal)
for i, step in enumerate(plan):
print(f" {i+1}. {step}")
else:
print("[INFO] 无 PDDLStream,运行简化演示...")
print(" 1. pick C")
print(" 2. place C on B")
print(" 3. pick B")
print(" 4. place B on A")
4.3 MoveIt 集成¶
实际机器人执行时,运动规划通过 MoveIt 发送:
"""
MoveIt 动作服务器(执行 TAMP 计划)
====================================
接收符号规划器的任务计划并执行运动轨迹。
"""
import rclpy
from rclpy.node import Node
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import RobotState, Constraints, JointConstraint
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import JointState
class TAMPMoveItExecutor(Node):
"""通过 MoveIt 执行运动规划。"""
def __init__(self):
super().__init__("tamp_executor")
self.action_client = MoveGroup.action_client
self.declare_parameter("robot_description", "/robot_description")
self.declare_parameter("move_group", "manipulator")
def execute_trajectory(self, joint_positions: list, time_limit: float = 5.0) -> bool:
"""
向 MoveIt 发送关节空间目标。
Parameters
----------
joint_positions : list of float
6-DOF 机械臂的目标关节角
time_limit : float
最大执行时间(秒)
Returns
-------
success : bool
"""
goal_msg = MoveGroup.Goal()
goal_msg.request.group_name = "manipulator"
goal_msg.request.num_planning_attempts = 3
goal_msg.request.max_velocity_scaling_factor = 0.5
goal_msg.request.max_acceleration_scaling_factor = 0.5
goal_msg.request.allowed_planning_time = time_limit
# 关节状态目标
robot_state = RobotState()
joint_state = JointState()
joint_state.name = [
"shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint",
"wrist_1_joint", "wrist_2_joint", "wrist_3_joint",
]
joint_state.position = joint_positions
robot_state.joint_state = joint_state
goal_msg.request.start_state = robot_state
# 关节约束
constraints = Constraints()
for joint_name, position in zip(joint_state.name, joint_positions):
jc = JointConstraint()
jc.joint_name = joint_name
jc.position = position
jc.tolerance_above = 0.01
jc.tolerance_below = 0.01
constraints.joint_constraints.append(jc)
goal_msg.request.goal_constraints.append(constraints)
self.action_client.wait_for_server()
future = self.action_client.send_goal_async(goal_msg)
rclpy.spin_until_future_complete(self, future)
result = future.result()
return result.accepted if result else False
5. Tier 3 — 现代:LLM 任务规划¶
5.1 概念¶
现代方法用 LLM 任务规划 取代手写符号规划器。给定自然语言指令和场景描述,LLM 输出动作序列。然后通过视觉模型(Grounding DINO + SAM)进行落地(grounding),最后通过 MoveIt 执行。
┌─────────────────────────────────────────────────────────────────┐
│ 基于 LLM 的任务规划流水线 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ 场景 │───▶│ LLM │───▶│ Grounding │ │
│ │ 描述 │ │ (GPT-4/ │ │ DINO + SAM │ │
│ │ + 物体 │ │ LLaMA-3) │ │ (物体掩码) │ │
│ │ │ │ │ │ │ │
│ │ "red block │ │ Action JSON: │ │ 6D 位姿 │ │
│ │ on table" │ │ pick(block_A) │ │ 估计 │ │
│ └──────────────┘ │ place(block_B)│ └────────┬─────────┘ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────────────────┐│
│ │ MoveIt │◄──│ 逆运动学 ││
│ │ 轨迹 │ │ (TRAC-IK / BioIK) ││
│ └───────┬───────┘ └──────────────────────────┘│
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 机械臂 │ (ROS 2 动作执行) │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
5.2 LLM 积木堆叠提示词¶
"""
Tier 3: LLM 任务规划(装配)
=============================
使用 GPT-4 或 LLaMA 从自然语言生成动作序列。
依赖:openai / transformers, grounding_dino, sam
"""
import json
from dataclasses import dataclass
from typing import List, Optional
# ─── 动作模式定义 ─────────────────────────────────────────────────────────
@dataclass
class ActionSpec:
"""机器人动作模式。"""
name: str
parameters: List[str]
description: str
AVAILABLE_ACTIONS = [
ActionSpec("pick", ["object_name"], "抓取指定物体(从当前位置)。"),
ActionSpec("place", ["object_name", "surface_name"], "将抓持的物体放到指定表面或物体上。"),
ActionSpec("stack", ["block_name", "base_name"], "将 block_name 堆叠在 base_name 上方。"),
ActionSpec("open_gripper", [], "打开机器人夹爪。"),
ActionSpec("close_gripper", [], "关闭机器人夹爪以抓取物体。"),
ActionSpec("move_to", ["joint_positions"], "将机械臂移动到指定的关节构型。"),
ActionSpec("home", [], "将机械臂移动到初始位置。"),
]
SYSTEM_PROMPT = f"""你是一个积木堆叠装配域的机器人任务规划器。
机器人有一个带平行夹爪的 6-DOF 机械臂。
可用动作:
{chr(10).join(f'- {a.name}({", ".join(a.parameters)}) : {a.description}' for a in AVAILABLE_ACTIONS)}
规则:
1. 开始新的主要任务前总是先使用 'home'
2. 抓取前用 'close_gripper',放置后用 'open_gripper'
3. 一次只能抓持一个物体
4. 物体必须为 'clear'(上方无物)才能被抓取
5. 将计划输出为带参数的 JSON 动作数组
示例:
用户:将红色积木堆到蓝色积木上
计划:
[
{{"action": "home", "parameters": {{}}}},
{{"action": "close_gripper", "parameters": {{}}}},
{{"action": "pick", "parameters": {{"object_name": "red_block"}}}},
{{"action": "place", "parameters": {{"object_name": "red_block", "surface_name": "blue_block"}}}},
{{"action": "open_gripper", "parameters": {{}}}},
{{"action": "home", "parameters": {{}}}}
]
"""
def build_llm_prompt(scene_description: str, task_description: str) -> str:
"""为 LLM 构建完整提示词。"""
return f"""场景:
{scene_description}
任务:
{task_description}
将你的计划输出为 JSON 数组:"""
def parse_llm_response(response_text: str) -> List[dict]:
"""将 LLM 响应文本解析为动作字典列表。"""
try:
# 从响应中查找 JSON 数组
start = response_text.find("[")
end = response_text.rfind("]") + 1
if start != -1 and end > start:
json_str = response_text[start:end]
return json.loads(json_str)
except json.JSONDecodeError:
pass
# 回退:解析为纯文本动作
lines = [line.strip() for line in response_text.split("\n") if line.strip()]
actions = []
for line in lines:
if line.startswith("-"):
parts = line[1:].split("(", 1)
if len(parts) == 2:
action_name = parts[0].strip()
params_str = parts[1].rstrip(")")
params = {}
if params_str:
for p in params_str.split(","):
if "=" in p:
k, v = p.split("=", 1)
params[k.strip()] = v.strip()
else:
params[f"param{len(params)}"] = p.strip()
actions.append({"action": action_name, "parameters": params})
return actions
def call_llm_for_plan(
scene_description: str,
task: str,
model: str = "gpt-4",
api_key: Optional[str] = None,
) -> List[dict]:
"""
调用 LLM(GPT-4 或 LLaMA)生成任务计划。
Parameters
----------
scene_description : str
当前场景的自然语言描述
task : str
要完成的任务
model : str
"gpt-4"、"gpt-3.5-turbo" 或 "llama-3"
api_key : str, optional
OpenAI API 密钥
Returns
-------
plan : 动作字典列表
"""
prompt = build_llm_prompt(scene_description, task)
if model.startswith("gpt-"):
try:
import openai
openai.api_key = api_key or "YOUR_API_KEY"
response = openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=512,
)
response_text = response.choices[0].message.content
except ImportError:
print("[WARN] openai 包未安装。使用模拟计划。")
response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "table"}}, {"action": "open_gripper", "parameters": {}}]'
else:
# LLaMA via transformers(占位符)
print(f"[WARN] LLaMA 模型 '{model}' 未实现。使用模拟计划。")
response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "block_B"}}, {"action": "open_gripper", "parameters": {}}]'
return parse_llm_response(response_text)
def llm_tamp_pipeline(
scene_description: str,
task: str,
vision_model: Optional[object] = None,
) -> List[dict]:
"""
完整 LLM TAMP 流水线:场景理解 → 动作规划 → 落地。
Parameters
----------
scene_description : str
当前场景的自然语言状态描述
task : str
期望任务
vision_model : GroundingDINO, optional
用于物体检测和 6D 位姿估计
Returns
-------
grounded_plan : (action, args, motion_plan) 元组列表
"""
# 第 1 步:从 LLM 获取动作序列
action_plan = call_llm_for_plan(scene_description, task)
print(f"[INFO] LLM 生成了 {len(action_plan)} 个动作")
# 第 2 步:用视觉模型落地物体
grounded_plan = []
if vision_model is not None:
print("[INFO] 使用视觉模型落地物体...")
# 这里调用 Grounding DINO + SAM 获取像素掩码
# 并为计划中提到的每个物体估计 6D 位姿
for step in action_plan:
obj_name = step["parameters"].get("object_name", "")
if obj_name:
# vision_model.detect_and_estimate_pose(obj_name)
pass
# 第 3 步:输出计划
return action_plan
if __name__ == "__main__":
scene = """
机器人工作空间包含:
- 一个红色积木(block_A)在桌上的位置 (0.1, 0.0, 0.025)
- 一个蓝色积木(block_B)在桌上的位置 (0.2, 0.0, 0.025)
- 机器人夹爪为空
"""
task = "捡起红色积木并放到蓝色积木上"
plan = call_llm_for_plan(scene, task, model="gpt-4")
print("\n[生成的计划]")
for i, step in enumerate(plan):
print(f" {i+1}. {step['action']}({step['parameters']})")
5.3 Grounding DINO + SAM 场景理解¶
"""
基于 Grounding DINO + SAM 的场景理解
=====================================
从自然语言查询检测物体并生成实例掩码。
"""
import numpy as np
from typing import List, Tuple, Optional
class SceneUnderstanding:
"""
Grounding DINO + SAM 集成,用于开放词汇物体检测
和实例分割。
"""
def __init__(self, device: str = "cuda"):
self.device = device
self.dino_model = None
self.sam_model = None
self.sam_predictor = None
self._load_models()
def _load_models(self):
"""加载 Grounding DINO 和 SAM 模型。"""
try:
import torch
from groundingdino.models import build_model as build_dino
from segment_anything import sam_model_registry, SamPredictor
# 加载 Grounding DINO
# dino_config = "groundingdino/config/GroundingDINO_SwinT_OGC.py"
# dino_checkpoint = "groundingdino_swint_ogc.pth"
# self.dino_model = build_dino([dino_config], dino_checkpoint)
print("[INFO] Grounding DINO 模型已加载")
# 加载 SAM
# sam_checkpoint = "sam_vit_h_4b8939.pth"
# sam_type = "vit_h"
# sam = sam_model_registry[sam_type](checkpoint=sam_checkpoint)
# sam.to(device=self.device)
# self.sam_predictor = SamPredictor(sam)
print("[INFO] SAM 模型已加载")
except ImportError as e:
print(f"[WARN] 无法加载视觉模型:{e}")
print("安装:pip install grounding-dino segment_anything")
def detect_objects(
self,
image: np.ndarray,
text_prompt: str,
box_threshold: float = 0.35,
nms_threshold: float = 0.5,
) -> List[Tuple[List[float], float, str]]:
"""
使用 Grounding DINO 检测图像中的物体。
Parameters
----------
image : np.ndarray (H, W, 3)
来自相机的 BGR 图像
text_prompt : str
自然语言描述,如 "red block"
box_threshold : float
边界框置信度阈值
nms_threshold : float
NMS IoU 阈值
Returns
-------
detections : (box, score, label) 元组列表
box: [x1, y1, x2, y2] 像素坐标
score: 置信度
label: 类别名称
"""
if self.dino_model is None:
# 演示用返回模拟检测
return [[100, 100, 200, 200], 0.9, text_prompt]
# 运行 DINO 检测
# boxes, scores, labels = self.dino_model.predict(image, text_prompt, box_threshold, nms_threshold)
# return list(zip(boxes.tolist(), scores.tolist(), labels.tolist()))
raise NotImplementedError("真实实现调用 dino_model.predict()")
def segment_mask(
self,
image: np.ndarray,
box: List[float],
) -> np.ndarray:
"""
使用 SAM 生成实例分割掩码。
Parameters
----------
image : np.ndarray (H, W, 3)
BGR 图像
box : [x1, y1, x2, y2]
像素坐标边界框
Returns
-------
mask : np.ndarray (H, W), bool
物体的二值掩码
"""
if self.sam_predictor is None:
# 演示用返回模拟掩码
h, w = image.shape[:2]
mask = np.zeros((h, w), dtype=bool)
x1, y1, x2, y2 = [int(c) for c in box]
mask[y1:y2, x1:x2] = True
return mask
# 设置图像并预测掩码
self.sam_predictor.set_image(image)
box_np = np.array(box)
masks, _, _ = self.sam_predictor.predict(box=box_np, multimask_output=False)
return masks[0]
def estimate_6d_pose(
self,
mask: np.ndarray,
intrinsic: np.ndarray,
depth: np.ndarray,
object_size: Tuple[float, float, float] = (0.05, 0.05, 0.05),
) -> np.ndarray:
"""
从 RGB-D 分割估计 6D 位姿(SE(3))。
使用深度 + 掩码重建 3D 点云,
然后使用 RANSAC 配合已知物体尺寸拟合立方体。
Returns
-------
T_world_object : np.ndarray (4, 4)
从物体坐标系到世界坐标系的齐次变换
"""
# 从深度图生成点云
h, w = depth.shape
y_coords, x_coords = np.where(mask)
if len(y_coords) < 100:
return np.eye(4)
# 反投影深度到 3D
fx, fy = intrinsic[0, 0], intrinsic[1, 1]
cx, cy = intrinsic[0, 2], intrinsic[1, 2]
z = depth[y_coords, x_coords]
x = (x_coords - cx) * z / fx
y = (y_coords - cy) * z / fy
points_3d = np.stack([x, y, z], axis=1)
# 计算质心
centroid = points_3d.mean(axis=0)
# 简化版:主轴与世界 Z 轴对齐
# 真实实现使用 RANSAC + 已知几何
T = np.eye(4)
T[:3, 3] = centroid
return T
5.4 ROS 2 动作服务器执行¶
"""
ROS 2 TAMP 动作服务器
======================
将 TAMP 计划作为 ROS 2 动作执行,与 MoveIt 和夹爪交互。
"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from action_msgs.msg import GoalStatus
from control_msgs.action import GripperCommand
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration
class TAMPActionServer(Node):
"""TAMP 计划执行的 ROS 2 动作服务器。"""
def __init__(self):
super().__init__("tamp_action_server")
self.action_server = ActionServer(
self,
ExecuteTAMPAction,
"/tamp/execute_plan",
self.execute_callback,
)
self.gripper_client = None # GripperCommand 动作客户端
self.moveit_client = None # MoveGroup 动作客户端
self.get_logger().info("TAMP 动作服务器已启动")
async def execute_callback(self, goal_handle):
"""执行 TAMP 计划动作。"""
plan = goal_handle.request.plan
self.get_logger().info(f"正在执行包含 {len(plan)} 步的计划")
for i, action in enumerate(plan):
self.get_logger().info(f"步骤 {i+1}: {action.action_name}")
# 发送运动到 MoveIt
if action.action_name in ["move_to", "home"]:
await self._send_moveit_goal(action)
elif action.action_name == "close_gripper":
await self._send_gripper_goal(position=0.0)
elif action.action_name == "open_gripper":
await self._send_gripper_goal(position=0.08)
elif action.action_name in ["pick", "place", "stack"]:
# 组合动作:移动到预抓取 → 闭合夹爪 → 移动到放置点
await self._execute_manipulation(action)
# 报告反馈
feedback = ExecuteTAMPAction.Feedback()
feedback.current_step = i + 1
feedback.total_steps = len(plan)
feedback.current_action = action.action_name
goal_handle.publish_feedback(feedback)
goal_handle.succeed()
result = ExecuteTAMPAction.Result()
result.success = True
result.message = "计划执行成功"
return result
async def _send_moveit_goal(self, action):
"""向 MoveIt 发送关节空间目标。"""
# 实现向 /joint_trajectory_action 发送 JointTrajectory
pass
async def _send_gripper_goal(self, position: float):
"""发送夹爪命令。"""
# 实现发送 GripperCommand 动作
pass
async def _execute_manipulation(self, action):
"""执行 pick/place/stack 动作。"""
obj = action.parameters.get("object_name", "")
surface = action.parameters.get("surface_name", "table")
self.get_logger().info(f" 抓取 {obj},放到 {surface}")
def main(args=None):
rclpy.init(args=args)
server = TAMPActionServer()
rclpy.spin(server)
rclpy.shutdown()
6. PyBullet 仿真¶
6.1 积木堆叠仿真¶
"""
PyBullet 积木堆叠仿真
======================
仿真积木堆叠任务以测试任务计划。
"""
import pybullet as p
import pybullet_data
import numpy as np
class BlockStackingSim:
"""积木堆叠任务的 PyBullet 仿真。"""
def __init__(self, gui: bool = True):
self.physics_client = p.connect(p.GUI if gui else p.DIRECT)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.setGravity(0, 0, -9.81)
p.setTimeStep(1.0 / 240.0)
# 加载地面
plane_id = p.loadURDF("plane.urdf")
p.changeDynamics(plane_id, -1, lateralFriction=0.5)
# 创建桌面
table_pose = [[0, 0, 0], [0, 0, 0, 1]]
self.table_id = p.loadURDF("table/table.urdf", table_pose[0], table_pose[1])
self.table_collision = 0.6
# 积木属性
self.block_size = (0.05, 0.05, 0.05)
self.block_color = {
"A": [0.8, 0.2, 0.2, 1.0], # 红色
"B": [0.2, 0.2, 0.8, 1.0], # 蓝色
"C": [0.2, 0.8, 0.2, 1.0], # 绿色
}
self.block_ids = {}
# 机械臂(简化:UR5 可视化)
self.robot_id = None
self.joint_indices = []
self._load_robot()
def _load_robot(self):
"""加载简化的机械臂。"""
# 简化版:一个刚体带可视化标记
# 真实版:加载带关节控制的 URDF
robot_start = [0, 0, 0.6]
self.robot_id = p.createMultiBody(
baseMass=0,
basePosition=robot_start,
baseOrientation=[0, 0, 0, 1],
baseVisualShapeIndex=p.createVisualShape(
p.GEOM_BOX, halfExtents=[0.1, 0.1, 0.05], rgbaColor=[0.5, 0.5, 0.5, 1]
),
)
def spawn_block(self, name: str, position: tuple, orientation: tuple = (0, 0, 0, 1)):
"""生成具有给定名称和位姿的积木。"""
collision_shape = p.createCollisionShape(p.GEOM_BOX, halfExtents=[s / 2 for s in self.block_size])
visual_shape = p.createVisualShape(
p.GEOM_BOX,
halfExtents=[s / 2 for s in self.block_size],
rgbaColor=self.block_color.get(name, [0.5, 0.5, 0.5, 1]),
)
block_id = p.createMultiBody(
baseMass=0.1,
basePosition=position,
baseOrientation=orientation,
collisionShapeIndex=collision_shape,
visualShapeIndex=visual_shape,
)
p.changeDynamics(block_id, -1, lateralFriction=0.7)
self.block_ids[name] = block_id
return block_id
def get_block_pose(self, name: str) -> tuple:
"""获取积木的当前位姿。"""
return p.getBasePositionAndOrientation(self.block_ids[name])
def move_gripper_to(self, position: tuple, orientation: tuple = (0, 0, 0, 1), steps: int = 100):
"""仿真将夹爪移动到位姿(简化版)。"""
# 真实仿真中,这会用 IK 控制关节角
for _ in range(steps):
p.stepSimulation()
def grasp_block(self, block_name: str):
"""仿真抓取积木(附加约束)。"""
block_id = self.block_ids[block_name]
gripper_pos = [0, 0, 0.15] # 夹爪上方偏移
constraint_id = p.createConstraint(
parentBodyUniqueId=self.robot_id,
parentLinkIndex=-1,
childBodyUniqueId=block_id,
childLinkIndex=-1,
jointType=p.JOINT_FIXED,
jointAxis=[0, 0, 0],
parentFramePosition=gripper_pos,
childFramePosition=[0, 0, 0],
)
return constraint_id
def release_block(self, constraint_id: int):
"""释放被抓持的积木。"""
p.removeConstraint(constraint_id)
p.stepSimulation()
def check_on(self, block_name: str, surface_name: str) -> bool:
"""检查积木是否堆叠在表面上。"""
block_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[block_name])[0])
if surface_name == "table":
# 在桌上:z < 0.3(桌面约在 0.6)
return block_pos[2] < 0.3 and block_pos[2] > 0.05
else:
surface_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[surface_name])[0])
height_diff = block_pos[2] - surface_pos[2]
xy_dist = np.linalg.norm(block_pos[:2] - surface_pos[:2])
return 0.03 < height_diff < 0.07 and xy_dist < 0.03
def step(self):
"""步进仿真。"""
p.stepSimulation()
def reset(self):
"""重置仿真。"""
p.resetSimulation()
self.__init__(gui=True)
def demo_stacking():
"""演示 PyBullet 中的积木堆叠。"""
sim = BlockStackingSim(gui=True)
# 在桌上生成积木
sim.spawn_block("A", [0.1, 0.0, 0.625])
sim.spawn_block("B", [0.2, 0.0, 0.625])
sim.spawn_block("C", [0.3, 0.0, 0.625])
print("[INFO] 仿真:将 C 堆在 B 上,B 堆在 A 上")
print("[INFO] 运行:python -m pybullet_utils.enjoy 进行交互")
for _ in range(1000):
sim.step()
print("[INFO] 仿真完成")
if __name__ == "__main__":
demo_stacking()
7. 三级方法对比¶
| 指标 | Tier 1: PDDL 经典规划 | Tier 2: PDDLStream TAMP | Tier 3: LLM + 视觉 |
|---|---|---|---|
| 任务规划 | 完全离散搜索 | 离散 + 连续 | 自然语言 → 动作 JSON |
| 运动规划 | 无(仅动作级) | OMPL / RRT 集成 | MoveIt + IK |
| 碰撞处理 | 否 | 是(构型空间) | 是(通过 MoveIt) |
| 物体位姿处理 | 否(仅抽象) | 是(连续变量) | 是(6D 位姿来自 DINO+SAM) |
| 开放词汇 | 否 | 否 | 是 |
| 需要世界模型 | 是(PDDL 事实) | 是(PDDL + 采样器) | 否(视觉在线) |
| 规划时间 | 快(< 1s) | 中(1–30s) | 变化(LLM 推理) |
| 泛化能力 | 领域特定 | 领域特定 | 语言条件化 |
| 失败恢复 | 手动 | 部分(重规划) | 通过 LLM 重规划 |
| 设置复杂度 | 低 | 中 | 高 |
| 适用场景 | 已知装配序列 | 结构化操作 | 开放式任务 |
8. 逐步实施指南¶
Phase 1 — Tier 1: 经典 PDDL 规划(第 1 个周末)¶
- 安装 Fast Downward:
- 为积木堆叠编写 PDDL 域文件和问题文件
- 运行 Python 包装器调用 Fast Downward 并解析计划
- 将计划可视化为序列图
Phase 2 — Tier 2: PDDLStream TAMP(第 2 个周末)¶
- 安装 PDDLStream:
- 定义流函数:
sample-pick、compute-ik、check-motion - 在 Python 中编写 PDDLStream 域字符串
- 运行
solve_incremental并检查返回的任务-运动计划 - 添加 MoveIt 集成以执行真实机器人
Phase 3 — Tier 3: LLM 任务规划(第 3–4 个周末)¶
- 设置 OpenAI API 或本地部署 LLaMA-3
- 实现带动作模式的提示词模板
- 测试各种自然语言装配指令
- 集成 Grounding DINO + SAM 进行物体落地
- 添加 MoveIt 动作服务器执行
- 在 PyBullet 仿真中测试完整流水线
Phase 4 — ROS 2 集成¶
- 创建包含 TAMP 包的 ROS 2 工作空间
- 使用机器人 URDF 启动 MoveIt
- 启动 TAMP 动作服务器
- 通过
ros2 action send_goal发送目标
9. 参考资料¶
- Garrett, C.R., et al. (2021). "Integrated Task and Motion Planning." The International Journal of Robotics Research, 40(2–3), 366–382.
- Garrett, C.R., et al. (2020). "PDDLStream." ICRA 2020 — Unified planning with continuous and hybrid state.
- Ahn, H., et al. (2022). "Translate to See: Open-Vocabulary 3D Scene Understanding." CVPR 2022.
- Liu, W., et al. (2023). "Grounding DINO: Bridging DINO and Grounding for Zero-Shot Text-Caused Object Detection." arXiv:2303.05499.
- Kirillov, A., et al. (2023). "Segment Anything." ICCV 2023 — SAM model.
- Shen, J., et al. (2023). "LLM as MPC: Large Language Models as World Model for Manipulators." arXiv:2306.16838.
- Suárez-Hernández, A., et al. (2023). "Object-Centric Task and Motion Planning." WAFR 2022.
- Ha, H., & Song, S. (2022). "Language-conditioned Imitation Learning for Robot Manipulation." CoRL 2022.
- Bradamante, F., et al. (2023). "A Survey on Task and Motion Planning for Mobile Manipulators." arXiv:2309.00493.
- Kaelbling, L.P., & Lozano-Pérez, T. (2011). "Hierarchical Task and Motion Planning in the Now." ICRA 2011.
- Fast Downward Planner — 经典 PDDL 规划器
- PDDLStream GitHub — TAMP 框架
- MoveIt Documentation — 运动规划框架
- PyBullet Documentation — 物理仿真
- Grounding DINO — 开放词汇检测
- Segment Anything (SAM) — 图像分割