Skip to content

Voice Interaction Robot (语音交互机器人)

Project Type: Human-Robot Interaction | Difficulty: ★★☆☆☆ to ★★★★☆ (approach-dependent) | Estimated Time: 1–3 weekends


1. Project Overview

A voice interaction robot listens to spoken commands, interprets user intent, and executes corresponding actions (move forward, stop, turn left, navigate to location, fetch an object, etc.). This project builds a complete voice pipeline from microphone input to robot actuation.

  ┌─────────────────────────────────────────────────────────────────┐
  │                    Voice Command Pipeline                         │
  │                                                                  │
  │   ┌──────────┐    ┌──────────┐    ┌───────────┐    ┌────────┐  │
  │   │ Microphone│───▶│    ASR    │───▶│   NLU /   │───▶│ Robot  │  │
  │   │          │    │  (Vosk/  │    │  Dialogue │    │ Action │  │
  │   │          │    │  WebRTC)  │    │  Manager  │    │ Server │  │
  │   └──────────┘    └──────────┘    └───────────┘    └────────┘  │
  │       │               │                  │             │        │
  │       ▼               ▼                  ▼             ▼        │
  │   Raw audio      Text transcript    Intent + Slots   cmd_vel /  │
  │   (16kHz PCM)    "go forward        go_forward {     ActionGoal │
  │                   three meters"       distance: 3}               │
  └─────────────────────────────────────────────────────────────────┘

In this project you will explore three progressively more sophisticated tiers:

Tier Approach Key Technique Complexity
Tier 1 — Traditional Keyword Spotting Threshold-based wake word + rule matching Low
Tier 2 — Intermediate Full NLU Pipeline Intent classification + slot filling Medium
Tier 3 — Modern LLM Dialogue GPT-4 / LLaMA + function calling High

2. Hardware & Software Requirements

Hardware

Component Specification Notes
Microphone USB microphone or I2S array mic (e.g., ReSpeaker 4-Mic) Far-field mic recommended for robot use
Single-board computer Raspberry Pi 4B (4GB+) or Jetson Nano Pi 3B+ minimum for Tier 1
Robot platform Any mobile robot with ROS 2 support TurtleBot4, custom differential drive, etc.
Speaker (optional) USB speaker or amp + speaker For robot speech feedback
Power supply 5V 3A USB-C (Pi) + motor battery Ensure stable power for audio

Software

Package Version Purpose
Python ≥ 3.8 Core language
vosk ≥ 0.3.45 Offline ASR engine
py-webrtcvad ≥ 2.0.10 Voice Activity Detection (VAD)
portaudio / pyaudio Audio I/O
sklearn ≥ 1.0 Intent classifier (Tier 2)
transformers ≥ 4.30 Hugging Face models (Tier ⅔)
openai ≥ 1.0 OpenAI API (Tier 3)
rclpy ROS 2 ROS 2 action/server
riva-cli / whispercpp Alternative ASR backends
pip install vosk py-webrtcvad pyaudio sklearn transformers openai rclpy

3. Tier 1 — Traditional: Keyword Spotting with Vosk

3.1 Concept

Tier 1 uses offline ASR (Vosk) combined with keyword/pattern matching. The system continuously listens for a wake word ("Hey Robot", "OK Bot"). Once triggered, it captures a short phrase, runs ASR, and matches the transcript against a dictionary of known command patterns using regex or string similarity.

  Continuous audio stream
  ┌──────────────────┐
  │ Voice Activity   │ ← WebRTC VAD: is speech present?
  │ Detection (VAD)  │
  └───────┬──────────┘
          │ speech detected
  ┌──────────────────┐
  │ Wake Word Check  │ ← Vosk partial result; keyword match?
  └───────┬──────────┘
          │ wake word found
  ┌──────────────────┐
  │ Command Capture  │ ← Buffer ~3s of audio after wake word
  │ + Vosk ASR       │
  └───────┬──────────┘
          │ transcript
  ┌──────────────────┐
  │ Rule Matcher     │ ← Regex / keyword dictionary
  │ → Robot Action   │
  └──────────────────┘

Wake word detection uses a rolling energy threshold on VAD output:

\[ E_n = \alpha \cdot E_{n-1} + (1-\alpha) \cdot \sum_{k} x_k^2[n] \]

where \(x_k[n]\) is the \(k\)-th frequency bin at frame \(n\), and \(E_n\) is the smoothed frame energy. Speech is declared active when \(E_n > \theta_{\text{energy}}\).

3.2 Complete Python Code

"""
Tier 1: Keyword Spotting Voice Robot
=====================================
Uses Vosk offline ASR + rule-based command matching.
No internet required — fully offline.
"""

import io
import os
import json
import queue
import re
import threading
import struct

import numpy as np
import pyaudio
from vosk import Model, KaldiRecognizer

# ─── Configuration ──────────────────────────────────────────────
VOSK_MODEL_PATH = os.path.expanduser("~/vosk-model-small-en-us-0.15")
SAMPLE_RATE = 16000
CHUNK_SIZE = 4096          # samples per audio chunk (256ms at 16kHz)
WAKE_WORDS = {"hey robot", "ok robot", "hey bot", "hello robot"}
# Minimum transcript length to trigger command processing
MIN_TRANSCRIPT_LEN = 3


