Skip to content

Object Assembly with TAMP (任务与运动规划)

Project Type: Manipulation | Difficulty: ★★★☆☆ to ★★★★★ (approach-dependent) | Estimated Time: 2–4 weekends


1. Project Overview

TAMP (Task and Motion Planning) bridges the gap between symbolic task-level reasoning and continuous motion planning. In assembly tasks, a robot must reason about which objects to manipulate, in what order, and how to physically move its arm to achieve each sub-goal — all while avoiding collisions.

This project teaches TAMP for block stacking assembly across three algorithm tiers:

  ┌─────────────────────────────────────────────────────────────────┐
  │                    TAMP Pipeline Overview                        │
  │                                                                  │
  │   ┌─────────────┐    ┌────────────────┐    ┌──────────────┐    │
  │   │   Scene      │───▶│  Task Planner  │───▶│  Motion      │    │
  │   │   State      │    │  (PDDL/LLM)    │    │  Planner      │    │
  │   │              │    │                │    │  (RRT/MoveIt) │    │
  │   │  on(A,B)     │    │  pick(A)       │    │  joint_traj   │    │
  │   │  on table    │    │  place(A,B)   │    │  (collision-  │    │
  │   │              │    │  stack(A,B)   │    │   free)        │    │
  │   └─────────────┘    └────────────────┘    └──────────────┘    │
  │         ▲                   ▲                      ▲             │
  │         │                   │                      │             │
  │         └───────────────────┴──────────────────────┘             │
  │                    Execution Feedback Loop                       │
  └─────────────────────────────────────────────────────────────────┘
Tier Approach Task Planning Motion Planning Sample Application
Traditional Classical Planning PDDL (Fast Downward / POPF) None (discrete actions only) Block stacking task sequences
Intermediate TAMP PDDLStream OMPL / MoveIt Continuous pick-place with collision-free paths
Modern LLM Task Planning GPT-4 / LLaMA + prompts Grounding DINO + SAM + MoveIt Open-vocabulary assembly from natural language

2. Hardware & Software Requirements

Hardware

Component Specification Notes
Robot arm 6-DOF manipulator (e.g., UR5, Franka Emika Panda) Or simulated in PyBullet / Isaac Sim
Gripper Parallel-jaw or vacuum gripper Suction cup for flat objects
Camera Intel RealSense D435 / Azure Kinect RGB-D for scene understanding
Workspace Tabletop (0.5m × 0.5m minimum) Objects: wooden blocks, pegs, nuts
PC Intel i7 / RTX 3060 or better For LLM inference in Tier 3

Software

Package Version Purpose
Python ≥ 3.8 Core language
ROS 2 Humble / Iron Robot control, action servers
MoveIt 2 Humble Motion planning framework
PyBullet ≥ 3.0 Physics simulation
PDDLStream Latest TAMP integration
Fast Downward Latest Classical task planning
OpenCV ≥ 4.5 Vision preprocessing
NumPy ≥ 1.20 Numerical computation
transformers ≥ 4.30 LLM inference (Tier 3)
pip install pybullet opencv-python numpy moveit-commander transformers torch pillow

3. Tier 1 — Traditional: PDDL-Based Classical Planning

3.1 Concept

PDDL (Planning Domain Definition Language) is the standard language for describing planning problems. A PDDL problem consists of:

  • Domain file: defines types, predicates, and actions
  • Problem file: defines objects, initial state, and goal state

The Fast Downward planner solves the problem by searching in the space of grounded action sequences.

3.2 Block Stacking Domain

For block stacking, the key predicates are:

  • on(X, Y) — block X is on block Y (or table)
  • clear(X) — nothing is on top of block X
  • holding(X) — robot is currently holding block X
  • handempty — robot gripper is not holding anything

Actions: - pick(X) — grasp block X from the table (requires clear(X) and handempty) - place(X, Y) — place block X on block Y or table (requires holding(X) and clear(Y)) - stack(X, Y) — stack block X onto block Y (same as place, but Y is a block)

3.3 PDDL Domain File

;; domain_blocks.pddl
(define (domain blocks)
  (:requirements :strips :typing)
  (:types block)

  (:predicates
    (on ?x - block ?y - block)
    (ontable ?x - block)
    (clear ?x - block)
    (holding ?x - block)
    (handempty))

  ;; Pick up a block from the table
  (:action pick
    :parameters (?x - block)
    :precondition (and (clear ?x) (ontable ?x) (handempty))
    :effect (and (holding ?x)
                 (not (ontable ?x))
                 (not (clear ?x))
                 (not (handempty))))

  ;; Put a block down on the table
  (:action putdown
    :parameters (?x - block)
    :precondition (holding ?x)
    :effect (and (ontable ?x)
                 (clear ?x)
                 (handempty)
                 (not (holding ?x))))

  ;; Stack a block onto another block
  (:action stack
    :parameters (?x - block ?y - block)
    :precondition (and (holding ?x) (clear ?y))
    :effect (and (on ?x ?y)
                 (clear ?x)
                 (handempty)
                 (not (holding ?x))
                 (not (clear ?y))))

  ;; Unstack a block from on top of another block
  (:action unstack
    :parameters (?x - block ?y - block)
    :precondition (and (on ?x ?y) (clear ?x) (handempty))
    :effect (and (holding ?x)
                 (clear ?y)
                 (not (on ?x ?y))
                 (not (clear ?x))
                 (not (handempty))))
)

3.4 PDDL Problem File

;; problem_blocks.pddl
(define (problem blocks-tower)
  (:domain blocks)
  (:objects A B C - block)
  (:init
    (ontable A)
    (ontable B)
    (ontable C)
    (clear A)
    (clear B)
    (clear C)
    (handempty))
  (:goal (and (on C B) (on B A))))
  ;; Goal: stack C on B, and B on A (tower of height 3)
)

3.5 Python — Call Fast Downward from Python

"""
Tier 1: Classical PDDL Planning with Fast Downward
==================================================
Calls the Fast Downward planner via subprocess and parses the plan.
Requirements: fast-downward (https://www.fast-downward.org/)
"""

