跳转至

目标装配与任务运动规划(TAMP)

项目类型: 机器人操作 | 难度: ★★★☆☆ 到 ★★★★★(取决于方法) | 预计时间: 2–4 个周末


1. 项目概述

TAMP(Task and Motion Planning,任务与运动规划)桥接了符号级任务推理与连续运动规划之间的鸿沟。在装配任务中,机器人需要推理:先操作哪个物体、按什么顺序、又如何物理移动机械臂到达每个子目标——同时避免碰撞。

本项目通过**积木堆叠装配**场景,教授三级算法的 TAMP:

  ┌─────────────────────────────────────────────────────────────────┐
  │                    TAMP 流水线概览                              │
  │                                                                  │
  │   ┌─────────────┐    ┌────────────────┐    ┌──────────────┐    │
  │   │   场景       │───▶│  任务规划器     │───▶│  运动规划器   │    │
  │   │   状态       │    │  (PDDL/LLM)     │    │  (RRT/MoveIt)│    │
  │   │              │    │                │    │              │    │
  │   │  on(A,B)     │    │  pick(A)        │    │  joint_traj  │    │
  │   │  on table    │    │  place(A,B)    │    │  (无碰撞路径) │    │
  │   └─────────────┘    └────────────────┘    └──────────────┘    │
  │         ▲                   ▲                      ▲             │
  │         │                   │                      │             │
  │         └───────────────────┴──────────────────────┘             │
  │                    执行反馈循环                                   │
  └─────────────────────────────────────────────────────────────────┘
层级 方法 任务规划 运动规划 示例应用
传统 经典规划 PDDL(Fast Downward / POPF) 无(仅离散动作) 积木堆叠任务序列
中级 TAMP PDDLStream OMPL / MoveIt 含碰撞规避的连续抓放
现代 LLM 任务规划 GPT-4 / LLaMA + 提示词 Grounding DINO + SAM + MoveIt 自然语言开放词汇装配

2. 硬件与软件需求

硬件

组件 规格 说明
机械臂 6-DOF 机械臂(如 UR5、Franka Emika Panda) 或在 PyBullet / Isaac Sim 中仿真
夹爪 平行程式或真空夹爪 真空吸盘适合平整物体
相机 Intel RealSense D435 / Azure Kinect RGB-D 用于场景理解
工作空间 桌面(最小 0.5m × 0.5m) 物体:木块、销钉、螺母
PC Intel i7 / RTX 3060 或更高 用于 Tier 3 的 LLM 推理

软件

软件包 版本 用途
Python ≥ 3.8 核心语言
ROS 2 Humble / Iron 机器人控制、动作服务器
MoveIt 2 Humble 运动规划框架
PyBullet ≥ 3.0 物理仿真
PDDLStream 最新版 TAMP 集成
Fast Downward 最新版 经典任务规划
OpenCV ≥ 4.5 视觉预处理
NumPy ≥ 1.20 数值计算
transformers ≥ 4.30 LLM 推理(Tier 3)
pip install pybullet opencv-python numpy moveit-commander transformers torch pillow

3. Tier 1 — 传统:基于 PDDL 的经典规划

3.1 概念

PDDL(Planning Domain Definition Language,规划域定义语言)是描述规划问题的标准语言。PDDL 问题由以下部分组成:

  • 域文件:定义类型、谓词和动作
  • 问题文件:定义对象、初始状态和目标状态

Fast Downward 规划器通过在基化动作序列空间中搜索来求解问题。

3.2 积木堆叠域

积木堆叠的关键谓词:

  • on(X, Y) — 积木 X 在积木 Y 之上(或桌上)
  • clear(X) — 积木 X 上方无物
  • holding(X) — 机器人正抓着积木 X
  • handempty — 机器人夹爪未抓取任何物体

动作: - pick(X) — 从桌面抓起积木 X(需 clear(X)handempty) - place(X, Y) — 将积木 X 放到积木 Y 或桌上(需 holding(X)clear(Y)) - stack(X, Y) — 将积木 X 堆叠到积木 Y 上(与 place 相同,但 Y 是积木)

3.3 PDDL 域文件

;; domain_blocks.pddl
(define (domain blocks)
  (:requirements :strips :typing)
  (:types block)

  (:predicates
    (on ?x - block ?y - block)
    (ontable ?x - block)
    (clear ?x - block)
    (holding ?x - block)
    (handempty))

  ;; 从桌面捡起积木
  (:action pick
    :parameters (?x - block)
    :precondition (and (clear ?x) (ontable ?x) (handempty))
    :effect (and (holding ?x)
                 (not (ontable ?x))
                 (not (clear ?x))
                 (not (handempty))))

  ;; 将积木放到桌上
  (:action putdown
    :parameters (?x - block)
    :precondition (holding ?x)
    :effect (and (ontable ?x)
                 (clear ?x)
                 (handempty)
                 (not (holding ?x))))

  ;; 将积木堆叠到另一个积木上
  (:action stack
    :parameters (?x - block ?y - block)
    :precondition (and (holding ?x) (clear ?y))
    :effect (and (on ?x ?y)
                 (clear ?x)
                 (handempty)
                 (not (holding ?x))
                 (not (clear ?y))))

  ;; 从另一个积木上取下积木
  (:action unstack
    :parameters (?x - block ?y - block)
    :precondition (and (on ?x ?y) (clear ?x) (handempty))
    :effect (and (holding ?x)
                 (clear ?y)
                 (not (on ?x ?y))
                 (not (clear ?x))
                 (not (handempty))))
)

3.4 PDDL 问题文件

;; problem_blocks.pddl
(define (problem blocks-tower)
  (:domain blocks)
  (:objects A B C - block)
  (:init
    (ontable A)
    (ontable B)
    (ontable C)
    (clear A)
    (clear B)
    (clear C)
    (handempty))
  (:goal (and (on C B) (on B A))))
  ;; 目标:将 C 堆在 B 上,B 堆在 A 上(高度为 3 的塔)
)

3.5 Python — 从 Python 调用 Fast Downward