class VoiceActivityDetector:
    """
    Energy-based Voice Activity Detection (VAD).
    Uses a rolling average of frame energies.
    """

    def __init__(self, energy_threshold: float = 100.0, smoothing: float = 0.1):
        self.energy_threshold = energy_threshold
        self.smoothing = smoothing
        self.smoothed_energy = 0.0

    def is_speech(self, audio_chunk: bytes) -> bool:
        # Convert to numpy array (16-bit PCM)
        samples = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)
        # Compute frame energy (RMS)
        energy = np.sqrt(np.mean(samples ** 2)) + 1e-9
        # Exponential moving average
        self.smoothed_energy = (
            self.smoothing * energy
            + (1 - self.smoothing) * self.smoothed_energy
        )
        return self.smoothed_energy > self.energy_threshold


class VoskAsr:
    """Wrapper around Vosk recognizer for streaming audio."""

    def __init__(self, model_path: str):
        if not os.path.exists(model_path):
            raise FileNotFoundError(
                f"Vosk model not found at {model_path}. "
                "Download from: https://alphacephei.com/vosk/models"
            )
        model = Model(model_path)
        self.rec = KaldiRecognizer(model, SAMPLE_RATE)
        self.rec.SetWords(True)  # Include word timestamps

    def process_chunk(self, audio_chunk: bytes) -> dict:
        """
        Feed one audio chunk to Vosk. Returns dict with:
          - text: transcribed text (empty string if not final)
          - partial: partial result string
          - is_final: True if result is final
        """
        result = {}
        if self.rec.AcceptWaveform(audio_chunk):
            result = json.loads(self.rec.Result())
            result["is_final"] = True
        else:
            partial = json.loads(self.rec.PartialResult())
            result = {"text": partial.get("partial", ""), "is_final": False}
        return result


class CommandMatcher:
    """
    Rule-based command matcher.
    Maps transcribed text to robot actions using regex patterns.
    """

    def __init__(self):
        # Pattern → (action_name, param_extractor_fn)
        self.rules = [
            # Forward / backward
            (r"go forward\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
             ("move_forward", lambda m: {"distance": int(m.group(1) or 1)})),
            (r"move forward\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
             ("move_forward", lambda m: {"distance": int(m.group(1) or 1)})),
            (r"go back(?:ward)?\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
             ("move_backward", lambda m: {"distance": int(m.group(1) or 1)})),
            # Turn
            (r"turn (left|right)\s*(?:(\d+)\s*degrees?)?",
             ("turn", lambda m: {"direction": m.group(1), "angle": int(m.group(2) or 90)})),
            (r"rotate (left|right)\s*(?:(\d+)\s*degrees?)?",
             ("turn", lambda m: {"direction": m.group(1), "angle": int(m.group(2) or 90)})),
            # Stop
            (r"\bstop\b", ("stop", lambda m: {})),
            (r"\bhalt\b", ("stop", lambda m: {})),
            # Navigation
            (r"go to (?:\w+\s*)+", ("navigate_to", self._extract_location)),
            # Status
            (r"what(?:'s| is) your status", ("status", lambda m: {})),
            (r"how are you", ("status", lambda m: {})),
        ]

    @staticmethod
    def _extract_location(m: re.Match) -> dict:
        """Extract location name from command text."""
        text = m.group(0).replace("go to ", "").strip()
        return {"location": text}

    def match(self, transcript: str) -> tuple | None:
        """
        Match transcript against all rules.
        Returns (action_name, params_dict) or None.
        """
        transcript = transcript.lower().strip()
        for pattern, (action, param_fn) in self.rules:
            m = re.search(pattern, transcript)
            if m:
                return action, param_fn(m)
        return None