import subprocess
import os
import re
import tempfile

DOMAIN_TEMPLATE = """(define (domain blocks)
  (:requirements :strips :typing)
  (:types block)
  (:predicates
    (on ?x - block ?y - block)
    (ontable ?x - block)
    (clear ?x - block)
    (holding ?x - block)
    (handempty))
  (:action pick
    :parameters (?x - block)
    :precondition (and (clear ?x) (ontable ?x) (handempty))
    :effect (and (holding ?x) (not (ontable ?x)) (not (clear ?x)) (not (handempty))))
  (:action putdown
    :parameters (?x - block)
    :precondition (holding ?x)
    :effect (and (ontable ?x) (clear ?x) (handempty) (not (holding ?x))))
  (:action stack
    :parameters (?x - block ?y - block)
    :precondition (and (holding ?x) (clear ?y))
    :effect (and (on ?x ?y) (clear ?x) (handempty) (not (holding ?x)) (not (clear ?y))))
  (:action unstack
    :parameters (?x - block ?y - block)
    :precondition (and (on ?x ?y) (clear ?x) (handempty))
    :effect (and (holding ?x) (clear ?y) (not (on ?x ?y)) (not (clear ?x)) (not (handempty))))
)"""

PROBLEM_TEMPLATE = """(define (problem {name})
  (:domain blocks)
  (:objects {objects})
  (:init {init})
  (:goal (and {goal})))
"""


def generate_problem(block_names: list, init_clauses: list, goal_clauses: list) -> str:
    """Generate a PDDL problem file from structured data."""
    objects = " ".join(block_names)
    init_str = " ".join(f"({c})" if " " not in c else c for c in init_clauses)
    goal_str = " ".join(f"({c})" if " " not in c else c for c in goal_clauses)
    return PROBLEM_TEMPLATE.format(
        name="blocks_problem",
        objects=objects,
        init=init_str,
        goal=goal_str,
    )


def call_fast_downward(domain_path: str, problem_path: str, plan_output_path: str):
    """
    Call Fast Downward planner.
    Returns the plan as a list of action strings.
    """
    cmd = [
        "fast-downward",
        "--alias", "seq-opt-lmcut",  # Optimal solver with landmarks
        "--plan-file", plan_output_path,
        problem_path,
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"[STDERR]\n{result.stderr}")
        raise RuntimeError(f"Fast Downward failed with code {result.returncode}")


def parse_plan(plan_path: str) -> list:
    """Parse Fast Downward plan output into a list of actions."""
    with open(plan_path, "r") as f:
        content = f.read()
    # Parse lines like: 0: (pick A) [1]
    pattern = r"\d+:\s*\(([a-z_]+)\s+([A-Z])\)\s*\[(\d+)\]"
    actions = re.findall(pattern, content)
    return [(action, obj) for action, obj, cost in actions]


def solve_blocks_stacking(initial_config: dict, goal_config: dict) -> list:
    """
    Solve a block stacking problem using Fast Downward.

    Parameters
    ----------
    initial_config : dict
        e.g., {"on": [], "ontable": ["A", "B", "C"], "clear": ["A", "B", "C"]}
    goal_config : dict
        e.g., {"on": [("C", "B"), ("B", "A")]}

    Returns
    -------
    plan : list of (action_name, object) tuples
    """
    blocks = list(set(
        [b for pair in initial_config.get("on", []) for b in pair]
        + initial_config.get("ontable", [])
    ))

    init_clauses = []
    init_clauses.extend(f"ontable {b}" for b in initial_config.get("ontable", []))
    init_clauses.extend(f"on {x} {y}" for x, y in initial_config.get("on", []))
    init_clauses.extend(f"clear {b}" for b in initial_config.get("clear", []))
    init_clauses.append("handempty")

    goal_clauses = []
    goal_clauses.extend(f"on {x} {y}" for x, y in goal_config.get("on", []))

    domain_str = DOMAIN_TEMPLATE
    problem_str = generate_problem(blocks, init_clauses, goal_clauses)

    with tempfile.TemporaryDirectory() as tmpdir:
        domain_path = os.path.join(tmpdir, "domain.pddl")
        problem_path = os.path.join(tmpdir, "problem.pddl")
        plan_path = os.path.join(tmpdir, "sas_plan")

        with open(domain_path, "w") as f:
            f.write(domain_str)
        with open(problem_path, "w") as f:
            f.write(problem_str)

        call_fast_downward(domain_path, problem_path, plan_path)
        plan = parse_plan(plan_path)

    return plan


if __name__ == "__main__":
    # Example: Stack C on B, B on A (tower of 3)
    initial = {
        "ontable": ["A", "B", "C"],
        "on": [],
        "clear": ["A", "B", "C"],
    }
    goal = {
        "on": [("C", "B"), ("B", "A")],
    }

    plan = solve_blocks_stacking(initial, goal)
    print("[Plan]")
    for i, (action, obj) in enumerate(plan):
        print(f"  {i+1}. {action} {obj}")

4. Tier 2 — Intermediate: PDDLStream TAMP

4.1 Concept

PDDLStream extends classical planning with continuous variables and samplers. This allows the planner to reason about:

  • Object poses (continuous \(\mathbb{R}^7\) for position + orientation)
  • Collision-free configurations (sampled from configuration space)
  • Integrated task + motion plans
  ┌──────────────────────────────────────────────────────────────┐
  │                   PDDLStream Architecture                    │
  │                                                               │
  │   Symbolic      Continuous          Solutions                 │
  │   State         Variables                                    │
  │   ────────      ───────────         ──────────                │
  │   on(A,table)   [x,y,z,rx,ry,rz,rw]  ✓ poses sampled         │
  │   clear(B)      [q0,q1,q2,q3,q4,q5]  ✓ IK solutions           │
  │                                                               │
  │   ┌────────────────┐    ┌──────────────────────┐              │
  │   │  Symbolic      │───▶│  Stream Functions    │              │
  │   │  Planner        │    │  sample_ik(),        │              │
  │   │  (BestFirst)    │    │  check_collision()   │              │
  │   └────────────────┘    └──────────────────────┘              │
  │         ▲                        │                             │
  │         │          ┌─────────────┘                             │
  │         └──────────┘                                            │
  │         Feedback: retry if no motion plan found                │
  └──────────────────────────────────────────────────────────────┘