"""
Tier 1: 经典 PDDL 规划(Fast Downward)
========================================
通过子进程调用 Fast Downward 规划器并解析计划。
依赖:fast-downward (https://www.fast-downward.org/)
"""

import subprocess
import os
import re
import tempfile

DOMAIN_TEMPLATE = """(define (domain blocks)
  (:requirements :strips :typing)
  (:types block)
  (:predicates
    (on ?x - block ?y - block)
    (ontable ?x - block)
    (clear ?x - block)
    (holding ?x - block)
    (handempty))
  (:action pick
    :parameters (?x - block)
    :precondition (and (clear ?x) (ontable ?x) (handempty))
    :effect (and (holding ?x) (not (ontable ?x)) (not (clear ?x)) (not (handempty))))
  (:action putdown
    :parameters (?x - block)
    :precondition (holding ?x)
    :effect (and (ontable ?x) (clear ?x) (handempty) (not (holding ?x))))
  (:action stack
    :parameters (?x - block ?y - block)
    :precondition (and (holding ?x) (clear ?y))
    :effect (and (on ?x ?y) (clear ?x) (handempty) (not (holding ?x)) (not (clear ?y))))
  (:action unstack
    :parameters (?x - block ?y - block)
    :precondition (and (on ?x ?y) (clear ?x) (handempty))
    :effect (and (holding ?x) (clear ?y) (not (on ?x ?y)) (not (clear ?x)) (not (handempty))))
)"""

PROBLEM_TEMPLATE = """(define (problem {name})
  (:domain blocks)
  (:objects {objects})
  (:init {init})
  (:goal (and {goal})))
"""


def generate_problem(block_names: list, init_clauses: list, goal_clauses: list) -> str:
    """从结构化数据生成 PDDL 问题文件。"""
    objects = " ".join(block_names)
    init_str = " ".join(f"({c})" if " " not in c else c for c in init_clauses)
    goal_str = " ".join(f"({c})" if " " not in c else c for c in goal_clauses)
    return PROBLEM_TEMPLATE.format(
        name="blocks_problem",
        objects=objects,
        init=init_str,
        goal=goal_str,
    )


def call_fast_downward(domain_path: str, problem_path: str, plan_output_path: str):
    """
    调用 Fast Downward 规划器。
    返回计划作为动作字符串列表。
    """
    cmd = [
        "fast-downward",
        "--alias", "seq-opt-lmcut",  # 最优求解器 with landmarks
        "--plan-file", plan_output_path,
        problem_path,
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"[STDERR]\n{result.stderr}")
        raise RuntimeError(f"Fast Downward failed with code {result.returncode}")


def parse_plan(plan_path: str) -> list:
    """解析 Fast Downward 计划输出为动作列表。"""
    with open(plan_path, "r") as f:
        content = f.read()
    # 解析形如:0: (pick A) [1] 的行
    pattern = r"\d+:\s*\(([a-z_]+)\s+([A-Z])\)\s*\[(\d+)\]"
    actions = re.findall(pattern, content)
    return [(action, obj) for action, obj, cost in actions]


def solve_blocks_stacking(initial_config: dict, goal_config: dict) -> list:
    """
    使用 Fast Downward 求解积木堆叠问题。

    Parameters
    ----------
    initial_config : dict
        例如 {"on": [], "ontable": ["A", "B", "C"], "clear": ["A", "B", "C"]}
    goal_config : dict
        例如 {"on": [("C", "B"), ("B", "A")]}

    Returns
    -------
    plan : (action_name, object) 元组列表
    """
    blocks = list(set(
        [b for pair in initial_config.get("on", []) for b in pair]
        + initial_config.get("ontable", [])
    ))

    init_clauses = []
    init_clauses.extend(f"ontable {b}" for b in initial_config.get("ontable", []))
    init_clauses.extend(f"on {x} {y}" for x, y in initial_config.get("on", []))
    init_clauses.extend(f"clear {b}" for b in initial_config.get("clear", []))
    init_clauses.append("handempty")

    goal_clauses = []
    goal_clauses.extend(f"on {x} {y}" for x, y in goal_config.get("on", []))

    domain_str = DOMAIN_TEMPLATE
    problem_str = generate_problem(blocks, init_clauses, goal_clauses)

    with tempfile.TemporaryDirectory() as tmpdir:
        domain_path = os.path.join(tmpdir, "domain.pddl")
        problem_path = os.path.join(tmpdir, "problem.pddl")
        plan_path = os.path.join(tmpdir, "sas_plan")

        with open(domain_path, "w") as f:
            f.write(domain_str)
        with open(problem_path, "w") as f:
            f.write(problem_str)

        call_fast_downward(domain_path, problem_path, plan_path)
        plan = parse_plan(plan_path)

    return plan


if __name__ == "__main__":
    # 示例:将 C 堆在 B 上,B 堆在 A 上(高度为 3 的塔)
    initial = {
        "ontable": ["A", "B", "C"],
        "on": [],
        "clear": ["A", "B", "C"],
    }
    goal = {
        "on": [("C", "B"), ("B", "A")],
    }

    plan = solve_blocks_stacking(initial, goal)
    print("[Plan]")
    for i, (action, obj) in enumerate(plan):
        print(f"  {i+1}. {action} {obj}")

4. Tier 2 — 中级:PDDLStream TAMP

4.1 概念

PDDLStream 将经典规划扩展为支持**连续变量**和**采样器**。这使规划器能够推理:

  • 物体位姿(位置 + 方向,\(\mathbb{R}^7\) 连续空间)
  • 无碰撞构型(从构型空间中采样)
  • 集成的任务-运动规划
  ┌──────────────────────────────────────────────────────────────┐
  │                   PDDLStream 架构                            │
  │                                                               │
  │   符号状态         连续变量            解                      │
  │   ────────        ───────────         ──────────              │
  │   on(A,table)    [x,y,z,rx,ry,rz,rw] ✓ 位姿已采样             │
  │   clear(B)       [q0,q1,q2,q3,q4,q5]  ✓ IK 解                  │
  │                                                               │
  │   ┌────────────────┐    ┌──────────────────────┐              │
  │   │  符号           │───▶│  流函数               │              │
  │   │  规划器         │    │  sample_ik(),        │              │
  │   │  (BestFirst)   │    │  check_collision()   │              │
  │   └────────────────┘    └──────────────────────┘              │
  │         ▲                        │                             │
  │         │          ┌─────────────┘                             │
  │         └──────────┘                                            │
  │         反馈:无运动规划时重试                                   │
  └──────────────────────────────────────────────────────────────┘

