Voice Interaction Robot (语音交互机器人)¶
Project Type: Human-Robot Interaction | Difficulty: ★★☆☆☆ to ★★★★☆ (approach-dependent) | Estimated Time: 1–3 weekends
1. Project Overview¶
A voice interaction robot listens to spoken commands, interprets user intent, and executes corresponding actions (move forward, stop, turn left, navigate to location, fetch an object, etc.). This project builds a complete voice pipeline from microphone input to robot actuation.
┌─────────────────────────────────────────────────────────────────┐
│ Voice Command Pipeline │
│ │
│ ┌──────────┐ ┌──────────┐ ┌───────────┐ ┌────────┐ │
│ │ Microphone│───▶│ ASR │───▶│ NLU / │───▶│ Robot │ │
│ │ │ │ (Vosk/ │ │ Dialogue │ │ Action │ │
│ │ │ │ WebRTC) │ │ Manager │ │ Server │ │
│ └──────────┘ └──────────┘ └───────────┘ └────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Raw audio Text transcript Intent + Slots cmd_vel / │
│ (16kHz PCM) "go forward go_forward { ActionGoal │
│ three meters" distance: 3} │
└─────────────────────────────────────────────────────────────────┘
In this project you will explore three progressively more sophisticated tiers:
| Tier | Approach | Key Technique | Complexity |
|---|---|---|---|
| Tier 1 — Traditional | Keyword Spotting | Threshold-based wake word + rule matching | Low |
| Tier 2 — Intermediate | Full NLU Pipeline | Intent classification + slot filling | Medium |
| Tier 3 — Modern | LLM Dialogue | GPT-4 / LLaMA + function calling | High |
2. Hardware & Software Requirements¶
Hardware¶
| Component | Specification | Notes |
|---|---|---|
| Microphone | USB microphone or I2S array mic (e.g., ReSpeaker 4-Mic) | Far-field mic recommended for robot use |
| Single-board computer | Raspberry Pi 4B (4GB+) or Jetson Nano | Pi 3B+ minimum for Tier 1 |
| Robot platform | Any mobile robot with ROS 2 support | TurtleBot4, custom differential drive, etc. |
| Speaker (optional) | USB speaker or amp + speaker | For robot speech feedback |
| Power supply | 5V 3A USB-C (Pi) + motor battery | Ensure stable power for audio |
Software¶
| Package | Version | Purpose |
|---|---|---|
| Python | ≥ 3.8 | Core language |
| vosk | ≥ 0.3.45 | Offline ASR engine |
| py-webrtcvad | ≥ 2.0.10 | Voice Activity Detection (VAD) |
| portaudio / pyaudio | — | Audio I/O |
| sklearn | ≥ 1.0 | Intent classifier (Tier 2) |
| transformers | ≥ 4.30 | Hugging Face models (Tier ⅔) |
| openai | ≥ 1.0 | OpenAI API (Tier 3) |
| rclpy | ROS 2 | ROS 2 action/server |
| riva-cli / whispercpp | — | Alternative ASR backends |
3. Tier 1 — Traditional: Keyword Spotting with Vosk¶
3.1 Concept¶
Tier 1 uses offline ASR (Vosk) combined with keyword/pattern matching. The system continuously listens for a wake word ("Hey Robot", "OK Bot"). Once triggered, it captures a short phrase, runs ASR, and matches the transcript against a dictionary of known command patterns using regex or string similarity.
Continuous audio stream
│
▼
┌──────────────────┐
│ Voice Activity │ ← WebRTC VAD: is speech present?
│ Detection (VAD) │
└───────┬──────────┘
│ speech detected
▼
┌──────────────────┐
│ Wake Word Check │ ← Vosk partial result; keyword match?
└───────┬──────────┘
│ wake word found
▼
┌──────────────────┐
│ Command Capture │ ← Buffer ~3s of audio after wake word
│ + Vosk ASR │
└───────┬──────────┘
│ transcript
▼
┌──────────────────┐
│ Rule Matcher │ ← Regex / keyword dictionary
│ → Robot Action │
└──────────────────┘
Wake word detection uses a rolling energy threshold on VAD output:
where \(x_k[n]\) is the \(k\)-th frequency bin at frame \(n\), and \(E_n\) is the smoothed frame energy. Speech is declared active when \(E_n > \theta_{\text{energy}}\).
3.2 Complete Python Code¶
"""
Tier 1: Keyword Spotting Voice Robot
=====================================
Uses Vosk offline ASR + rule-based command matching.
No internet required — fully offline.
"""
import io
import os
import json
import queue
import re
import threading
import struct
import numpy as np
import pyaudio
from vosk import Model, KaldiRecognizer
# ─── Configuration ──────────────────────────────────────────────
VOSK_MODEL_PATH = os.path.expanduser("~/vosk-model-small-en-us-0.15")
SAMPLE_RATE = 16000
CHUNK_SIZE = 4096 # samples per audio chunk (256ms at 16kHz)
WAKE_WORDS = {"hey robot", "ok robot", "hey bot", "hello robot"}
# Minimum transcript length to trigger command processing
MIN_TRANSCRIPT_LEN = 3
class VoiceActivityDetector:
"""
Energy-based Voice Activity Detection (VAD).
Uses a rolling average of frame energies.
"""
def __init__(self, energy_threshold: float = 100.0, smoothing: float = 0.1):
self.energy_threshold = energy_threshold
self.smoothing = smoothing
self.smoothed_energy = 0.0
def is_speech(self, audio_chunk: bytes) -> bool:
# Convert to numpy array (16-bit PCM)
samples = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)
# Compute frame energy (RMS)
energy = np.sqrt(np.mean(samples ** 2)) + 1e-9
# Exponential moving average
self.smoothed_energy = (
self.smoothing * energy
+ (1 - self.smoothing) * self.smoothed_energy
)
return self.smoothed_energy > self.energy_threshold
class VoskAsr:
"""Wrapper around Vosk recognizer for streaming audio."""
def __init__(self, model_path: str):
if not os.path.exists(model_path):
raise FileNotFoundError(
f"Vosk model not found at {model_path}. "
"Download from: https://alphacephei.com/vosk/models"
)
model = Model(model_path)
self.rec = KaldiRecognizer(model, SAMPLE_RATE)
self.rec.SetWords(True) # Include word timestamps
def process_chunk(self, audio_chunk: bytes) -> dict:
"""
Feed one audio chunk to Vosk. Returns dict with:
- text: transcribed text (empty string if not final)
- partial: partial result string
- is_final: True if result is final
"""
result = {}
if self.rec.AcceptWaveform(audio_chunk):
result = json.loads(self.rec.Result())
result["is_final"] = True
else:
partial = json.loads(self.rec.PartialResult())
result = {"text": partial.get("partial", ""), "is_final": False}
return result
class CommandMatcher:
"""
Rule-based command matcher.
Maps transcribed text to robot actions using regex patterns.
"""
def __init__(self):
# Pattern → (action_name, param_extractor_fn)
self.rules = [
# Forward / backward
(r"go forward\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
("move_forward", lambda m: {"distance": int(m.group(1) or 1)})),
(r"move forward\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
("move_forward", lambda m: {"distance": int(m.group(1) or 1)})),
(r"go back(?:ward)?\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
("move_backward", lambda m: {"distance": int(m.group(1) or 1)})),
# Turn
(r"turn (left|right)\s*(?:(\d+)\s*degrees?)?",
("turn", lambda m: {"direction": m.group(1), "angle": int(m.group(2) or 90)})),
(r"rotate (left|right)\s*(?:(\d+)\s*degrees?)?",
("turn", lambda m: {"direction": m.group(1), "angle": int(m.group(2) or 90)})),
# Stop
(r"\bstop\b", ("stop", lambda m: {})),
(r"\bhalt\b", ("stop", lambda m: {})),
# Navigation
(r"go to (?:\w+\s*)+", ("navigate_to", self._extract_location)),
# Status
(r"what(?:'s| is) your status", ("status", lambda m: {})),
(r"how are you", ("status", lambda m: {})),
]
@staticmethod
def _extract_location(m: re.Match) -> dict:
"""Extract location name from command text."""
text = m.group(0).replace("go to ", "").strip()
return {"location": text}
def match(self, transcript: str) -> tuple | None:
"""
Match transcript against all rules.
Returns (action_name, params_dict) or None.
"""
transcript = transcript.lower().strip()
for pattern, (action, param_fn) in self.rules:
m = re.search(pattern, transcript)
if m:
return action, param_fn(m)
return None
class VoiceRobotTier1:
"""
Tier 1 Voice Robot: Vosk ASR + keyword spotting.
Runs ASR continuously and processes commands after wake word.
"""
def __init__(self, model_path: str):
self.asr = VoskAsr(model_path)
self.vad = VoiceActivityDetector(energy_threshold=100.0)
self.matcher = CommandMatcher()
self.audio_queue = queue.Queue()
self.running = False
# State machine
self.state = "idle" # idle → wake_word → listening → processing
self.partial_text = ""
self.speech_frames = 0 # consecutive frames with speech
self.SPEECH_FRAMES_THRESH = 3 # frames of speech before capture
self.silence_frames = 0
self.SILENCE_FRAMES_THRESH = 15 # silence frames before finalizing
def _audio_capture_thread(self, p: pyaudio.PyAudio):
"""Background thread: capture audio from microphone."""
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE,
)
print("[INFO] Microphone active. Say wake word 'Hey Robot' to begin.")
while self.running:
chunk = stream.read(CHUNK_SIZE, exception_on_overflow=False)
self.audio_queue.put(chunk)
stream.stop_stream()
stream.close()
def _process_commands(self, transcript: str) -> None:
"""Process ASR transcript → command → action."""
if not transcript or len(transcript.split()) < MIN_TRANSCRIPT_LEN:
return
result = self.matcher.match(transcript)
if result:
action, params = result
print(f"[CMD] Action: {action} Params: {params}")
# Here: publish to ROS action server / robot controller
# Example: self.robot_action_server.send_goal(action, params)
else:
print(f"[WARN] No command matched: '{transcript}'")
def run(self):
"""Main loop: process audio chunks through VAD → ASR."""
p = pyaudio.PyAudio()
self.running = True
# Start capture thread
capture_thread = threading.Thread(
target=self._audio_capture_thread, args=(p,), daemon=True
)
capture_thread.start()
print("[INFO] Voice Robot Tier 1 running. Say 'Hey Robot' to activate.")
try:
while self.running:
chunk = self.audio_queue.get(timeout=1.0)
is_speech = self.vad.is_speech(chunk)
if is_speech:
self.speech_frames += 1
self.silence_frames = 0
else:
self.silence_frames += 1
self.speech_frames = 0
# ── State machine ──
if self.state == "idle":
# Feed audio to Vosk for partial result (wake word detection)
result = self.asr.process_chunk(chunk)
partial = result.get("text", "")
if partial:
combined = (self.partial_text + " " + partial).lower()
self.partial_text = combined
for ww in WAKE_WORDS:
if ww in combined:
print(f"[WAKE] Wake word '{ww}' detected!")
self.state = "listening"
self.partial_text = ""
# Reset recognizer for command capture
self.asr.rec.Reset()
break
elif self.state == "listening":
# Collect audio; finalize on silence
result = self.asr.process_chunk(chunk)
partial = result.get("text", "")
if partial:
self.partial_text += " " + partial
if self.silence_frames > self.SILENCE_FRAMES_THRESH:
print(f"[LISTEN] Finalizing: '{self.partial_text.strip()}'")
self._process_commands(self.partial_text.strip())
self.partial_text = ""
self.state = "idle"
self.asr.rec.Reset()
else:
# Feed audio even when idle (for continuous wake word check)
self.asr.process_chunk(chunk)
except KeyboardInterrupt:
print("\n[INFO] Shutting down...")
finally:
self.running = False
p.terminate()
if __name__ == "__main__":
robot = VoiceRobotTier1(model_path=VOSK_MODEL_PATH)
robot.run()
3.3 Command Coverage¶
| Command Pattern | Action | Parameter |
|---|---|---|
| "go forward [N]" | move_forward |
{distance: N} |
| "go back [N]" | move_backward |
{distance: N} |
| "turn left/right [N deg]" | turn |
{direction, angle} |
| "stop" / "halt" | stop |
— |
| "go to kitchen" | navigate_to |
{location: "kitchen"} |
| "what's your status" | status |
— |
4. Tier 2 — Intermediate: Intent Classification + Slot Filling¶
4.1 Concept¶
Tier 2 replaces regex matching with a proper Natural Language Understanding (NLU) pipeline. The transcribed text goes through two stages:
- Intent Classification: Assign the utterance to one of \(K\) predefined intents using a trained classifier.
- Slot Filling: Extract structured entities (e.g., numbers, directions, locations) from the utterance.
Transcript: "go forward three meters"
│
▼
┌─────────────────────────────┐
│ Intent Classifier │ → go_forward (confidence: 0.94)
│ (sklearn / Transformer) │
└─────────────────────────────┘
│
▼
┌─────────────────────────────┐
│ Slot Filler │ → B-DISTANCE I-DISTANCE O O
│ (BIO tagging / CRF) │ 3 three <PAD> <PAD>
└─────────────────────────────┘
│
▼
{ intent: "go_forward", slots: { distance: 3, unit: "meter" } }
Intent Classification: Given an input sequence \(\mathbf{x} = (x_1, ..., x_n)\), the classifier predicts:
where \(\mathcal{C}\) is the set of intents. We can use:
- TF-IDF + Logistic Regression (fast, good baseline)
- BERT / DistilBERT (higher accuracy, needs GPU)
Slot Filling: Treats the task as sequence labeling. Each input token \(x_i\) is labeled with a BIO tag:
where \(\mathcal{Y} = \{ O, B\text{-}DISTANCE, I\text{-}DISTANCE, B\text{-}DIRECTION, ... \}\).
4.2 Complete Python Code¶
"""
Tier 2: Intent Classification + Slot Filling
=============================================
NLU pipeline for voice robot commands.
Uses TF-IDF + Logistic Regression for intent,
and BIO tagging with sklearn-crfsuite for slot filling.
"""
import json
import os
import pickle
import re
from typing import Optional
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import sklearn_crfsuite
from sklearn_crfsuite import metrics as crf_metrics
# ─── Training Data ──────────────────────────────────────────────
INTENT_DATA = [
("go forward one meter", "move_forward"),
("go forward two meters", "move_forward"),
("move forward three steps", "move_forward"),
("move forward five meters", "move_forward"),
("advance forward", "move_forward"),
("go forward", "move_forward"),
("go back one meter", "move_backward"),
("move backward two steps", "move_backward"),
("go backward three meters", "move_backward"),
("reverse", "move_backward"),
("retreat", "move_backward"),
("turn left", "turn_left"),
("rotate left", "turn_left"),
("turn left ninety degrees", "turn_left"),
("turn right", "turn_right"),
("rotate right forty five degrees", "turn_right"),
("spin right", "turn_right"),
("stop", "stop"),
("halt", "stop"),
("emergency stop", "stop"),
("wait", "stop"),
("go to the kitchen", "navigate_to"),
("navigate to living room", "navigate_to"),
("go to charging station", "navigate_to"),
("go to home position", "navigate_to"),
("pick up the object", "pick_object"),
("grab the item", "pick_object"),
("put it down", "place_object"),
("drop the object", "place_object"),
("what is your status", "status"),
("how are you doing", "status"),
("report status", "status"),
]
# Slot BIO training data: list of (tokens, bio_tags)
SLOT_DATA = [
(["go", "forward", "three", "meters"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
(["move", "forward", "five", "steps"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
(["go", "forward"], ["O", "O"]),
(["go", "back", "two", "meters"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
(["turn", "left", "ninety", "degrees"], ["O", "B-DIRECTION", "B-ANGLE", "I-ANGLE"]),
(["turn", "right"], ["O", "B-DIRECTION"]),
(["rotate", "right", "forty", "five", "degrees"], ["O", "B-DIRECTION", "B-ANGLE", "I-ANGLE", "I-ANGLE"]),
(["go", "to", "the", "kitchen"], ["O", "O", "O", "B-LOCATION"]),
(["navigate", "to", "living", "room"], ["O", "O", "O", "B-LOCATION"]),
(["go", "to", "home"], ["O", "O", "O"]),
(["stop"], ["O"]),
(["how", "are", "you"], ["O", "O", "O"]),
(["what", "is", "your", "status"], ["O", "O", "O", "O"]),
(["grab", "the", "red", "box"], ["O", "O", "B-OBJECT_COLOR", "B-OBJECT_NAME"]),
(["pick", "up", "the", "item"], ["O", "O", "O", "B-OBJECT_NAME"]),
]
INTENTS = sorted(list(set(label for _, label in INTENT_DATA)))
SLOT_LABELS = sorted(list(set(tag for _, tags in SLOT_DATA for tag in tags)))
# ─── Intent Classifier ─────────────────────────────────────────
class IntentClassifier:
"""TF-IDF + Logistic Regression intent classifier."""
def __init__(self):
self.pipeline = Pipeline([
("tfidf", TfidfVectorizer(analyzer="char_wb", ngram_range=(2, 4))),
("clf", LogisticRegression(max_iter=1000, C=10.0)),
])
self._trained = False
def train(self, texts: list[str], labels: list[str]):
X_train, X_test, y_train, y_test = train_test_split(
texts, labels, test_size=0.2, random_state=42, stratify=labels
)
self.pipeline.fit(X_train, y_train)
acc = self.pipeline.score(X_test, y_test)
print(f"[NLU] Intent classifier accuracy: {acc:.2%}")
self._trained = True
def predict(self, text: str) -> tuple[str, float]:
"""Return (intent, confidence_score)."""
if not self._trained:
raise RuntimeError("Classifier not trained. Call train() first.")
probs = self.pipeline.predict_proba([text])[0]
intent_idx = int(np.argmax(probs))
intent = self.pipeline.classes_[intent_idx]
confidence = float(probs[intent_idx])
return intent, confidence
# ─── Slot Filler ───────────────────────────────────────────────
class SlotFiller:
"""BIO tagging slot filler using CRF (Conditional Random Field)."""
def __init__(self):
self.model: Optional[sklearn_crfsuite.CRF] = None
def _extract_features(self, token: str, i: int, tokens: list[str]) -> dict:
"""Extract features for a single token in the sequence."""
word = token.lower()
features = {
"bias": 1.0,
"word.lower()": word,
"word[-3:]": word[-3:] if len(word) > 2 else word,
"word[-2:]": word[-2:] if len(word) > 1 else word,
"word.isupper()": word.isupper(),
"word.isdigit()": word.isdigit(),
"word.isalpha()": word.isalpha(),
# Context features
"word_prev": tokens[i - 1].lower() if i > 0 else "<BOS>",
"word_next": tokens[i + 1].lower() if i < len(tokens) - 1 else "<EOS>",
"word_prev2": tokens[i - 2].lower() if i > 1 else "<BOS>",
}
return features
def _token_features(self, tokens: list[str]) -> list[dict]:
return [self._extract_features(t, i, tokens) for i, t in enumerate(tokens)]
def train(self, data: list[tuple[list[str], list[str]]]):
X = [self._token_features(tokens) for tokens, _ in data]
y = [tags for _, tags in data]
self.model = sklearn_crfsuite.CRF(
algorithm="lbfgs", c1=0.1, c2=0.1, max_iterations=100
)
self.model.fit(X, y)
print("[NLU] Slot filler (CRF) trained.")
def predict(self, text: str) -> dict[str, str]:
"""Return slot dictionary from input text."""
if self.model is None:
raise RuntimeError("Slot filler not trained. Call train() first.")
tokens = re.findall(r"\b\w+\b", text.lower())
features = self._token_features(tokens)
pred_tags = self.model.predict([features])[0]
# Parse BIO tags into slot dictionary
slots = {}
current_slot = None
current_value = []
for token, tag in zip(tokens, pred_tags):
if tag.startswith("B-"):
if current_slot and current_value:
slots[current_slot] = " ".join(current_value)
current_slot = tag[2:]
current_value = [token]
elif tag.startswith("I-") and tag[2:] == current_slot:
current_value.append(token)
else:
if current_slot and current_value:
slots[current_slot] = " ".join(current_value)
current_slot = None
current_value = []
if current_slot and current_value:
slots[current_slot] = " ".join(current_value)
# Resolve numbers
for key in list(slots.keys()):
if slots[key].isdigit():
slots[key] = int(slots[key])
elif slots[key].replace(" ", "").isdigit():
slots[key] = int(slots[key].replace(" ", ""))
else:
# Map word numbers to digits
word_to_num = {
"one": 1, "two": 2, "three": 3, "four": 4,
"five": 5, "six": 6, "seven": 7, "eight": 8,
"nine": 9, "ten": 10,
}
val = word_to_num.get(slots[key].strip(), slots[key])
slots[key] = val
return slots
class NLUVoiceRobotTier2:
"""
Tier 2 Voice Robot: Intent Classification + Slot Filling.
More robust than regex — handles paraphrases and variations.
"""
def __init__(self):
self.intent_clf = IntentClassifier()
self.slot_filler = SlotFiller()
self._trained = False
def train(self):
"""Train both NLU components on labeled data."""
texts, labels = zip(*INTENT_DATA)
self.intent_clf.train(list(texts), list(labels))
self.slot_filler.train(SLOT_DATA)
self._trained = True
print("[NLU] Training complete.")
def understand(self, transcript: str) -> dict:
"""
Full NLU understanding pipeline.
Returns { intent, confidence, slots }.
"""
if not self._trained:
self.train()
intent, confidence = self.intent_clf.predict(transcript)
slots = self.slot_filler.predict(transcript)
return {
"transcript": transcript,
"intent": intent,
"confidence": confidence,
"slots": slots,
}
def process(self, transcript: str) -> None:
"""Understand and execute a voice command."""
result = self.understand(transcript)
print(f"[NLU] Intent: {result['intent']} "
f"(conf={result['confidence']:.2f}) "
f"Slots: {result['slots']}")
# Here: map intent → ROS action and send goal
# Example: self.action_client.send_goal(result['intent'], result['slots'])
# ─── Demo ──────────────────────────────────────────────────────
if __name__ == "__main__":
robot = NLUVoiceRobotTier2()
robot.train()
test_utterances = [
"go forward three meters",
"turn left ninety degrees",
"go to the kitchen",
"rotate right forty five degrees",
"what is your status",
"move forward five steps",
"grab the red box",
]
print("\n=== NLU Test Results ===")
for utt in test_utterances:
result = robot.understand(utt)
print(f" '{utt}'")
print(f" → intent={result['intent']} conf={result['confidence']:.2f} "
f"slots={result['slots']}")
5. Tier 3 — Modern: LLM-Powered Dialogue with Function Calling¶
5.1 Concept¶
Tier 3 replaces the fixed NLU pipeline with a Large Language Model (GPT-4 or local LLaMA) that handles intent understanding, slot extraction, and dialogue management jointly. The LLM is given a system prompt describing the robot's capabilities and a function calling schema that defines available robot actions.
User: "Can you go to the kitchen and pick up the red box?"
│
▼
┌─────────────────────────────────────────────────┐
│ System Prompt: │
│ "You are a robot voice assistant. You have │
│ access to these functions: navigate_to, │
│ pick_object, place_object, move_forward, │
│ turn, stop, status." │
│ │
│ Function Calling Schema: │
│ - navigate_to(location: string) │
│ - pick_object(color: string, name: string) │
│ - place_object() │
│ - move_forward(distance: float, unit: string) │
└─────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ LLM Output (structured function call): │
│ { │
│ name: "navigate_to", │
│ arguments: { location: "kitchen" } │
│ } │
│ { │
│ name: "pick_object", │
│ arguments: { color: "red", name: "box" } │
│ } │
└──────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ Action Executor: call functions sequentially, │
│ report results back to LLM for next step │
└──────────────────────────────────────────────────┘
Prompt Engineering is central to this tier. The system prompt defines:
- Role: "You are a robot voice assistant."
- Capabilities: List of available functions with descriptions.
- Constraints: "Only call functions from the provided list."
- Dialogue State: Conversation history for multi-step tasks.
5.2 Complete Python Code¶
"""
Tier 3: LLM-Powered Voice Robot with Function Calling
=====================================================
Uses GPT-4 or local LLaMA to understand commands,
extract structured parameters, and execute robot actions
via a function calling interface.
"""
import json
import re
import openai
from dataclasses import dataclass, asdict
from typing import Optional
# ─── Robot Action Schema ──────────────────────────────────────
# This defines what the robot can do. Passed to the LLM.
ROBOT_FUNCTIONS = [
{
"type": "function",
"function": {
"name": "move_forward",
"description": "Move the robot forward by a specified distance.",
"parameters": {
"type": "object",
"properties": {
"distance": {
"type": "number",
"description": "Distance to travel in meters."
},
"speed": {
"type": "number",
"description": "Speed in m/s. Default: 0.3.",
"default": 0.3,
},
},
"required": ["distance"],
},
},
},
{
"type": "function",
"function": {
"name": "turn",
"description": "Turn the robot in place by a specified angle.",
"parameters": {
"type": "object",
"properties": {
"angle": {
"type": "number",
"description": "Turn angle in degrees. Positive = left, negative = right."
},
"speed": {
"type": "number",
"description": "Angular speed in rad/s. Default: 0.5.",
"default": 0.5,
},
},
"required": ["angle"],
},
},
},
{
"type": "function",
"function": {
"name": "navigate_to",
"description": "Navigate autonomously to a named location on the map.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Name of the destination (e.g., 'kitchen', 'charging station')."
},
},
"required": ["location"],
},
},
},
{
"type": "function",
"function": {
"name": "pick_object",
"description": "Activate the robot's gripper to pick up an object.",
"parameters": {
"type": "object",
"properties": {
"object_id": {
"type": "string",
"description": "Identifier for the object to pick up (e.g., 'red_box', 'cup')."
},
},
"required": ["object_id"],
},
},
},
{
"type": "function",
"function": {
"name": "place_object",
"description": "Release the gripped object.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "stop",
"description": "Immediately stop all robot motion.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "report_status",
"description": "Report the robot's current status: battery, position, carried object.",
"parameters": {"type": "object", "properties": {}},
},
},
]
@dataclass
class DialogueState:
"""Maintains the dialogue context across multiple turns."""
conversation_history: list[dict] # [{role, content}]
current_goal: Optional[str] = None
sub_goals_completed: int = 0
total_sub_goals: int = 0
def add_user_message(self, text: str):
self.conversation_history.append({"role": "user", "content": text})
def add_assistant_message(self, text: str):
self.conversation_history.append({"role": "assistant", "content": text})
def add_function_result(self, name: str, result: str):
msg = (
f"Function '{name}' executed. Result: {result}. "
"Continue the dialogue or report completion."
)
self.conversation_history.append(
{"role": "user", "content": msg}
)
# ─── Robot Action Executor ────────────────────────────────────
class RobotActionExecutor:
"""
Executes robot actions.
In a real system, this publishes to ROS topics / action servers.
"""
def __init__(self):
self.last_status = {
"battery": 85,
"position": {"x": 0.0, "y": 0.0, "theta": 0.0},
"carrying": None,
}
def execute(self, function_name: str, arguments: dict) -> str:
"""Execute a function call and return a status string."""
handler = getattr(self, f"_do_{function_name}", None)
if handler is None:
return f"Unknown function: {function_name}"
try:
result = handler(arguments)
return result
except Exception as e:
return f"Error executing {function_name}: {str(e)}"
def _do_move_forward(self, args: dict) -> str:
d = args.get("distance", 1.0)
s = args.get("speed", 0.3)
# In real code: publish to ROS cmd_vel or call action server
print(f"[ROBOT] Moving forward {d}m at {s}m/s")
self.last_status["position"]["x"] += d
return f"Moved forward {d}m. New position: {self.last_status['position']}"
def _do_turn(self, args: dict) -> str:
angle = args.get("angle", 90)
speed = args.get("speed", 0.5)
print(f"[ROBOT] Turning {angle}° at {speed}rad/s")
self.last_status["position"]["theta"] += angle
return f"Turned {angle}°."
def _do_navigate_to(self, args: dict) -> str:
loc = args.get("location", "unknown")
print(f"[ROBOT] Navigating to '{loc}'")
# ROS Navigation: send goal to move_base
return f"Navigation to '{loc}' started. ETA: 30 seconds."
def _do_pick_object(self, args: dict) -> str:
obj = args.get("object_id", "unknown")
print(f"[ROBOT] Picking up '{obj}'")
self.last_status["carrying"] = obj
return f"Picked up '{obj}'."
def _do_place_object(self, args: dict) -> str:
obj = self.last_status.get("carrying", "nothing")
print(f"[ROBOT] Placing '{obj}'")
self.last_status["carrying"] = None
return f"Placed '{obj}'."
def _do_stop(self, args: dict) -> str:
print("[ROBOT] EMERGENCY STOP")
return "Robot stopped."
def _do_report_status(self, args: dict) -> str:
s = self.last_status
return (
f"Status: battery={s['battery']}%, "
f"position=({s['position']['x']:.1f}, {s['position']['y']:.1f}), "
f"carrying={s['carrying'] or 'nothing'}."
)
# ─── LLM Dialogue Manager ────────────────────────────────────
class LLMDialogueManager:
"""
Manages conversation with an LLM using function calling.
Supports multi-step task decomposition.
"""
SYSTEM_PROMPT = """You are a helpful voice-controlled robot assistant.
Your robot has the following capabilities:
- move_forward(distance, speed): Move forward by a distance in meters.
- turn(angle, speed): Turn in place. Positive angle=left, negative=right.
- navigate_to(location): Navigate autonomously to a named location on the map.
- pick_object(object_id): Pick up a specific object.
- place_object(): Place the currently held object.
- stop(): Immediately stop all robot motion.
- report_status(): Report battery, position, and carried object.
Guidelines:
1. Only call functions from the provided list.
2. Break complex commands into sequential steps.
3. After each function result, confirm completion to the user.
4. If the user asks something outside your capabilities, politely say so.
5. Be concise and natural in your responses.
"""
def __init__(self, api_key: Optional[str] = None, model: str = "gpt-4o"):
self.client = openai.OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
self.model = model
self.state = DialogueState(conversation_history=[])
self.executor = RobotActionExecutor()
def _chat(self, messages: list[dict]) -> dict:
"""Call the LLM with function definitions."""
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=ROBOT_FUNCTIONS,
tool_choice="auto",
temperature=0.0,
)
return response.choices[0].message
def _build_messages(self) -> list[dict]:
"""Build full message list with system prompt."""
messages = [{"role": "system", "content": self.SYSTEM_PROMPT}]
messages += self.state.conversation_history
return messages
def _handle_function_call(self, msg: dict) -> str:
"""Handle a function call from the LLM response."""
tool_calls = msg.get("tool_calls", [])
results = []
for call in tool_calls:
fn = call["function"]
name = fn["name"]
args = json.loads(fn["arguments"]) if isinstance(fn["arguments"], str) else fn["arguments"]
print(f"[LLM] Calling function: {name}({args})")
result = self.executor.execute(name, args)
print(f"[LLM] Result: {result}")
results.append((name, result))
# Add assistant message with function calls
tool_calls_block = []
for call in tool_calls:
tool_calls_block.append({
"id": call["id"],
"type": "function",
"function": call["function"],
})
assistant_text = msg.get("content") or ""
assistant_msg = {"role": "assistant", "content": assistant_text}
if tool_calls_block:
assistant_msg["tool_calls"] = tool_calls_block
self.state.add_assistant_message(assistant_text)
# Add function results back to conversation
for name, result in results:
self.state.conversation_history.append({
"role": "tool",
"tool_call_id": tool_calls_block[0]["id"],
"content": result,
})
return results
def process_utterance(self, transcript: str) -> str:
"""
Process a user utterance and execute robot actions.
Handles multi-step dialogues automatically.
"""
print(f"\n[USER] {transcript}")
self.state.add_user_message(transcript)
max_turns = 10 # Prevent infinite loops
for _ in range(max_turns):
messages = self._build_messages()
msg = self._chat(messages)
if msg.get("tool_calls"):
self._handle_function_call(asdict(msg))
else:
response = msg.get("content", "")
self.state.add_assistant_message(response)
print(f"[ROBOT] {response}")
return response
return "Maximum dialogue turns reached. Please try a simpler command."
# ─── ROS 2 Action Server Integration ──────────────────────────
def integrate_with_ros_action():
"""
Integration pattern for connecting LLM function calls to ROS 2 actions.
This shows the bridge between LLM decisions and robot execution.
"""
# In a real implementation:
#
# import rclpy
# from rclpy.node import Node
# from my_robot_action_msgs.action import Navigate, Move
#
# class RobotActionClient(Node):
# def __init__(self):
# super().__init__('llm_action_client')
# self.move_client = ActionClient(self, Move, '/move_action')
# self.nav_client = ActionClient(self, Navigate, '/navigate_action')
#
# def send_goal(self, action_name: str, params: dict):
# if action_name == "move_forward":
# goal = Move.Goal()
# goal.distance = params.get("distance", 1.0)
# goal.speed = params.get("speed", 0.3)
# self.move_client.send_goal_async(goal)
# elif action_name == "navigate_to":
# goal = Navigate.Goal()
# goal.location = params.get("location", "")
# self.nav_client.send_goal_async(goal)
#
# print("ROS 2 action client pattern ready for integration.")
pass
# ─── Demo ─────────────────────────────────────────────────────
if __name__ == "__main__":
import os
# Initialize LLM manager
manager = LLMDialogueManager(
api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-4o",
)
print("=== LLM Voice Robot (Tier 3) Demo ===")
print("Commands will be sent to GPT-4 with robot function schemas.\n")
commands = [
"Go forward two meters please.",
"Turn left 90 degrees.",
"Can you navigate to the kitchen?",
"Go to the kitchen, pick up the red box, and bring it back to me.",
]
for cmd in commands:
manager.process_utterance(cmd)
print()
6. Three-Tier Comparison¶
| Criteria | Tier 1 — Keyword Spotting | Tier 2 — Intent + Slot NLU | Tier 3 — LLM Function Calling |
|---|---|---|---|
| ASR | Vosk offline | Vosk offline | Vosk offline or Whisper API |
| Language model | Regex / dictionary | TF-IDF + LR / CRF | GPT-4 / LLaMA (≥7B) |
| Intent coverage | Fixed patterns | Trainable, handles paraphrases | Open-vocabulary |
| Slot extraction | Regex capture groups | BIO tagging | LLM JSON extraction |
| Multi-turn dialogue | ❌ No | ❌ No | ✅ Yes |
| Error recovery | Manual | Manual | LLM-handled |
| Paraphrase handling | ❌ Poor | ✅ Good | ✅ Excellent |
| Context memory | ❌ None | ❌ None | ✅ Conversation history |
| Latency (local) | < 100ms | 100–500ms | 500ms–3s (LLaMA) |
| Latency (API) | — | — | 1–3s (GPT-4) |
| Hardware requirement | Pi 3B+ | Pi 4B | Pi 4B + GPU (LLaMA) |
| Internet required | ❌ No | ❌ No | ⚠️ Yes (GPT-4) / ⚠️ Partial (LLaMA) |
| Setup complexity | ⭐ | ⭐⭐ | ⭐⭐⭐⭐ |
| Accuracy | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Best for | Fixed commands | Custom robot domain | Natural, flexible interaction |
7. Step-by-Step Implementation Guide¶
Phase 1 — Audio Pipeline (All Tiers)¶
- Install Vosk model:
- Test microphone:
python3 -c "import pyaudio; p = pyaudio.PyAudio(); print(p.get_device_count())" - Run Tier 1 code end-to-end; verify wake word detection.
- Tune VAD energy threshold: lower if robot is in a quiet environment, raise if noisy.
Phase 2 — Tier 1 Deployment¶
- Connect USB microphone to robot SBC.
- Run
VoiceRobotTier1.run()on the robot. - Map matched commands to ROS 2 action clients (see Section 7.1).
- Test in various acoustic environments; adjust
SPEECH_FRAMES_THRESH.
Phase 3 — Tier 2 Training¶
- Expand
INTENT_DATAandSLOT_DATAwith your robot's specific command set. - Run
NLUVoiceRobotTier2.train()to evaluate accuracy. - For better accuracy, use DistilBERT instead of TF-IDF:
- Deploy the trained model to the robot.
Phase 4 — Tier 3 Integration¶
- Obtain OpenAI API key or deploy a local LLaMA server (Ollama, text-generation-webui).
- Configure
LLMDialogueManagerwith your preferred model. - Define
ROBOT_FUNCTIONSschema for your specific robot's capabilities. - Connect
RobotActionExecutor.execute()to ROS 2 action clients. - Test multi-step commands like "go to the kitchen and pick up the red box."
Phase 5 — ROS 2 Integration (All Tiers)¶
# Example: ROS 2 action client bridge
import rclpy
from rclpy.node import Node
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Twist
class VoiceRobotROSClient(Node):
def __init__(self):
super().__init__("voice_robot_client")
self.cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
def execute_action(self, action: str, params: dict):
twist = Twist()
if action == "move_forward":
twist.linear.x = params.get("distance", 1.0)
elif action == "stop":
twist.linear.x = 0.0
twist.angular.z = 0.0
self.cmd_pub.publish(twist)
8. Extensions and Variations¶
8.1 WebRTC VAD vs. Energy-Based VAD¶
Tier 1 uses energy-based VAD for simplicity. For production systems, replace with WebRTC VAD (py-webrtcvad) which is specifically trained for voice:
import webrtcvad
vad = webrtcvad.Vad(2) # Aggressiveness 0-3
# Frame must be 10, 20, or 30 ms
is_speech = vad.is_speech(audio_10ms_frame, sample_rate=16000)
8.2 Whisper API for Higher-Quality ASR¶
Replace Vosk with OpenAI Whisper for better transcription accuracy:
import openai
audio_file = open("recording.wav", "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file)["text"]
8.3 Local LLaMA with Ollama¶
For fully offline Tier 3, use Ollama:
# Terminal
ollama serve
ollama pull llama3
# Python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
response = client.chat.completions.create(model="llama3", messages=[...])
8.4 Wake Word Customization¶
Train a custom wake word detector using Picovoice Porcupine:
8.5 Multi-Modal Commands¶
Combine voice with gesture or gaze:
User: *points to object* "Grab that."
Voice: "Grab that" → navigate_to(object_location_from_vision)
Vision: detected pointing gesture + object at location
9. References¶
- Vosk Downloads — Offline speech recognition models
- WebRTC VAD — Voice Activity Detection library
- OpenAI Function Calling — GPT-4 function calling docs
- sklearn-crfsuite — CRF for sequence labeling (slot filling)
- Hugging Face Transformers — Pretrained NLU models
- Ollama — Run LLaMA, Mistral, and other open models locally
- ROS 2 Actions — ROS 2 action interface
- Picovoice Porcupine — Custom wake word detection
- MiniGPT-4 — Vision-language model for robot perception
- SpeechBrain — Open-source speech toolkit
- RasPBX / Voxpopuli — Open-source voice datasets