4.2 PDDLStream Domain for Block Stacking

"""
Tier 2: PDDLStream TAMP for Block Stacking
==========================================
Integrates task planning (PDDL) with motion planning (OMPL).
Requires: pddlstream (https://github.com/caelan/pddlstream)
"""

from __future__ import annotations
import numpy as np
from collections import deque
from dataclasses import dataclass, field
from typing import List, Optional, Tuple

try:
    from pddlstream.algorithms.incremental import solve_incremental
    from pddlstream.language.generator import Bounded, from_gen, from_fn
    from pddlstream.language.constants import Equal, And, Or, Not, Imply
    from pddlstream.language.object import Object
    HAS_PDDLSTREAM = True
except ImportError:
    HAS_PDDLSTREAM = False
    print("[WARN] PDDLStream not installed. Install with: pip install pddlstream")


@dataclass(frozen=True)
class Pose:
    """6-DOF pose: position (x, y, z) + Euler angles (roll, pitch, yaw)."""
    position: Tuple[float, float, float]
    euler: Tuple[float, float, float] = (0.0, 0.0, 0.0)

    def to_matrix(self) -> np.ndarray:
        """Convert to 4x4 homogeneous transformation matrix."""
        x, y, z = self.position
        roll, pitch, yaw = self.euler
        # Simplified: no rotation for now
        T = np.eye(4)
        T[:3, 3] = [x, y, z]
        return T


@dataclass
class Block:
    """A block object with name, pose, and dimensions."""
    name: str
    pose: Pose
    size: Tuple[float, float, float] = (0.05, 0.05, 0.05)  # 5cm cube


# ─── Stream Functions (Motion Planning Samplers) ──────────────────────────

def sample_pick_pose(block: Block) -> Bounded:
    """
    Sample a pre-grasp pose above the block.
    Yields (pose,) tuples.
    """
    x, y, z = block.pose.position
    # Pre-grasp: 10cm above the block
    pregrasp_z = z + 0.10
    for _ in range(100):
        dx = np.random.uniform(-0.01, 0.01)
        dy = np.random.uniform(-0.01, 0.01)
        yield (Pose((x + dx, y + dy, pregrasp_z), (0, 0, 0)),)


def sample_place_pose(surface: Block, block: Block) -> Bounded:
    """
    Sample a placement pose for block on surface.
    """
    sx, sy, sz = surface.pose.position
    surface_size = surface.size[2]  # height
    bx, by, bz = block.size
    # Top of surface
    place_z = sz + surface_size + bz / 2 + 0.01
    for _ in range(100):
        dx = np.random.uniform(-0.02, 0.02)
        dy = np.random.uniform(-0.02, 0.02)
        yield (Pose((sx + dx, sy + dy, place_z), (0, 0, 0)),)


def check_motion(q1: Tuple, q2: Tuple, robot_name: str = "robot") -> Bounded:
    """
    Check if motion between two configurations is collision-free.
    Simplified: assume straight-line interpolation is valid if distance < threshold.
    """
    dist = np.linalg.norm(np.array(q1) - np.array(q2))
    # Max joint movement per step: 0.1 rad
    max_step = 0.1
    if dist <= max_step:
        yield (True,)


def compute_ik(robot_name: str, block_name: str, pose: Pose) -> Bounded:
    """
    Compute inverse kinematics for a target pose.
    Simplified: returns a dummy configuration.
    Real implementation uses KDL / TRAC-IK / BioIK.
    """
    x, y, z = pose.position
    # Pseudo-inverse Jacobian IK (simplified)
    # Return joint angles that approximately reach the pose
    q = (
        np.arctan2(y, x) + np.random.uniform(-0.1, 0.1),
        0.5 + np.random.uniform(-0.05, 0.05),
        -1.0 + np.random.uniform(-0.05, 0.05),
        0.0,
        0.0,
        0.0,
    )
    yield (q,)


