Skip to content

代码讲解 - Embodied AI Exploration Lab1

本页详细讲解 Embodied-AI-Exploration-Lab1 仓库中的代码。

项目结构

Embodied-AI-Exploration-Lab1/
├── src/
│   ├── ros_course_examples/      # ROS 基础示例
│   │   ├── nodes/                # ROS 节点示例
│   │   └── simulation_demo/      # 仿真演示
│   ├── lab2_perception/          # Lab2: 感知模块
│   │   ├── scripts/              # 感知脚本
│   │   └── tools/                # 工具
│   ├── lab3_path_planning/       # Lab3: 路径规划
│   │   └── scripts/              # 路径规划脚本
│   ├── turtlebot3/               # Turtlebot3 仿真
│   └── turtlebot3_msgs/          # Turtlebot3 消息定义
├── linux_exp/                    # Linux 实验
├── doc/                          # 文档
├── setup_course.sh               # 课程配置脚本
└── README.md                     # 项目说明

1. ROS 基础示例

1.1 话题发布者 (topic_publisher.py)

#!/usr/bin/env python3
"""
话题发布者示例
演示如何使用 ROS 发布消息到话题
"""

import rospy
from std_msgs.msg import String

def talker():
    """发布者节点"""

    # 初始化节点
    rospy.init_node('talker', anonymous=True)

    # 创建发布者,发布到 'chatter' 话题
    pub = rospy.Publisher('chatter', String, queue_size=10)

    # 设置发布频率
    rate = rospy.Rate(10)  # 10Hz

    while not rospy.is_shutdown():
        # 创建消息
        hello_str = "hello world %s" % rospy.get_time()

        # 发布消息
        rospy.loginfo(hello_str)
        pub.publish(hello_str)

        # 按照频率休眠
        rate.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        pass

代码解析:

  1. 导入模块
  2. rospy: ROS Python 客户端库
  3. std_msgs.msg.String: 标准消息类型

  4. 初始化节点

  5. rospy.init_node(): 初始化 ROS 节点
  6. anonymous=True: 自动添加随机后缀,避免节点名冲突

  7. 创建发布者

  8. rospy.Publisher(): 创建发布者对象
  9. 'chatter': 话题名称
  10. String: 消息类型
  11. queue_size=10: 消息队列大小

  12. 发布消息

  13. pub.publish(): 发布消息到话题
  14. rospy.loginfo(): 打印日志信息

1.2 话题订阅者 (topic_subscriber.py)

#!/usr/bin/env python3
"""
话题订阅者示例
演示如何订阅 ROS 话题并接收消息
"""

import rospy
from std_msgs.msg import String

def callback(data):
    """回调函数,当收到消息时被调用"""
    rospy.loginfo("I heard %s", data.data)

def listener():
    """订阅者节点"""

    # 初始化节点
    rospy.init_node('listener', anonymous=True)

    # 订阅 'chatter' 话题
    rospy.Subscriber('chatter', String, callback)

    # 保持节点运行
    rospy.spin()

if __name__ == '__main__':
    listener()

代码解析:

  1. 回调函数
  2. 当收到消息时自动调用
  3. data.data: 获取消息内容

  4. 创建订阅者

  5. rospy.Subscriber(): 创建订阅者对象
  6. 'chatter': 订阅的话题名称
  7. String: 消息类型
  8. callback: 回调函数

  9. 保持运行

  10. rospy.spin(): 保持节点运行,等待消息

1.3 服务示例 (add_two_ints_server.py)

#!/usr/bin/env python3
"""
服务端示例
演示如何创建 ROS 服务
"""

import rospy
from beginner_tutorials.srv import AddTwoInts, AddTwoIntsResponse

def handle_add_two_ints(req):
    """处理服务请求"""
    print("Returning [%s + %s = %s]" % (req.a, req.b, (req.a + req.b)))
    return AddTwoIntsResponse(req.a + req.b)

def add_two_ints_server():
    """服务端节点"""

    # 初始化节点
    rospy.init_node('add_two_ints_server')

    # 创建服务
    s = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)

    print("Ready to add two ints.")

    # 保持服务运行
    rospy.spin()

