语音交互机器人 (Voice Interaction Robot)¶
项目类型: 人机交互 | 难度: ★★☆☆☆ 至 ★★★★☆(取决于实现方案)| 预计时间: 1–3 个周末
1. 项目概述¶
语音交互机器人 持续监听用户的语音命令,解析用户意图,并执行相应动作(前进、停止、左转、导航到某地点、抓取物体等)。本项目构建一条从麦克风输入到机器人执行机构的完整语音处理管线。
┌─────────────────────────────────────────────────────────────────┐
│ 语音命令处理管线 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌───────────┐ ┌────────┐ │
│ │ 麦克风 │───▶│ ASR │───▶│ NLU / │───▶│ 机器人 │ │
│ │ │ │ (Vosk/ │ │ 对话管理 │ │ 动作执行│ │
│ │ │ │ WebRTC) │ │ │ │ │ │
│ └──────────┘ └──────────┘ └───────────┘ └────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ 原始音频 文本转录 意图 + 槽位 cmd_vel / │
│ (16kHz PCM) "前进三米" go_forward { ActionGoal │
│ distance: 3 } │
└─────────────────────────────────────────────────────────────────┘
本项目探索 三种逐步升级的算法层级:
| 层级 | 方法 | 核心技术 | 复杂度 |
|---|---|---|---|
| 层级 1 — 传统 | 关键词检测 | 阈值唤醒词 + 规则匹配 | 低 |
| 层级 2 — 中级 | 完整 NLU 管线 | 意图分类 + 槽位填充 | 中 |
| 层级 3 — 现代 | LLM 对话 | GPT-4 / LLaMA + 函数调用 | 高 |
2. 硬件与软件需求¶
硬件¶
| 组件 | 规格 | 说明 |
|---|---|---|
| 麦克风 | USB 麦克风或 I2S 阵列麦克风(如 ReSpeaker 4-Mic) | 建议远场麦克风用于机器人场景 |
| 单板计算机 | Raspberry Pi 4B(4GB+)或 Jetson Nano | 层级 1 最低要求 Pi 3B+ |
| 机器人平台 | 任意支持 ROS 2 的移动机器人 | TurtleBot4、自定义差速驱动等 |
| 扬声器(可选) | USB 扬声器或功放 + 扬声器 | 用于机器人语音反馈 |
| 电源 | 5V 3A USB-C(Pi)+ 电机电池 | 确保音频供电稳定 |
软件¶
| 包 | 版本 | 用途 |
|---|---|---|
| Python | ≥ 3.8 | 核心语言 |
| vosk | ≥ 0.3.45 | 离线 ASR 引擎 |
| py-webrtcvad | ≥ 2.0.10 | 语音活动检测(VAD) |
| portaudio / pyaudio | — | 音频 I/O |
| sklearn | ≥ 1.0 | 意图分类器(层级 2) |
| transformers | ≥ 4.30 | Hugging Face 模型(层级 ⅔) |
| openai | ≥ 1.0 | OpenAI API(层级 3) |
| rclpy | ROS 2 | ROS 2 action/server |
3. 层级 1 — 传统:基于 Vosk 的关键词检测¶
3.1 原理¶
层级 1 采用 离线 ASR(Vosk)结合 关键词/模式匹配。系统持续监听 唤醒词("Hey Robot"、"OK Bot")。触发后捕获短语音,运行 ASR,并将转录文本与已知命令模式字典进行正则或字符串相似度匹配。
持续音频流
│
▼
┌──────────────────┐
│ 语音活动检测 │ ← WebRTC VAD:是否有语音?
│ (VAD) │
└───────┬──────────┘
│ 检测到语音
▼
┌──────────────────┐
│ 唤醒词检测 │ ← Vosk 部分结果;关键词匹配?
└───────┬──────────┘
│ 找到唤醒词
▼
┌──────────────────┐
│ 命令捕获 │ ← 唤醒词后缓冲约 3 秒音频
│ + Vosk ASR │
└───────┬──────────┘
│ 转录文本
▼
┌──────────────────┐
│ 规则匹配器 │ ← 正则 / 关键词字典
│ → 机器人动作 │
└──────────────────┘
唤醒词检测 使用 VAD 输出上的滚动能量阈值:
其中 \(x_k[n]\) 是第 \(n\) 帧的第 \(k\) 个频点,\(E_n\) 是平滑后的帧能量。当 \(E_n > \theta_{\text{energy}}\) 时,判定为语音活动。
3.2 完整 Python 代码¶
"""
层级 1:关键词检测语音机器人
=====================================
使用 Vosk 离线 ASR + 规则匹配命令。
无需互联网——完全离线运行。
"""
import io
import os
import json
import queue
import re
import threading
import struct
import numpy as np
import pyaudio
from vosk import Model, KaldiRecognizer
# ─── Configuration ──────────────────────────────────────────────
VOSK_MODEL_PATH = os.path.expanduser("~/vosk-model-small-en-us-0.15")
SAMPLE_RATE = 16000
CHUNK_SIZE = 4096 # samples per audio chunk (256ms at 16kHz)
WAKE_WORDS = {"hey robot", "ok robot", "hey bot", "hello robot"}
# Minimum transcript length to trigger command processing
MIN_TRANSCRIPT_LEN = 3
class VoiceActivityDetector:
"""
Energy-based Voice Activity Detection (VAD).
Uses a rolling average of frame energies.
"""
def __init__(self, energy_threshold: float = 100.0, smoothing: float = 0.1):
self.energy_threshold = energy_threshold
self.smoothing = smoothing
self.smoothed_energy = 0.0
def is_speech(self, audio_chunk: bytes) -> bool:
# Convert to numpy array (16-bit PCM)
samples = np.frombuffer(audio_chunk, dtype=np.int16).astype(np.float32)
# Compute frame energy (RMS)
energy = np.sqrt(np.mean(samples ** 2)) + 1e-9
# Exponential moving average
self.smoothed_energy = (
self.smoothing * energy
+ (1 - self.smoothing) * self.smoothed_energy
)
return self.smoothed_energy > self.energy_threshold
class VoskAsr:
"""Wrapper around Vosk recognizer for streaming audio."""
def __init__(self, model_path: str):
if not os.path.exists(model_path):
raise FileNotFoundError(
f"Vosk model not found at {model_path}. "
"Download from: https://alphacephei.com/vosk/models"
)
model = Model(model_path)
self.rec = KaldiRecognizer(model, SAMPLE_RATE)
self.rec.SetWords(True) # Include word timestamps
def process_chunk(self, audio_chunk: bytes) -> dict:
"""
Feed one audio chunk to Vosk. Returns dict with:
- text: transcribed text (empty string if not final)
- partial: partial result string
- is_final: True if result is final
"""
result = {}
if self.rec.AcceptWaveform(audio_chunk):
result = json.loads(self.rec.Result())
result["is_final"] = True
else:
partial = json.loads(self.rec.PartialResult())
result = {"text": partial.get("partial", ""), "is_final": False}
return result
class CommandMatcher:
"""
Rule-based command matcher.
Maps transcribed text to robot actions using regex patterns.
"""
def __init__(self):
# Pattern → (action_name, param_extractor_fn)
self.rules = [
# Forward / backward
(r"go forward\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
("move_forward", lambda m: {"distance": int(m.group(1) or 1)})),
(r"move forward\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
("move_forward", lambda m: {"distance": int(m.group(1) or 1)})),
(r"go back(?:ward)?\s*(?:(\d+)\s*(?:meter|m|steps?)?)?",
("move_backward", lambda m: {"distance": int(m.group(1) or 1)})),
# Turn
(r"turn (left|right)\s*(?:(\d+)\s*degrees?)?",
("turn", lambda m: {"direction": m.group(1), "angle": int(m.group(2) or 90)})),
(r"rotate (left|right)\s*(?:(\d+)\s*degrees?)?",
("turn", lambda m: {"direction": m.group(1), "angle": int(m.group(2) or 90)})),
# Stop
(r"\bstop\b", ("stop", lambda m: {})),
(r"\bhalt\b", ("stop", lambda m: {})),
# Navigation
(r"go to (?:\w+\s*)+", ("navigate_to", self._extract_location)),
# Status
(r"what(?:'s| is) your status", ("status", lambda m: {})),
(r"how are you", ("status", lambda m: {})),
]
@staticmethod
def _extract_location(m: re.Match) -> dict:
"""Extract location name from command text."""
text = m.group(0).replace("go to ", "").strip()
return {"location": text}
def match(self, transcript: str) -> tuple | None:
"""
Match transcript against all rules.
Returns (action_name, params_dict) or None.
"""
transcript = transcript.lower().strip()
for pattern, (action, param_fn) in self.rules:
m = re.search(pattern, transcript)
if m:
return action, param_fn(m)
return None
class VoiceRobotTier1:
"""
Tier 1 Voice Robot: Vosk ASR + keyword spotting.
Runs ASR continuously and processes commands after wake word.
"""
def __init__(self, model_path: str):
self.asr = VoskAsr(model_path)
self.vad = VoiceActivityDetector(energy_threshold=100.0)
self.matcher = CommandMatcher()
self.audio_queue = queue.Queue()
self.running = False
# State machine
self.state = "idle" # idle → wake_word → listening → processing
self.partial_text = ""
self.speech_frames = 0 # consecutive frames with speech
self.SPEECH_FRAMES_THRESH = 3 # frames of speech before capture
self.silence_frames = 0
self.SILENCE_FRAMES_THRESH = 15 # silence frames before finalizing
def _audio_capture_thread(self, p: pyaudio.PyAudio):
"""Background thread: capture audio from microphone."""
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE,
)
print("[INFO] Microphone active. Say wake word 'Hey Robot' to begin.")
while self.running:
chunk = stream.read(CHUNK_SIZE, exception_on_overflow=False)
self.audio_queue.put(chunk)
stream.stop_stream()
stream.close()
def _process_commands(self, transcript: str) -> None:
"""Process ASR transcript → command → action."""
if not transcript or len(transcript.split()) < MIN_TRANSCRIPT_LEN:
return
result = self.matcher.match(transcript)
if result:
action, params = result
print(f"[CMD] Action: {action} Params: {params}")
# Here: publish to ROS action server / robot controller
# Example: self.robot_action_server.send_goal(action, params)
else:
print(f"[WARN] No command matched: '{transcript}'")
def run(self):
"""Main loop: process audio chunks through VAD → ASR."""
p = pyaudio.PyAudio()
self.running = True
# Start capture thread
capture_thread = threading.Thread(
target=self._audio_capture_thread, args=(p,), daemon=True
)
capture_thread.start()
print("[INFO] Voice Robot Tier 1 running. Say 'Hey Robot' to activate.")
try:
while self.running:
chunk = self.audio_queue.get(timeout=1.0)
is_speech = self.vad.is_speech(chunk)
if is_speech:
self.speech_frames += 1
self.silence_frames = 0
else:
self.silence_frames += 1
self.speech_frames = 0
# ── State machine ──
if self.state == "idle":
# Feed audio to Vosk for partial result (wake word detection)
result = self.asr.process_chunk(chunk)
partial = result.get("text", "")
if partial:
combined = (self.partial_text + " " + partial).lower()
self.partial_text = combined
for ww in WAKE_WORDS:
if ww in combined:
print(f"[WAKE] Wake word '{ww}' detected!")
self.state = "listening"
self.partial_text = ""
# Reset recognizer for command capture
self.asr.rec.Reset()
break
elif self.state == "listening":
# Collect audio; finalize on silence
result = self.asr.process_chunk(chunk)
partial = result.get("text", "")
if partial:
self.partial_text += " " + partial
if self.silence_frames > self.SILENCE_FRAMES_THRESH:
print(f"[LISTEN] Finalizing: '{self.partial_text.strip()}'")
self._process_commands(self.partial_text.strip())
self.partial_text = ""
self.state = "idle"
self.asr.rec.Reset()
else:
# Feed audio even when idle (for continuous wake word check)
self.asr.process_chunk(chunk)
except KeyboardInterrupt:
print("\n[INFO] Shutting down...")
finally:
self.running = False
p.terminate()
if __name__ == "__main__":
robot = VoiceRobotTier1(model_path=VOSK_MODEL_PATH)
robot.run()
3.3 命令覆盖范围¶
| 命令模式 | 动作 | 参数 |
|---|---|---|
| "go forward [N]" | move_forward |
{distance: N} |
| "go back [N]" | move_backward |
{distance: N} |
| "turn left/right [N deg]" | turn |
{direction, angle} |
| "stop" / "halt" | stop |
— |
| "go to kitchen" | navigate_to |
{location: "kitchen"} |
| "what's your status" | status |
— |
4. 层级 2 — 中级:意图分类 + 槽位填充¶
4.1 原理¶
层级 2 用完整的 自然语言理解(NLU)管线 替代正则匹配。转录文本经过两个阶段:
- 意图分类:将语句归类到 \(K\) 个预定义意图之一,使用训练好的分类器。
- 槽位填充:从语句中提取结构化实体(如数字、方向、位置)。
转录文本: "go forward three meters"
│
▼
┌─────────────────────────────┐
│ 意图分类器 │ → go_forward (置信度: 0.94)
│ (sklearn / Transformer) │
└─────────────────────────────┘
│
▼
┌─────────────────────────────┐
│ 槽位填充器 │ → B-DISTANCE I-DISTANCE O O
│ (BIO 标注 / CRF) │ 3 three <PAD> <PAD>
└─────────────────────────────┘
│
▼
{ intent: "go_forward", slots: { distance: 3, unit: "meter" } }
意图分类:给定输入序列 \(\mathbf{x} = (x_1, ..., x_n)\),分类器预测:
其中 \(\mathcal{C}\) 是意图集合。可使用:
- TF-IDF + 逻辑回归(快速,良好的基线)
- BERT / DistilBERT(更高精度,需 GPU)
槽位填充:将任务建模为序列标注问题。每个输入词 \(x_i\) 被标注为 BIO 标签:
其中 \(\mathcal{Y} = \{ O, B\text{-}DISTANCE, I\text{-}DISTANCE, B\text{-}DIRECTION, ... \}\)。
4.2 完整 Python 代码¶
"""
层级 2:意图分类 + 槽位填充
=============================================
语音机器人的 NLU 管线。
使用 TF-IDF + 逻辑回归进行意图分类,
BIO 标注 + sklearn-crfsuite CRF 进行槽位填充。
"""
import json
import os
import pickle
import re
from typing import Optional
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
import sklearn_crfsuite
from sklearn_crfsuite import metrics as crf_metrics
# ─── Training Data ──────────────────────────────────────────────
INTENT_DATA = [
("go forward one meter", "move_forward"),
("go forward two meters", "move_forward"),
("move forward three steps", "move_forward"),
("move forward five meters", "move_forward"),
("advance forward", "move_forward"),
("go forward", "move_forward"),
("go back one meter", "move_backward"),
("move backward two steps", "move_backward"),
("go backward three meters", "move_backward"),
("reverse", "move_backward"),
("retreat", "move_backward"),
("turn left", "turn_left"),
("rotate left", "turn_left"),
("turn left ninety degrees", "turn_left"),
("turn right", "turn_right"),
("rotate right forty five degrees", "turn_right"),
("spin right", "turn_right"),
("stop", "stop"),
("halt", "stop"),
("emergency stop", "stop"),
("wait", "stop"),
("go to the kitchen", "navigate_to"),
("navigate to living room", "navigate_to"),
("go to charging station", "navigate_to"),
("go to home position", "navigate_to"),
("pick up the object", "pick_object"),
("grab the item", "pick_object"),
("put it down", "place_object"),
("drop the object", "place_object"),
("what is your status", "status"),
("how are you doing", "status"),
("report status", "status"),
]
# Slot BIO training data: list of (tokens, bio_tags)
SLOT_DATA = [
(["go", "forward", "three", "meters"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
(["move", "forward", "five", "steps"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
(["go", "forward"], ["O", "O"]),
(["go", "back", "two", "meters"], ["O", "O", "B-DISTANCE", "I-DISTANCE"]),
(["turn", "left", "ninety", "degrees"], ["O", "B-DIRECTION", "B-ANGLE", "I-ANGLE"]),
(["turn", "right"], ["O", "B-DIRECTION"]),
(["rotate", "right", "forty", "five", "degrees"], ["O", "B-DIRECTION", "B-ANGLE", "I-ANGLE", "I-ANGLE"]),
(["go", "to", "the", "kitchen"], ["O", "O", "O", "B-LOCATION"]),
(["navigate", "to", "living", "room"], ["O", "O", "O", "B-LOCATION"]),
(["go", "to", "home"], ["O", "O", "O"]),
(["stop"], ["O"]),
(["how", "are", "you"], ["O", "O", "O"]),
(["what", "is", "your", "status"], ["O", "O", "O", "O"]),
(["grab", "the", "red", "box"], ["O", "O", "B-OBJECT_COLOR", "B-OBJECT_NAME"]),
(["pick", "up", "the", "item"], ["O", "O", "O", "B-OBJECT_NAME"]),
]
INTENTS = sorted(list(set(label for _, label in INTENT_DATA)))
SLOT_LABELS = sorted(list(set(tag for _, tags in SLOT_DATA for tag in tags)))
# ─── Intent Classifier ─────────────────────────────────────────
class IntentClassifier:
"""TF-IDF + Logistic Regression intent classifier."""
def __init__(self):
self.pipeline = Pipeline([
("tfidf", TfidfVectorizer(analyzer="char_wb", ngram_range=(2, 4))),
("clf", LogisticRegression(max_iter=1000, C=10.0)),
])
self._trained = False
def train(self, texts: list[str], labels: list[str]):
X_train, X_test, y_train, y_test = train_test_split(
texts, labels, test_size=0.2, random_state=42, stratify=labels
)
self.pipeline.fit(X_train, y_train)
acc = self.pipeline.score(X_test, y_test)
print(f"[NLU] Intent classifier accuracy: {acc:.2%}")
self._trained = True
def predict(self, text: str) -> tuple[str, float]:
"""Return (intent, confidence_score)."""
if not self._trained:
raise RuntimeError("Classifier not trained. Call train() first.")
probs = self.pipeline.predict_proba([text])[0]
intent_idx = int(np.argmax(probs))
intent = self.pipeline.classes_[intent_idx]
confidence = float(probs[intent_idx])
return intent, confidence
# ─── Slot Filler ───────────────────────────────────────────────
class SlotFiller:
"""BIO tagging slot filler using CRF (Conditional Random Field)."""
def __init__(self):
self.model: Optional[sklearn_crfsuite.CRF] = None
def _extract_features(self, token: str, i: int, tokens: list[str]) -> dict:
"""Extract features for a single token in the sequence."""
word = token.lower()
features = {
"bias": 1.0,
"word.lower()": word,
"word[-3:]": word[-3:] if len(word) > 2 else word,
"word[-2:]": word[-2:] if len(word) > 1 else word,
"word.isupper()": word.isupper(),
"word.isdigit()": word.isdigit(),
"word.isalpha()": word.isalpha(),
# Context features
"word_prev": tokens[i - 1].lower() if i > 0 else "<BOS>",
"word_next": tokens[i + 1].lower() if i < len(tokens) - 1 else "<EOS>",
"word_prev2": tokens[i - 2].lower() if i > 1 else "<BOS>",
}
return features
def _token_features(self, tokens: list[str]) -> list[dict]:
return [self._extract_features(t, i, tokens) for i, t in enumerate(tokens)]
def train(self, data: list[tuple[list[str], list[str]]]):
X = [self._token_features(tokens) for tokens, _ in data]
y = [tags for _, tags in data]
self.model = sklearn_crfsuite.CRF(
algorithm="lbfgs", c1=0.1, c2=0.1, max_iterations=100
)
self.model.fit(X, y)
print("[NLU] Slot filler (CRF) trained.")
def predict(self, text: str) -> dict[str, str]:
"""Return slot dictionary from input text."""
if self.model is None:
raise RuntimeError("Slot filler not trained. Call train() first.")
tokens = re.findall(r"\b\w+\b", text.lower())
features = self._token_features(tokens)
pred_tags = self.model.predict([features])[0]
# Parse BIO tags into slot dictionary
slots = {}
current_slot = None
current_value = []
for token, tag in zip(tokens, pred_tags):
if tag.startswith("B-"):
if current_slot and current_value:
slots[current_slot] = " ".join(current_value)
current_slot = tag[2:]
current_value = [token]
elif tag.startswith("I-") and tag[2:] == current_slot:
current_value.append(token)
else:
if current_slot and current_value:
slots[current_slot] = " ".join(current_value)
current_slot = None
current_value = []
if current_slot and current_value:
slots[current_slot] = " ".join(current_value)
# Resolve numbers
for key in list(slots.keys()):
if slots[key].isdigit():
slots[key] = int(slots[key])
elif slots[key].replace(" ", "").isdigit():
slots[key] = int(slots[key].replace(" ", ""))
else:
# Map word numbers to digits
word_to_num = {
"one": 1, "two": 2, "three": 3, "four": 4,
"five": 5, "six": 6, "seven": 7, "eight": 8,
"nine": 9, "ten": 10,
}
val = word_to_num.get(slots[key].strip(), slots[key])
slots[key] = val
return slots
class NLUVoiceRobotTier2:
"""
Tier 2 Voice Robot: Intent Classification + Slot Filling.
More robust than regex — handles paraphrases and variations.
"""
def __init__(self):
self.intent_clf = IntentClassifier()
self.slot_filler = SlotFiller()
self._trained = False
def train(self):
"""Train both NLU components on labeled data."""
texts, labels = zip(*INTENT_DATA)
self.intent_clf.train(list(texts), list(labels))
self.slot_filler.train(SLOT_DATA)
self._trained = True
print("[NLU] Training complete.")
def understand(self, transcript: str) -> dict:
"""
Full NLU understanding pipeline.
Returns { intent, confidence, slots }.
"""
if not self._trained:
self.train()
intent, confidence = self.intent_clf.predict(transcript)
slots = self.slot_filler.predict(transcript)
return {
"transcript": transcript,
"intent": intent,
"confidence": confidence,
"slots": slots,
}
def process(self, transcript: str) -> None:
"""Understand and execute a voice command."""
result = self.understand(transcript)
print(f"[NLU] Intent: {result['intent']} "
f"(conf={result['confidence']:.2f}) "
f"Slots: {result['slots']}")
# Here: map intent → ROS action and send goal
# Example: self.action_client.send_goal(result['intent'], result['slots'])
# ─── Demo ──────────────────────────────────────────────────────
if __name__ == "__main__":
robot = NLUVoiceRobotTier2()
robot.train()
test_utterances = [
"go forward three meters",
"turn left ninety degrees",
"go to the kitchen",
"rotate right forty five degrees",
"what is your status",
"move forward five steps",
"grab the red box",
]
print("\n=== NLU Test Results ===")
for utt in test_utterances:
result = robot.understand(utt)
print(f" '{utt}'")
print(f" → intent={result['intent']} conf={result['confidence']:.2f} "
f"slots={result['slots']}")
5. 层级 3 — 现代:基于 LLM 函数调用的对话系统¶
5.1 原理¶
层级 3 用 大语言模型(GPT-4 或本地 LLaMA)替代固定的 NLU 管线, jointly 处理意图理解、槽位提取和对话管理。LLM 通过 system prompt 了解机器人能力,并通过 函数调用接口 执行动作。
用户: "你能去厨房帮我拿一下红色盒子吗?"
│
▼
┌─────────────────────────────────────────────────┐
│ System Prompt: │
│ "你是一个机器人语音助手。你有以下函数可用: │
│ navigate_to, pick_object, place_object, │
│ move_forward, turn, stop, status。" │
│ │
│ Function Calling Schema: │
│ - navigate_to(location: string) │
│ - pick_object(color: string, name: string) │
│ - move_forward(distance: float) │
└─────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ LLM 输出(结构化函数调用): │
│ { │
│ name: "navigate_to", │
│ arguments: { location: "厨房" } │
│ } │
│ { │
│ name: "pick_object", │
│ arguments: { color: "红色", name: "盒子" } │
│ } │
└──────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ 动作执行器:依次调用函数,报告结果给 LLM │
└──────────────────────────────────────────────────┘
提示工程 是本层的核心。System prompt 定义:
- 角色:"你是一个机器人语音助手。"
- 能力:可用函数列表及其描述
- 约束:"只调用提供的函数列表中的函数。"
- 对话状态:多轮任务所需的历史对话
5.2 完整 Python 代码¶
"""
层级 3:基于 LLM 函数调用的语音机器人
=====================================================
使用 GPT-4 或本地 LLaMA 理解命令,
提取结构化参数,通过函数调用接口执行机器人动作。
"""
import json
import re
import openai
from dataclasses import dataclass, asdict
from typing import Optional
# ─── Robot Action Schema ──────────────────────────────────────
# 定义机器人可执行的动作,传递给 LLM
ROBOT_FUNCTIONS = [
{
"type": "function",
"function": {
"name": "move_forward",
"description": "Move the robot forward by a specified distance.",
"parameters": {
"type": "object",
"properties": {
"distance": {
"type": "number",
"description": "Distance to travel in meters."
},
"speed": {
"type": "number",
"description": "Speed in m/s. Default: 0.3.",
"default": 0.3,
},
},
"required": ["distance"],
},
},
},
{
"type": "function",
"function": {
"name": "turn",
"description": "Turn the robot in place by a specified angle.",
"parameters": {
"type": "object",
"properties": {
"angle": {
"type": "number",
"description": "Turn angle in degrees. Positive = left, negative = right."
},
"speed": {
"type": "number",
"description": "Angular speed in rad/s. Default: 0.5.",
"default": 0.5,
},
},
"required": ["angle"],
},
},
},
{
"type": "function",
"function": {
"name": "navigate_to",
"description": "Navigate autonomously to a named location on the map.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Name of the destination (e.g., 'kitchen', 'charging station')."
},
},
"required": ["location"],
},
},
},
{
"type": "function",
"function": {
"name": "pick_object",
"description": "Activate the robot's gripper to pick up an object.",
"parameters": {
"type": "object",
"properties": {
"object_id": {
"type": "string",
"description": "Identifier for the object to pick up (e.g., 'red_box', 'cup')."
},
},
"required": ["object_id"],
},
},
},
{
"type": "function",
"function": {
"name": "place_object",
"description": "Release the gripped object.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "stop",
"description": "Immediately stop all robot motion.",
"parameters": {"type": "object", "properties": {}},
},
},
{
"type": "function",
"function": {
"name": "report_status",
"description": "Report the robot's current status: battery, position, carried object.",
"parameters": {"type": "object", "properties": {}},
},
},
]
@dataclass
class DialogueState:
"""Maintains the dialogue context across multiple turns."""
conversation_history: list[dict] # [{role, content}]
current_goal: Optional[str] = None
sub_goals_completed: int = 0
total_sub_goals: int = 0
def add_user_message(self, text: str):
self.conversation_history.append({"role": "user", "content": text})
def add_assistant_message(self, text: str):
self.conversation_history.append({"role": "assistant", "content": text})
def add_function_result(self, name: str, result: str):
msg = (
f"Function '{name}' executed. Result: {result}. "
"Continue the dialogue or report completion."
)
self.conversation_history.append(
{"role": "user", "content": msg}
)
# ─── Robot Action Executor ────────────────────────────────────
class RobotActionExecutor:
"""
Executes robot actions.
In a real system, this publishes to ROS topics / action servers.
"""
def __init__(self):
self.last_status = {
"battery": 85,
"position": {"x": 0.0, "y": 0.0, "theta": 0.0},
"carrying": None,
}
def execute(self, function_name: str, arguments: dict) -> str:
"""Execute a function call and return a status string."""
handler = getattr(self, f"_do_{function_name}", None)
if handler is None:
return f"Unknown function: {function_name}"
try:
result = handler(arguments)
return result
except Exception as e:
return f"Error executing {function_name}: {str(e)}"
def _do_move_forward(self, args: dict) -> str:
d = args.get("distance", 1.0)
s = args.get("speed", 0.3)
# In real code: publish to ROS cmd_vel or call action server
print(f"[ROBOT] Moving forward {d}m at {s}m/s")
self.last_status["position"]["x"] += d
return f"Moved forward {d}m. New position: {self.last_status['position']}"
def _do_turn(self, args: dict) -> str:
angle = args.get("angle", 90)
speed = args.get("speed", 0.5)
print(f"[ROBOT] Turning {angle}° at {speed}rad/s")
self.last_status["position"]["theta"] += angle
return f"Turned {angle}°."
def _do_navigate_to(self, args: dict) -> str:
loc = args.get("location", "unknown")
print(f"[ROBOT] Navigating to '{loc}'")
# ROS Navigation: send goal to move_base
return f"Navigation to '{loc}' started. ETA: 30 seconds."
def _do_pick_object(self, args: dict) -> str:
obj = args.get("object_id", "unknown")
print(f"[ROBOT] Picking up '{obj}'")
self.last_status["carrying"] = obj
return f"Picked up '{obj}'."
def _do_place_object(self, args: dict) -> str:
obj = self.last_status.get("carrying", "nothing")
print(f"[ROBOT] Placing '{obj}'")
self.last_status["carrying"] = None
return f"Placed '{obj}'."
def _do_stop(self, args: dict) -> str:
print("[ROBOT] EMERGENCY STOP")
return "Robot stopped."
def _do_report_status(self, args: dict) -> str:
s = self.last_status
return (
f"Status: battery={s['battery']}%, "
f"position=({s['position']['x']:.1f}, {s['position']['y']:.1f}), "
f"carrying={s['carrying'] or 'nothing'}."
)
# ─── LLM Dialogue Manager ────────────────────────────────────
class LLMDialogueManager:
"""
Manages conversation with an LLM using function calling.
Supports multi-step task decomposition.
"""
SYSTEM_PROMPT = """You are a helpful voice-controlled robot assistant.
Your robot has the following capabilities:
- move_forward(distance, speed): Move forward by a distance in meters.
- turn(angle, speed): Turn in place. Positive angle=left, negative=right.
- navigate_to(location): Navigate autonomously to a named location on the map.
- pick_object(object_id): Pick up a specific object.
- place_object(): Place the currently held object.
- stop(): Immediately stop all robot motion.
- report_status(): Report battery, position, and carried object.
Guidelines:
1. Only call functions from the provided list.
2. Break complex commands into sequential steps.
3. After each function result, confirm completion to the user.
4. If the user asks something outside your capabilities, politely say so.
5. Be concise and natural in your responses.
"""
def __init__(self, api_key: Optional[str] = None, model: str = "gpt-4o"):
self.client = openai.OpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
self.model = model
self.state = DialogueState(conversation_history=[])
self.executor = RobotActionExecutor()
def _chat(self, messages: list[dict]) -> dict:
"""Call the LLM with function definitions."""
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=ROBOT_FUNCTIONS,
tool_choice="auto",
temperature=0.0,
)
return response.choices[0].message
def _build_messages(self) -> list[dict]:
"""Build full message list with system prompt."""
messages = [{"role": "system", "content": self.SYSTEM_PROMPT}]
messages += self.state.conversation_history
return messages
def _handle_function_call(self, msg: dict) -> str:
"""Handle a function call from the LLM response."""
tool_calls = msg.get("tool_calls", [])
results = []
for call in tool_calls:
fn = call["function"]
name = fn["name"]
args = json.loads(fn["arguments"]) if isinstance(fn["arguments"], str) else fn["arguments"]
print(f"[LLM] Calling function: {name}({args})")
result = self.executor.execute(name, args)
print(f"[LLM] Result: {result}")
results.append((name, result))
# Add assistant message with function calls
tool_calls_block = []
for call in tool_calls:
tool_calls_block.append({
"id": call["id"],
"type": "function",
"function": call["function"],
})
assistant_text = msg.get("content") or ""
assistant_msg = {"role": "assistant", "content": assistant_text}
if tool_calls_block:
assistant_msg["tool_calls"] = tool_calls_block
self.state.add_assistant_message(assistant_text)
# Add function results back to conversation
for name, result in results:
self.state.conversation_history.append({
"role": "tool",
"tool_call_id": tool_calls_block[0]["id"],
"content": result,
})
return results
def process_utterance(self, transcript: str) -> str:
"""
Process a user utterance and execute robot actions.
Handles multi-step dialogues automatically.
"""
print(f"\n[USER] {transcript}")
self.state.add_user_message(transcript)
max_turns = 10 # Prevent infinite loops
for _ in range(max_turns):
messages = self._build_messages()
msg = self._chat(messages)
if msg.get("tool_calls"):
self._handle_function_call(asdict(msg))
else:
response = msg.get("content", "")
self.state.add_assistant_message(response)
print(f"[ROBOT] {response}")
return response
return "Maximum dialogue turns reached. Please try a simpler command."
# ─── ROS 2 Action Server Integration ──────────────────────────
def integrate_with_ros_action():
"""
Integration pattern for connecting LLM function calls to ROS 2 actions.
This shows the bridge between LLM decisions and robot execution.
"""
# In a real implementation:
#
# import rclpy
# from rclpy.node import Node
# from my_robot_action_msgs.action import Navigate, Move
#
# class RobotActionClient(Node):
# def __init__(self):
# super().__init__('llm_action_client')
# self.move_client = ActionClient(self, Move, '/move_action')
# self.nav_client = ActionClient(self, Navigate, '/navigate_action')
#
# def send_goal(self, action_name: str, params: dict):
# if action_name == "move_forward":
# goal = Move.Goal()
# goal.distance = params.get("distance", 1.0)
# goal.speed = params.get("speed", 0.3)
# self.move_client.send_goal_async(goal)
# elif action_name == "navigate_to":
# goal = Navigate.Goal()
# goal.location = params.get("location", "")
# self.nav_client.send_goal_async(goal)
#
# print("ROS 2 action client pattern ready for integration.")
pass
# ─── Demo ─────────────────────────────────────────────────────
if __name__ == "__main__":
import os
# Initialize LLM manager
manager = LLMDialogueManager(
api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-4o",
)
print("=== LLM Voice Robot (Tier 3) Demo ===")
print("Commands will be sent to GPT-4 with robot function schemas.\n")
commands = [
"Go forward two meters please.",
"Turn left 90 degrees.",
"Can you navigate to the kitchen?",
"Go to the kitchen, pick up the red box, and bring it back to me.",
]
for cmd in commands:
manager.process_utterance(cmd)
print()
6. 三层对比¶
| 指标 | 层级 1 — 关键词检测 | 层级 2 — 意图 + 槽位 NLU | 层级 3 — LLM 函数调用 |
|---|---|---|---|
| ASR | Vosk 离线 | Vosk 离线 | Vosk 离线或 Whisper API |
| 语言模型 | 正则 / 字典 | TF-IDF + LR / CRF | GPT-4 / LLaMA (≥7B) |
| 意图覆盖 | 固定模式 | 可训练,处理同义改写 | 开放词汇 |
| 槽位提取 | 正则捕获组 | BIO 标注 | LLM JSON 提取 |
| 多轮对话 | ❌ 不支持 | ❌ 不支持 | ✅ 支持 |
| 错误恢复 | 手动 | 手动 | LLM 自动处理 |
| 同义改写处理 | ❌ 差 | ✅ 好 | ✅ 优秀 |
| 上下文记忆 | ❌ 无 | ❌ 无 | ✅ 对话历史 |
| 延迟(本地) | < 100ms | 100–500ms | 500ms–3s (LLaMA) |
| 延迟(API) | — | — | 1–3s (GPT-4) |
| 硬件要求 | Pi 3B+ | Pi 4B | Pi 4B + GPU (LLaMA) |
| 需要互联网 | ❌ 否 | ❌ 否 | ⚠️ 部分需要(GPT-4)/ ⚠️ 部分(LLaMA) |
| 搭建复杂度 | ⭐ | ⭐⭐ | ⭐⭐⭐⭐ |
| 准确率 | ⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 适用场景 | 固定命令集 | 定制化机器人领域 | 自然、灵活的交互 |
7. 分步实施指南¶
阶段 1 — 音频管线(所有层级)¶
- 安装 Vosk 模型:
- 测试麦克风:
python3 -c "import pyaudio; p = pyaudio.PyAudio(); print(p.get_device_count())" - 端到端运行层级 1;验证唤醒词检测。
- 调整 VAD 能量阈值:安静环境降低,嘈杂环境升高。
阶段 2 — 层级 1 部署¶
- 将 USB 麦克风连接到机器人 SBC。
- 在机器人上运行
VoiceRobotTier1.run()。 - 将匹配的命令映射到 ROS 2 action client(见 7.1 节)。
- 在多种声学环境中测试;调整
SPEECH_FRAMES_THRESH。
阶段 3 — 层级 2 训练¶
- 扩展
INTENT_DATA和SLOT_DATA,添加机器人特定命令集。 - 运行
NLUVoiceRobotTier2.train()评估准确率。 - 如需更高准确率,使用 DistilBERT 替代 TF-IDF:
- 将训练好的模型部署到机器人。
阶段 4 — 层级 3 集成¶
- 获取 OpenAI API key 或部署本地 LLaMA 服务器(Ollama、text-generation-webui)。
- 使用偏好的模型配置
LLMDialogueManager。 - 为机器人特定能力定义
ROBOT_FUNCTIONSschema。 - 将
RobotActionExecutor.execute()连接到 ROS 2 action client。 - 测试多步命令,如"去厨房拿红色盒子"。
阶段 5 — ROS 2 集成(所有层级)¶
# 示例:ROS 2 action client 桥接
import rclpy
from rclpy.node import Node
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Twist
class VoiceRobotROSClient(Node):
def __init__(self):
super().__init__("voice_robot_client")
self.cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
def execute_action(self, action: str, params: dict):
twist = Twist()
if action == "move_forward":
twist.linear.x = params.get("distance", 1.0)
elif action == "stop":
twist.linear.x = 0.0
twist.angular.z = 0.0
self.cmd_pub.publish(twist)
8. 扩展与变体¶
8.1 WebRTC VAD vs. 能量 VAD¶
层级 1 使用能量 VAD 简化实现。生产系统建议替换为 WebRTC VAD(py-webrtcvad),专门针对语音训练:
import webrtcvad
vad = webrtcvad.Vad(2) # Aggressiveness 0-3
# Frame must be 10, 20, or 30 ms
is_speech = vad.is_speech(audio_10ms_frame, sample_rate=16000)
8.2 Whisper API 提升 ASR 质量¶
使用 OpenAI Whisper 替代 Vosk 以获得更好的转录准确率:
import openai
audio_file = open("recording.wav", "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file)["text"]
8.3 使用 Ollama 本地运行 LLaMA¶
实现完全离线层级 3:
# Terminal
ollama serve
ollama pull llama3
# Python
from openai import OpenAI
client = OpenAI(base_url="http://localhost:11434/v1", api_key="ollama")
response = client.chat.completions.create(model="llama3", messages=[...])
8.4 自定义唤醒词¶
使用 Picovoice Porcupine 训练自定义唤醒词:
8.5 多模态命令¶
将语音与手势或视线结合:
9. 参考资料¶
- Vosk 模型下载 — 离线语音识别模型
- WebRTC VAD — 语音活动检测库
- OpenAI 函数调用 — GPT-4 函数调用文档
- sklearn-crfsuite — 用于序列标注(槽位填充)的 CRF
- Hugging Face Transformers — 预训练 NLU 模型
- Ollama — 本地运行 LLaMA、Mistral 等开源模型
- ROS 2 Actions — ROS 2 action 接口
- Picovoice Porcupine — 自定义唤醒词检测
- MiniGPT-4 — 用于机器人感知的视觉-语言模型
- SpeechBrain — 开源语音工具包
- Voxpopuli — 开源语音数据集