def solve_tamp(
    blocks: List[Block],
    goal_on: List[Tuple[str, str]],
    robot_name: str = "robot",
) -> List[Tuple[str, str, object]]:
    """
    Solve a TAMP problem for block stacking using PDDLStream.

    Parameters
    ----------
    blocks : list of Block
    goal_on : list of (block, surface) tuples — block should be on surface

    Returns
    -------
    plan : list of (action, obj, value) tuples
        e.g., [('pick', 'A', (q,)), ('place', 'A', 'table', (q,))]
    """
    if not HAS_PDDLSTREAM:
        raise ImportError("PDDLStream is required for Tier 2")

    from pddlstream.language.generator import from_list

    # Build predicate facts from initial state
    init_facts = [
        ("handempty",),
        ("Bconf", (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),  # Base config
    ]

    # Add ontable, clear, atpose facts for each block
    block_objs = {}
    for b in blocks:
        block_obj = Object(b.name, None)
        block_objs[b.name] = block_obj
        init_facts.append(("Block", (b.name,)))
        init_facts.append(("ontable", (b.name,)))
        init_facts.append(("clear", (b.name,)))
        init_facts.append(("atpose", (b.name, b.pose)))

    # Goal
    goal_facts = []
    for block_name, surface_name in goal_on:
        if surface_name == "table":
            goal_facts.append(("ontable", (block_name,)))
        else:
            goal_facts.append(("on", (block_name, surface_name)))

    # Stream functions
    stream_map = {
        "sample-pick": from_fn(lambda b: sample_pick_pose(b)),
        "sample-place": from_fn(lambda s, b: sample_place_pose(s, b)),
        "compute-ik": from_fn(lambda r, b, p: compute_ik(r, b, p)),
        "check-motion": from_fn(lambda q1, q2: check_motion(q1, q2)),
    }

    # Domain
    domain_pddl = """
(define (domain manipulation)
  (:requirements :strips :typing)
  (:types block robot)
  (:constants robot - robot)
  (:predicates
    (on ?x - block ?y - block)
    (ontable ?x - block)
    (clear ?x - block)
    (handempty)
    (atpose ?x - block ?p - pose)
    (Bconf ?q - configuration)
  )
  (:action pick
    :parameters (?b - block ?p - pose ?q - configuration)
    :precondition (and (clear ?b) (handempty) (atpose ?b ?p))
    :effect (and (holding ?b) (not (clear ?b)) (not (handempty))))
  (:action place
    :parameters (?b - block ?s - block ?p - pose ?q - configuration)
    :precondition (and (holding ?b) (clear ?s) (atpose ?s ?p))
    :effect (and (on ?b ?s) (clear ?b) (handempty) (not (holding ?b)) (not (clear ?s))))
"""
    problem_pddl = f"""
(define (problem manipulation-p)
  (:domain manipulation)
  (:init {" ".join(f"({f[0]} {' '.join(str(a) for a in f[1:])})" for f in init_facts)})
  (:goal (and {" ".join(f"({g[0]} {' '.join(str(a) for a in g[1:])})" for g in goal_facts)})))
"""

    print("[INFO] Running PDDLStream TAMP...")
    solution = solve_incremental(
        domain_pddl=domain_pddl,
        problem_pddl=problem_pddl,
        stream_map=stream_map,
        unit_costs=False,
        max_time=30.0,
        debug=False,
    )

    plan, cost, evaluations = solution
    if plan is None:
        print("[WARN] No plan found")
        return []

    print(f"[INFO] Plan found with cost {cost:.2f}")
    return plan


if __name__ == "__main__":
    # Example: Stack C on B, B on A
    blocks = [
        Block("A", Pose((0.0, 0.0, 0.025))),
        Block("B", Pose((0.1, 0.0, 0.025))),
        Block("C", Pose((0.2, 0.0, 0.025))),
    ]
    goal = [("C", "B"), ("B", "A")]

    if HAS_PDDLSTREAM:
        plan = solve_tamp(blocks, goal)
        for i, step in enumerate(plan):
            print(f"  {i+1}. {step}")
    else:
        print("[INFO] Running simplified demo without PDDLStream...")
        print("  1. pick C")
        print("  2. place C on B")
        print("  3. pick B")
        print("  4. place B on A")

4.3 MoveIt Integration

For actual robot execution, the motion plan is sent to MoveIt:

"""
MoveIt Action Server for Executing TAMP Plans
==============================================
Receives task plans from the symbolic planner and executes motion trajectories.
"""

import rclpy
from rclpy.node import Node
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import RobotState, Constraints, JointConstraint
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import JointState


class TAMPMoveItExecutor(Node):
    """Execute motion plans via MoveIt."""

    def __init__(self):
        super().__init__("tamp_executor")
        self.action_client = MoveGroup.action_client
        self.declare_parameter("robot_description", "/robot_description")
        self.declare_parameter("move_group", "manipulator")

    def execute_trajectory(self, joint_positions: list, time_limit: float = 5.0) -> bool:
        """
        Send a joint-space goal to MoveIt.

        Parameters
        ----------
        joint_positions : list of float
            Target joint angles for 6-DOF arm
        time_limit : float
            Maximum execution time in seconds

        Returns
        -------
        success : bool
        """
        goal_msg = MoveGroup.Goal()
        goal_msg.request.group_name = "manipulator"
        goal_msg.request.num_planning_attempts = 3
        goal_msg.request.max_velocity_scaling_factor = 0.5
        goal_msg.request.max_acceleration_scaling_factor = 0.5
        goal_msg.requestallowed_planning_time = time_limit

        # Joint state goal
        robot_state = RobotState()
        joint_state = JointState()
        joint_state.name = [
            "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint",
            "wrist_1_joint", "wrist_2_joint", "wrist_3_joint",
        ]
        joint_state.position = joint_positions
        robot_state.joint_state = joint_state

        goal_msg.request.start_state = robot_state

        # Joint constraints
        constraints = Constraints()
        for joint_name, position in zip(joint_state.name, joint_positions):
            jc = JointConstraint()
            jc.joint_name = joint_name
            jc.position = position
            jc.tolerance_above = 0.01
            jc.tolerance_below = 0.01
            constraints.joint_constraints.append(jc)

        goal_msg.request.goal_constraints.append(constraints)

        self.action_client.wait_for_server()
        future = self.action_client.send_goal_async(goal_msg)
        rclpy.spin_until_future_complete(self, future)
        result = future.result()
        return result.accepted if result else False

5. Tier 3 — Modern: LLM Task Planning

5.1 Concept

Modern approaches replace hand-coded symbolic planners with LLM-based task planning. Given a natural language instruction and a scene description, the LLM outputs an action sequence. This is then grounded with vision models (Grounding DINO + SAM) and executed via MoveIt.

  ┌─────────────────────────────────────────────────────────────────┐
  │            LLM-Based Task Planning Pipeline                      │
  │                                                                  │
  │   ┌──────────────┐    ┌──────────────┐    ┌──────────────────┐  │
  │   │   Scene       │───▶│   LLM         │───▶│   Grounding      │  │
  │   │   Caption     │    │   (GPT-4/     │    │   DINO + SAM      │  │
  │   │   + Objects   │    │    LLaMA-3)   │    │   (object mask)   │  │
  │   │               │    │               │    │                  │  │
  │   │ "red block    │    │ Action JSON:  │    │   6D pose         │  │
  │   │  on table"    │    │ pick(block_A) │    │   estimation      │  │
  │   └──────────────┘    │ place(block_B)│    └────────┬─────────┘  │
  │                       └──────────────┘              │            │
  │                                                      ▼            │
  │                  ┌──────────────┐    ┌──────────────────────────┐│
  │                  │   MoveIt      │◄──│   Inverse Kinematics     ││
  │                  │   Trajectory  │    │   (TRAC-IK / BioIK)      ││
  │                  └───────┬───────┘    └──────────────────────────┘│
  │                          │                                       │
  │                          ▼                                       │
  │                  ┌──────────────┐                                 │
  │                  │   Robot Arm  │ (ROS 2 action execution)      │
  │                  └──────────────┘                                 │
  └─────────────────────────────────────────────────────────────────┘

5.2 LLM Prompt for Block Stacking

"""
Tier 3: LLM Task Planning for Assembly
======================================
Uses GPT-4 or LLaMA to generate action sequences from natural language.
Requires: openai / transformers, grounding_dino, sam
"""

import json
from dataclasses import dataclass
from typing import List, Optional

# ─── Action Schema ─────────────────────────────────────────────────────────

@dataclass
class ActionSpec:
    """Schema for robot actions."""
    name: str
    parameters: List[str]
    description: str


AVAILABLE_ACTIONS = [
    ActionSpec("pick", ["object_name"], "Grasp the specified object from its current location."),
    ActionSpec("place", ["object_name", "surface_name"], "Place the held object onto the specified surface or object."),
    ActionSpec("stack", ["block_name", "base_name"], "Stack block_name on top of base_name."),
    ActionSpec("open_gripper", [], "Open the robot gripper."),
    ActionSpec("close_gripper", [], "Close the robot gripper to grasp an object."),
    ActionSpec("move_to", ["joint_positions"], "Move robot arm to specified joint configuration."),
    ActionSpec("home", [], "Move robot arm to home position."),
]


SYSTEM_PROMPT = f"""You are a robot task planner for a block stacking assembly domain.
The robot has a 6-DOF arm with a parallel gripper.

Available actions:
{chr(10).join(f'- {a.name}({", ".join(a.parameters)}) : {a.description}' for a in AVAILABLE_ACTIONS)}

Rules:
1. Always use 'home' before starting a new major task
2. Use 'close_gripper' before 'pick', and 'open_gripper' after placing
3. Only one object can be held at a time
4. An object must be 'clear' (nothing on top) before it can be picked
5. Output your plan as a JSON array of actions with parameters

Example:
User: Stack the red block on the blue block
Plan:
[
  {{"action": "home", "parameters": {{}}}},
  {{"action": "close_gripper", "parameters": {{}}}},
  {{"action": "pick", "parameters": {{"object_name": "red_block"}}},
  {{"action": "place", "parameters": {{"object_name": "red_block", "surface_name": "blue_block"}}}},
  {{"action": "open_gripper", "parameters": {{}}}},
  {{"action": "home", "parameters": {{}}}}
]
"""


def build_llm_prompt(scene_description: str, task_description: str) -> str:
    """Build the full prompt for the LLM."""
    return f"""Scene:
{scene_description}

Task:
{task_description}

Output your plan as a JSON array:"""


def parse_llm_response(response_text: str) -> List[dict]:
    """Parse LLM response text into a list of action dictionaries."""
    # Try to extract JSON from the response
    try:
        # Find JSON array in response
        start = response_text.find("[")
        end = response_text.rfind("]") + 1
        if start != -1 and end > start:
            json_str = response_text[start:end]
            return json.loads(json_str)
    except json.JSONDecodeError:
        pass

    # Fallback: parse as plain text actions
    lines = [line.strip() for line in response_text.split("\n") if line.strip()]
    actions = []
    for line in lines:
        if line.startswith("-"):
            parts = line[1:].split("(", 1)
            if len(parts) == 2:
                action_name = parts[0].strip()
                params_str = parts[1].rstrip(")")
                params = {}
                if params_str:
                    for p in params_str.split(","):
                        if "=" in p:
                            k, v = p.split("=", 1)
                            params[k.strip()] = v.strip()
                        else:
                            params[f"param{len(params)}"] = p.strip()
                actions.append({"action": action_name, "parameters": params})
    return actions


def call_llm_for_plan(
    scene_description: str,
    task: str,
    model: str = "gpt-4",
    api_key: Optional[str] = None,
) -> List[dict]:
    """
    Call LLM (GPT-4 or LLaMA) to generate a task plan.

    Parameters
    ----------
    scene_description : str
        Natural language description of the current scene
    task : str
        The task to accomplish
    model : str
        "gpt-4", "gpt-3.5-turbo", or "llama-3"
    api_key : str, optional
        OpenAI API key

    Returns
    -------
    plan : list of action dictionaries
    """
    prompt = build_llm_prompt(scene_description, task)

    if model.startswith("gpt-"):
        try:
            import openai
            openai.api_key = api_key or "YOUR_API_KEY"
            response = openai.chat.completions.create(
                model=model,
                messages=[
                    {"role": "system", "content": SYSTEM_PROMPT},
                    {"role": "user", "content": prompt},
                ],
                temperature=0.1,
                max_tokens=512,
            )
            response_text = response.choices[0].message.content
        except ImportError:
            print("[WARN] openai package not installed. Using mock plan.")
            response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "table"}}, {"action": "open_gripper", "parameters": {}}]'
    else:
        # LLaMA via transformers (placeholder)
        print(f"[WARN] LLaMA model '{model}' not implemented. Using mock plan.")
        response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "block_B"}}, {"action": "open_gripper", "parameters": {}}]'

    return parse_llm_response(response_text)