4.2 积木堆叠的 PDDLStream 域

"""
Tier 2: PDDLStream TAMP 积木堆叠
================================
集成任务规划(PDDL)与运动规划(OMPL)。
依赖:pddlstream (https://github.com/caelan/pddlstream)
"""

from __future__ import annotations
import numpy as np
from collections import deque
from dataclasses import dataclass, field
from typing import List, Optional, Tuple

try:
    from pddlstream.algorithms.incremental import solve_incremental
    from pddlstream.language.generator import Bounded, from_gen, from_fn
    from pddlstream.language.constants import Equal, And, Or, Not, Imply
    from pddlstream.language.object import Object
    HAS_PDDLSTREAM = True
except ImportError:
    HAS_PDDLSTREAM = False
    print("[WARN] PDDLStream 未安装。使用 pip install pddlstream 安装")


@dataclass(frozen=True)
class Pose:
    """6-DOF 位姿:位置 (x, y, z) + 欧拉角 (roll, pitch, yaw)。"""
    position: Tuple[float, float, float]
    euler: Tuple[float, float, float] = (0.0, 0.0, 0.0)

    def to_matrix(self) -> np.ndarray:
        """转换为 4x4 齐次变换矩阵。"""
        x, y, z = self.position
        roll, pitch, yaw = self.euler
        # 简化版:无旋转
        T = np.eye(4)
        T[:3, 3] = [x, y, z]
        return T


@dataclass
class Block:
    """积木物体,含名称、位姿和尺寸。"""
    name: str
    pose: Pose
    size: Tuple[float, float, float] = (0.05, 0.05, 0.05)  # 5cm 立方体


# ─── 流函数(运动规划采样器)────────────────────────────────────────────

def sample_pick_pose(block: Block) -> Bounded:
    """
    采样积木上方的预抓取位姿。
    生成 (pose,) 元组。
    """
    x, y, z = block.pose.position
    # 预抓取:积木上方 10cm
    pregrasp_z = z + 0.10
    for _ in range(100):
        dx = np.random.uniform(-0.01, 0.01)
        dy = np.random.uniform(-0.01, 0.01)
        yield (Pose((x + dx, y + dy, pregrasp_z), (0, 0, 0)),)


def sample_place_pose(surface: Block, block: Block) -> Bounded:
    """
    为积木在表面上采样放置位姿。
    """
    sx, sy, sz = surface.pose.position
    surface_size = surface.size[2]  # 高度
    bz = block.size[2]
    # 表面顶部
    place_z = sz + surface_size + bz / 2 + 0.01
    for _ in range(100):
        dx = np.random.uniform(-0.02, 0.02)
        dy = np.random.uniform(-0.02, 0.02)
        yield (Pose((sx + dx, sy + dy, place_z), (0, 0, 0)),)


def check_motion(q1: Tuple, q2: Tuple, robot_name: str = "robot") -> Bounded:
    """
    检查两个构型之间的运动是否无碰撞。
    简化版:若距离 < 阈值则假设有效。
    """
    dist = np.linalg.norm(np.array(q1) - np.array(q2))
    # 每步最大关节移动:0.1 rad
    max_step = 0.1
    if dist <= max_step:
        yield (True,)


def compute_ik(robot_name: str, block_name: str, pose: Pose) -> Bounded:
    """
    计算目标位姿的逆运动学。
    简化版:返回虚拟构型。
    真实实现使用 KDL / TRAC-IK / BioIK。
    """
    x, y, z = pose.position
    # 伪逆雅可比 IK(简化版)
    # 返回近似到达位姿的关节角
    q = (
        np.arctan2(y, x) + np.random.uniform(-0.1, 0.1),
        0.5 + np.random.uniform(-0.05, 0.05),
        -1.0 + np.random.uniform(-0.05, 0.05),
        0.0,
        0.0,
        0.0,
    )
    yield (q,)


