Загрузка данных


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import rospy
import sys
import select
import termios
import tty
import math
import subprocess
import os
from datetime import datetime

from geometry_msgs.msg import Twist, Pose2D
from nav_msgs.msg import Odometry
from sensor_msgs.msg import LaserScan, Imu, BatteryState, Image, CompressedImage, CameraInfo
from std_msgs.msg import Int16, Int32, Float32MultiArray
from diagnostic_msgs.msg import DiagnosticArray


CMD_VEL_TOPIC = "/cmd_vel"

LINEAR_SPEED = 0.15
ANGULAR_SPEED = 0.5
RATE_HZ = 10


class TurtleBroController:
    def __init__(self):
        rospy.init_node("turtlebro_full_controller", anonymous=True)

        self.pub = rospy.Publisher(CMD_VEL_TOPIC, Twist, queue_size=10)
        self.rate = rospy.Rate(RATE_HZ)

        rospy.sleep(1)

        print("TurtleBro controller запущен")
        print("Топик управления:", CMD_VEL_TOPIC)

    # ======================================================
    # БАЗОВОЕ УПРАВЛЕНИЕ РОБОТОМ
    # ======================================================

    def set_velocity(self, linear_x=0.0, angular_z=0.0):
        move_cmd = Twist()

        move_cmd.linear.x = linear_x
        move_cmd.linear.y = 0.0
        move_cmd.linear.z = 0.0

        move_cmd.angular.x = 0.0
        move_cmd.angular.y = 0.0
        move_cmd.angular.z = angular_z

        self.pub.publish(move_cmd)

    def stop_robot(self):
        self.set_velocity(0.0, 0.0)

    def move_forward(self, seconds):
        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.set_velocity(LINEAR_SPEED, 0.0)
            self.rate.sleep()

        self.stop_robot()

    def move_backward(self, seconds):
        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.set_velocity(-LINEAR_SPEED, 0.0)
            self.rate.sleep()

        self.stop_robot()

    def turn_left(self, seconds):
        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.set_velocity(0.0, ANGULAR_SPEED)
            self.rate.sleep()

        self.stop_robot()

    def turn_right(self, seconds):
        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.set_velocity(0.0, -ANGULAR_SPEED)
            self.rate.sleep()

        self.stop_robot()

    def test_response(self):
        print("\n=== Проверка отзывчивости TurtleBro ===")

        print("1. Короткое движение вперед")
        self.move_forward(0.5)
        rospy.sleep(0.3)

        print("2. Короткое движение назад")
        self.move_backward(0.5)
        rospy.sleep(0.3)

        print("3. Короткий поворот налево")
        self.turn_left(0.5)
        rospy.sleep(0.3)

        print("4. Короткий поворот направо")
        self.turn_right(0.5)
        rospy.sleep(0.3)

        self.stop_robot()

        print("Проверка завершена")

    # ======================================================
    # УПРАВЛЕНИЕ СТРЕЛКАМИ
    # ======================================================

    def get_key(self):
        settings = termios.tcgetattr(sys.stdin)

        try:
            tty.setraw(sys.stdin.fileno())
            select.select([sys.stdin], [], [], 0.1)
            key = sys.stdin.read(1)

            if key == "\x1b":
                key += sys.stdin.read(2)

        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)

        return key

    def keyboard_control(self):
        print("\n=== Управление роботом стрелками ===")
        print("↑      - ехать вперед")
        print("↓      - ехать назад")
        print("←      - поворот налево")
        print("→      - поворот направо")
        print("Пробел - стоп")
        print("q      - выход в главное меню")
        print("")

        while not rospy.is_shutdown():
            key = self.get_key()

            if key == "\x1b[A":
                print("Вперед")
                self.set_velocity(LINEAR_SPEED, 0.0)

            elif key == "\x1b[B":
                print("Назад")
                self.set_velocity(-LINEAR_SPEED, 0.0)

            elif key == "\x1b[D":
                print("Налево")
                self.set_velocity(0.0, ANGULAR_SPEED)

            elif key == "\x1b[C":
                print("Направо")
                self.set_velocity(0.0, -ANGULAR_SPEED)

            elif key == " ":
                print("Стоп")
                self.stop_robot()

            elif key == "q":
                print("Выход из режима управления")
                self.stop_robot()
                break

            else:
                self.stop_robot()

    # ======================================================
    # ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ДЛЯ ДАТЧИКОВ
    # ======================================================

    def wait_msg(self, topic, msg_type, timeout=3.0):
        try:
            return rospy.wait_for_message(topic, msg_type, timeout=timeout)
        except rospy.ROSException:
            print(f"Нет данных с топика {topic} за {timeout} сек.")
            return None
        except Exception as error:
            print(f"Ошибка чтения {topic}: {error}")
            return None

    def quaternion_to_yaw(self, q):
        siny_cosp = 2.0 * (q.w * q.z + q.x * q.y)
        cosy_cosp = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
        yaw = math.atan2(siny_cosp, cosy_cosp)
        return yaw

    # ======================================================
    # ИСПОЛЬЗОВАНИЕ ДАТЧИКОВ
    # ======================================================

    def use_lidar_scan(self):
        print("\n=== Лидар /scan ===")

        msg = self.wait_msg("/scan", LaserScan)

        if msg is None:
            return

        valid_ranges = []

        for distance in msg.ranges:
            if not math.isinf(distance) and not math.isnan(distance):
                valid_ranges.append(distance)

        if not valid_ranges:
            print("Лидар работает, но валидных расстояний нет")
            return

        min_distance = min(valid_ranges)
        max_distance = max(valid_ranges)
        avg_distance = sum(valid_ranges) / len(valid_ranges)

        print(f"Количество лучей: {len(msg.ranges)}")
        print(f"Минимальная дистанция: {min_distance:.3f} м")
        print(f"Максимальная дистанция: {max_distance:.3f} м")
        print(f"Средняя дистанция: {avg_distance:.3f} м")
        print(f"Угол начала: {msg.angle_min:.3f} рад")
        print(f"Угол конца: {msg.angle_max:.3f} рад")
        print(f"Шаг угла: {msg.angle_increment:.5f} рад")

        if min_distance < 0.25:
            print("Внимание: рядом препятствие")
        else:
            print("Перед роботом нет критически близкого препятствия")

    def use_odom(self):
        print("\n=== Одометрия /odom ===")

        msg = self.wait_msg("/odom", Odometry)

        if msg is None:
            return

        position = msg.pose.pose.position
        orientation = msg.pose.pose.orientation
        linear = msg.twist.twist.linear
        angular = msg.twist.twist.angular

        yaw = self.quaternion_to_yaw(orientation)

        print(f"Позиция X: {position.x:.3f}")
        print(f"Позиция Y: {position.y:.3f}")
        print(f"Позиция Z: {position.z:.3f}")
        print(f"Поворот yaw: {yaw:.3f} рад")
        print(f"Линейная скорость X: {linear.x:.3f} м/с")
        print(f"Угловая скорость Z: {angular.z:.3f} рад/с")

    def use_pose2d(self):
        print("\n=== Положение /odom_pose2d ===")

        msg = self.wait_msg("/odom_pose2d", Pose2D)

        if msg is None:
            return

        print(f"X: {msg.x:.3f}")
        print(f"Y: {msg.y:.3f}")
        print(f"Theta: {msg.theta:.3f} рад")

    def use_imu(self):
        print("\n=== IMU /imu ===")

        msg = self.wait_msg("/imu", Imu)

        if msg is None:
            return

        orientation = msg.orientation
        angular_velocity = msg.angular_velocity
        linear_acceleration = msg.linear_acceleration

        yaw = self.quaternion_to_yaw(orientation)

        print(f"Ориентация yaw: {yaw:.3f} рад")
        print("")
        print("Угловая скорость:")
        print(f"X: {angular_velocity.x:.3f}")
        print(f"Y: {angular_velocity.y:.3f}")
        print(f"Z: {angular_velocity.z:.3f}")
        print("")
        print("Линейное ускорение:")
        print(f"X: {linear_acceleration.x:.3f}")
        print(f"Y: {linear_acceleration.y:.3f}")
        print(f"Z: {linear_acceleration.z:.3f}")

    def use_battery(self):
        print("\n=== Батарея /bat ===")

        msg = self.wait_msg("/bat", BatteryState)

        if msg is None:
            return

        print(f"Напряжение: {msg.voltage:.2f} В")
        print(f"Ток: {msg.current:.2f} А")
        print(f"Заряд: {msg.charge:.2f}")
        print(f"Ёмкость: {msg.capacity:.2f}")
        print(f"Процент: {msg.percentage * 100:.1f}%")

        if msg.percentage > 0:
            if msg.percentage < 0.2:
                print("Внимание: батарея почти разряжена")
            elif msg.percentage < 0.5:
                print("Батарея ниже половины")
            else:
                print("Батарея в норме")
        else:
            print("Процент батареи может не передаваться датчиком")

    def use_camera_info(self):
        print("\n=== Информация о камере /front_camera/camera_info ===")

        msg = self.wait_msg("/front_camera/camera_info", CameraInfo)

        if msg is None:
            return

        print(f"Ширина: {msg.width}")
        print(f"Высота: {msg.height}")
        print(f"Модель искажений: {msg.distortion_model}")
        print(f"Матрица камеры K: {msg.K}")
        print(f"Коэффициенты искажений D: {msg.D}")

    def use_camera_raw_info(self):
        print("\n=== Кадр камеры /front_camera/image_raw ===")

        msg = self.wait_msg("/front_camera/image_raw", Image)

        if msg is None:
            return

        print(f"Ширина: {msg.width}")
        print(f"Высота: {msg.height}")
        print(f"Кодировка: {msg.encoding}")
        print(f"Размер данных: {len(msg.data)} байт")
        print("")
        print("Само изображение в терминал не выводится, потому что это огромный массив байтов.")
        print("Для просмотра камеры лучше использовать:")
        print("rqt_image_view /front_camera/image_raw")

    def save_compressed_camera_image(self):
        print("\n=== Сохранение кадра камеры /front_camera/image_raw/compressed ===")

        msg = self.wait_msg("/front_camera/image_raw/compressed", CompressedImage)

        if msg is None:
            return

        folder = os.path.expanduser("~/turtlebro_camera")
        os.makedirs(folder, exist_ok=True)

        filename = datetime.now().strftime("camera_%Y%m%d_%H%M%S.jpg")
        filepath = os.path.join(folder, filename)

        with open(filepath, "wb") as file:
            file.write(msg.data)

        print(f"Кадр сохранён: {filepath}")
        print(f"Формат: {msg.format}")
        print(f"Размер файла: {len(msg.data)} байт")

    def use_diagnostics(self):
        print("\n=== Диагностика /diagnostics ===")

        msg = self.wait_msg("/diagnostics", DiagnosticArray)

        if msg is None:
            return

        if not msg.status:
            print("Диагностические статусы пустые")
            return

        for status in msg.status:
            print("--------------------------------")
            print(f"Имя: {status.name}")
            print(f"Уровень: {status.level}")
            print(f"Сообщение: {status.message}")
            print(f"Аппаратный ID: {status.hardware_id}")

            if status.values:
                print("Параметры:")
                for value in status.values:
                    print(f"  {value.key}: {value.value}")

    def use_buttons(self):
        print("\n=== Кнопки /buttons ===")

        msg = self.wait_msg("/buttons", Int16)

        if msg is None:
            return

        print(f"Состояние кнопок: {msg.data}")

        if msg.data == 0:
            print("Кнопки не нажаты")
        else:
            print("Есть нажатие или активное состояние кнопки")

    def use_thermovisor(self):
        print("\n=== Термовизор /thermovisor ===")

        msg = self.wait_msg("/thermovisor", Float32MultiArray)

        if msg is None:
            return

        data = list(msg.data)

        if not data:
            print("Термовизор передал пустой массив")
            return

        min_temp = min(data)
        max_temp = max(data)
        avg_temp = sum(data) / len(data)

        print(f"Количество значений: {len(data)}")
        print(f"Минимум: {min_temp:.2f}")
        print(f"Максимум: {max_temp:.2f}")
        print(f"Среднее: {avg_temp:.2f}")

        print("")
        print("Первые 20 значений:")
        print(data[:20])

    def use_client_count(self):
        print("\n=== Количество клиентов /client_count ===")

        msg = self.wait_msg("/client_count", Int32)

        if msg is None:
            return

        print(f"Количество клиентов: {msg.data}")

    def echo_connected_clients(self):
        print("\n=== Подключенные клиенты /connected_clients ===")
        print("Читаю одно сообщение через rostopic echo")
        self.echo_topic_once("/connected_clients")

    def echo_web_telemetry(self):
        print("\n=== Web telemetry /web_tele ===")
        print("Это кастомный тип turtlebro_web/WebTelemetry, поэтому читаем через rostopic echo")
        self.echo_topic_once("/web_tele")

    def echo_raw_odom(self):
        print("\n=== Raw odometry /raw_odom ===")
        print("Это кастомный тип turtlebro/RawOdom, поэтому читаем через rostopic echo")
        self.echo_topic_once("/raw_odom")

    def echo_tf(self):
        print("\n=== TF /tf ===")
        self.echo_topic_once("/tf")

    def echo_tf_static(self):
        print("\n=== TF static /tf_static ===")
        self.echo_topic_once("/tf_static")

    # ======================================================
    # АВТОМАТИЧЕСКИЕ ТЕСТЫ С ДАТЧИКАМИ
    # ======================================================

    def obstacle_check(self):
        print("\n=== Проверка препятствия по лидару ===")

        msg = self.wait_msg("/scan", LaserScan)

        if msg is None:
            return

        valid_ranges = []

        for distance in msg.ranges:
            if not math.isinf(distance) and not math.isnan(distance):
                valid_ranges.append(distance)

        if not valid_ranges:
            print("Нет валидных расстояний")
            return

        min_distance = min(valid_ranges)

        print(f"Минимальное расстояние: {min_distance:.3f} м")

        if min_distance < 0.20:
            print("Очень близко препятствие. Движение опасно.")
        elif min_distance < 0.50:
            print("Препятствие рядом. Ехать надо аккуратно.")
        else:
            print("Свободно. Критичных препятствий рядом нет.")

    def safe_forward_by_lidar(self):
        print("\n=== Безопасное движение вперед по лидару ===")

        seconds = get_seconds()

        if seconds is None:
            return

        print("Робот будет ехать вперед, пока впереди нет близкого препятствия")

        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            msg = self.wait_msg("/scan", LaserScan, timeout=1.0)

            if msg is None:
                print("Лидар не отвечает. Останавливаю робота.")
                self.stop_robot()
                return

            valid_ranges = []

            for distance in msg.ranges:
                if not math.isinf(distance) and not math.isnan(distance):
                    valid_ranges.append(distance)

            if not valid_ranges:
                print("Нет данных лидара. Останавливаю робота.")
                self.stop_robot()
                return

            min_distance = min(valid_ranges)

            if min_distance < 0.35:
                print(f"Препятствие близко: {min_distance:.3f} м. Стоп.")
                self.stop_robot()
                return

            self.set_velocity(LINEAR_SPEED, 0.0)
            self.rate.sleep()

        self.stop_robot()
        print("Безопасное движение завершено")

    def battery_and_diagnostics_check(self):
        print("\n=== Быстрая проверка состояния робота ===")

        self.use_battery()
        print("")
        self.use_diagnostics()

    def full_sensor_check(self):
        print("\n=== Полная проверка основных датчиков ===")

        self.use_battery()
        print("")

        self.use_odom()
        print("")

        self.use_pose2d()
        print("")

        self.use_imu()
        print("")

        self.use_lidar_scan()
        print("")

        self.use_camera_info()
        print("")

        self.use_buttons()
        print("")

        self.use_thermovisor()
        print("")

        self.use_client_count()

        print("\nПолная проверка завершена")

    # ======================================================
    # ТОПИКИ, СЕРВИСЫ, ДИАГНОСТИКА ROS
    # ======================================================

    def echo_topic_once(self, topic_name):
        try:
            subprocess.call(["rostopic", "echo", "-n", "1", topic_name])
        except Exception as error:
            print("Ошибка чтения топика:", error)

    def show_all_topics(self):
        print("\n=== Все ROS-топики ===")

        topics = rospy.get_published_topics()

        if not topics:
            print("Топики не найдены")
            return

        for topic_name, topic_type in topics:
            print(f"{topic_name}    [{topic_type}]")

    def show_all_services(self):
        print("\n=== Все ROS-сервисы ===")

        try:
            subprocess.call(["rosservice", "list"])
        except Exception as error:
            print("Ошибка просмотра сервисов:", error)

    def show_ros_nodes(self):
        print("\n=== Активные ROS-узлы ===")

        try:
            subprocess.call(["rosnode", "list"])
        except Exception as error:
            print("Ошибка просмотра узлов:", error)

    def show_cmd_vel_info(self):
        print("\n=== Информация о /cmd_vel ===")

        try:
            subprocess.call(["rostopic", "info", CMD_VEL_TOPIC])
        except Exception as error:
            print("Ошибка просмотра /cmd_vel:", error)

    def read_any_topic_once(self):
        topic_name = input("Введи название топика: ").strip()

        if not topic_name:
            print("Название топика пустое")
            return

        self.echo_topic_once(topic_name)