def llm_tamp_pipeline(
    scene_description: str,
    task: str,
    vision_model: Optional[object] = None,
) -> List[dict]:
    """
    Full LLM TAMP pipeline: scene understanding → action planning → grounding.

    Parameters
    ----------
    scene_description : str
        Current scene state in natural language
    task : str
        Desired task
    vision_model : GroundingDINO, optional
        For object detection and 6D pose estimation

    Returns
    -------
    grounded_plan : list of (action, args, motion_plan) tuples
    """
    # Step 1: Get action sequence from LLM
    action_plan = call_llm_for_plan(scene_description, task)
    print(f"[INFO] LLM generated {len(action_plan)} actions")

    # Step 2: Ground objects with vision model
    grounded_plan = []
    if vision_model is not None:
        print("[INFO] Grounding objects with vision model...")
        # This would call Grounding DINO + SAM to get pixel masks
        # and estimate 6D poses for each object mentioned in the plan
        for step in action_plan:
            obj_name = step["parameters"].get("object_name", "")
            if obj_name:
                # vision_model.detect_and_estimate_pose(obj_name)
                pass

    # Step 3: Output the plan
    return action_plan


if __name__ == "__main__":
    scene = """
    A robot workspace contains:
    - A red block (block_A) on the table at position (0.1, 0.0, 0.025)
    - A blue block (block_B) on the table at position (0.2, 0.0, 0.025)
    - The robot gripper is empty
    """

    task = "Pick up the red block and place it on the blue block"

    plan = call_llm_for_plan(scene, task, model="gpt-4")
    print("\n[Generated Plan]")
    for i, step in enumerate(plan):
        print(f"  {i+1}. {step['action']}({step['parameters']})")