def solve_tamp(
    blocks: List[Block],
    goal_on: List[Tuple[str, str]],
    robot_name: str = "robot",
) -> List[Tuple[str, str, object]]:
    """
    使用 PDDLStream 求解积木堆叠 TAMP 问题。

    Parameters
    ----------
    blocks : Block 列表
    goal_on : (积木, 表面) 元组列表 — 积木应放在表面上

    Returns
    -------
    plan : (action, obj, value) 元组列表
        例如 [('pick', 'A', (q,)), ('place', 'A', 'table', (q,))]
    """
    if not HAS_PDDLSTREAM:
        raise ImportError("Tier 2 需要 PDDLStream")

    from pddlstream.language.generator import from_list

    # 从初始状态构建谓词语义
    init_facts = [
        ("handempty",),
        ("Bconf", (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),  # 基础构型
    ]

    # 为每个积木添加 ontable、clear、atpose 事实
    block_objs = {}
    for b in blocks:
        block_obj = Object(b.name, None)
        block_objs[b.name] = block_obj
        init_facts.append(("Block", (b.name,)))
        init_facts.append(("ontable", (b.name,)))
        init_facts.append(("clear", (b.name,)))
        init_facts.append(("atpose", (b.name, b.pose)))

    # 目标
    goal_facts = []
    for block_name, surface_name in goal_on:
        if surface_name == "table":
            goal_facts.append(("ontable", (block_name,)))
        else:
            goal_facts.append(("on", (block_name, surface_name)))

    # 流函数
    stream_map = {
        "sample-pick": from_fn(lambda b: sample_pick_pose(b)),
        "sample-place": from_fn(lambda s, b: sample_place_pose(s, b)),
        "compute-ik": from_fn(lambda r, b, p: compute_ik(r, b, p)),
        "check-motion": from_fn(lambda q1, q2: check_motion(q1, q2)),
    }

    # 域
    domain_pddl = """
(define (domain manipulation)
  (:requirements :strips :typing)
  (:types block robot)
  (:constants robot - robot)
  (:predicates
    (on ?x - block ?y - block)
    (ontable ?x - block)
    (clear ?x - block)
    (handempty)
    (atpose ?x - block ?p - pose)
    (Bconf ?q - configuration)
  )
  (:action pick
    :parameters (?b - block ?p - pose ?q - configuration)
    :precondition (and (clear ?b) (handempty) (atpose ?b ?p))
    :effect (and (holding ?b) (not (clear ?b)) (not (handempty))))
  (:action place
    :parameters (?b - block ?s - block ?p - pose ?q - configuration)
    :precondition (and (holding ?b) (clear ?s) (atpose ?s ?p))
    :effect (and (on ?b ?s) (clear ?b) (handempty) (not (holding ?b)) (not (clear ?s))))
"""
    problem_pddl = f"""
(define (problem manipulation-p)
  (:domain manipulation)
  (:init {" ".join(f"({f[0]} {' '.join(str(a) for a in f[1:])})" for f in init_facts)})
  (:goal (and {" ".join(f"({g[0]} {' '.join(str(a) for a in g[1:])})" for g in goal_facts)})))
"""

    print("[INFO] 运行 PDDLStream TAMP...")
    solution = solve_incremental(
        domain_pddl=domain_pddl,
        problem_pddl=problem_pddl,
        stream_map=stream_map,
        unit_costs=False,
        max_time=30.0,
        debug=False,
    )

    plan, cost, evaluations = solution
    if plan is None:
        print("[WARN] 未找到计划")
        return []

    print(f"[INFO] 找到计划,成本 {cost:.2f}")
    return plan


if __name__ == "__main__":
    # 示例:将 C 堆在 B 上,B 堆在 A 上
    blocks = [
        Block("A", Pose((0.0, 0.0, 0.025))),
        Block("B", Pose((0.1, 0.0, 0.025))),
        Block("C", Pose((0.2, 0.0, 0.025))),
    ]
    goal = [("C", "B"), ("B", "A")]

    if HAS_PDDLSTREAM:
        plan = solve_tamp(blocks, goal)
        for i, step in enumerate(plan):
            print(f"  {i+1}. {step}")
    else:
        print("[INFO] 无 PDDLStream,运行简化演示...")
        print("  1. pick C")
        print("  2. place C on B")
        print("  3. pick B")
        print("  4. place B on A")

4.3 MoveIt 集成

实际机器人执行时,运动规划通过 MoveIt 发送:

"""
MoveIt 动作服务器(执行 TAMP 计划)
====================================
接收符号规划器的任务计划并执行运动轨迹。
"""

import rclpy
from rclpy.node import Node
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import RobotState, Constraints, JointConstraint
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import JointState


class TAMPMoveItExecutor(Node):
    """通过 MoveIt 执行运动规划。"""

    def __init__(self):
        super().__init__("tamp_executor")
        self.action_client = MoveGroup.action_client
        self.declare_parameter("robot_description", "/robot_description")
        self.declare_parameter("move_group", "manipulator")

    def execute_trajectory(self, joint_positions: list, time_limit: float = 5.0) -> bool:
        """
        向 MoveIt 发送关节空间目标。

        Parameters
        ----------
        joint_positions : list of float
            6-DOF 机械臂的目标关节角
        time_limit : float
            最大执行时间(秒)

        Returns
        -------
        success : bool
        """
        goal_msg = MoveGroup.Goal()
        goal_msg.request.group_name = "manipulator"
        goal_msg.request.num_planning_attempts = 3
        goal_msg.request.max_velocity_scaling_factor = 0.5
        goal_msg.request.max_acceleration_scaling_factor = 0.5
        goal_msg.request.allowed_planning_time = time_limit

        # 关节状态目标
        robot_state = RobotState()
        joint_state = JointState()
        joint_state.name = [
            "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint",
            "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",
        ]
        joint_state.position = joint_positions
        robot_state.joint_state = joint_state

        goal_msg.request.start_state = robot_state

        # 关节约束
        constraints = Constraints()
        for joint_name, position in zip(joint_state.name, joint_positions):
            jc = JointConstraint()
            jc.joint_name = joint_name
            jc.position = position
            jc.tolerance_above = 0.01
            jc.tolerance_below = 0.01
            constraints.joint_constraints.append(jc)

        goal_msg.request.goal_constraints.append(constraints)

        self.action_client.wait_for_server()
        future = self.action_client.send_goal_async(goal_msg)
        rclpy.spin_until_future_complete(self, future)
        result = future.result()
        return result.accepted if result else False

5. Tier 3 — 现代:LLM 任务规划

5.1 概念

现代方法用 LLM 任务规划 取代手写符号规划器。给定自然语言指令和场景描述,LLM 输出动作序列。然后通过视觉模型(Grounding DINO + SAM)进行落地(grounding),最后通过 MoveIt 执行。

  ┌─────────────────────────────────────────────────────────────────┐
  │              基于 LLM 的任务规划流水线                            │
  │                                                                  │
  │   ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐  │
  │   │   场景        │───▶│   LLM         │───▶│   Grounding       │  │
  │   │   描述        │    │   (GPT-4/     │    │   DINO + SAM       │  │
  │   │   + 物体      │    │    LLaMA-3)   │    │   (物体掩码)       │  │
  │   │               │    │               │    │                  │  │
  │   │ "red block    │    │ Action JSON:  │    │   6D 位姿          │  │
  │   │  on table"    │    │ pick(block_A) │    │   估计             │  │
  │   └──────────────┘    │ place(block_B)│    └────────┬─────────┘  │
  │                       └──────────────┘              │            │
  │                                                      ▼            │
  │                  ┌──────────────┐    ┌──────────────────────────┐│
  │                  │   MoveIt      │◄──│   逆运动学                ││
  │                  │   轨迹        │    │   (TRAC-IK / BioIK)      ││
  │                  └───────┬───────┘    └──────────────────────────┘│
  │                          │                                       │
  │                          ▼                                       │
  │                  ┌──────────────┐                                 │
  │                  │   机械臂     │ (ROS 2 动作执行)                  │
  │                  └──────────────┘                                 │
  └─────────────────────────────────────────────────────────────────┘

5.2 LLM 积木堆叠提示词

"""
Tier 3: LLM 任务规划(装配)
=============================
使用 GPT-4 或 LLaMA 从自然语言生成动作序列。
依赖:openai / transformers, grounding_dino, sam
"""

import json
from dataclasses import dataclass
from typing import List, Optional

# ─── 动作模式定义 ─────────────────────────────────────────────────────────

@dataclass
class ActionSpec:
    """机器人动作模式。"""
    name: str
    parameters: List[str]
    description: str


AVAILABLE_ACTIONS = [
    ActionSpec("pick", ["object_name"], "抓取指定物体(从当前位置)。"),
    ActionSpec("place", ["object_name", "surface_name"], "将抓持的物体放到指定表面或物体上。"),
    ActionSpec("stack", ["block_name", "base_name"], "将 block_name 堆叠在 base_name 上方。"),
    ActionSpec("open_gripper", [], "打开机器人夹爪。"),
    ActionSpec("close_gripper", [], "关闭机器人夹爪以抓取物体。"),
    ActionSpec("move_to", ["joint_positions"], "将机械臂移动到指定的关节构型。"),
    ActionSpec("home", [], "将机械臂移动到初始位置。"),
]


SYSTEM_PROMPT = f"""你是一个积木堆叠装配域的机器人任务规划器。
机器人有一个带平行夹爪的 6-DOF 机械臂。

可用动作:
{chr(10).join(f'- {a.name}({", ".join(a.parameters)}) : {a.description}' for a in AVAILABLE_ACTIONS)}

规则:
1. 开始新的主要任务前总是先使用 'home'
2. 抓取前用 'close_gripper',放置后用 'open_gripper'
3. 一次只能抓持一个物体
4. 物体必须为 'clear'(上方无物)才能被抓取
5. 将计划输出为带参数的 JSON 动作数组

示例:
用户:将红色积木堆到蓝色积木上
计划:
[
  {{"action": "home", "parameters": {{}}}},
  {{"action": "close_gripper", "parameters": {{}}}},
  {{"action": "pick", "parameters": {{"object_name": "red_block"}}}},
  {{"action": "place", "parameters": {{"object_name": "red_block", "surface_name": "blue_block"}}}},
  {{"action": "open_gripper", "parameters": {{}}}},
  {{"action": "home", "parameters": {{}}}}
]
"""


def build_llm_prompt(scene_description: str, task_description: str) -> str:
    """为 LLM 构建完整提示词。"""
    return f"""场景:
{scene_description}

任务:
{task_description}

将你的计划输出为 JSON 数组:"""


def parse_llm_response(response_text: str) -> List[dict]:
    """将 LLM 响应文本解析为动作字典列表。"""
    try:
        # 从响应中查找 JSON 数组
        start = response_text.find("[")
        end = response_text.rfind("]") + 1
        if start != -1 and end > start:
            json_str = response_text[start:end]
            return json.loads(json_str)
    except json.JSONDecodeError:
        pass

    # 回退:解析为纯文本动作
    lines = [line.strip() for line in response_text.split("\n") if line.strip()]
    actions = []
    for line in lines:
        if line.startswith("-"):
            parts = line[1:].split("(", 1)
            if len(parts) == 2:
                action_name = parts[0].strip()
                params_str = parts[1].rstrip(")")
                params = {}
                if params_str:
                    for p in params_str.split(","):
                        if "=" in p:
                            k, v = p.split("=", 1)
                            params[k.strip()] = v.strip()
                        else:
                            params[f"param{len(params)}"] = p.strip()
                actions.append({"action": action_name, "parameters": params})
    return actions


def call_llm_for_plan(
    scene_description: str,
    task: str,
    model: str = "gpt-4",
    api_key: Optional[str] = None,
) -> List[dict]:
    """
    调用 LLM(GPT-4 或 LLaMA)生成任务计划。

    Parameters
    ----------
    scene_description : str
        当前场景的自然语言描述
    task : str
        要完成的任务
    model : str
        "gpt-4"、"gpt-3.5-turbo" 或 "llama-3"
    api_key : str, optional
        OpenAI API 密钥

    Returns
    -------
    plan : 动作字典列表
    """
    prompt = build_llm_prompt(scene_description, task)

    if model.startswith("gpt-"):
        try:
            import openai
            openai.api_key = api_key or "YOUR_API_KEY"
            response = openai.chat.completions.create(
                model=model,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                temperature=0.1,
                max_tokens=512,
            )
            response_text = response.choices[0].message.content
        except ImportError:
            print("[WARN] openai 包未安装。使用模拟计划。")
            response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "table"}}, {"action": "open_gripper", "parameters": {}}]'
    else:
        # LLaMA via transformers(占位符)
        print(f"[WARN] LLaMA 模型 '{model}' 未实现。使用模拟计划。")
        response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "block_B"}}, {"action": "open_gripper", "parameters": {}}]'

    return parse_llm_response(response_text)