if __name__ == '__main__':
    add_two_ints_server()

代码解析:

  1. 服务定义
  2. AddTwoInts: 服务类型(请求:两个整数,响应:一个整数)
  3. AddTwoIntsResponse: 响应类型

  4. 创建服务

  5. rospy.Service(): 创建服务对象
  6. 'add_two_ints': 服务名称
  7. AddTwoInts: 服务类型
  8. handle_add_two_ints: 处理函数

1.4 客户端示例 (add_two_ints_client.py)

#!/usr/bin/env python3
"""
客户端示例
演示如何调用 ROS 服务
"""

import rospy
from beginner_tutorials.srv import AddTwoInts
import sys

def add_two_ints_client(x, y):
    """调用服务"""

    # 等待服务可用
    rospy.wait_for_service('add_two_ints')

    try:
        # 创建服务代理
        add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)

        # 调用服务
        resp = add_two_ints(x, y)

        return resp.sum
    except rospy.ServiceException as e:
        print("Service call failed: %s" % e)

if __name__ == "__main__":
    if len(sys.argv) == 3:
        x = int(sys.argv[1])
        y = int(sys.argv[2])
    else:
        print("%s [x y]" % sys.argv[0])
        sys.exit(1)

    print("Requesting %s + %s" % (x, y))
    print("%s + %s = %s" % (x, y, add_two_ints_client(x, y)))

代码解析:

  1. 等待服务
  2. rospy.wait_for_service(): 等待服务可用

  3. 创建服务代理

  4. rospy.ServiceProxy(): 创建服务代理对象

  5. 调用服务

  6. add_two_ints(x, y): 调用服务并获取响应

2. 感知模块

2.1 RGB 图像订阅者 (rgb_subscriber.py)

#!/usr/bin/env python3
"""
RGB 图像订阅者
订阅相机话题并显示图像
"""

import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2

class RGBSubscriber:
    def __init__(self):
        """初始化"""

        # 创建 CV Bridge
        self.bridge = CvBridge()

        # 订阅图像话题
        self.image_sub = rospy.Subscriber(
            '/camera/rgb/image_raw',  # 话题名称
            Image,                     # 消息类型
            self.image_callback        # 回调函数
        )

        # 创建窗口
        cv2.namedWindow('RGB Image', cv2.WINDOW_NORMAL)

    def image_callback(self, data):
        """图像回调函数"""

        try:
            # 将 ROS 图像消息转换为 OpenCV 格式
            cv_image = self.bridge.imgmsg_to_cv2(data, 'bgr8')

            # 显示图像
            cv2.imshow('RGB Image', cv_image)
            cv2.waitKey(1)

        except Exception as e:
            rospy.logerr(e)

def main():
    """主函数"""

    # 初始化节点
    rospy.init_node('rgb_subscriber', anonymous=True)

    # 创建订阅者
    RGBSubscriber()

    # 保持运行
    rospy.spin()

    # 关闭窗口
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

代码解析:

  1. CV Bridge
  2. 用于 ROS 图像消息和 OpenCV 格式之间的转换
  3. imgmsg_to_cv2(): 将 ROS 图像转换为 OpenCV 格式

  4. 图像显示

  5. cv2.imshow(): 显示图像
  6. cv2.waitKey(1): 等待按键

2.2 YOLO 目标检测 (perception_yolo.py)

#!/usr/bin/env python3
"""
YOLO 目标检测节点
使用 YOLO 进行目标检测
"""

import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
from ultralytics import YOLO