class VoiceRobotTier1:
    """
    Tier 1 Voice Robot: Vosk ASR + keyword spotting.
    Runs ASR continuously and processes commands after wake word.
    """

    def __init__(self, model_path: str):
        self.asr = VoskAsr(model_path)
        self.vad = VoiceActivityDetector(energy_threshold=100.0)
        self.matcher = CommandMatcher()
        self.audio_queue = queue.Queue()
        self.running = False

        # State machine
        self.state = "idle"       # idle → wake_word → listening → processing
        self.partial_text = ""
        self.speech_frames = 0    # consecutive frames with speech
        self.SPEECH_FRAMES_THRESH = 3  # frames of speech before capture
        self.silence_frames = 0
        self.SILENCE_FRAMES_THRESH = 15  # silence frames before finalizing

    def _audio_capture_thread(self, p: pyaudio.PyAudio):
        """Background thread: capture audio from microphone."""
        stream = p.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=SAMPLE_RATE,
            input=True,
            frames_per_buffer=CHUNK_SIZE,
        )
        print("[INFO] Microphone active. Say wake word 'Hey Robot' to begin.")
        while self.running:
            chunk = stream.read(CHUNK_SIZE, exception_on_overflow=False)
            self.audio_queue.put(chunk)
        stream.stop_stream()
        stream.close()

    def _process_commands(self, transcript: str) -> None:
        """Process ASR transcript → command → action."""
        if not transcript or len(transcript.split()) < MIN_TRANSCRIPT_LEN:
            return

        result = self.matcher.match(transcript)
        if result:
            action, params = result
            print(f"[CMD] Action: {action}  Params: {params}")
            # Here: publish to ROS action server / robot controller
            # Example: self.robot_action_server.send_goal(action, params)
        else:
            print(f"[WARN] No command matched: '{transcript}'")

    def run(self):
        """Main loop: process audio chunks through VAD → ASR."""
        p = pyaudio.PyAudio()
        self.running = True

        # Start capture thread
        capture_thread = threading.Thread(
            target=self._audio_capture_thread, args=(p,), daemon=True
        )
        capture_thread.start()

        print("[INFO] Voice Robot Tier 1 running. Say 'Hey Robot' to activate.")
        try:
            while self.running:
                chunk = self.audio_queue.get(timeout=1.0)
                is_speech = self.vad.is_speech(chunk)

                if is_speech:
                    self.speech_frames += 1
                    self.silence_frames = 0
                else:
                    self.silence_frames += 1
                    self.speech_frames = 0

                # ── State machine ──
                if self.state == "idle":
                    # Feed audio to Vosk for partial result (wake word detection)
                    result = self.asr.process_chunk(chunk)
                    partial = result.get("text", "")
                    if partial:
                        combined = (self.partial_text + " " + partial).lower()
                        self.partial_text = combined
                        for ww in WAKE_WORDS:
                            if ww in combined:
                                print(f"[WAKE] Wake word '{ww}' detected!")
                                self.state = "listening"
                                self.partial_text = ""
                                # Reset recognizer for command capture
                                self.asr.rec.Reset()
                                break

                elif self.state == "listening":
                    # Collect audio; finalize on silence
                    result = self.asr.process_chunk(chunk)
                    partial = result.get("text", "")
                    if partial:
                        self.partial_text += " " + partial

                    if self.silence_frames > self.SILENCE_FRAMES_THRESH:
                        print(f"[LISTEN] Finalizing: '{self.partial_text.strip()}'")
                        self._process_commands(self.partial_text.strip())
                        self.partial_text = ""
                        self.state = "idle"
                        self.asr.rec.Reset()

                else:
                    # Feed audio even when idle (for continuous wake word check)
                    self.asr.process_chunk(chunk)

        except KeyboardInterrupt:
            print("\n[INFO] Shutting down...")
        finally:
            self.running = False
            p.terminate()


if __name__ == "__main__":
    robot = VoiceRobotTier1(model_path=VOSK_MODEL_PATH)
    robot.run()

3.3 Command Coverage

Command Pattern Action Parameter
"go forward [N]" move_forward {distance: N}
"go back [N]" move_backward {distance: N}
"turn left/right [N deg]" turn {direction, angle}
"stop" / "halt" stop
"go to kitchen" navigate_to {location: "kitchen"}
"what's your status" status

4. Tier 2 — Intermediate: Intent Classification + Slot Filling

4.1 Concept

Tier 2 replaces regex matching with a proper Natural Language Understanding (NLU) pipeline. The transcribed text goes through two stages:

  1. Intent Classification: Assign the utterance to one of \(K\) predefined intents using a trained classifier.
  2. Slot Filling: Extract structured entities (e.g., numbers, directions, locations) from the utterance.
  Transcript: "go forward three meters"
  ┌─────────────────────────────┐
  │  Intent Classifier           │ → go_forward  (confidence: 0.94)
  │  (sklearn / Transformer)     │
  └─────────────────────────────┘
  ┌─────────────────────────────┐
  │  Slot Filler                │ → B-DISTANCE I-DISTANCE O O
  │  (BIO tagging / CRF)        │   3        three  <PAD> <PAD>
  └─────────────────────────────┘
  { intent: "go_forward", slots: { distance: 3, unit: "meter" } }

Intent Classification: Given an input sequence \(\mathbf{x} = (x_1, ..., x_n)\), the classifier predicts:

\[ \hat{c} = \arg\max_{c \in \mathcal{C}} \; P_\theta(c \mid \mathbf{x}) \]

where \(\mathcal{C}\) is the set of intents. We can use:

  • TF-IDF + Logistic Regression (fast, good baseline)
  • BERT / DistilBERT (higher accuracy, needs GPU)

Slot Filling: Treats the task as sequence labeling. Each input token \(x_i\) is labeled with a BIO tag:

\[ \hat{y}_i = \arg\max_{y \in \mathcal{Y}} \; P_\theta(y_i \mid x_i, \mathbf{x}) \]

where \(\mathcal{Y} = \{ O, B\text{-}DISTANCE, I\text{-}DISTANCE, B\text{-}DIRECTION, ... \}\).

4.2 Complete Python Code

"""
Tier 2: Intent Classification + Slot Filling
=============================================
NLU pipeline for voice robot commands.
Uses TF-IDF + Logistic Regression for intent,
and BIO tagging with sklearn-crfsuite for slot filling.
"""

import json
import os
import pickle
import re
from typing import Optional

import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import sklearn_crfsuite
from sklearn_crfsuite import metrics as crf_metrics