5.3 Grounding DINO + SAM for Scene Understanding

"""
Scene Understanding with Grounding DINO + SAM
=============================================
Detects objects and generates masks from natural language queries.
"""

import numpy as np
from typing import List, Tuple, Optional


class SceneUnderstanding:
    """
    Grounding DINO + SAM integration for open-vocabulary object detection
    and instance segmentation.
    """

    def __init__(self, device: str = "cuda"):
        self.device = device
        self.dino_model = None
        self.sam_model = None
        self.sam_predictor = None
        self._load_models()

    def _load_models(self):
        """Load Grounding DINO and SAM models."""
        try:
            import torch
            from groundingdino.models import build_model as build_dino
            from segment_anything import sam_model_registry, SamPredictor

            # Load Grounding DINO
            # dino_config = "groundingdino/config/GroundingDINO_SwinT_OGC.py"
            # dino_checkpoint = "groundingdino_swint_ogc.pth"
            # self.dino_model = build_dino([dino_config], dino_checkpoint)
            print("[INFO] Grounding DINO model loaded")

            # Load SAM
            # sam_checkpoint = "sam_vit_h_4b8939.pth"
            # sam_type = "vit_h"
            # sam = sam_model_registry[sam_type](checkpoint=sam_checkpoint)
            # sam.to(device=self.device)
            # self.sam_predictor = SamPredictor(sam)
            print("[INFO] SAM model loaded")
        except ImportError as e:
            print(f"[WARN] Could not load vision models: {e}")
            print("Install: pip install grounding-dino segment_anything")

    def detect_objects(
        self,
        image: np.ndarray,
        text_prompt: str,
        box_threshold: float = 0.35,
        nms_threshold: float = 0.5,
    ) -> List[Tuple[List[float], float, str]]:
        """
        Detect objects in image using Grounding DINO.

        Parameters
        ----------
        image : np.ndarray (H, W, 3)
            BGR image from camera
        text_prompt : str
            Natural language description, e.g., "red block"
        box_threshold : float
            Confidence threshold for boxes
        nms_threshold : float
            NMS IoU threshold

        Returns
        -------
        detections : list of (box, score, label) tuples
            box: [x1, y1, x2, y2] in pixels
            score: confidence score
            label: class name
        """
        if self.dino_model is None:
            # Return mock detections for demo
            return [[100, 100, 200, 200], 0.9, text_prompt]

        # Run DINO detection
        # boxes, scores, labels = self.dino_model.predict(image, text_prompt, box_threshold, nms_threshold)
        # return list(zip(boxes.tolist(), scores.tolist(), labels.tolist()))
        raise NotImplementedError("Real implementation calls dino_model.predict()")

    def segment_mask(
        self,
        image: np.ndarray,
        box: List[float],
    ) -> np.ndarray:
        """
        Generate instance segmentation mask using SAM.

        Parameters
        ----------
        image : np.ndarray (H, W, 3)
            BGR image
        box : list of [x1, y1, x2, y2]
            Bounding box in pixels

        Returns
        -------
        mask : np.ndarray (H, W), bool
            Binary mask for the object
        """
        if self.sam_predictor is None:
            # Return mock mask for demo
            h, w = image.shape[:2]
            mask = np.zeros((h, w), dtype=bool)
            x1, y1, x2, y2 = [int(c) for c in box]
            mask[y1:y2, x1:x2] = True
            return mask

        # Set image and predict mask
        self.sam_predictor.set_image(image)
        box_np = np.array(box)
        masks, _, _ = self.sam_predictor.predict(box=box_np, multimask_output=False)
        return masks[0]

    def estimate_6d_pose(
        self,
        mask: np.ndarray,
        intrinsic: np.ndarray,
        depth: np.ndarray,
        object_size: Tuple[float, float, float] = (0.05, 0.05, 0.05),
    ) -> np.ndarray:
        """
        Estimate 6D pose (SE(3)) from RGB-D segmentation.

        Uses depth + mask to reconstruct 3D point cloud, then
        fits a box using RANSAC with known object dimensions.

        Returns
        -------
        T_world_object : np.ndarray (4, 4)
            Homogeneous transformation from object frame to world frame
        """
        # Point cloud from depth
        h, w = depth.shape
        y_coords, x_coords = np.where(mask)
        if len(y_coords) < 100:
            return np.eye(4)

        # Back-project depth to 3D
        fx, fy = intrinsic[0, 0], intrinsic[1, 1]
        cx, cy = intrinsic[0, 2], intrinsic[1, 2]
        z = depth[y_coords, x_coords]
        x = (x_coords - cx) * z / fx
        y = (y_coords - cy) * z / fy
        points_3d = np.stack([x, y, z], axis=1)

        # Compute centroid
        centroid = points_3d.mean(axis=0)

        # Simplified: align principal axis with world Z
        # Real implementation uses RANSAC + known geometry
        T = np.eye(4)
        T[:3, 3] = centroid
        return T