class YOLODetector:
    def __init__(self):
        """初始化"""

        # 创建 CV Bridge
        self.bridge = CvBridge()

        # 加载 YOLO 模型
        self.model = YOLO('yolov8n.pt')  # 使用 YOLOv8 nano 模型

        # 订阅图像话题
        self.image_sub = rospy.Subscriber(
            '/camera/rgb/image_raw',
            Image,
            self.image_callback
        )

        # 创建窗口
        cv2.namedWindow('YOLO Detection', cv2.WINDOW_NORMAL)

    def image_callback(self, data):
        """图像回调函数"""

        try:
            # 将 ROS 图像转换为 OpenCV 格式
            cv_image = self.bridge.imgmsg_to_cv2(data, 'bgr8')

            # 进行目标检测
            results = self.model(cv_image)

            # 绘制检测结果
            annotated_frame = results[0].plot()

            # 显示图像
            cv2.imshow('YOLO Detection', annotated_frame)
            cv2.waitKey(1)

            # 打印检测结果
            for result in results:
                for box in result.boxes:
                    class_id = int(box.cls[0])
                    confidence = float(box.conf[0])
                    class_name = self.model.names[class_id]

                    rospy.loginfo(f"Detected: {class_name} ({confidence:.2f})")

        except Exception as e:
            rospy.logerr(e)

def main():
    """主函数"""

    # 初始化节点
    rospy.init_node('yolo_detector', anonymous=True)

    # 创建检测器
    YOLODetector()

    # 保持运行
    rospy.spin()

    # 关闭窗口
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

代码解析:

  1. YOLO 模型
  2. 使用 ultralytics 库加载 YOLOv8 模型
  3. YOLO('yolov8n.pt'): 加载 nano 版本模型

  4. 目标检测

  5. self.model(cv_image): 进行目标检测
  6. results[0].plot(): 绘制检测结果

  7. 获取检测结果

  8. box.cls[0]: 类别 ID
  9. box.conf[0]: 置信度
  10. self.model.names[class_id]: 类别名称

2.3 深度图像订阅者 (depth_subscriber.py)

#!/usr/bin/env python3
"""
深度图像订阅者
订阅深度相机话题并显示深度图
"""

import rospy
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np

class DepthSubscriber:
    def __init__(self):
        """初始化"""

        # 创建 CV Bridge
        self.bridge = CvBridge()

        # 订阅深度图像话题
        self.depth_sub = rospy.Subscriber(
            '/camera/depth/image_raw',
            Image,
            self.depth_callback
        )

        # 创建窗口
        cv2.namedWindow('Depth Image', cv2.WINDOW_NORMAL)

    def depth_callback(self, data):
        """深度图像回调函数"""

        try:
            # 将 ROS 图像转换为 OpenCV 格式
            # 深度图通常是 32 位浮点数
            depth_image = self.bridge.imgmsg_to_cv2(data, '32FC1')

            # 归一化到 0-255 范围
            depth_normalized = cv2.normalize(
                depth_image, 
                None, 
                0, 
                255, 
                cv2.NORM_MINMAX
            )

            # 转换为 8 位无符号整数
            depth_normalized = np.uint8(depth_normalized)

            # 应用颜色映射
            depth_colormap = cv2.applyColorMap(
                depth_normalized, 
                cv2.COLORMAP_JET
            )

            # 显示深度图
            cv2.imshow('Depth Image', depth_colormap)
            cv2.waitKey(1)

            # 打印深度信息
            center_x = depth_image.shape[1] // 2
            center_y = depth_image.shape[0] // 2
            center_depth = depth_image[center_y, center_x]

            rospy.loginfo(f"Center depth: {center_depth:.2f} meters")

        except Exception as e:
            rospy.logerr(e)

def main():
    """主函数"""

    # 初始化节点
    rospy.init_node('depth_subscriber', anonymous=True)

    # 创建订阅者
    DepthSubscriber()

    # 保持运行
    rospy.spin()

    # 关闭窗口
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

代码解析:

  1. 深度图像处理
  2. 深度图通常是 32 位浮点数格式
  3. 32FC1: 单通道 32 位浮点数

  4. 归一化

  5. cv2.normalize(): 将深度值归一化到 0-255 范围
  6. 便于可视化

  7. 颜色映射

  8. cv2.applyColorMap(): 应用颜色映射
  9. cv2.COLORMAP_JET: JET 颜色映射

3. 路径规划

3.1 路径规划节点 (path_planning_node.py)

#!/usr/bin/env python3
"""
路径规划节点
实现 A* 路径规划算法
"""

import rospy
from nav_msgs.msg import OccupancyGrid, Path
from geometry_msgs.msg import PoseStamped, Point
import numpy as np
import heapq

