Загрузка данных


#!/usr/bin/env python3

import rospy
from geometry_msgs.msg import Twist
from std_srvs.srv import Trigger, TriggerResponse

import sys
import select
import termios
import tty
import subprocess


CMD_VEL_TOPIC = "/cmd_vel"

LINEAR_SPEED = 0.15
ANGULAR_SPEED = 0.5
RATE_HZ = 10


class TurtleBroController:
    def __init__(self):
        rospy.init_node("turtlebro_full_controller", anonymous=True)

        self.pub = rospy.Publisher(CMD_VEL_TOPIC, Twist, queue_size=10)
        self.rate = rospy.Rate(RATE_HZ)

        rospy.sleep(1)

        print("TurtleBro controller запущен")
        print("Топик управления:", CMD_VEL_TOPIC)

    # =========================
    # БАЗОВОЕ УПРАВЛЕНИЕ
    # =========================

    def set_velocity(self, linear_x=0.0, angular_z=0.0):
        move_cmd = Twist()

        move_cmd.linear.x = linear_x
        move_cmd.linear.y = 0.0
        move_cmd.linear.z = 0.0

        move_cmd.angular.x = 0.0
        move_cmd.angular.y = 0.0
        move_cmd.angular.z = angular_z

        self.pub.publish(move_cmd)

    def stop_robot(self):
        self.set_velocity(0.0, 0.0)

    def move_forward(self, seconds):
        move_cmd = Twist()
        move_cmd.linear.x = LINEAR_SPEED
        move_cmd.angular.z = 0.0

        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.pub.publish(move_cmd)
            self.rate.sleep()

        self.stop_robot()

    def move_backward(self, seconds):
        move_cmd = Twist()
        move_cmd.linear.x = -LINEAR_SPEED
        move_cmd.angular.z = 0.0

        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.pub.publish(move_cmd)
            self.rate.sleep()

        self.stop_robot()

    def turn_left(self, seconds):
        move_cmd = Twist()
        move_cmd.linear.x = 0.0
        move_cmd.angular.z = ANGULAR_SPEED

        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.pub.publish(move_cmd)
            self.rate.sleep()

        self.stop_robot()

    def turn_right(self, seconds):
        move_cmd = Twist()
        move_cmd.linear.x = 0.0
        move_cmd.angular.z = -ANGULAR_SPEED

        for _ in range(int(seconds * RATE_HZ)):
            if rospy.is_shutdown():
                break

            self.pub.publish(move_cmd)
            self.rate.sleep()

        self.stop_robot()

    def test_response(self):
        print("\nПроверка отзывчивости робота")
        print("Робот сделает короткие движения: вперед, назад, влево, вправо")

        self.move_forward(0.5)
        rospy.sleep(0.3)

        self.move_backward(0.5)
        rospy.sleep(0.3)

        self.turn_left(0.5)
        rospy.sleep(0.3)

        self.turn_right(0.5)
        rospy.sleep(0.3)

        self.stop_robot()

        print("Проверка завершена")

    # =========================
    # УПРАВЛЕНИЕ СТРЕЛКАМИ
    # =========================

    def get_key(self):
        settings = termios.tcgetattr(sys.stdin)

        try:
            tty.setraw(sys.stdin.fileno())
            select.select([sys.stdin], [], [], 0.1)
            key = sys.stdin.read(1)

            if key == "\x1b":
                key += sys.stdin.read(2)

        finally:
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)

        return key

    def keyboard_control(self):
        print("\n=== Управление роботом стрелками ===")
        print("↑  - ехать вперед")
        print("↓  - ехать назад")
        print("←  - поворот налево")
        print("→  - поворот направо")
        print("Пробел - стоп")
        print("q - выход в главное меню")
        print("")

        while not rospy.is_shutdown():
            key = self.get_key()

            if key == "\x1b[A":
                print("Вперед")
                self.set_velocity(LINEAR_SPEED, 0.0)

            elif key == "\x1b[B":
                print("Назад")
                self.set_velocity(-LINEAR_SPEED, 0.0)

            elif key == "\x1b[D":
                print("Налево")
                self.set_velocity(0.0, ANGULAR_SPEED)

            elif key == "\x1b[C":
                print("Направо")
                self.set_velocity(0.0, -ANGULAR_SPEED)

            elif key == " ":
                print("Стоп")
                self.stop_robot()

            elif key == "q":
                print("Выход из режима управления")
                self.stop_robot()
                break

            else:
                self.stop_robot()

    # =========================
    # ПРОСМОТР ТОПИКОВ И ДАТЧИКОВ
    # =========================

    def show_all_topics(self):
        print("\n=== Все ROS-топики ===")

        topics = rospy.get_published_topics()

        if not topics:
            print("Топики не найдены")
            return

        for topic_name, topic_type in topics:
            print(f"{topic_name}    [{topic_type}]")

    def show_sensor_topics(self):
        print("\n=== Возможные датчики робота ===")

        topics = rospy.get_published_topics()

        sensor_keywords = [
            "scan",
            "laser",
            "camera",
            "image",
            "imu",
            "odom",
            "joint",
            "battery",
            "ultrasound",
            "sonar",
            "depth",
            "pointcloud",
            "point_cloud",
            "tf"
        ]

        found = False

        for topic_name, topic_type in topics:
            lower_name = topic_name.lower()
            lower_type = topic_type.lower()

            for keyword in sensor_keywords:
                if keyword in lower_name or keyword in lower_type:
                    print(f"{topic_name}    [{topic_type}]")
                    found = True
                    break

        if not found:
            print("Датчики не найдены автоматически")
            print("Посмотри все топики через пункт меню 'Показать все ROS-топики'")

    def read_topic_once(self):
        topic_name = input("Введи название топика, например /odom или /scan: ").strip()

        if not topic_name:
            print("Название топика пустое")
            return

        print(f"\nЧитаю одно сообщение из {topic_name}")
        print("Если программа зависла, значит в этот топик сейчас ничего не приходит")
        print("")

        try:
            subprocess.call(["rostopic", "echo", "-n", "1", topic_name])
        except Exception as error:
            print("Ошибка чтения топика:", error)

    # =========================
    # СЕРВИСЫ И ДОПОЛНЕНИЯ
    # =========================

    def show_all_services(self):
        print("\n=== Все ROS-сервисы ===")

        try:
            services = rospy.get_param_names()
        except Exception:
            services = []

        try:
            result = subprocess.check_output(["rosservice", "list"])
            result = result.decode("utf-8")
            print(result)
        except Exception as error:
            print("Не удалось получить список сервисов:", error)

    def show_service_info(self):
        service_name = input("Введи название сервиса, например /reset или /stop_robot: ").strip()

        if not service_name:
            print("Название сервиса пустое")
            return

        try:
            subprocess.call(["rosservice", "info", service_name])
        except Exception as error:
            print("Ошибка просмотра сервиса:", error)

    def call_service(self):
        service_name = input("Введи название сервиса для вызова: ").strip()

        if not service_name:
            print("Название сервиса пустое")
            return

        print(f"\nВызываю сервис {service_name}")

        try:
            subprocess.call(["rosservice", "call", service_name])
        except Exception as error:
            print("Ошибка вызова сервиса:", error)

    # =========================
    # СИСТЕМНАЯ ДИАГНОСТИКА
    # =========================

    def show_ros_nodes(self):
        print("\n=== Активные ROS-узлы ===")

        try:
            subprocess.call(["rosnode", "list"])
        except Exception as error:
            print("Ошибка просмотра узлов:", error)

    def show_cmd_vel_info(self):
        print("\n=== Информация о топике управления ===")

        try:
            subprocess.call(["rostopic", "info", CMD_VEL_TOPIC])
        except Exception as error:
            print("Ошибка просмотра /cmd_vel:", error)

    def show_cmd_vel_hz(self):
        print("\n=== Частота публикации в /cmd_vel ===")
        print("Нажми Ctrl+C, чтобы остановить проверку частоты")

        try:
            subprocess.call(["rostopic", "hz", CMD_VEL_TOPIC])
        except Exception as error:
            print("Ошибка проверки частоты:", error)