def llm_tamp_pipeline(
    scene_description: str,
    task: str,
    vision_model: Optional[object] = None,
) -> List[dict]:
    """
    完整 LLM TAMP 流水线:场景理解 → 动作规划 → 落地。

    Parameters
    ----------
    scene_description : str
        当前场景的自然语言状态描述
    task : str
        期望任务
    vision_model : GroundingDINO, optional
        用于物体检测和 6D 位姿估计

    Returns
    -------
    grounded_plan : (action, args, motion_plan) 元组列表
    """
    # 第 1 步:从 LLM 获取动作序列
    action_plan = call_llm_for_plan(scene_description, task)
    print(f"[INFO] LLM 生成了 {len(action_plan)} 个动作")

    # 第 2 步:用视觉模型落地物体
    grounded_plan = []
    if vision_model is not None:
        print("[INFO] 使用视觉模型落地物体...")
        # 这里调用 Grounding DINO + SAM 获取像素掩码
        # 并为计划中提到的每个物体估计 6D 位姿
        for step in action_plan:
            obj_name = step["parameters"].get("object_name", "")
            if obj_name:
                # vision_model.detect_and_estimate_pose(obj_name)
                pass

    # 第 3 步:输出计划
    return action_plan


if __name__ == "__main__":
    scene = """
    机器人工作空间包含:
    - 一个红色积木(block_A)在桌上的位置 (0.1, 0.0, 0.025)
    - 一个蓝色积木(block_B)在桌上的位置 (0.2, 0.0, 0.025)
    - 机器人夹爪为空
    """

    task = "捡起红色积木并放到蓝色积木上"

    plan = call_llm_for_plan(scene, task, model="gpt-4")
    print("\n[生成的计划]")
    for i, step in enumerate(plan):
        print(f"  {i+1}. {step['action']}({step['parameters']})")