class PathPlanner:
    def __init__(self):
        """初始化"""

        # 订阅地图话题
        self.map_sub = rospy.Subscriber(
            '/map',
            OccupancyGrid,
            self.map_callback
        )

        # 发布路径话题
        self.path_pub = rospy.Publisher(
            '/planned_path',
            Path,
            queue_size=1
        )

        # 地图数据
        self.map_data = None
        self.map_resolution = None
        self.map_origin = None

    def map_callback(self, data):
        """地图回调函数"""

        # 保存地图数据
        self.map_data = np.array(data.data).reshape(
            (data.info.height, data.info.width)
        )
        self.map_resolution = data.info.resolution
        self.map_origin = data.info.origin

        rospy.loginfo(f"Map received: {data.info.width}x{data.info.height}")

    def world_to_grid(self, x, y):
        """世界坐标转栅格坐标"""

        grid_x = int((x - self.map_origin.position.x) / self.map_resolution)
        grid_y = int((y - self.map_origin.position.y) / self.map_resolution)

        return grid_x, grid_y

    def grid_to_world(self, grid_x, grid_y):
        """栅格坐标转世界坐标"""

        x = grid_x * self.map_resolution + self.map_origin.position.x
        y = grid_y * self.map_resolution + self.map_origin.position.y

        return x, y

    def heuristic(self, a, b):
        """启发式函数(曼哈顿距离)"""

        return abs(a[0] - b[0]) + abs(a[1] - b[1])

    def astar(self, start, goal):
        """A* 路径规划算法"""

        # 检查起点和终点是否有效
        if (self.map_data[start[1], start[0]] != 0 or 
            self.map_data[goal[1], goal[0]] != 0):
            rospy.logwarn("Start or goal is invalid!")
            return None

        # 初始化
        open_set = []
        heapq.heappush(open_set, (0, start))
        came_from = {}
        g_score = {start: 0}
        f_score = {start: self.heuristic(start, goal)}

        # 邻居方向(8方向)
        directions = [
            (0, 1), (0, -1), (1, 0), (-1, 0),
            (1, 1), (1, -1), (-1, 1), (-1, -1)
        ]

        while open_set:
            # 获取 f 值最小的节点
            current = heapq.heappop(open_set)[1]

            # 到达目标
            if current == goal:
                # 重建路径
                path = []
                while current in came_from:
                    path.append(current)
                    current = came_from[current]
                path.append(start)
                return path[::-1]

            # 探索邻居
            for dx, dy in directions:
                neighbor = (current[0] + dx, current[1] + dy)

                # 检查边界
                if (0 <= neighbor[0] < self.map_data.shape[1] and 
                    0 <= neighbor[1] < self.map_data.shape[0]):

                    # 检查是否是障碍物
                    if self.map_data[neighbor[1], neighbor[0]] == 0:

                        # 计算 g 值
                        tentative_g = g_score[current] + np.sqrt(dx**2 + dy**2)

                        # 更新路径
                        if neighbor not in g_score or tentative_g < g_score[neighbor]:
                            came_from[neighbor] = current
                            g_score[neighbor] = tentative_g
                            f_score[neighbor] = tentative_g + self.heuristic(neighbor, goal)
                            heapq.heappush(open_set, (f_score[neighbor], neighbor))

        return None

    def plan_path(self, start_x, start_y, goal_x, goal_y):
        """规划路径"""

        # 转换坐标
        start_grid = self.world_to_grid(start_x, start_y)
        goal_grid = self.world_to_grid(goal_x, goal_y)

        rospy.loginfo(f"Planning path from {start_grid} to {goal_grid}")

        # 执行 A* 算法
        path_grid = self.astar(start_grid, goal_grid)

        if path_grid is None:
            rospy.logwarn("No path found!")
            return None

        # 创建路径消息
        path_msg = Path()
        path_msg.header.stamp = rospy.Time.now()
        path_msg.header.frame_id = 'map'

        # 添加路径点
        for grid_x, grid_y in path_grid:
            pose = PoseStamped()
            pose.header.stamp = rospy.Time.now()
            pose.header.frame_id = 'map'

            # 转换坐标
            x, y = self.grid_to_world(grid_x, grid_y)
            pose.pose.position.x = x
            pose.pose.position.y = y
            pose.pose.position.z = 0
            pose.pose.orientation.w = 1.0

            path_msg.poses.append(pose)

        # 发布路径
        self.path_pub.publish(path_msg)

        rospy.loginfo(f"Path planned with {len(path_msg.poses)} points")

        return path_msg