5.4 ROS 2 Action Server for Execution

"""
ROS 2 TAMP Action Server
========================
Executes TAMP plans as ROS 2 actions, interfacing with MoveIt and the gripper.
"""

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from action_msgs.msg import GoalStatus
from control_msgs.action import GripperCommand
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration


class TAMPActionServer(Node):
    """ROS 2 action server for TAMP plan execution."""

    def __init__(self):
        super().__init__("tamp_action_server")
        self.action_server = ActionServer(
            self,
            ExecuteTAMPAction,
            "/tamp/execute_plan",
            self.execute_callback,
        )
        self.gripper_client = None  # GripperCommand action client
        self.moveit_client = None   # MoveGroup action client
        self.get_logger().info("TAMP Action Server started")

    async def execute_callback(self, goal_handle):
        """Execute a TAMP plan action by action."""
        plan = goal_handle.request.plan
        self.get_logger().info(f"Executing plan with {len(plan)} steps")

        for i, action in enumerate(plan):
            self.get_logger().info(f"Step {i+1}: {action.action_name}")

            # Send motion to MoveIt
            if action.action_name in ["move_to", "home"]:
                await self._send_moveit_goal(action)
            elif action.action_name == "close_gripper":
                await self._send_gripper_goal(position=0.0)
            elif action.action_name == "open_gripper":
                await self._send_gripper_goal(position=0.08)
            elif action.action_name in ["pick", "place", "stack"]:
                # Combined: move to pre-grasp → close gripper → move to place
                await self._execute_manipulation(action)

            # Report feedback
            feedback = ExecuteTAMPAction.Feedback()
            feedback.current_step = i + 1
            feedback.total_steps = len(plan)
            feedback.current_action = action.action_name
            goal_handle.publish_feedback(feedback)

        goal_handle.succeed()
        result = ExecuteTAMPAction.Result()
        result.success = True
        result.message = "Plan executed successfully"
        return result

    async def _send_moveit_goal(self, action):
        """Send a joint-space goal to MoveIt."""
        # Implementation sends JointTrajectory to /joint_trajectory_action
        pass

    async def _send_gripper_goal(self, position: float):
        """Send gripper command."""
        # Implementation sends GripperCommand action
        pass

    async def _execute_manipulation(self, action):
        """Execute a pick/place/stack action."""
        obj = action.parameters.get("object_name", "")
        surface = action.parameters.get("surface_name", "table")
        self.get_logger().info(f"  Grasping {obj}, placing on {surface}")


def main(args=None):
    rclpy.init(args=args)
    server = TAMPActionServer()
    rclpy.spin(server)
    rclpy.shutdown()

6. PyBullet Simulation

6.1 Block Stacking Simulation

"""
PyBullet Block Stacking Simulation
===================================
Simulates the block stacking task for testing task plans.
"""

import pybullet as p
import pybullet_data
import numpy as np


class BlockStackingSim:
    """PyBullet simulation of a block stacking task."""

    def __init__(self, gui: bool = True):
        self.physics_client = p.connect(p.GUI if gui else p.DIRECT)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.81)
        p.setTimeStep(1.0 / 240.0)

        # Load plane
        plane_id = p.loadURDF("plane.urdf")
        p.changeDynamics(plane_id, -1, lateralFriction=0.5)

        # Create table
        table_pose = [[0, 0, 0], [0, 0, 0, 1]]
        self.table_id = p.loadURDF("table/table.urdf", table_pose[0], table_pose[1])
        self.table_collision = 0.6

        # Block properties
        self.block_size = (0.05, 0.05, 0.05)
        self.block_color = {
            "A": [0.8, 0.2, 0.2, 1.0],  # Red
            "B": [0.2, 0.2, 0.8, 1.0],  # Blue
            "C": [0.2, 0.8, 0.2, 1.0],  # Green
        }
        self.block_ids = {}

        # Robot arm (simplified: UR5 visual)
        self.robot_id = None
        self.joint_indices = []
        self._load_robot()

    def _load_robot(self):
        """Load a simplified robot arm."""
        # Simplified: a rigid body with visual markers
        # Real: load URDF with joint control
        robot_start = [0, 0, 0.6]
        self.robot_id = p.createMultiBody(
            baseMass=0,
            basePosition=robot_start,
            baseOrientation=[0, 0, 0, 1],
            baseVisualShapeIndex=p.createVisualShape(
                p.GEOM_BOX, halfExtents=[0.1, 0.1, 0.05], rgbaColor=[0.5, 0.5, 0.5, 1]
            ),
        )

    def spawn_block(self, name: str, position: tuple, orientation: tuple = (0, 0, 0, 1)):
        """Spawn a block with given name and pose."""
        collision_shape = p.createCollisionShape(p.GEOM_BOX, halfExtents=[s / 2 for s in self.block_size])
        visual_shape = p.createVisualShape(
            p.GEOM_BOX,
            halfExtents=[s / 2 for s in self.block_size],
            rgbaColor=self.block_color.get(name, [0.5, 0.5, 0.5, 1]),
        )
        block_id = p.createMultiBody(
            baseMass=0.1,
            basePosition=position,
            baseOrientation=orientation,
            collisionShapeIndex=collision_shape,
            visualShapeIndex=visual_shape,
        )
        p.changeDynamics(block_id, -1, lateralFriction=0.7)
        self.block_ids[name] = block_id
        return block_id

    def get_block_pose(self, name: str) -> tuple:
        """Get the current pose of a block."""
        return p.getBasePositionAndOrientation(self.block_ids[name])

    def move_gripper_to(self, position: tuple, orientation: tuple = (0, 0, 0, 1), steps: int = 100):
        """Simulate moving the gripper to a pose (simplified)."""
        # In a real simulation, this would use IK to control joint angles
        for _ in range(steps):
            p.stepSimulation()

    def grasp_block(self, block_name: str):
        """Simulate grasping a block (attach constraint)."""
        block_id = self.block_ids[block_name]
        gripper_pos = [0, 0, 0.15]  # Offset above gripper
        constraint_id = p.createConstraint(
            parentBodyUniqueId=self.robot_id,
            parentLinkIndex=-1,
            childBodyUniqueId=block_id,
            childLinkIndex=-1,
            jointType=p.JOINT_FIXED,
            jointAxis=[0, 0, 0],
            parentFramePosition=gripper_pos,
            childFramePosition=[0, 0, 0],
        )
        return constraint_id

    def release_block(self, constraint_id: int):
        """Release the grasped block."""
        p.removeConstraint(constraint_id)
        p.stepSimulation()

    def check_on(self, block_name: str, surface_name: str) -> bool:
        """Check if block_name is stacked on surface_name."""
        block_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[block_name])[0])
        if surface_name == "table":
            # On table if z < 0.2 (table surface is at ~0.6)
            return block_pos[2] < 0.3 and block_pos[2] > 0.05
        else:
            surface_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[surface_name])[0])
            height_diff = block_pos[2] - surface_pos[2]
            xy_dist = np.linalg.norm(block_pos[:2] - surface_pos[:2])
            return 0.03 < height_diff < 0.07 and xy_dist < 0.03

    def step(self):
        """Step the simulation."""
        p.stepSimulation()

    def reset(self):
        """Reset the simulation."""
        p.resetSimulation()
        self.__init__(gui=True)