# ─── Training Data ──────────────────────────────────────────────
INTENT_DATA = [
    ("go forward one meter", "move_forward"),
    ("go forward two meters", "move_forward"),
    ("move forward three steps", "move_forward"),
    ("move forward five meters", "move_forward"),
    ("advance forward", "move_forward"),
    ("go forward", "move_forward"),
    ("go back one meter", "move_backward"),
    ("move backward two steps", "move_backward"),
    ("go backward three meters", "move_backward"),
    ("reverse", "move_backward"),
    ("retreat", "move_backward"),
    ("turn left", "turn_left"),
    ("rotate left", "turn_left"),
    ("turn left ninety degrees", "turn_left"),
    ("turn right", "turn_right"),
    ("rotate right forty five degrees", "turn_right"),
    ("spin right", "turn_right"),
    ("stop", "stop"),
    ("halt", "stop"),
    ("emergency stop", "stop"),
    ("wait", "stop"),
    ("go to the kitchen", "navigate_to"),
    ("navigate to living room", "navigate_to"),
    ("go to charging station", "navigate_to"),
    ("go to home position", "navigate_to"),
    ("pick up the object", "pick_object"),
    ("grab the item", "pick_object"),
    ("put it down", "place_object"),
    ("drop the object", "place_object"),
    ("what is your status", "status"),
    ("how are you doing", "status"),
    ("report status", "status"),
]