def main():
    """主函数"""

    # 初始化节点
    rospy.init_node('path_planner', anonymous=True)

    # 创建路径规划器
    planner = PathPlanner()

    # 等待地图
    rospy.loginfo("Waiting for map...")
    rospy.wait_for_message('/map', OccupancyGrid)
    rospy.loginfo("Map received!")

    # 规划路径
    planner.plan_path(0.0, 0.0, 5.0, 5.0)

    # 保持运行
    rospy.spin()

if __name__ == '__main__':
    main()

代码解析:

  1. 地图处理
  2. OccupancyGrid: 占用栅格地图
  3. 地图数据是一维数组,需要转换为二维

  4. 坐标转换

  5. world_to_grid(): 世界坐标转栅格坐标
  6. grid_to_world(): 栅格坐标转世界坐标

  7. A* 算法

  8. 使用优先队列(heapq)实现
  9. 启发式函数:曼哈顿距离
  10. 8 方向邻居探索

  11. 路径发布

  12. Path: 路径消息类型
  13. 包含多个 PoseStamped 位姿点

4. 仿真演示

4.1 电机控制 (motor.py)

#!/usr/bin/env python3
"""
电机控制类
模拟电机控制
"""

class Motor:
    def __init__(self, motor_id):
        """初始化"""

        self.motor_id = motor_id
        self.speed = 0
        self.position = 0

    def set_speed(self, speed):
        """设置速度"""

        self.speed = speed
        print(f"Motor {self.motor_id}: Speed set to {speed}")

    def get_speed(self):
        """获取速度"""

        return self.speed

    def update(self, dt):
        """更新位置"""

        self.position += self.speed * dt

    def get_position(self):
        """获取位置"""

        return self.position

4.2 控制器 (controller.py)

#!/usr/bin/env python3
"""
控制器类
实现差速驱动控制
"""

from motor import Motor
import math

class Controller:
    def __init__(self, wheel_separation):
        """初始化"""

        self.wheel_separation = wheel_separation

        # 创建电机
        self.left_motor = Motor('left')
        self.right_motor = Motor('right')

        # 机器人位置
        self.x = 0.0
        self.y = 0.0
        self.theta = 0.0

    def set_velocity(self, linear, angular):
        """设置速度"""

        # 差速驱动运动学
        left_speed = linear - (angular * self.wheel_separation / 2)
        right_speed = linear + (angular * self.wheel_separation / 2)

        # 设置电机速度
        self.left_motor.set_speed(left_speed)
        self.right_motor.set_speed(right_speed)

    def update(self, dt):
        """更新位置"""

        # 获取电机速度
        left_speed = self.left_motor.get_speed()
        right_speed = self.right_motor.get_speed()

        # 计算机器人速度
        linear = (left_speed + right_speed) / 2
        angular = (right_speed - left_speed) / self.wheel_separation

        # 更新位置
        self.x += linear * math.cos(self.theta) * dt
        self.y += linear * math.sin(self.theta) * dt
        self.theta += angular * dt

    def get_position(self):
        """获取位置"""

        return self.x, self.y, self.theta

代码解析:

  1. 差速驱动运动学
  2. 左右轮速度差产生转向
  3. linear = (left + right) / 2
  4. angular = (right - left) / wheel_separation

  5. 位置更新

  6. 使用欧拉积分更新位置
  7. x += linear * cos(theta) * dt
  8. y += linear * sin(theta) * dt
  9. theta += angular * dt

5. Linux 实验

5.1 简单控制器 (controller_simple.py)

#!/usr/bin/env python3
"""
简单控制器
用于 Linux 实验
"""

import time