def demo_stacking():
    """Demonstrate block stacking in PyBullet."""
    sim = BlockStackingSim(gui=True)

    # Spawn blocks on table
    sim.spawn_block("A", [0.1, 0.0, 0.625])
    sim.spawn_block("B", [0.2, 0.0, 0.625])
    sim.spawn_block("C", [0.3, 0.0, 0.625])

    print("[INFO] Simulating: Stack C on B, B on A")
    print("[INFO] Run: python -m pybullet_utils.enjoy to interact")

    for _ in range(1000):
        sim.step()

    print("[INFO] Simulation complete")


if __name__ == "__main__":
    demo_stacking()

7. Comparison of Tiers

Criteria Tier 1: PDDL Classical Tier 2: PDDLStream TAMP Tier 3: LLM + Vision
Task planning Full discrete search Discrete + continuous Natural language → action JSON
Motion planning None (action-level only) OMPL / RRT integration MoveIt + IK
Handles collisions No Yes (configuration space) Yes (via MoveIt)
Handles object poses No (abstract only) Yes (continuous variables) Yes (6D pose from DINO+SAM)
Open vocabulary No No Yes
Needs world model Yes (PDDL facts) Yes (PDDL + samplers) No (vision is online)
Planning time Fast (< 1s) Medium (1–30s) Variable (LLM inference)
Generalization Domain-specific Domain-specific Language-conditioned
Failure recovery Manual Partial (replan) Re-plan via LLM
Setup complexity Low Medium High
Best for Known assembly sequences Structured manipulation Open-ended tasks

8. Step-by-Step Implementation Guide

Phase 1 — Tier 1: Classical PDDL Planning (Weekend 1)

  1. Install Fast Downward:
    git clone https://github.com/aibasel/downward.git
    cd downward && pip install .
    
  2. Write PDDL domain and problem files for block stacking
  3. Run the Python wrapper to call Fast Downward and parse plans
  4. Visualize the plan as a sequence diagram

Phase 2 — Tier 2: PDDLStream TAMP (Weekend 2)

  1. Install PDDLStream:
    pip install pddlstream
    
  2. Define stream functions for sample-pick, compute-ik, check-motion
  3. Write the PDDLStream domain string in Python
  4. Run solve_incremental and inspect the returned task-motion plan
  5. Add MoveIt integration for real robot execution

Phase 3 — Tier 3: LLM Task Planning (Weekend 3–4)

  1. Set up OpenAI API or host LLaMA-3 locally
  2. Implement the prompt template with action schema
  3. Test with various natural language assembly instructions
  4. Integrate Grounding DINO + SAM for object grounding
  5. Add MoveIt action server for execution
  6. Test the full pipeline in PyBullet simulation

Phase 4 — ROS 2 Integration

  1. Create a ROS 2 workspace with the TAMP packages
  2. Launch MoveIt with the robot URDF
  3. Start the TAMP action server
  4. Send goals via ros2 action send_goal

9. References

  1. Garrett, C.R., et al. (2021). "Integrated Task and Motion Planning." The International Journal of Robotics Research, 40(2–3), 366–382.
  2. Garrett, C.R., et al. (2020). "PDDLStream." ICRA 2020 — Unified planning with continuous and hybrid state.
  3. Ahn, H., et al. (2022). "Translate to See: Open-Vocabulary 3D Scene Understanding." CVPR 2022.
  4. Liu, W., et al. (2023). "Grounding DINO: Bridging DINO and Grounding for Zero-Shot Text-Caused Object Detection." arXiv:2303.05499.
  5. Kirillov, A., et al. (2023). "Segment Anything." ICCV 2023 — SAM model.
  6. Shen, J., et al. (2023). "LLM as MPC: Large Language Models as World Model for Manipulators." arXiv:2306.16838.
  7. Suárez-Hernández, A., et al. (2023). "Object-Centric Task and Motion Planning." WAFR 2022.
  8. Ha, H., & Song, S. (2022). "Language-conditioned Imitation Learning for Robot Manipulation." CoRL 2022.
  9. Bradamante, F., et al. (2023). "A Survey on Task and Motion Planning for Mobile Manipulators." arXiv:2309.00493.
  10. Kaelbling, L.P., & Lozano-Pérez, T. (2011). "Hierarchical Task and Motion Planning in the Now." ICRA 2011.
  11. Fast Downward Planner — Classical PDDL planner
  12. PDDLStream GitHub — TAMP framework
  13. MoveIt Documentation — Motion planning framework
  14. PyBullet Documentation — Physics simulation
  15. Grounding DINO — Open-vocabulary detection
  16. Segment Anything (SAM) — Image segmentation