# ======================================================
# МЕНЮ
# ======================================================

def print_main_menu():
    print("\n==============================")
    print("      TURTLEBRO CONTROL")
    print("==============================")
    print("1  - Управление роботом стрелками")
    print("2  - Ехать вперед по времени")
    print("3  - Ехать назад по времени")
    print("4  - Повернуть налево по времени")
    print("5  - Повернуть направо по времени")
    print("6  - Стоп")
    print("7  - Тест отзывчивости")
    print("")
    print("8  - Все ROS-топики")
    print("9  - Все ROS-сервисы")
    print("10 - Активные ROS-узлы")
    print("11 - Информация о /cmd_vel")
    print("12 - Прочитать любой топик один раз")
    print("")
    print("13 - Датчики и топики TurtleBro")
    print("14 - Безопасное движение вперед по лидару")
    print("15 - Проверка препятствий по лидару")
    print("16 - Быстрая проверка батареи и диагностики")
    print("17 - Полная проверка основных датчиков")
    print("")
    print("0  - Выход")
    print("==============================")


def print_sensor_menu():
    print("\n======================================")
    print("        ДАТЧИКИ И ТОПИКИ TURTLEBRO")
    print("======================================")
    print("1  - Лидар /scan")
    print("2  - Одометрия /odom")
    print("3  - Положение 2D /odom_pose2d")
    print("4  - IMU /imu")
    print("5  - Батарея /bat")
    print("6  - Информация о камере /front_camera/camera_info")
    print("7  - Информация о raw-кадре /front_camera/image_raw")
    print("8  - Сохранить кадр с камеры /front_camera/image_raw/compressed")
    print("9  - Диагностика /diagnostics")
    print("10 - Кнопки /buttons")
    print("11 - Термовизор /thermovisor")
    print("12 - Количество клиентов /client_count")
    print("13 - Подключенные клиенты /connected_clients")
    print("14 - Web telemetry /web_tele")
    print("15 - Raw odometry /raw_odom")
    print("16 - TF /tf")
    print("17 - TF static /tf_static")
    print("")
    print("0  - Назад")
    print("======================================")