def main():
    """主函数"""

    # 电机状态
    left_speed = 0
    right_speed = 0

    print("Simple Controller")
    print("Commands: w (forward), s (backward), a (left), d (right), x (stop)")

    while True:
        # 获取用户输入
        cmd = input("Enter command: ")

        if cmd == 'w':
            # 前进
            left_speed = 100
            right_speed = 100
            print("Moving forward")

        elif cmd == 's':
            # 后退
            left_speed = -100
            right_speed = -100
            print("Moving backward")

        elif cmd == 'a':
            # 左转
            left_speed = -100
            right_speed = 100
            print("Turning left")

        elif cmd == 'd':
            # 右转
            left_speed = 100
            right_speed = -100
            print("Turning right")

        elif cmd == 'x':
            # 停止
            left_speed = 0
            right_speed = 0
            print("Stopping")

        elif cmd == 'q':
            # 退出
            print("Exiting")
            break

        else:
            print("Unknown command")

        # 模拟执行
        time.sleep(0.1)

        # 打印状态
        print(f"Left motor: {left_speed}, Right motor: {right_speed}")

if __name__ == '__main__':
    main()

6. 工具脚本

6.1 HSV 调节器 (hsv_tuner.py)

#!/usr/bin/env python3
"""
HSV 颜色调节器
用于调节颜色检测参数
"""

import cv2
import numpy as np

def nothing(x):
    """空回调函数"""
    pass

def main():
    """主函数"""

    # 创建窗口
    cv2.namedWindow('HSV Tuner')

    # 创建滑块
    cv2.createTrackbar('H Min', 'HSV Tuner', 0, 179, nothing)
    cv2.createTrackbar('H Max', 'HSV Tuner', 179, 179, nothing)
    cv2.createTrackbar('S Min', 'HSV Tuner', 0, 255, nothing)
    cv2.createTrackbar('S Max', 'HSV Tuner', 255, 255, nothing)
    cv2.createTrackbar('V Min', 'HSV Tuner', 0, 255, nothing)
    cv2.createTrackbar('V Max', 'HSV Tuner', 255, 255, nothing)

    # 打开摄像头
    cap = cv2.VideoCapture(0)

    while True:
        # 读取图像
        ret, frame = cap.read()

        if not ret:
            break

        # 转换为 HSV
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # 获取滑块值
        h_min = cv2.getTrackbarPos('H Min', 'HSV Tuner')
        h_max = cv2.getTrackbarPos('H Max', 'HSV Tuner')
        s_min = cv2.getTrackbarPos('S Min', 'HSV Tuner')
        s_max = cv2.getTrackbarPos('S Max', 'HSV Tuner')
        v_min = cv2.getTrackbarPos('V Min', 'HSV Tuner')
        v_max = cv2.getTrackbarPos('V Max', 'HSV Tuner')

        # 创建掩码
        lower = np.array([h_min, s_min, v_min])
        upper = np.array([h_max, s_max, v_max])
        mask = cv2.inRange(hsv, lower, upper)

        # 应用掩码
        result = cv2.bitwise_and(frame, frame, mask=mask)

        # 显示图像
        cv2.imshow('Original', frame)
        cv2.imshow('Mask', mask)
        cv2.imshow('Result', result)

        # 按 'q' 退出
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # 释放资源
    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

代码解析:

  1. HSV 颜色空间
  2. H: 色相 (0-179)
  3. S: 饱和度 (0-255)
  4. V: 明度 (0-255)

  5. 颜色掩码

  6. cv2.inRange(): 创建二值掩码
  7. 只保留指定颜色范围内的像素

总结

本仓库包含了完整的具身智能课程实验代码,涵盖:

  1. ROS 基础:话题、服务、节点
  2. 感知模块:RGB 图像、深度图像、YOLO 目标检测
  3. 路径规划:A* 算法
  4. 仿真演示:差速驱动控制
  5. Linux 实验:基本控制命令
  6. 工具脚本:HSV 颜色调节

通过学习这些代码,你将掌握:

  • ROS 节点的创建和通信
  • 图像处理和目标检测
  • 路径规划算法实现
  • 机器人控制基础

下一步