# Slot BIO training data: list of (tokens, bio_tags)
SLOT_DATA = [
    (["go", "forward", "three", "meters"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
    (["move", "forward", "five", "steps"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
    (["go", "forward"], ["O", "O"]),
    (["go", "back", "two", "meters"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
    (["turn", "left", "ninety", "degrees"], ["O", "B-DIRECTION", "B-ANGLE", "I-ANGLE"]),
    (["turn", "right"], ["O", "B-DIRECTION"]),
    (["rotate", "right", "forty", "five", "degrees"], ["O", "B-DIRECTION", "B-ANGLE", "I-ANGLE", "I-ANGLE"]),
    (["go", "to", "the", "kitchen"], ["O", "O", "O", "B-LOCATION"]),
    (["navigate", "to", "living", "room"], ["O", "O", "O", "B-LOCATION"]),
    (["go", "to", "home"], ["O", "O", "O"]),
    (["stop"], ["O"]),
    (["how", "are", "you"], ["O", "O", "O"]),
    (["what", "is", "your", "status"], ["O", "O", "O", "O"]),
    (["grab", "the", "red", "box"], ["O", "O", "B-OBJECT_COLOR", "B-OBJECT_NAME"]),
    (["pick", "up", "the", "item"], ["O", "O", "O", "B-OBJECT_NAME"]),
]

INTENTS = sorted(list(set(label for _, label in INTENT_DATA)))
SLOT_LABELS = sorted(list(set(tag for _, tags in SLOT_DATA for tag in tags)))


# ─── Intent Classifier ─────────────────────────────────────────
class IntentClassifier:
    """TF-IDF + Logistic Regression intent classifier."""

    def __init__(self):
        self.pipeline = Pipeline([
            ("tfidf", TfidfVectorizer(analyzer="char_wb", ngram_range=(2, 4))),
            ("clf", LogisticRegression(max_iter=1000, C=10.0)),
        ])
        self._trained = False

    def train(self, texts: list[str], labels: list[str]):
        X_train, X_test, y_train, y_test = train_test_split(
            texts, labels, test_size=0.2, random_state=42, stratify=labels
        )
        self.pipeline.fit(X_train, y_train)
        acc = self.pipeline.score(X_test, y_test)
        print(f"[NLU] Intent classifier accuracy: {acc:.2%}")
        self._trained = True

    def predict(self, text: str) -> tuple[str, float]:
        """Return (intent, confidence_score)."""
        if not self._trained:
            raise RuntimeError("Classifier not trained. Call train() first.")
        probs = self.pipeline.predict_proba([text])[0]
        intent_idx = int(np.argmax(probs))
        intent = self.pipeline.classes_[intent_idx]
        confidence = float(probs[intent_idx])
        return intent, confidence


# ─── Slot Filler ───────────────────────────────────────────────
class SlotFiller:
    """BIO tagging slot filler using CRF (Conditional Random Field)."""

    def __init__(self):
        self.model: Optional[sklearn_crfsuite.CRF] = None

    def _extract_features(self, token: str, i: int, tokens: list[str]) -> dict:
        """Extract features for a single token in the sequence."""
        word = token.lower()
        features = {
            "bias": 1.0,
            "word.lower()": word,
            "word[-3:]": word[-3:] if len(word) > 2 else word,
            "word[-2:]": word[-2:] if len(word) > 1 else word,
            "word.isupper()": word.isupper(),
            "word.isdigit()": word.isdigit(),
            "word.isalpha()": word.isalpha(),
            # Context features
            "word_prev": tokens[i - 1].lower() if i > 0 else "<BOS>",
            "word_next": tokens[i + 1].lower() if i < len(tokens) - 1 else "<EOS>",
            "word_prev2": tokens[i - 2].lower() if i > 1 else "<BOS>",
        }
        return features

    def _token_features(self, tokens: list[str]) -> list[dict]:
        return [self._extract_features(t, i, tokens) for i, t in enumerate(tokens)]

    def train(self, data: list[tuple[list[str], list[str]]]):
        X = [self._token_features(tokens) for tokens, _ in data]
        y = [tags for _, tags in data]
        self.model = sklearn_crfsuite.CRF(
            algorithm="lbfgs", c1=0.1, c2=0.1, max_iterations=100
        )
        self.model.fit(X, y)
        print("[NLU] Slot filler (CRF) trained.")

    def predict(self, text: str) -> dict[str, str]:
        """Return slot dictionary from input text."""
        if self.model is None:
            raise RuntimeError("Slot filler not trained. Call train() first.")
        tokens = re.findall(r"\b\w+\b", text.lower())
        features = self._token_features(tokens)
        pred_tags = self.model.predict([features])[0]

        # Parse BIO tags into slot dictionary
        slots = {}
        current_slot = None
        current_value = []

        for token, tag in zip(tokens, pred_tags):
            if tag.startswith("B-"):
                if current_slot and current_value:
                    slots[current_slot] = " ".join(current_value)
                current_slot = tag[2:]
                current_value = [token]
            elif tag.startswith("I-") and tag[2:] == current_slot:
                current_value.append(token)
            else:
                if current_slot and current_value:
                    slots[current_slot] = " ".join(current_value)
                current_slot = None
                current_value = []

        if current_slot and current_value:
            slots[current_slot] = " ".join(current_value)

        # Resolve numbers
        for key in list(slots.keys()):
            if slots[key].isdigit():
                slots[key] = int(slots[key])
            elif slots[key].replace(" ", "").isdigit():
                slots[key] = int(slots[key].replace(" ", ""))
            else:
                # Map word numbers to digits
                word_to_num = {
                    "one": 1, "two": 2, "three": 3, "four": 4,
                    "five": 5, "six": 6, "seven": 7, "eight": 8,
                    "nine": 9, "ten": 10,
                }
                val = word_to_num.get(slots[key].strip(), slots[key])
                slots[key] = val

        return slots


class NLUVoiceRobotTier2:
    """
    Tier 2 Voice Robot: Intent Classification + Slot Filling.
    More robust than regex — handles paraphrases and variations.
    """

    def __init__(self):
        self.intent_clf = IntentClassifier()
        self.slot_filler = SlotFiller()
        self._trained = False

    def train(self):
        """Train both NLU components on labeled data."""
        texts, labels = zip(*INTENT_DATA)
        self.intent_clf.train(list(texts), list(labels))
        self.slot_filler.train(SLOT_DATA)
        self._trained = True
        print("[NLU] Training complete.")

    def understand(self, transcript: str) -> dict:
        """
        Full NLU understanding pipeline.
        Returns { intent, confidence, slots }.
        """
        if not self._trained:
            self.train()

        intent, confidence = self.intent_clf.predict(transcript)
        slots = self.slot_filler.predict(transcript)
        return {
            "transcript": transcript,
            "intent": intent,
            "confidence": confidence,
            "slots": slots,
        }

    def process(self, transcript: str) -> None:
        """Understand and execute a voice command."""
        result = self.understand(transcript)
        print(f"[NLU] Intent: {result['intent']} "
              f"(conf={result['confidence']:.2f})  "
              f"Slots: {result['slots']}")
        # Here: map intent → ROS action and send goal
        # Example: self.action_client.send_goal(result['intent'], result['slots'])


# ─── Demo ──────────────────────────────────────────────────────
if __name__ == "__main__":
    robot = NLUVoiceRobotTier2()
    robot.train()

    test_utterances = [
        "go forward three meters",
        "turn left ninety degrees",
        "go to the kitchen",
        "rotate right forty five degrees",
        "what is your status",
        "move forward five steps",
        "grab the red box",
    ]

    print("\n=== NLU Test Results ===")
    for utt in test_utterances:
        result = robot.understand(utt)
        print(f"  '{utt}'")
        print(f"    → intent={result['intent']}  conf={result['confidence']:.2f}  "
              f"slots={result['slots']}")

5. Tier 3 — Modern: LLM-Powered Dialogue with Function Calling

5.1 Concept

Tier 3 replaces the fixed NLU pipeline with a Large Language Model (GPT-4 or local LLaMA) that handles intent understanding, slot extraction, and dialogue management jointly. The LLM is given a system prompt describing the robot's capabilities and a function calling schema that defines available robot actions.

  User: "Can you go to the kitchen and pick up the red box?"
  ┌─────────────────────────────────────────────────┐
  │  System Prompt:                                 │
  │  "You are a robot voice assistant. You have    │
  │   access to these functions: navigate_to,       │
  │   pick_object, place_object, move_forward,       │
  │   turn, stop, status."                         │
  │                                                  │
  │  Function Calling Schema:                        │
  │  - navigate_to(location: string)               │
  │  - pick_object(color: string, name: string)     │
  │  - place_object()                               │
  │  - move_forward(distance: float, unit: string) │
  └─────────────────────────────────────────────────┘
  ┌──────────────────────────────────────────────────┐
  │  LLM Output (structured function call):          │
  │  {                                               │
  │    name: "navigate_to",                         │
  │    arguments: { location: "kitchen" }            │
  │  }                                               │
  │  {                                               │
  │    name: "pick_object",                         │
  │    arguments: { color: "red", name: "box" }     │
  │  }                                               │
  └──────────────────────────────────────────────────┘
  ┌──────────────────────────────────────────────────┐
  │  Action Executor: call functions sequentially,   │
  │  report results back to LLM for next step        │
  └──────────────────────────────────────────────────┘

Prompt Engineering is central to this tier. The system prompt defines:

  1. Role: "You are a robot voice assistant."
  2. Capabilities: List of available functions with descriptions.
  3. Constraints: "Only call functions from the provided list."
  4. Dialogue State: Conversation history for multi-step tasks.

5.2 Complete Python Code

"""
Tier 3: LLM-Powered Voice Robot with Function Calling
=====================================================
Uses GPT-4 or local LLaMA to understand commands,
extract structured parameters, and execute robot actions
via a function calling interface.
"""

import json
import re
import openai
from dataclasses import dataclass, asdict
from typing import Optional

# ─── Robot Action Schema ──────────────────────────────────────
# This defines what the robot can do. Passed to the LLM.
ROBOT_FUNCTIONS = [
    {
        "type": "function",
        "function": {
            "name": "move_forward",
            "description": "Move the robot forward by a specified distance.",
            "parameters": {
                "type": "object",
                "properties": {
                    "distance": {
                        "type": "number",
                        "description": "Distance to travel in meters."
                    },
                    "speed": {
                        "type": "number",
                        "description": "Speed in m/s. Default: 0.3.",
                        "default": 0.3,
                    },
                },
                "required": ["distance"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "turn",
            "description": "Turn the robot in place by a specified angle.",
            "parameters": {
                "type": "object",
                "properties": {
                    "angle": {
                        "type": "number",
                        "description": "Turn angle in degrees. Positive = left, negative = right."
                    },
                    "speed": {
                        "type": "number",
                        "description": "Angular speed in rad/s. Default: 0.5.",
                        "default": 0.5,
                    },
                },
                "required": ["angle"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "navigate_to",
            "description": "Navigate autonomously to a named location on the map.",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "Name of the destination (e.g., 'kitchen', 'charging station')."
                    },
                },
                "required": ["location"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "pick_object",
            "description": "Activate the robot's gripper to pick up an object.",
            "parameters": {
                "type": "object",
                "properties": {
                    "object_id": {
                        "type": "string",
                        "description": "Identifier for the object to pick up (e.g., 'red_box', 'cup')."
                    },
                },
                "required": ["object_id"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "place_object",
            "description": "Release the gripped object.",
            "parameters": {"type": "object", "properties": {}},
        },
    },
    {
        "type": "function",
        "function": {
            "name": "stop",
            "description": "Immediately stop all robot motion.",
            "parameters": {"type": "object", "properties": {}},
        },
    },
    {
        "type": "function",
        "function": {
            "name": "report_status",
            "description": "Report the robot's current status: battery, position, carried object.",
            "parameters": {"type": "object", "properties": {}},
        },
    },
]


@dataclass
class DialogueState:
    """Maintains the dialogue context across multiple turns."""
    conversation_history: list[dict]  # [{role, content}]
    current_goal: Optional[str] = None
    sub_goals_completed: int = 0
    total_sub_goals: int = 0

    def add_user_message(self, text: str):
        self.conversation_history.append({"role": "user", "content": text})

    def add_assistant_message(self, text: str):
        self.conversation_history.append({"role": "assistant", "content": text})

    def add_function_result(self, name: str, result: str):
        msg = (
            f"Function '{name}' executed. Result: {result}. "
            "Continue the dialogue or report completion."
        )
        self.conversation_history.append(
            {"role": "user", "content": msg}
        )


# ─── Robot Action Executor ────────────────────────────────────
class RobotActionExecutor:
    """
    Executes robot actions.
    In a real system, this publishes to ROS topics / action servers.
    """

    def __init__(self):
        self.last_status = {
            "battery": 85,
            "position": {"x": 0.0, "y": 0.0, "theta": 0.0},
            "carrying": None,
        }

    def execute(self, function_name: str, arguments: dict) -> str:
        """Execute a function call and return a status string."""
        handler = getattr(self, f"_do_{function_name}", None)
        if handler is None:
            return f"Unknown function: {function_name}"

        try:
            result = handler(arguments)
            return result
        except Exception as e:
            return f"Error executing {function_name}: {str(e)}"

    def _do_move_forward(self, args: dict) -> str:
        d = args.get("distance", 1.0)
        s = args.get("speed", 0.3)
        # In real code: publish to ROS cmd_vel or call action server
        print(f"[ROBOT] Moving forward {d}m at {s}m/s")
        self.last_status["position"]["x"] += d
        return f"Moved forward {d}m. New position: {self.last_status['position']}"

    def _do_turn(self, args: dict) -> str:
        angle = args.get("angle", 90)
        speed = args.get("speed", 0.5)
        print(f"[ROBOT] Turning {angle}° at {speed}rad/s")
        self.last_status["position"]["theta"] += angle
        return f"Turned {angle}°."

    def _do_navigate_to(self, args: dict) -> str:
        loc = args.get("location", "unknown")
        print(f"[ROBOT] Navigating to '{loc}'")
        # ROS Navigation: send goal to move_base
        return f"Navigation to '{loc}' started. ETA: 30 seconds."

    def _do_pick_object(self, args: dict) -> str:
        obj = args.get("object_id", "unknown")
        print(f"[ROBOT] Picking up '{obj}'")
        self.last_status["carrying"] = obj
        return f"Picked up '{obj}'."

    def _do_place_object(self, args: dict) -> str:
        obj = self.last_status.get("carrying", "nothing")
        print(f"[ROBOT] Placing '{obj}'")
        self.last_status["carrying"] = None
        return f"Placed '{obj}'."

    def _do_stop(self, args: dict) -> str:
        print("[ROBOT] EMERGENCY STOP")
        return "Robot stopped."

    def _do_report_status(self, args: dict) -> str:
        s = self.last_status
        return (
            f"Status: battery={s['battery']}%, "
            f"position=({s['position']['x']:.1f}, {s['position']['y']:.1f}), "
            f"carrying={s['carrying'] or 'nothing'}."
        )


# ─── LLM Dialogue Manager ────────────────────────────────────
class LLMDialogueManager:
    """
    Manages conversation with an LLM using function calling.
    Supports multi-step task decomposition.
    """

    SYSTEM_PROMPT = """You are a helpful voice-controlled robot assistant.

Your robot has the following capabilities:
- move_forward(distance, speed): Move forward by a distance in meters.
- turn(angle, speed): Turn in place. Positive angle=left, negative=right.
- navigate_to(location): Navigate autonomously to a named location on the map.
- pick_object(object_id): Pick up a specific object.
- place_object(): Place the currently held object.
- stop(): Immediately stop all robot motion.
- report_status(): Report battery, position, and carried object.

Guidelines:
1. Only call functions from the provided list.
2. Break complex commands into sequential steps.
3. After each function result, confirm completion to the user.
4. If the user asks something outside your capabilities, politely say so.
5. Be concise and natural in your responses.
"""

    def __init__(self, api_key: Optional[str] = None, model: str = "gpt-4o"):
        self.client = openai.OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
        self.model = model
        self.state = DialogueState(conversation_history=[])
        self.executor = RobotActionExecutor()

    def _chat(self, messages: list[dict]) -> dict:
        """Call the LLM with function definitions."""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=ROBOT_FUNCTIONS,
            tool_choice="auto",
            temperature=0.0,
        )
        return response.choices[0].message

    def _build_messages(self) -> list[dict]:
        """Build full message list with system prompt."""
        messages = [{"role": "system", "content": self.SYSTEM_PROMPT}]
        messages += self.state.conversation_history
        return messages

    def _handle_function_call(self, msg: dict) -> str:
        """Handle a function call from the LLM response."""
        tool_calls = msg.get("tool_calls", [])
        results = []

        for call in tool_calls:
            fn = call["function"]
            name = fn["name"]
            args = json.loads(fn["arguments"]) if isinstance(fn["arguments"], str) else fn["arguments"]
            print(f"[LLM] Calling function: {name}({args})")
            result = self.executor.execute(name, args)
            print(f"[LLM] Result: {result}")
            results.append((name, result))

        # Add assistant message with function calls
        tool_calls_block = []
        for call in tool_calls:
            tool_calls_block.append({
                "id": call["id"],
                "type": "function",
                "function": call["function"],
            })

        assistant_text = msg.get("content") or ""
        assistant_msg = {"role": "assistant", "content": assistant_text}
        if tool_calls_block:
            assistant_msg["tool_calls"] = tool_calls_block
        self.state.add_assistant_message(assistant_text)

        # Add function results back to conversation
        for name, result in results:
            self.state.conversation_history.append({
                "role": "tool",
                "tool_call_id": tool_calls_block[0]["id"],
                "content": result,
            })

        return results

    def process_utterance(self, transcript: str) -> str:
        """
        Process a user utterance and execute robot actions.
        Handles multi-step dialogues automatically.
        """
        print(f"\n[USER] {transcript}")
        self.state.add_user_message(transcript)

        max_turns = 10  # Prevent infinite loops
        for _ in range(max_turns):
            messages = self._build_messages()
            msg = self._chat(messages)

            if msg.get("tool_calls"):
                self._handle_function_call(asdict(msg))
            else:
                response = msg.get("content", "")
                self.state.add_assistant_message(response)
                print(f"[ROBOT] {response}")
                return response

        return "Maximum dialogue turns reached. Please try a simpler command."


# ─── ROS 2 Action Server Integration ──────────────────────────
def integrate_with_ros_action():
    """
    Integration pattern for connecting LLM function calls to ROS 2 actions.
    This shows the bridge between LLM decisions and robot execution.
    """
    # In a real implementation:
    #
    # import rclpy
    # from rclpy.node import Node
    # from my_robot_action_msgs.action import Navigate, Move
    #
    # class RobotActionClient(Node):
    #     def __init__(self):
    #         super().__init__('llm_action_client')
    #         self.move_client = ActionClient(self, Move, '/move_action')
    #         self.nav_client = ActionClient(self, Navigate, '/navigate_action')
    #
    #     def send_goal(self, action_name: str, params: dict):
    #         if action_name == "move_forward":
    #             goal = Move.Goal()
    #             goal.distance = params.get("distance", 1.0)
    #             goal.speed = params.get("speed", 0.3)
    #             self.move_client.send_goal_async(goal)
    #         elif action_name == "navigate_to":
    #             goal = Navigate.Goal()
    #             goal.location = params.get("location", "")
    #             self.nav_client.send_goal_async(goal)
    #
    # print("ROS 2 action client pattern ready for integration.")
    pass


# ─── Demo ─────────────────────────────────────────────────────
if __name__ == "__main__":
    import os

    # Initialize LLM manager
    manager = LLMDialogueManager(
        api_key=os.environ.get("OPENAI_API_KEY"),
        model="gpt-4o",
    )

    print("=== LLM Voice Robot (Tier 3) Demo ===")
    print("Commands will be sent to GPT-4 with robot function schemas.\n")

    commands = [
        "Go forward two meters please.",
        "Turn left 90 degrees.",
        "Can you navigate to the kitchen?",
        "Go to the kitchen, pick up the red box, and bring it back to me.",
    ]

    for cmd in commands:
        manager.process_utterance(cmd)
        print()

6. Three-Tier Comparison

Criteria Tier 1 — Keyword Spotting Tier 2 — Intent + Slot NLU Tier 3 — LLM Function Calling
ASR Vosk offline Vosk offline Vosk offline or Whisper API
Language model Regex / dictionary TF-IDF + LR / CRF GPT-4 / LLaMA (≥7B)
Intent coverage Fixed patterns Trainable, handles paraphrases Open-vocabulary
Slot extraction Regex capture groups BIO tagging LLM JSON extraction
Multi-turn dialogue ❌ No ❌ No ✅ Yes
Error recovery Manual Manual LLM-handled
Paraphrase handling ❌ Poor ✅ Good ✅ Excellent
Context memory ❌ None ❌ None ✅ Conversation history
Latency (local) < 100ms 100–500ms 500ms–3s (LLaMA)
Latency (API) 1–3s (GPT-4)
Hardware requirement Pi 3B+ Pi 4B Pi 4B + GPU (LLaMA)
Internet required ❌ No ❌ No ⚠️ Yes (GPT-4) / ⚠️ Partial (LLaMA)
Setup complexity ⭐⭐ ⭐⭐⭐⭐
Accuracy ⭐⭐ ⭐⭐⭐ ⭐⭐⭐⭐⭐
Best for Fixed commands Custom robot domain Natural, flexible interaction

7. Step-by-Step Implementation Guide

Phase 1 — Audio Pipeline (All Tiers)

  1. Install Vosk model:
    mkdir -p ~/vosk-models
    cd ~/vosk-models
    wget https://alphacephei.com/vosk/models/vosk-model-small-en-us-0.15.zip
    unzip vosk-model-small-en-us-0.15.zip
    
  2. Test microphone: python3 -c "import pyaudio; p = pyaudio.PyAudio(); print(p.get_device_count())"
  3. Run Tier 1 code end-to-end; verify wake word detection.
  4. Tune VAD energy threshold: lower if robot is in a quiet environment, raise if noisy.

Phase 2 — Tier 1 Deployment

  1. Connect USB microphone to robot SBC.
  2. Run VoiceRobotTier1.run() on the robot.
  3. Map matched commands to ROS 2 action clients (see Section 7.1).
  4. Test in various acoustic environments; adjust SPEECH_FRAMES_THRESH.

Phase 3 — Tier 2 Training

  1. Expand INTENT_DATA and SLOT_DATA with your robot's specific command set.
  2. Run NLUVoiceRobotTier2.train() to evaluate accuracy.
  3. For better accuracy, use DistilBERT instead of TF-IDF:
    from transformers import pipeline
    intent_pipe = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
    # Zero-shot: no fine-tuning needed for new intents
    
  4. Deploy the trained model to the robot.

Phase 4 — Tier 3 Integration

  1. Obtain OpenAI API key or deploy a local LLaMA server (Ollama, text-generation-webui).
  2. Configure LLMDialogueManager with your preferred model.
  3. Define ROBOT_FUNCTIONS schema for your specific robot's capabilities.
  4. Connect RobotActionExecutor.execute() to ROS 2 action clients.
  5. Test multi-step commands like "go to the kitchen and pick up the red box."

Phase 5 — ROS 2 Integration (All Tiers)

# Example: ROS 2 action client bridge
import rclpy
from rclpy.node import Node
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Twist

class VoiceRobotROSClient(Node):
    def __init__(self):
        super().__init__("voice_robot_client")
        self.cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)

    def execute_action(self, action: str, params: dict):
        twist = Twist()
        if action == "move_forward":
            twist.linear.x = params.get("distance", 1.0)
        elif action == "stop":
            twist.linear.x = 0.0
            twist.angular.z = 0.0
        self.cmd_pub.publish(twist)

8. Extensions and Variations

8.1 WebRTC VAD vs. Energy-Based VAD

Tier 1 uses energy-based VAD for simplicity. For production systems, replace with WebRTC VAD (py-webrtcvad) which is specifically trained for voice:

import webrtcvad

vad = webrtcvad.Vad(2)  # Aggressiveness 0-3
# Frame must be 10, 20, or 30 ms
is_speech = vad.is_speech(audio_10ms_frame, sample_rate=16000)

8.2 Whisper API for Higher-Quality ASR

Replace Vosk with OpenAI Whisper for better transcription accuracy:

import openai
audio_file = open("recording.wav", "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file)["text"]

8.3 Local LLaMA with Ollama

For fully offline Tier 3, use Ollama:

# Terminal
ollama serve
ollama pull llama3
# Python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
response = client.chat.completions.create(model="llama3", messages=[...])

8.4 Wake Word Customization

Train a custom wake word detector using Picovoice Porcupine:

pip install pvporcupine
python3 -c "import porcupine; print(porcupine.MODEL_PATH)"

8.5 Multi-Modal Commands

Combine voice with gesture or gaze:

User: *points to object* "Grab that."
  Voice: "Grab that" → navigate_to(object_location_from_vision)
  Vision: detected pointing gesture + object at location

9. References