def get_seconds():
    try:
        seconds = float(input("Введите время в секундах: "))

        if seconds <= 0:
            print("Время должно быть больше 0")
            return None

        return seconds

    except ValueError:
        print("Нужно ввести число")
        return None


def sensor_menu(robot):
    while not rospy.is_shutdown():
        print_sensor_menu()
        choice = input("Выбери датчик/топик: ").strip()

        if choice == "1":
            robot.use_lidar_scan()

        elif choice == "2":
            robot.use_odom()

        elif choice == "3":
            robot.use_pose2d()

        elif choice == "4":
            robot.use_imu()

        elif choice == "5":
            robot.use_battery()

        elif choice == "6":
            robot.use_camera_info()

        elif choice == "7":
            robot.use_camera_raw_info()

        elif choice == "8":
            robot.save_compressed_camera_image()

        elif choice == "9":
            robot.use_diagnostics()

        elif choice == "10":
            robot.use_buttons()

        elif choice == "11":
            robot.use_thermovisor()

        elif choice == "12":
            robot.use_client_count()

        elif choice == "13":
            robot.echo_connected_clients()

        elif choice == "14":
            robot.echo_web_telemetry()

        elif choice == "15":
            robot.echo_raw_odom()

        elif choice == "16":
            robot.echo_tf()

        elif choice == "17":
            robot.echo_tf_static()

        elif choice == "0":
            break

        else:
            print("Нет такого пункта")