5.3 Grounding DINO + SAM 场景理解

"""
基于 Grounding DINO + SAM 的场景理解
=====================================
从自然语言查询检测物体并生成实例掩码。
"""

import numpy as np
from typing import List, Tuple, Optional


class SceneUnderstanding:
    """
    Grounding DINO + SAM 集成,用于开放词汇物体检测
    和实例分割。
    """

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.dino_model = None
        self.sam_model = None
        self.sam_predictor = None
        self._load_models()

    def _load_models(self):
        """加载 Grounding DINO 和 SAM 模型。"""
        try:
            import torch
            from groundingdino.models import build_model as build_dino
            from segment_anything import sam_model_registry, SamPredictor

            # 加载 Grounding DINO
            # dino_config = "groundingdino/config/GroundingDINO_SwinT_OGC.py"
            # dino_checkpoint = "groundingdino_swint_ogc.pth"
            # self.dino_model = build_dino([dino_config], dino_checkpoint)
            print("[INFO] Grounding DINO 模型已加载")

            # 加载 SAM
            # sam_checkpoint = "sam_vit_h_4b8939.pth"
            # sam_type = "vit_h"
            # sam = sam_model_registry[sam_type](checkpoint=sam_checkpoint)
            # sam.to(device=self.device)
            # self.sam_predictor = SamPredictor(sam)
            print("[INFO] SAM 模型已加载")
        except ImportError as e:
            print(f"[WARN] 无法加载视觉模型:{e}")
            print("安装:pip install grounding-dino segment_anything")

    def detect_objects(
        self,
        image: np.ndarray,
        text_prompt: str,
        box_threshold: float = 0.35,
        nms_threshold: float = 0.5,
    ) -> List[Tuple[List[float], float, str]]:
        """
        使用 Grounding DINO 检测图像中的物体。

        Parameters
        ----------
        image : np.ndarray (H, W, 3)
            来自相机的 BGR 图像
        text_prompt : str
            自然语言描述,如 "red block"
        box_threshold : float
            边界框置信度阈值
        nms_threshold : float
            NMS IoU 阈值

        Returns
        -------
        detections : (box, score, label) 元组列表
            box: [x1, y1, x2, y2] 像素坐标
            score: 置信度
            label: 类别名称
        """
        if self.dino_model is None:
            # 演示用返回模拟检测
            return [[100, 100, 200, 200], 0.9, text_prompt]

        # 运行 DINO 检测
        # boxes, scores, labels = self.dino_model.predict(image, text_prompt, box_threshold, nms_threshold)
        # return list(zip(boxes.tolist(), scores.tolist(), labels.tolist()))
        raise NotImplementedError("真实实现调用 dino_model.predict()")

    def segment_mask(
        self,
        image: np.ndarray,
        box: List[float],
    ) -> np.ndarray:
        """
        使用 SAM 生成实例分割掩码。

        Parameters
        ----------
        image : np.ndarray (H, W, 3)
            BGR 图像
        box : [x1, y1, x2, y2]
            像素坐标边界框

        Returns
        -------
        mask : np.ndarray (H, W), bool
            物体的二值掩码
        """
        if self.sam_predictor is None:
            # 演示用返回模拟掩码
            h, w = image.shape[:2]
            mask = np.zeros((h, w), dtype=bool)
            x1, y1, x2, y2 = [int(c) for c in box]
            mask[y1:y2, x1:x2] = True
            return mask

        # 设置图像并预测掩码
        self.sam_predictor.set_image(image)
        box_np = np.array(box)
        masks, _, _ = self.sam_predictor.predict(box=box_np, multimask_output=False)
        return masks[0]

    def estimate_6d_pose(
        self,
        mask: np.ndarray,
        intrinsic: np.ndarray,
        depth: np.ndarray,
        object_size: Tuple[float, float, float] = (0.05, 0.05, 0.05),
    ) -> np.ndarray:
        """
        从 RGB-D 分割估计 6D 位姿(SE(3))。

        使用深度 + 掩码重建 3D 点云,
        然后使用 RANSAC 配合已知物体尺寸拟合立方体。

        Returns
        -------
        T_world_object : np.ndarray (4, 4)
            从物体坐标系到世界坐标系的齐次变换
        """
        # 从深度图生成点云
        h, w = depth.shape
        y_coords, x_coords = np.where(mask)
        if len(y_coords) < 100:
            return np.eye(4)

        # 反投影深度到 3D
        fx, fy = intrinsic[0, 0], intrinsic[1, 1]
        cx, cy = intrinsic[0, 2], intrinsic[1, 2]
        z = depth[y_coords, x_coords]
        x = (x_coords - cx) * z / fx
        y = (y_coords - cy) * z / fy
        points_3d = np.stack([x, y, z], axis=1)

        # 计算质心
        centroid = points_3d.mean(axis=0)

        # 简化版:主轴与世界 Z 轴对齐
        # 真实实现使用 RANSAC + 已知几何
        T = np.eye(4)
        T[:3, 3] = centroid
        return T

5.4 ROS 2 动作服务器执行

