Object Assembly with TAMP (任务与运动规划)¶
Project Type: Manipulation | Difficulty: ★★★☆☆ to ★★★★★ (approach-dependent) | Estimated Time: 2–4 weekends
1. Project Overview¶
TAMP (Task and Motion Planning) bridges the gap between symbolic task-level reasoning and continuous motion planning. In assembly tasks, a robot must reason about which objects to manipulate, in what order, and how to physically move its arm to achieve each sub-goal — all while avoiding collisions.
This project teaches TAMP for block stacking assembly across three algorithm tiers:
┌─────────────────────────────────────────────────────────────────┐
│ TAMP Pipeline Overview │
│ │
│ ┌─────────────┐ ┌────────────────┐ ┌──────────────┐ │
│ │ Scene │───▶│ Task Planner │───▶│ Motion │ │
│ │ State │ │ (PDDL/LLM) │ │ Planner │ │
│ │ │ │ │ │ (RRT/MoveIt) │ │
│ │ on(A,B) │ │ pick(A) │ │ joint_traj │ │
│ │ on table │ │ place(A,B) │ │ (collision- │ │
│ │ │ │ stack(A,B) │ │ free) │ │
│ └─────────────┘ └────────────────┘ └──────────────┘ │
│ ▲ ▲ ▲ │
│ │ │ │ │
│ └───────────────────┴──────────────────────┘ │
│ Execution Feedback Loop │
└─────────────────────────────────────────────────────────────────┘
| Tier | Approach | Task Planning | Motion Planning | Sample Application |
|---|---|---|---|---|
| Traditional | Classical Planning | PDDL (Fast Downward / POPF) | None (discrete actions only) | Block stacking task sequences |
| Intermediate | TAMP | PDDLStream | OMPL / MoveIt | Continuous pick-place with collision-free paths |
| Modern | LLM Task Planning | GPT-4 / LLaMA + prompts | Grounding DINO + SAM + MoveIt | Open-vocabulary assembly from natural language |
2. Hardware & Software Requirements¶
Hardware¶
| Component | Specification | Notes |
|---|---|---|
| Robot arm | 6-DOF manipulator (e.g., UR5, Franka Emika Panda) | Or simulated in PyBullet / Isaac Sim |
| Gripper | Parallel-jaw or vacuum gripper | Suction cup for flat objects |
| Camera | Intel RealSense D435 / Azure Kinect | RGB-D for scene understanding |
| Workspace | Tabletop (0.5m × 0.5m minimum) | Objects: wooden blocks, pegs, nuts |
| PC | Intel i7 / RTX 3060 or better | For LLM inference in Tier 3 |
Software¶
| Package | Version | Purpose |
|---|---|---|
| Python | ≥ 3.8 | Core language |
| ROS 2 | Humble / Iron | Robot control, action servers |
| MoveIt 2 | Humble | Motion planning framework |
| PyBullet | ≥ 3.0 | Physics simulation |
| PDDLStream | Latest | TAMP integration |
| Fast Downward | Latest | Classical task planning |
| OpenCV | ≥ 4.5 | Vision preprocessing |
| NumPy | ≥ 1.20 | Numerical computation |
| transformers | ≥ 4.30 | LLM inference (Tier 3) |
3. Tier 1 — Traditional: PDDL-Based Classical Planning¶
3.1 Concept¶
PDDL (Planning Domain Definition Language) is the standard language for describing planning problems. A PDDL problem consists of:
- Domain file: defines types, predicates, and actions
- Problem file: defines objects, initial state, and goal state
The Fast Downward planner solves the problem by searching in the space of grounded action sequences.
3.2 Block Stacking Domain¶
For block stacking, the key predicates are:
on(X, Y)— block X is on block Y (or table)clear(X)— nothing is on top of block Xholding(X)— robot is currently holding block Xhandempty— robot gripper is not holding anything
Actions:
- pick(X) — grasp block X from the table (requires clear(X) and handempty)
- place(X, Y) — place block X on block Y or table (requires holding(X) and clear(Y))
- stack(X, Y) — stack block X onto block Y (same as place, but Y is a block)
3.3 PDDL Domain File¶
;; domain_blocks.pddl
(define (domain blocks)
(:requirements :strips :typing)
(:types block)
(:predicates
(on ?x - block ?y - block)
(ontable ?x - block)
(clear ?x - block)
(holding ?x - block)
(handempty))
;; Pick up a block from the table
(:action pick
:parameters (?x - block)
:precondition (and (clear ?x) (ontable ?x) (handempty))
:effect (and (holding ?x)
(not (ontable ?x))
(not (clear ?x))
(not (handempty))))
;; Put a block down on the table
(:action putdown
:parameters (?x - block)
:precondition (holding ?x)
:effect (and (ontable ?x)
(clear ?x)
(handempty)
(not (holding ?x))))
;; Stack a block onto another block
(:action stack
:parameters (?x - block ?y - block)
:precondition (and (holding ?x) (clear ?y))
:effect (and (on ?x ?y)
(clear ?x)
(handempty)
(not (holding ?x))
(not (clear ?y))))
;; Unstack a block from on top of another block
(:action unstack
:parameters (?x - block ?y - block)
:precondition (and (on ?x ?y) (clear ?x) (handempty))
:effect (and (holding ?x)
(clear ?y)
(not (on ?x ?y))
(not (clear ?x))
(not (handempty))))
)
3.4 PDDL Problem File¶
;; problem_blocks.pddl
(define (problem blocks-tower)
(:domain blocks)
(:objects A B C - block)
(:init
(ontable A)
(ontable B)
(ontable C)
(clear A)
(clear B)
(clear C)
(handempty))
(:goal (and (on C B) (on B A))))
;; Goal: stack C on B, and B on A (tower of height 3)
)
3.5 Python — Call Fast Downward from Python¶
"""
Tier 1: Classical PDDL Planning with Fast Downward
==================================================
Calls the Fast Downward planner via subprocess and parses the plan.
Requirements: fast-downward (https://www.fast-downward.org/)
"""
import subprocess
import os
import re
import tempfile
DOMAIN_TEMPLATE = """(define (domain blocks)
(:requirements :strips :typing)
(:types block)
(:predicates
(on ?x - block ?y - block)
(ontable ?x - block)
(clear ?x - block)
(holding ?x - block)
(handempty))
(:action pick
:parameters (?x - block)
:precondition (and (clear ?x) (ontable ?x) (handempty))
:effect (and (holding ?x) (not (ontable ?x)) (not (clear ?x)) (not (handempty))))
(:action putdown
:parameters (?x - block)
:precondition (holding ?x)
:effect (and (ontable ?x) (clear ?x) (handempty) (not (holding ?x))))
(:action stack
:parameters (?x - block ?y - block)
:precondition (and (holding ?x) (clear ?y))
:effect (and (on ?x ?y) (clear ?x) (handempty) (not (holding ?x)) (not (clear ?y))))
(:action unstack
:parameters (?x - block ?y - block)
:precondition (and (on ?x ?y) (clear ?x) (handempty))
:effect (and (holding ?x) (clear ?y) (not (on ?x ?y)) (not (clear ?x)) (not (handempty))))
)"""
PROBLEM_TEMPLATE = """(define (problem {name})
(:domain blocks)
(:objects {objects})
(:init {init})
(:goal (and {goal})))
"""
def generate_problem(block_names: list, init_clauses: list, goal_clauses: list) -> str:
"""Generate a PDDL problem file from structured data."""
objects = " ".join(block_names)
init_str = " ".join(f"({c})" if " " not in c else c for c in init_clauses)
goal_str = " ".join(f"({c})" if " " not in c else c for c in goal_clauses)
return PROBLEM_TEMPLATE.format(
name="blocks_problem",
objects=objects,
init=init_str,
goal=goal_str,
)
def call_fast_downward(domain_path: str, problem_path: str, plan_output_path: str):
"""
Call Fast Downward planner.
Returns the plan as a list of action strings.
"""
cmd = [
"fast-downward",
"--alias", "seq-opt-lmcut", # Optimal solver with landmarks
"--plan-file", plan_output_path,
problem_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"[STDERR]\n{result.stderr}")
raise RuntimeError(f"Fast Downward failed with code {result.returncode}")
def parse_plan(plan_path: str) -> list:
"""Parse Fast Downward plan output into a list of actions."""
with open(plan_path, "r") as f:
content = f.read()
# Parse lines like: 0: (pick A) [1]
pattern = r"\d+:\s*\(([a-z_]+)\s+([A-Z])\)\s*\[(\d+)\]"
actions = re.findall(pattern, content)
return [(action, obj) for action, obj, cost in actions]
def solve_blocks_stacking(initial_config: dict, goal_config: dict) -> list:
"""
Solve a block stacking problem using Fast Downward.
Parameters
----------
initial_config : dict
e.g., {"on": [], "ontable": ["A", "B", "C"], "clear": ["A", "B", "C"]}
goal_config : dict
e.g., {"on": [("C", "B"), ("B", "A")]}
Returns
-------
plan : list of (action_name, object) tuples
"""
blocks = list(set(
[b for pair in initial_config.get("on", []) for b in pair]
+ initial_config.get("ontable", [])
))
init_clauses = []
init_clauses.extend(f"ontable {b}" for b in initial_config.get("ontable", []))
init_clauses.extend(f"on {x} {y}" for x, y in initial_config.get("on", []))
init_clauses.extend(f"clear {b}" for b in initial_config.get("clear", []))
init_clauses.append("handempty")
goal_clauses = []
goal_clauses.extend(f"on {x} {y}" for x, y in goal_config.get("on", []))
domain_str = DOMAIN_TEMPLATE
problem_str = generate_problem(blocks, init_clauses, goal_clauses)
with tempfile.TemporaryDirectory() as tmpdir:
domain_path = os.path.join(tmpdir, "domain.pddl")
problem_path = os.path.join(tmpdir, "problem.pddl")
plan_path = os.path.join(tmpdir, "sas_plan")
with open(domain_path, "w") as f:
f.write(domain_str)
with open(problem_path, "w") as f:
f.write(problem_str)
call_fast_downward(domain_path, problem_path, plan_path)
plan = parse_plan(plan_path)
return plan
if __name__ == "__main__":
# Example: Stack C on B, B on A (tower of 3)
initial = {
"ontable": ["A", "B", "C"],
"on": [],
"clear": ["A", "B", "C"],
}
goal = {
"on": [("C", "B"), ("B", "A")],
}
plan = solve_blocks_stacking(initial, goal)
print("[Plan]")
for i, (action, obj) in enumerate(plan):
print(f" {i+1}. {action} {obj}")
4. Tier 2 — Intermediate: PDDLStream TAMP¶
4.1 Concept¶
PDDLStream extends classical planning with continuous variables and samplers. This allows the planner to reason about:
- Object poses (continuous \(\mathbb{R}^7\) for position + orientation)
- Collision-free configurations (sampled from configuration space)
- Integrated task + motion plans
┌──────────────────────────────────────────────────────────────┐
│ PDDLStream Architecture │
│ │
│ Symbolic Continuous Solutions │
│ State Variables │
│ ──────── ─────────── ────────── │
│ on(A,table) [x,y,z,rx,ry,rz,rw] ✓ poses sampled │
│ clear(B) [q0,q1,q2,q3,q4,q5] ✓ IK solutions │
│ │
│ ┌────────────────┐ ┌──────────────────────┐ │
│ │ Symbolic │───▶│ Stream Functions │ │
│ │ Planner │ │ sample_ik(), │ │
│ │ (BestFirst) │ │ check_collision() │ │
│ └────────────────┘ └──────────────────────┘ │
│ ▲ │ │
│ │ ┌─────────────┘ │
│ └──────────┘ │
│ Feedback: retry if no motion plan found │
└──────────────────────────────────────────────────────────────┘
4.2 PDDLStream Domain for Block Stacking¶
"""
Tier 2: PDDLStream TAMP for Block Stacking
==========================================
Integrates task planning (PDDL) with motion planning (OMPL).
Requires: pddlstream (https://github.com/caelan/pddlstream)
"""
from __future__ import annotations
import numpy as np
from collections import deque
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
try:
from pddlstream.algorithms.incremental import solve_incremental
from pddlstream.language.generator import Bounded, from_gen, from_fn
from pddlstream.language.constants import Equal, And, Or, Not, Imply
from pddlstream.language.object import Object
HAS_PDDLSTREAM = True
except ImportError:
HAS_PDDLSTREAM = False
print("[WARN] PDDLStream not installed. Install with: pip install pddlstream")
@dataclass(frozen=True)
class Pose:
"""6-DOF pose: position (x, y, z) + Euler angles (roll, pitch, yaw)."""
position: Tuple[float, float, float]
euler: Tuple[float, float, float] = (0.0, 0.0, 0.0)
def to_matrix(self) -> np.ndarray:
"""Convert to 4x4 homogeneous transformation matrix."""
x, y, z = self.position
roll, pitch, yaw = self.euler
# Simplified: no rotation for now
T = np.eye(4)
T[:3, 3] = [x, y, z]
return T
@dataclass
class Block:
"""A block object with name, pose, and dimensions."""
name: str
pose: Pose
size: Tuple[float, float, float] = (0.05, 0.05, 0.05) # 5cm cube
# ─── Stream Functions (Motion Planning Samplers) ──────────────────────────
def sample_pick_pose(block: Block) -> Bounded:
"""
Sample a pre-grasp pose above the block.
Yields (pose,) tuples.
"""
x, y, z = block.pose.position
# Pre-grasp: 10cm above the block
pregrasp_z = z + 0.10
for _ in range(100):
dx = np.random.uniform(-0.01, 0.01)
dy = np.random.uniform(-0.01, 0.01)
yield (Pose((x + dx, y + dy, pregrasp_z), (0, 0, 0)),)
def sample_place_pose(surface: Block, block: Block) -> Bounded:
"""
Sample a placement pose for block on surface.
"""
sx, sy, sz = surface.pose.position
surface_size = surface.size[2] # height
bx, by, bz = block.size
# Top of surface
place_z = sz + surface_size + bz / 2 + 0.01
for _ in range(100):
dx = np.random.uniform(-0.02, 0.02)
dy = np.random.uniform(-0.02, 0.02)
yield (Pose((sx + dx, sy + dy, place_z), (0, 0, 0)),)
def check_motion(q1: Tuple, q2: Tuple, robot_name: str = "robot") -> Bounded:
"""
Check if motion between two configurations is collision-free.
Simplified: assume straight-line interpolation is valid if distance < threshold.
"""
dist = np.linalg.norm(np.array(q1) - np.array(q2))
# Max joint movement per step: 0.1 rad
max_step = 0.1
if dist <= max_step:
yield (True,)
def compute_ik(robot_name: str, block_name: str, pose: Pose) -> Bounded:
"""
Compute inverse kinematics for a target pose.
Simplified: returns a dummy configuration.
Real implementation uses KDL / TRAC-IK / BioIK.
"""
x, y, z = pose.position
# Pseudo-inverse Jacobian IK (simplified)
# Return joint angles that approximately reach the pose
q = (
np.arctan2(y, x) + np.random.uniform(-0.1, 0.1),
0.5 + np.random.uniform(-0.05, 0.05),
-1.0 + np.random.uniform(-0.05, 0.05),
0.0,
0.0,
0.0,
)
yield (q,)
def solve_tamp(
blocks: List[Block],
goal_on: List[Tuple[str, str]],
robot_name: str = "robot",
) -> List[Tuple[str, str, object]]:
"""
Solve a TAMP problem for block stacking using PDDLStream.
Parameters
----------
blocks : list of Block
goal_on : list of (block, surface) tuples — block should be on surface
Returns
-------
plan : list of (action, obj, value) tuples
e.g., [('pick', 'A', (q,)), ('place', 'A', 'table', (q,))]
"""
if not HAS_PDDLSTREAM:
raise ImportError("PDDLStream is required for Tier 2")
from pddlstream.language.generator import from_list
# Build predicate facts from initial state
init_facts = [
("handempty",),
("Bconf", (0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), # Base config
]
# Add ontable, clear, atpose facts for each block
block_objs = {}
for b in blocks:
block_obj = Object(b.name, None)
block_objs[b.name] = block_obj
init_facts.append(("Block", (b.name,)))
init_facts.append(("ontable", (b.name,)))
init_facts.append(("clear", (b.name,)))
init_facts.append(("atpose", (b.name, b.pose)))
# Goal
goal_facts = []
for block_name, surface_name in goal_on:
if surface_name == "table":
goal_facts.append(("ontable", (block_name,)))
else:
goal_facts.append(("on", (block_name, surface_name)))
# Stream functions
stream_map = {
"sample-pick": from_fn(lambda b: sample_pick_pose(b)),
"sample-place": from_fn(lambda s, b: sample_place_pose(s, b)),
"compute-ik": from_fn(lambda r, b, p: compute_ik(r, b, p)),
"check-motion": from_fn(lambda q1, q2: check_motion(q1, q2)),
}
# Domain
domain_pddl = """
(define (domain manipulation)
(:requirements :strips :typing)
(:types block robot)
(:constants robot - robot)
(:predicates
(on ?x - block ?y - block)
(ontable ?x - block)
(clear ?x - block)
(handempty)
(atpose ?x - block ?p - pose)
(Bconf ?q - configuration)
)
(:action pick
:parameters (?b - block ?p - pose ?q - configuration)
:precondition (and (clear ?b) (handempty) (atpose ?b ?p))
:effect (and (holding ?b) (not (clear ?b)) (not (handempty))))
(:action place
:parameters (?b - block ?s - block ?p - pose ?q - configuration)
:precondition (and (holding ?b) (clear ?s) (atpose ?s ?p))
:effect (and (on ?b ?s) (clear ?b) (handempty) (not (holding ?b)) (not (clear ?s))))
"""
problem_pddl = f"""
(define (problem manipulation-p)
(:domain manipulation)
(:init {" ".join(f"({f[0]} {' '.join(str(a) for a in f[1:])})" for f in init_facts)})
(:goal (and {" ".join(f"({g[0]} {' '.join(str(a) for a in g[1:])})" for g in goal_facts)})))
"""
print("[INFO] Running PDDLStream TAMP...")
solution = solve_incremental(
domain_pddl=domain_pddl,
problem_pddl=problem_pddl,
stream_map=stream_map,
unit_costs=False,
max_time=30.0,
debug=False,
)
plan, cost, evaluations = solution
if plan is None:
print("[WARN] No plan found")
return []
print(f"[INFO] Plan found with cost {cost:.2f}")
return plan
if __name__ == "__main__":
# Example: Stack C on B, B on A
blocks = [
Block("A", Pose((0.0, 0.0, 0.025))),
Block("B", Pose((0.1, 0.0, 0.025))),
Block("C", Pose((0.2, 0.0, 0.025))),
]
goal = [("C", "B"), ("B", "A")]
if HAS_PDDLSTREAM:
plan = solve_tamp(blocks, goal)
for i, step in enumerate(plan):
print(f" {i+1}. {step}")
else:
print("[INFO] Running simplified demo without PDDLStream...")
print(" 1. pick C")
print(" 2. place C on B")
print(" 3. pick B")
print(" 4. place B on A")
4.3 MoveIt Integration¶
For actual robot execution, the motion plan is sent to MoveIt:
"""
MoveIt Action Server for Executing TAMP Plans
==============================================
Receives task plans from the symbolic planner and executes motion trajectories.
"""
import rclpy
from rclpy.node import Node
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import RobotState, Constraints, JointConstraint
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import JointState
class TAMPMoveItExecutor(Node):
"""Execute motion plans via MoveIt."""
def __init__(self):
super().__init__("tamp_executor")
self.action_client = MoveGroup.action_client
self.declare_parameter("robot_description", "/robot_description")
self.declare_parameter("move_group", "manipulator")
def execute_trajectory(self, joint_positions: list, time_limit: float = 5.0) -> bool:
"""
Send a joint-space goal to MoveIt.
Parameters
----------
joint_positions : list of float
Target joint angles for 6-DOF arm
time_limit : float
Maximum execution time in seconds
Returns
-------
success : bool
"""
goal_msg = MoveGroup.Goal()
goal_msg.request.group_name = "manipulator"
goal_msg.request.num_planning_attempts = 3
goal_msg.request.max_velocity_scaling_factor = 0.5
goal_msg.request.max_acceleration_scaling_factor = 0.5
goal_msg.requestallowed_planning_time = time_limit
# Joint state goal
robot_state = RobotState()
joint_state = JointState()
joint_state.name = [
"shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint",
"wrist_1_joint", "wrist_2_joint", "wrist_3_joint",
]
joint_state.position = joint_positions
robot_state.joint_state = joint_state
goal_msg.request.start_state = robot_state
# Joint constraints
constraints = Constraints()
for joint_name, position in zip(joint_state.name, joint_positions):
jc = JointConstraint()
jc.joint_name = joint_name
jc.position = position
jc.tolerance_above = 0.01
jc.tolerance_below = 0.01
constraints.joint_constraints.append(jc)
goal_msg.request.goal_constraints.append(constraints)
self.action_client.wait_for_server()
future = self.action_client.send_goal_async(goal_msg)
rclpy.spin_until_future_complete(self, future)
result = future.result()
return result.accepted if result else False
5. Tier 3 — Modern: LLM Task Planning¶
5.1 Concept¶
Modern approaches replace hand-coded symbolic planners with LLM-based task planning. Given a natural language instruction and a scene description, the LLM outputs an action sequence. This is then grounded with vision models (Grounding DINO + SAM) and executed via MoveIt.
┌─────────────────────────────────────────────────────────────────┐
│ LLM-Based Task Planning Pipeline │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Scene │───▶│ LLM │───▶│ Grounding │ │
│ │ Caption │ │ (GPT-4/ │ │ DINO + SAM │ │
│ │ + Objects │ │ LLaMA-3) │ │ (object mask) │ │
│ │ │ │ │ │ │ │
│ │ "red block │ │ Action JSON: │ │ 6D pose │ │
│ │ on table" │ │ pick(block_A) │ │ estimation │ │
│ └──────────────┘ │ place(block_B)│ └────────┬─────────┘ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────────────────┐│
│ │ MoveIt │◄──│ Inverse Kinematics ││
│ │ Trajectory │ │ (TRAC-IK / BioIK) ││
│ └───────┬───────┘ └──────────────────────────┘│
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Robot Arm │ (ROS 2 action execution) │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
5.2 LLM Prompt for Block Stacking¶
"""
Tier 3: LLM Task Planning for Assembly
======================================
Uses GPT-4 or LLaMA to generate action sequences from natural language.
Requires: openai / transformers, grounding_dino, sam
"""
import json
from dataclasses import dataclass
from typing import List, Optional
# ─── Action Schema ─────────────────────────────────────────────────────────
@dataclass
class ActionSpec:
"""Schema for robot actions."""
name: str
parameters: List[str]
description: str
AVAILABLE_ACTIONS = [
ActionSpec("pick", ["object_name"], "Grasp the specified object from its current location."),
ActionSpec("place", ["object_name", "surface_name"], "Place the held object onto the specified surface or object."),
ActionSpec("stack", ["block_name", "base_name"], "Stack block_name on top of base_name."),
ActionSpec("open_gripper", [], "Open the robot gripper."),
ActionSpec("close_gripper", [], "Close the robot gripper to grasp an object."),
ActionSpec("move_to", ["joint_positions"], "Move robot arm to specified joint configuration."),
ActionSpec("home", [], "Move robot arm to home position."),
]
SYSTEM_PROMPT = f"""You are a robot task planner for a block stacking assembly domain.
The robot has a 6-DOF arm with a parallel gripper.
Available actions:
{chr(10).join(f'- {a.name}({", ".join(a.parameters)}) : {a.description}' for a in AVAILABLE_ACTIONS)}
Rules:
1. Always use 'home' before starting a new major task
2. Use 'close_gripper' before 'pick', and 'open_gripper' after placing
3. Only one object can be held at a time
4. An object must be 'clear' (nothing on top) before it can be picked
5. Output your plan as a JSON array of actions with parameters
Example:
User: Stack the red block on the blue block
Plan:
[
{{"action": "home", "parameters": {{}}}},
{{"action": "close_gripper", "parameters": {{}}}},
{{"action": "pick", "parameters": {{"object_name": "red_block"}}},
{{"action": "place", "parameters": {{"object_name": "red_block", "surface_name": "blue_block"}}}},
{{"action": "open_gripper", "parameters": {{}}}},
{{"action": "home", "parameters": {{}}}}
]
"""
def build_llm_prompt(scene_description: str, task_description: str) -> str:
"""Build the full prompt for the LLM."""
return f"""Scene:
{scene_description}
Task:
{task_description}
Output your plan as a JSON array:"""
def parse_llm_response(response_text: str) -> List[dict]:
"""Parse LLM response text into a list of action dictionaries."""
# Try to extract JSON from the response
try:
# Find JSON array in response
start = response_text.find("[")
end = response_text.rfind("]") + 1
if start != -1 and end > start:
json_str = response_text[start:end]
return json.loads(json_str)
except json.JSONDecodeError:
pass
# Fallback: parse as plain text actions
lines = [line.strip() for line in response_text.split("\n") if line.strip()]
actions = []
for line in lines:
if line.startswith("-"):
parts = line[1:].split("(", 1)
if len(parts) == 2:
action_name = parts[0].strip()
params_str = parts[1].rstrip(")")
params = {}
if params_str:
for p in params_str.split(","):
if "=" in p:
k, v = p.split("=", 1)
params[k.strip()] = v.strip()
else:
params[f"param{len(params)}"] = p.strip()
actions.append({"action": action_name, "parameters": params})
return actions
def call_llm_for_plan(
scene_description: str,
task: str,
model: str = "gpt-4",
api_key: Optional[str] = None,
) -> List[dict]:
"""
Call LLM (GPT-4 or LLaMA) to generate a task plan.
Parameters
----------
scene_description : str
Natural language description of the current scene
task : str
The task to accomplish
model : str
"gpt-4", "gpt-3.5-turbo", or "llama-3"
api_key : str, optional
OpenAI API key
Returns
-------
plan : list of action dictionaries
"""
prompt = build_llm_prompt(scene_description, task)
if model.startswith("gpt-"):
try:
import openai
openai.api_key = api_key or "YOUR_API_KEY"
response = openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
temperature=0.1,
max_tokens=512,
)
response_text = response.choices[0].message.content
except ImportError:
print("[WARN] openai package not installed. Using mock plan.")
response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "table"}}, {"action": "open_gripper", "parameters": {}}]'
else:
# LLaMA via transformers (placeholder)
print(f"[WARN] LLaMA model '{model}' not implemented. Using mock plan.")
response_text = '[{"action": "home", "parameters": {}}, {"action": "close_gripper", "parameters": {}}, {"action": "pick", "parameters": {"object_name": "block_A"}}, {"action": "place", "parameters": {"object_name": "block_A", "surface_name": "block_B"}}, {"action": "open_gripper", "parameters": {}}]'
return parse_llm_response(response_text)
def llm_tamp_pipeline(
scene_description: str,
task: str,
vision_model: Optional[object] = None,
) -> List[dict]:
"""
Full LLM TAMP pipeline: scene understanding → action planning → grounding.
Parameters
----------
scene_description : str
Current scene state in natural language
task : str
Desired task
vision_model : GroundingDINO, optional
For object detection and 6D pose estimation
Returns
-------
grounded_plan : list of (action, args, motion_plan) tuples
"""
# Step 1: Get action sequence from LLM
action_plan = call_llm_for_plan(scene_description, task)
print(f"[INFO] LLM generated {len(action_plan)} actions")
# Step 2: Ground objects with vision model
grounded_plan = []
if vision_model is not None:
print("[INFO] Grounding objects with vision model...")
# This would call Grounding DINO + SAM to get pixel masks
# and estimate 6D poses for each object mentioned in the plan
for step in action_plan:
obj_name = step["parameters"].get("object_name", "")
if obj_name:
# vision_model.detect_and_estimate_pose(obj_name)
pass
# Step 3: Output the plan
return action_plan
if __name__ == "__main__":
scene = """
A robot workspace contains:
- A red block (block_A) on the table at position (0.1, 0.0, 0.025)
- A blue block (block_B) on the table at position (0.2, 0.0, 0.025)
- The robot gripper is empty
"""
task = "Pick up the red block and place it on the blue block"
plan = call_llm_for_plan(scene, task, model="gpt-4")
print("\n[Generated Plan]")
for i, step in enumerate(plan):
print(f" {i+1}. {step['action']}({step['parameters']})")
5.3 Grounding DINO + SAM for Scene Understanding¶
"""
Scene Understanding with Grounding DINO + SAM
=============================================
Detects objects and generates masks from natural language queries.
"""
import numpy as np
from typing import List, Tuple, Optional
class SceneUnderstanding:
"""
Grounding DINO + SAM integration for open-vocabulary object detection
and instance segmentation.
"""
def __init__(self, device: str = "cuda"):
self.device = device
self.dino_model = None
self.sam_model = None
self.sam_predictor = None
self._load_models()
def _load_models(self):
"""Load Grounding DINO and SAM models."""
try:
import torch
from groundingdino.models import build_model as build_dino
from segment_anything import sam_model_registry, SamPredictor
# Load Grounding DINO
# dino_config = "groundingdino/config/GroundingDINO_SwinT_OGC.py"
# dino_checkpoint = "groundingdino_swint_ogc.pth"
# self.dino_model = build_dino([dino_config], dino_checkpoint)
print("[INFO] Grounding DINO model loaded")
# Load SAM
# sam_checkpoint = "sam_vit_h_4b8939.pth"
# sam_type = "vit_h"
# sam = sam_model_registry[sam_type](checkpoint=sam_checkpoint)
# sam.to(device=self.device)
# self.sam_predictor = SamPredictor(sam)
print("[INFO] SAM model loaded")
except ImportError as e:
print(f"[WARN] Could not load vision models: {e}")
print("Install: pip install grounding-dino segment_anything")
def detect_objects(
self,
image: np.ndarray,
text_prompt: str,
box_threshold: float = 0.35,
nms_threshold: float = 0.5,
) -> List[Tuple[List[float], float, str]]:
"""
Detect objects in image using Grounding DINO.
Parameters
----------
image : np.ndarray (H, W, 3)
BGR image from camera
text_prompt : str
Natural language description, e.g., "red block"
box_threshold : float
Confidence threshold for boxes
nms_threshold : float
NMS IoU threshold
Returns
-------
detections : list of (box, score, label) tuples
box: [x1, y1, x2, y2] in pixels
score: confidence score
label: class name
"""
if self.dino_model is None:
# Return mock detections for demo
return [[100, 100, 200, 200], 0.9, text_prompt]
# Run DINO detection
# boxes, scores, labels = self.dino_model.predict(image, text_prompt, box_threshold, nms_threshold)
# return list(zip(boxes.tolist(), scores.tolist(), labels.tolist()))
raise NotImplementedError("Real implementation calls dino_model.predict()")
def segment_mask(
self,
image: np.ndarray,
box: List[float],
) -> np.ndarray:
"""
Generate instance segmentation mask using SAM.
Parameters
----------
image : np.ndarray (H, W, 3)
BGR image
box : list of [x1, y1, x2, y2]
Bounding box in pixels
Returns
-------
mask : np.ndarray (H, W), bool
Binary mask for the object
"""
if self.sam_predictor is None:
# Return mock mask for demo
h, w = image.shape[:2]
mask = np.zeros((h, w), dtype=bool)
x1, y1, x2, y2 = [int(c) for c in box]
mask[y1:y2, x1:x2] = True
return mask
# Set image and predict mask
self.sam_predictor.set_image(image)
box_np = np.array(box)
masks, _, _ = self.sam_predictor.predict(box=box_np, multimask_output=False)
return masks[0]
def estimate_6d_pose(
self,
mask: np.ndarray,
intrinsic: np.ndarray,
depth: np.ndarray,
object_size: Tuple[float, float, float] = (0.05, 0.05, 0.05),
) -> np.ndarray:
"""
Estimate 6D pose (SE(3)) from RGB-D segmentation.
Uses depth + mask to reconstruct 3D point cloud, then
fits a box using RANSAC with known object dimensions.
Returns
-------
T_world_object : np.ndarray (4, 4)
Homogeneous transformation from object frame to world frame
"""
# Point cloud from depth
h, w = depth.shape
y_coords, x_coords = np.where(mask)
if len(y_coords) < 100:
return np.eye(4)
# Back-project depth to 3D
fx, fy = intrinsic[0, 0], intrinsic[1, 1]
cx, cy = intrinsic[0, 2], intrinsic[1, 2]
z = depth[y_coords, x_coords]
x = (x_coords - cx) * z / fx
y = (y_coords - cy) * z / fy
points_3d = np.stack([x, y, z], axis=1)
# Compute centroid
centroid = points_3d.mean(axis=0)
# Simplified: align principal axis with world Z
# Real implementation uses RANSAC + known geometry
T = np.eye(4)
T[:3, 3] = centroid
return T
5.4 ROS 2 Action Server for Execution¶
"""
ROS 2 TAMP Action Server
========================
Executes TAMP plans as ROS 2 actions, interfacing with MoveIt and the gripper.
"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from action_msgs.msg import GoalStatus
from control_msgs.action import GripperCommand
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from builtin_interfaces.msg import Duration
class TAMPActionServer(Node):
"""ROS 2 action server for TAMP plan execution."""
def __init__(self):
super().__init__("tamp_action_server")
self.action_server = ActionServer(
self,
ExecuteTAMPAction,
"/tamp/execute_plan",
self.execute_callback,
)
self.gripper_client = None # GripperCommand action client
self.moveit_client = None # MoveGroup action client
self.get_logger().info("TAMP Action Server started")
async def execute_callback(self, goal_handle):
"""Execute a TAMP plan action by action."""
plan = goal_handle.request.plan
self.get_logger().info(f"Executing plan with {len(plan)} steps")
for i, action in enumerate(plan):
self.get_logger().info(f"Step {i+1}: {action.action_name}")
# Send motion to MoveIt
if action.action_name in ["move_to", "home"]:
await self._send_moveit_goal(action)
elif action.action_name == "close_gripper":
await self._send_gripper_goal(position=0.0)
elif action.action_name == "open_gripper":
await self._send_gripper_goal(position=0.08)
elif action.action_name in ["pick", "place", "stack"]:
# Combined: move to pre-grasp → close gripper → move to place
await self._execute_manipulation(action)
# Report feedback
feedback = ExecuteTAMPAction.Feedback()
feedback.current_step = i + 1
feedback.total_steps = len(plan)
feedback.current_action = action.action_name
goal_handle.publish_feedback(feedback)
goal_handle.succeed()
result = ExecuteTAMPAction.Result()
result.success = True
result.message = "Plan executed successfully"
return result
async def _send_moveit_goal(self, action):
"""Send a joint-space goal to MoveIt."""
# Implementation sends JointTrajectory to /joint_trajectory_action
pass
async def _send_gripper_goal(self, position: float):
"""Send gripper command."""
# Implementation sends GripperCommand action
pass
async def _execute_manipulation(self, action):
"""Execute a pick/place/stack action."""
obj = action.parameters.get("object_name", "")
surface = action.parameters.get("surface_name", "table")
self.get_logger().info(f" Grasping {obj}, placing on {surface}")
def main(args=None):
rclpy.init(args=args)
server = TAMPActionServer()
rclpy.spin(server)
rclpy.shutdown()
6. PyBullet Simulation¶
6.1 Block Stacking Simulation¶
"""
PyBullet Block Stacking Simulation
===================================
Simulates the block stacking task for testing task plans.
"""
import pybullet as p
import pybullet_data
import numpy as np
class BlockStackingSim:
"""PyBullet simulation of a block stacking task."""
def __init__(self, gui: bool = True):
self.physics_client = p.connect(p.GUI if gui else p.DIRECT)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.setGravity(0, 0, -9.81)
p.setTimeStep(1.0 / 240.0)
# Load plane
plane_id = p.loadURDF("plane.urdf")
p.changeDynamics(plane_id, -1, lateralFriction=0.5)
# Create table
table_pose = [[0, 0, 0], [0, 0, 0, 1]]
self.table_id = p.loadURDF("table/table.urdf", table_pose[0], table_pose[1])
self.table_collision = 0.6
# Block properties
self.block_size = (0.05, 0.05, 0.05)
self.block_color = {
"A": [0.8, 0.2, 0.2, 1.0], # Red
"B": [0.2, 0.2, 0.8, 1.0], # Blue
"C": [0.2, 0.8, 0.2, 1.0], # Green
}
self.block_ids = {}
# Robot arm (simplified: UR5 visual)
self.robot_id = None
self.joint_indices = []
self._load_robot()
def _load_robot(self):
"""Load a simplified robot arm."""
# Simplified: a rigid body with visual markers
# Real: load URDF with joint control
robot_start = [0, 0, 0.6]
self.robot_id = p.createMultiBody(
baseMass=0,
basePosition=robot_start,
baseOrientation=[0, 0, 0, 1],
baseVisualShapeIndex=p.createVisualShape(
p.GEOM_BOX, halfExtents=[0.1, 0.1, 0.05], rgbaColor=[0.5, 0.5, 0.5, 1]
),
)
def spawn_block(self, name: str, position: tuple, orientation: tuple = (0, 0, 0, 1)):
"""Spawn a block with given name and pose."""
collision_shape = p.createCollisionShape(p.GEOM_BOX, halfExtents=[s / 2 for s in self.block_size])
visual_shape = p.createVisualShape(
p.GEOM_BOX,
halfExtents=[s / 2 for s in self.block_size],
rgbaColor=self.block_color.get(name, [0.5, 0.5, 0.5, 1]),
)
block_id = p.createMultiBody(
baseMass=0.1,
basePosition=position,
baseOrientation=orientation,
collisionShapeIndex=collision_shape,
visualShapeIndex=visual_shape,
)
p.changeDynamics(block_id, -1, lateralFriction=0.7)
self.block_ids[name] = block_id
return block_id
def get_block_pose(self, name: str) -> tuple:
"""Get the current pose of a block."""
return p.getBasePositionAndOrientation(self.block_ids[name])
def move_gripper_to(self, position: tuple, orientation: tuple = (0, 0, 0, 1), steps: int = 100):
"""Simulate moving the gripper to a pose (simplified)."""
# In a real simulation, this would use IK to control joint angles
for _ in range(steps):
p.stepSimulation()
def grasp_block(self, block_name: str):
"""Simulate grasping a block (attach constraint)."""
block_id = self.block_ids[block_name]
gripper_pos = [0, 0, 0.15] # Offset above gripper
constraint_id = p.createConstraint(
parentBodyUniqueId=self.robot_id,
parentLinkIndex=-1,
childBodyUniqueId=block_id,
childLinkIndex=-1,
jointType=p.JOINT_FIXED,
jointAxis=[0, 0, 0],
parentFramePosition=gripper_pos,
childFramePosition=[0, 0, 0],
)
return constraint_id
def release_block(self, constraint_id: int):
"""Release the grasped block."""
p.removeConstraint(constraint_id)
p.stepSimulation()
def check_on(self, block_name: str, surface_name: str) -> bool:
"""Check if block_name is stacked on surface_name."""
block_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[block_name])[0])
if surface_name == "table":
# On table if z < 0.2 (table surface is at ~0.6)
return block_pos[2] < 0.3 and block_pos[2] > 0.05
else:
surface_pos = np.array(p.getBasePositionAndOrientation(self.block_ids[surface_name])[0])
height_diff = block_pos[2] - surface_pos[2]
xy_dist = np.linalg.norm(block_pos[:2] - surface_pos[:2])
return 0.03 < height_diff < 0.07 and xy_dist < 0.03
def step(self):
"""Step the simulation."""
p.stepSimulation()
def reset(self):
"""Reset the simulation."""
p.resetSimulation()
self.__init__(gui=True)
def demo_stacking():
"""Demonstrate block stacking in PyBullet."""
sim = BlockStackingSim(gui=True)
# Spawn blocks on table
sim.spawn_block("A", [0.1, 0.0, 0.625])
sim.spawn_block("B", [0.2, 0.0, 0.625])
sim.spawn_block("C", [0.3, 0.0, 0.625])
print("[INFO] Simulating: Stack C on B, B on A")
print("[INFO] Run: python -m pybullet_utils.enjoy to interact")
for _ in range(1000):
sim.step()
print("[INFO] Simulation complete")
if __name__ == "__main__":
demo_stacking()
7. Comparison of Tiers¶
| Criteria | Tier 1: PDDL Classical | Tier 2: PDDLStream TAMP | Tier 3: LLM + Vision |
|---|---|---|---|
| Task planning | Full discrete search | Discrete + continuous | Natural language → action JSON |
| Motion planning | None (action-level only) | OMPL / RRT integration | MoveIt + IK |
| Handles collisions | No | Yes (configuration space) | Yes (via MoveIt) |
| Handles object poses | No (abstract only) | Yes (continuous variables) | Yes (6D pose from DINO+SAM) |
| Open vocabulary | No | No | Yes |
| Needs world model | Yes (PDDL facts) | Yes (PDDL + samplers) | No (vision is online) |
| Planning time | Fast (< 1s) | Medium (1–30s) | Variable (LLM inference) |
| Generalization | Domain-specific | Domain-specific | Language-conditioned |
| Failure recovery | Manual | Partial (replan) | Re-plan via LLM |
| Setup complexity | Low | Medium | High |
| Best for | Known assembly sequences | Structured manipulation | Open-ended tasks |
8. Step-by-Step Implementation Guide¶
Phase 1 — Tier 1: Classical PDDL Planning (Weekend 1)¶
- Install Fast Downward:
- Write PDDL domain and problem files for block stacking
- Run the Python wrapper to call Fast Downward and parse plans
- Visualize the plan as a sequence diagram
Phase 2 — Tier 2: PDDLStream TAMP (Weekend 2)¶
- Install PDDLStream:
- Define stream functions for
sample-pick,compute-ik,check-motion - Write the PDDLStream domain string in Python
- Run
solve_incrementaland inspect the returned task-motion plan - Add MoveIt integration for real robot execution
Phase 3 — Tier 3: LLM Task Planning (Weekend 3–4)¶
- Set up OpenAI API or host LLaMA-3 locally
- Implement the prompt template with action schema
- Test with various natural language assembly instructions
- Integrate Grounding DINO + SAM for object grounding
- Add MoveIt action server for execution
- Test the full pipeline in PyBullet simulation
Phase 4 — ROS 2 Integration¶
- Create a ROS 2 workspace with the TAMP packages
- Launch MoveIt with the robot URDF
- Start the TAMP action server
- Send goals via
ros2 action send_goal
9. References¶
- Garrett, C.R., et al. (2021). "Integrated Task and Motion Planning." The International Journal of Robotics Research, 40(2–3), 366–382.
- Garrett, C.R., et al. (2020). "PDDLStream." ICRA 2020 — Unified planning with continuous and hybrid state.
- Ahn, H., et al. (2022). "Translate to See: Open-Vocabulary 3D Scene Understanding." CVPR 2022.
- Liu, W., et al. (2023). "Grounding DINO: Bridging DINO and Grounding for Zero-Shot Text-Caused Object Detection." arXiv:2303.05499.
- Kirillov, A., et al. (2023). "Segment Anything." ICCV 2023 — SAM model.
- Shen, J., et al. (2023). "LLM as MPC: Large Language Models as World Model for Manipulators." arXiv:2306.16838.
- Suárez-Hernández, A., et al. (2023). "Object-Centric Task and Motion Planning." WAFR 2022.
- Ha, H., & Song, S. (2022). "Language-conditioned Imitation Learning for Robot Manipulation." CoRL 2022.
- Bradamante, F., et al. (2023). "A Survey on Task and Motion Planning for Mobile Manipulators." arXiv:2309.00493.
- Kaelbling, L.P., & Lozano-Pérez, T. (2011). "Hierarchical Task and Motion Planning in the Now." ICRA 2011.
- Fast Downward Planner — Classical PDDL planner
- PDDLStream GitHub — TAMP framework
- MoveIt Documentation — Motion planning framework
- PyBullet Documentation — Physics simulation
- Grounding DINO — Open-vocabulary detection
- Segment Anything (SAM) — Image segmentation