def print_menu():
    print("\n==============================")
    print("      TURTLEBRO CONTROL")
    print("==============================")
    print("1  - Управление роботом стрелками")
    print("2  - Ехать вперед по времени")
    print("3  - Ехать назад по времени")
    print("4  - Повернуть налево по времени")
    print("5  - Повернуть направо по времени")
    print("6  - Стоп")
    print("7  - Тест отзывчивости")
    print("")
    print("8  - Показать все ROS-топики")
    print("9  - Показать возможные датчики")
    print("10 - Прочитать данные датчика / топика")
    print("")
    print("11 - Показать все ROS-сервисы / дополнения")
    print("12 - Показать информацию о сервисе")
    print("13 - Вызвать сервис")
    print("")
    print("14 - Показать активные ROS-узлы")
    print("15 - Информация о /cmd_vel")
    print("16 - Проверить частоту /cmd_vel")
    print("")
    print("0  - Выход")
    print("==============================")


def get_seconds():
    try:
        seconds = float(input("Введите время в секундах: "))
        if seconds <= 0:
            print("Время должно быть больше 0")
            return None

        return seconds

    except ValueError:
        print("Нужно ввести число")
        return None


def main():
    robot = TurtleBroController()

    try:
        while not rospy.is_shutdown():
            print_menu()
            choice = input("Выбери действие: ").strip()

            if choice == "1":
                robot.keyboard_control()

            elif choice == "2":
                seconds = get_seconds()
                if seconds is not None:
                    robot.move_forward(seconds)

            elif choice == "3":
                seconds = get_seconds()
                if seconds is not None:
                    robot.move_backward(seconds)

            elif choice == "4":
                seconds = get_seconds()
                if seconds is not None:
                    robot.turn_left(seconds)

            elif choice == "5":
                seconds = get_seconds()
                if seconds is not None:
                    robot.turn_right(seconds)

            elif choice == "6":
                robot.stop_robot()
                print("Робот остановлен")

            elif choice == "7":
                robot.test_response()

            elif choice == "8":
                robot.show_all_topics()

            elif choice == "9":
                robot.show_sensor_topics()

            elif choice == "10":
                robot.read_topic_once()

            elif choice == "11":
                robot.show_all_services()

            elif choice == "12":
                robot.show_service_info()

            elif choice == "13":
                robot.call_service()

            elif choice == "14":
                robot.show_ros_nodes()

            elif choice == "15":
                robot.show_cmd_vel_info()

            elif choice == "16":
                robot.show_cmd_vel_hz()

            elif choice == "0":
                robot.stop_robot()
                print("Выход из программы")
                break

            else:
                print("Нет такого пункта меню")

    except KeyboardInterrupt:
        print("\nПрограмма остановлена через Ctrl+C")
        robot.stop_robot()

    finally:
        robot.stop_robot()


if __name__ == "__main__":
    main()