def main():
    robot = TurtleBroController()

    try:
        while not rospy.is_shutdown():
            print_main_menu()
            choice = input("Выбери действие: ").strip()

            if choice == "1":
                robot.keyboard_control()

            elif choice == "2":
                seconds = get_seconds()
                if seconds is not None:
                    robot.move_forward(seconds)

            elif choice == "3":
                seconds = get_seconds()
                if seconds is not None:
                    robot.move_backward(seconds)

            elif choice == "4":
                seconds = get_seconds()
                if seconds is not None:
                    robot.turn_left(seconds)

            elif choice == "5":
                seconds = get_seconds()
                if seconds is not None:
                    robot.turn_right(seconds)

            elif choice == "6":
                robot.stop_robot()
                print("Робот остановлен")

            elif choice == "7":
                robot.test_response()

            elif choice == "8":
                robot.show_all_topics()

            elif choice == "9":
                robot.show_all_services()

            elif choice == "10":
                robot.show_ros_nodes()

            elif choice == "11":
                robot.show_cmd_vel_info()

            elif choice == "12":
                robot.read_any_topic_once()

            elif choice == "13":
                sensor_menu(robot)

            elif choice == "14":
                robot.safe_forward_by_lidar()

            elif choice == "15":
                robot.obstacle_check()

            elif choice == "16":
                robot.battery_and_diagnostics_check()

            elif choice == "17":
                robot.full_sensor_check()

            elif choice == "0":
                robot.stop_robot()
                print("Выход из программы")
                break

            else:
                print("Нет такого пункта меню")

    except KeyboardInterrupt:
        print("\nПрограмма остановлена через Ctrl+C")
        robot.stop_robot()

    finally:
        robot.stop_robot()


if __name__ == "__main__":
    main()