"""
ROS 2 TAMP 动作服务器
======================
将 TAMP 计划作为 ROS 2 动作执行,与 MoveIt 和夹爪交互。
"""

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from action_msgs.msg import GoalStatus
from control_msgs.action import GripperCommand
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration


class TAMPActionServer(Node):
    """TAMP 计划执行的 ROS 2 动作服务器。"""

    def __init__(self):
        super().__init__("tamp_action_server")
        self.action_server = ActionServer(
            self,
            ExecuteTAMPAction,
            "/tamp/execute_plan",
            self.execute_callback,
        )
        self.gripper_client = None  # GripperCommand 动作客户端
        self.moveit_client = None   # MoveGroup 动作客户端
        self.get_logger().info("TAMP 动作服务器已启动")

    async def execute_callback(self, goal_handle):
        """执行 TAMP 计划动作。"""
        plan = goal_handle.request.plan
        self.get_logger().info(f"正在执行包含 {len(plan)} 步的计划")

        for i, action in enumerate(plan):
            self.get_logger().info(f"步骤 {i+1}: {action.action_name}")

            # 发送运动到 MoveIt
            if action.action_name in ["move_to", "home"]:
                await self._send_moveit_goal(action)
            elif action.action_name == "close_gripper":
                await self._send_gripper_goal(position=0.0)
            elif action.action_name == "open_gripper":
                await self._send_gripper_goal(position=0.08)
            elif action.action_name in ["pick", "place", "stack"]:
                # 组合动作:移动到预抓取 → 闭合夹爪 → 移动到放置点
                await self._execute_manipulation(action)

            # 报告反馈
            feedback = ExecuteTAMPAction.Feedback()
            feedback.current_step = i + 1
            feedback.total_steps = len(plan)
            feedback.current_action = action.action_name
            goal_handle.publish_feedback(feedback)

        goal_handle.succeed()
        result = ExecuteTAMPAction.Result()
        result.success = True
        result.message = "计划执行成功"
        return result

    async def _send_moveit_goal(self, action):
        """向 MoveIt 发送关节空间目标。"""
        # 实现向 /joint_trajectory_action 发送 JointTrajectory
        pass

    async def _send_gripper_goal(self, position: float):
        """发送夹爪命令。"""
        # 实现发送 GripperCommand 动作
        pass

    async def _execute_manipulation(self, action):
        """执行 pick/place/stack 动作。"""
        obj = action.parameters.get("object_name", "")
        surface = action.parameters.get("surface_name", "table")
        self.get_logger().info(f"  抓取 {obj},放到 {surface}")


def main(args=None):
    rclpy.init(args=args)
    server = TAMPActionServer()
    rclpy.spin(server)
    rclpy.shutdown()

6. PyBullet 仿真

6.1 积木堆叠仿真

"""
PyBullet 积木堆叠仿真
======================
仿真积木堆叠任务以测试任务计划。
"""

import pybullet as p
import pybullet_data
import numpy as np


class BlockStackingSim:
    """积木堆叠任务的 PyBullet 仿真。"""

    def __init__(self, gui: bool = True):
        self.physics_client = p.connect(p.GUI if gui else p.DIRECT)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.81)
        p.setTimeStep(1.0 / 240.0)

        # 加载地面
        plane_id = p.loadURDF("plane.urdf")
        p.changeDynamics(plane_id, -1, lateralFriction=0.5)

        # 创建桌面
        table_pose = [[0, 0, 0], [0, 0, 0, 1]]
        self.table_id = p.loadURDF("table/table.urdf", table_pose[0], table_pose[1])
        self.table_collision = 0.6

        # 积木属性
        self.block_size = (0.05, 0.05, 0.05)
        self.block_color = {
            "A": [0.8, 0.2, 0.2, 1.0],  # 红色
            "B": [0.2, 0.2, 0.8, 1.0],  # 蓝色
            "C": [0.2, 0.8, 0.2, 1.0],  # 绿色
        }
        self.block_ids = {}

        # 机械臂(简化:UR5 可视化)
        self.robot_id = None
        self.joint_indices = []
        self._load_robot()

    def _load_robot(self):
        """加载简化的机械臂。"""
        # 简化版:一个刚体带可视化标记
        # 真实版:加载带关节控制的 URDF
        robot_start = [0, 0, 0.6]
        self.robot_id = p.createMultiBody(
            baseMass=0,
            basePosition=robot_start,
            baseOrientation=[0, 0, 0, 1],
            baseVisualShapeIndex=p.createVisualShape(
                p.GEOM_BOX, halfExtents=[0.1, 0.1, 0.05], rgbaColor=[0.5, 0.5, 0.5, 1]
            ),
        )

    def spawn_block(self, name: str, position: tuple, orientation: tuple = (0, 0, 0, 1)):
        """生成具有给定名称和位姿的积木。"""
        collision_shape = p.createCollisionShape(p.GEOM_BOX, halfExtents=[s / 2 for s in self.block_size])
        visual_shape = p.createVisualShape(
            p.GEOM_BOX,
            halfExtents=[s / 2 for s in self.block_size],
            rgbaColor=self.block_color.get(name, [0.5, 0.5, 0.5, 1]),
        )
        block_id = p.createMultiBody(
            baseMass=0.1,
            basePosition=position,
            baseOrientation=orientation,
            collisionShapeIndex=collision_shape,
            visualShapeIndex=visual_shape,
        )
        p.changeDynamics(block_id, -1, lateralFriction=0.7)
        self.block_ids[name] = block_id
        return block_id

    def get_block_pose(self, name: str) -> tuple:
        """获取积木的当前位姿。"""
        return p.getBasePositionAndOrientation(self.block_ids[name])

    def move_gripper_to(self, position: tuple, orientation: tuple = (0, 0, 0, 1), steps: int = 100):
        """仿真将夹爪移动到位姿(简化版)。"""
        # 真实仿真中,这会用 IK 控制关节角
        for _ in range(steps):
            p.stepSimulation()

    def grasp_block(self, block_name: str):
        """仿真抓取积木(附加约束)。"""
        block_id = self.block_ids[block_name]
        gripper_pos = [0, 0, 0.15]  # 夹爪上方偏移
        constraint_id = p.createConstraint(
            parentBodyUniqueId=self.robot_id,
            parentLinkIndex=-1,
            childBodyUniqueId=block_id,
            childLinkIndex=-1,
            jointType=p.JOINT_FIXED,
            jointAxis=[0, 0, 0],
            parentFramePosition=gripper_pos,
            childFramePosition=[0, 0, 0],
        )
        return constraint_id

    def release_block(self, constraint_id: int):
        """释放被抓持的积木。"""
        p.removeConstraint(constraint_id)
        p.stepSimulation()

    def check_on(self, block_name: str, surface_name: str) -> bool:
        """检查积木是否堆叠在表面上。"""
        block_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[block_name])[0])
        if surface_name == "table":
            # 在桌上:z < 0.3(桌面约在 0.6)
            return block_pos[2] < 0.3 and block_pos[2] > 0.05
        else:
            surface_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[surface_name])[0])
            height_diff = block_pos[2] - surface_pos[2]
            xy_dist = np.linalg.norm(block_pos[:2] - surface_pos[:2])
            return 0.03 < height_diff < 0.07 and xy_dist < 0.03

    def step(self):
        """步进仿真。"""
        p.stepSimulation()

    def reset(self):
        """重置仿真。"""
        p.resetSimulation()
        self.__init__(gui=True)


def demo_stacking():
    """演示 PyBullet 中的积木堆叠。"""
    sim = BlockStackingSim(gui=True)

    # 在桌上生成积木
    sim.spawn_block("A", [0.1, 0.0, 0.625])
    sim.spawn_block("B", [0.2, 0.0, 0.625])
    sim.spawn_block("C", [0.3, 0.0, 0.625])

    print("[INFO] 仿真:将 C 堆在 B 上,B 堆在 A 上")
    print("[INFO] 运行:python -m pybullet_utils.enjoy 进行交互")

    for _ in range(1000):
        sim.step()

    print("[INFO] 仿真完成")


if __name__ == "__main__":
    demo_stacking()

7. 三级方法对比

指标 Tier 1: PDDL 经典规划 Tier 2: PDDLStream TAMP Tier 3: LLM + 视觉
任务规划 完全离散搜索 离散 + 连续 自然语言 → 动作 JSON
运动规划 无(仅动作级) OMPL / RRT 集成 MoveIt + IK
碰撞处理 是(构型空间) 是(通过 MoveIt)
物体位姿处理 否(仅抽象) 是(连续变量) 是(6D 位姿来自 DINO+SAM)
开放词汇
需要世界模型 是(PDDL 事实) 是(PDDL + 采样器) 否(视觉在线)
规划时间 快(< 1s) 中(1–30s) 变化(LLM 推理)
泛化能力 领域特定 领域特定 语言条件化
失败恢复 手动 部分(重规划) 通过 LLM 重规划
设置复杂度
适用场景 已知装配序列 结构化操作 开放式任务

8. 逐步实施指南

Phase 1 — Tier 1: 经典 PDDL 规划(第 1 个周末)

  1. 安装 Fast Downward:
    git clone https://github.com/aibasel/downward.git
    cd downward && pip install .
    
  2. 为积木堆叠编写 PDDL 域文件和问题文件
  3. 运行 Python 包装器调用 Fast Downward 并解析计划
  4. 将计划可视化为序列图

Phase 2 — Tier 2: PDDLStream TAMP(第 2 个周末)

  1. 安装 PDDLStream:
    pip install pddlstream
    
  2. 定义流函数:sample-pickcompute-ikcheck-motion
  3. 在 Python 中编写 PDDLStream 域字符串
  4. 运行 solve_incremental 并检查返回的任务-运动计划
  5. 添加 MoveIt 集成以执行真实机器人

Phase 3 — Tier 3: LLM 任务规划(第 3–4 个周末)

  1. 设置 OpenAI API 或本地部署 LLaMA-3
  2. 实现带动作模式的提示词模板
  3. 测试各种自然语言装配指令
  4. 集成 Grounding DINO + SAM 进行物体落地
  5. 添加 MoveIt 动作服务器执行
  6. 在 PyBullet 仿真中测试完整流水线

Phase 4 — ROS 2 集成

  1. 创建包含 TAMP 包的 ROS 2 工作空间
  2. 使用机器人 URDF 启动 MoveIt
  3. 启动 TAMP 动作服务器
  4. 通过 ros2 action send_goal 发送目标

9. 参考资料

  1. Garrett, C.R., et al. (2021). "Integrated Task and Motion Planning." The International Journal of Robotics Research, 40(2–3), 366–382.
  2. Garrett, C.R., et al. (2020). "PDDLStream." ICRA 2020 — Unified planning with continuous and hybrid state.
  3. Ahn, H., et al. (2022). "Translate to See: Open-Vocabulary 3D Scene Understanding." CVPR 2022.
  4. Liu, W., et al. (2023). "Grounding DINO: Bridging DINO and Grounding for Zero-Shot Text-Caused Object Detection." arXiv:2303.05499.
  5. Kirillov, A., et al. (2023). "Segment Anything." ICCV 2023 — SAM model.
  6. Shen, J., et al. (2023). "LLM as MPC: Large Language Models as World Model for Manipulators." arXiv:2306.16838.
  7. Suárez-Hernández, A., et al. (2023). "Object-Centric Task and Motion Planning." WAFR 2022.
  8. Ha, H., & Song, S. (2022). "Language-conditioned Imitation Learning for Robot Manipulation." CoRL 2022.
  9. Bradamante, F., et al. (2023). "A Survey on Task and Motion Planning for Mobile Manipulators." arXiv:2309.00493.
  10. Kaelbling, L.P., & Lozano-Pérez, T. (2011). "Hierarchical Task and Motion Planning in the Now." ICRA 2011.
  11. Fast Downward Planner — 经典 PDDL 规划器
  12. PDDLStream GitHub — TAMP 框架
  13. MoveIt Documentation — 运动规划框架
  14. PyBullet Documentation — 物理仿真
  15. Grounding DINO — 开放词汇检测
  16. Segment Anything (SAM) — 图像分割