Загрузка данных
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import rospy
import sys
import select
import termios
import tty
import math
import subprocess
import os
from datetime import datetime
from geometry_msgs.msg import Twist, Pose2D
from nav_msgs.msg import Odometry
from sensor_msgs.msg import LaserScan, Imu, BatteryState, Image, CompressedImage, CameraInfo
from std_msgs.msg import Int16, Int32, Float32MultiArray
from diagnostic_msgs.msg import DiagnosticArray
CMD_VEL_TOPIC = "/cmd_vel"
LINEAR_SPEED = 0.15
ANGULAR_SPEED = 0.5
RATE_HZ = 10
class TurtleBroController:
def __init__(self):
rospy.init_node("turtlebro_full_controller", anonymous=True)
self.pub = rospy.Publisher(CMD_VEL_TOPIC, Twist, queue_size=10)
self.rate = rospy.Rate(RATE_HZ)
rospy.sleep(1)
print("TurtleBro controller запущен")
print("Топик управления:", CMD_VEL_TOPIC)
# ======================================================
# БАЗОВОЕ УПРАВЛЕНИЕ РОБОТОМ
# ======================================================
def set_velocity(self, linear_x=0.0, angular_z=0.0):
move_cmd = Twist()
move_cmd.linear.x = linear_x
move_cmd.linear.y = 0.0
move_cmd.linear.z = 0.0
move_cmd.angular.x = 0.0
move_cmd.angular.y = 0.0
move_cmd.angular.z = angular_z
self.pub.publish(move_cmd)
def stop_robot(self):
self.set_velocity(0.0, 0.0)
def move_forward(self, seconds):
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.set_velocity(LINEAR_SPEED, 0.0)
self.rate.sleep()
self.stop_robot()
def move_backward(self, seconds):
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.set_velocity(-LINEAR_SPEED, 0.0)
self.rate.sleep()
self.stop_robot()
def turn_left(self, seconds):
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.set_velocity(0.0, ANGULAR_SPEED)
self.rate.sleep()
self.stop_robot()
def turn_right(self, seconds):
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.set_velocity(0.0, -ANGULAR_SPEED)
self.rate.sleep()
self.stop_robot()
def test_response(self):
print("\n=== Проверка отзывчивости TurtleBro ===")
print("1. Короткое движение вперед")
self.move_forward(0.5)
rospy.sleep(0.3)
print("2. Короткое движение назад")
self.move_backward(0.5)
rospy.sleep(0.3)
print("3. Короткий поворот налево")
self.turn_left(0.5)
rospy.sleep(0.3)
print("4. Короткий поворот направо")
self.turn_right(0.5)
rospy.sleep(0.3)
self.stop_robot()
print("Проверка завершена")
# ======================================================
# УПРАВЛЕНИЕ СТРЕЛКАМИ
# ======================================================
def get_key(self):
settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
select.select([sys.stdin], [], [], 0.1)
key = sys.stdin.read(1)
if key == "\x1b":
key += sys.stdin.read(2)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
return key
def keyboard_control(self):
print("\n=== Управление роботом стрелками ===")
print("↑ - ехать вперед")
print("↓ - ехать назад")
print("← - поворот налево")
print("→ - поворот направо")
print("Пробел - стоп")
print("q - выход в главное меню")
print("")
while not rospy.is_shutdown():
key = self.get_key()
if key == "\x1b[A":
print("Вперед")
self.set_velocity(LINEAR_SPEED, 0.0)
elif key == "\x1b[B":
print("Назад")
self.set_velocity(-LINEAR_SPEED, 0.0)
elif key == "\x1b[D":
print("Налево")
self.set_velocity(0.0, ANGULAR_SPEED)
elif key == "\x1b[C":
print("Направо")
self.set_velocity(0.0, -ANGULAR_SPEED)
elif key == " ":
print("Стоп")
self.stop_robot()
elif key == "q":
print("Выход из режима управления")
self.stop_robot()
break
else:
self.stop_robot()
# ======================================================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ДЛЯ ДАТЧИКОВ
# ======================================================
def wait_msg(self, topic, msg_type, timeout=3.0):
try:
return rospy.wait_for_message(topic, msg_type, timeout=timeout)
except rospy.ROSException:
print(f"Нет данных с топика {topic} за {timeout} сек.")
return None
except Exception as error:
print(f"Ошибка чтения {topic}: {error}")
return None
def quaternion_to_yaw(self, q):
siny_cosp = 2.0 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
yaw = math.atan2(siny_cosp, cosy_cosp)
return yaw
# ======================================================
# ИСПОЛЬЗОВАНИЕ ДАТЧИКОВ
# ======================================================
def use_lidar_scan(self):
print("\n=== Лидар /scan ===")
msg = self.wait_msg("/scan", LaserScan)
if msg is None:
return
valid_ranges = []
for distance in msg.ranges:
if not math.isinf(distance) and not math.isnan(distance):
valid_ranges.append(distance)
if not valid_ranges:
print("Лидар работает, но валидных расстояний нет")
return
min_distance = min(valid_ranges)
max_distance = max(valid_ranges)
avg_distance = sum(valid_ranges) / len(valid_ranges)
print(f"Количество лучей: {len(msg.ranges)}")
print(f"Минимальная дистанция: {min_distance:.3f} м")
print(f"Максимальная дистанция: {max_distance:.3f} м")
print(f"Средняя дистанция: {avg_distance:.3f} м")
print(f"Угол начала: {msg.angle_min:.3f} рад")
print(f"Угол конца: {msg.angle_max:.3f} рад")
print(f"Шаг угла: {msg.angle_increment:.5f} рад")
if min_distance < 0.25:
print("Внимание: рядом препятствие")
else:
print("Перед роботом нет критически близкого препятствия")
def use_odom(self):
print("\n=== Одометрия /odom ===")
msg = self.wait_msg("/odom", Odometry)
if msg is None:
return
position = msg.pose.pose.position
orientation = msg.pose.pose.orientation
linear = msg.twist.twist.linear
angular = msg.twist.twist.angular
yaw = self.quaternion_to_yaw(orientation)
print(f"Позиция X: {position.x:.3f}")
print(f"Позиция Y: {position.y:.3f}")
print(f"Позиция Z: {position.z:.3f}")
print(f"Поворот yaw: {yaw:.3f} рад")
print(f"Линейная скорость X: {linear.x:.3f} м/с")
print(f"Угловая скорость Z: {angular.z:.3f} рад/с")
def use_pose2d(self):
print("\n=== Положение /odom_pose2d ===")
msg = self.wait_msg("/odom_pose2d", Pose2D)
if msg is None:
return
print(f"X: {msg.x:.3f}")
print(f"Y: {msg.y:.3f}")
print(f"Theta: {msg.theta:.3f} рад")
def use_imu(self):
print("\n=== IMU /imu ===")
msg = self.wait_msg("/imu", Imu)
if msg is None:
return
orientation = msg.orientation
angular_velocity = msg.angular_velocity
linear_acceleration = msg.linear_acceleration
yaw = self.quaternion_to_yaw(orientation)
print(f"Ориентация yaw: {yaw:.3f} рад")
print("")
print("Угловая скорость:")
print(f"X: {angular_velocity.x:.3f}")
print(f"Y: {angular_velocity.y:.3f}")
print(f"Z: {angular_velocity.z:.3f}")
print("")
print("Линейное ускорение:")
print(f"X: {linear_acceleration.x:.3f}")
print(f"Y: {linear_acceleration.y:.3f}")
print(f"Z: {linear_acceleration.z:.3f}")
def use_battery(self):
print("\n=== Батарея /bat ===")
msg = self.wait_msg("/bat", BatteryState)
if msg is None:
return
print(f"Напряжение: {msg.voltage:.2f} В")
print(f"Ток: {msg.current:.2f} А")
print(f"Заряд: {msg.charge:.2f}")
print(f"Ёмкость: {msg.capacity:.2f}")
print(f"Процент: {msg.percentage * 100:.1f}%")
if msg.percentage > 0:
if msg.percentage < 0.2:
print("Внимание: батарея почти разряжена")
elif msg.percentage < 0.5:
print("Батарея ниже половины")
else:
print("Батарея в норме")
else:
print("Процент батареи может не передаваться датчиком")
def use_camera_info(self):
print("\n=== Информация о камере /front_camera/camera_info ===")
msg = self.wait_msg("/front_camera/camera_info", CameraInfo)
if msg is None:
return
print(f"Ширина: {msg.width}")
print(f"Высота: {msg.height}")
print(f"Модель искажений: {msg.distortion_model}")
print(f"Матрица камеры K: {msg.K}")
print(f"Коэффициенты искажений D: {msg.D}")
def use_camera_raw_info(self):
print("\n=== Кадр камеры /front_camera/image_raw ===")
msg = self.wait_msg("/front_camera/image_raw", Image)
if msg is None:
return
print(f"Ширина: {msg.width}")
print(f"Высота: {msg.height}")
print(f"Кодировка: {msg.encoding}")
print(f"Размер данных: {len(msg.data)} байт")
print("")
print("Само изображение в терминал не выводится, потому что это огромный массив байтов.")
print("Для просмотра камеры лучше использовать:")
print("rqt_image_view /front_camera/image_raw")
def save_compressed_camera_image(self):
print("\n=== Сохранение кадра камеры /front_camera/image_raw/compressed ===")
msg = self.wait_msg("/front_camera/image_raw/compressed", CompressedImage)
if msg is None:
return
folder = os.path.expanduser("~/turtlebro_camera")
os.makedirs(folder, exist_ok=True)
filename = datetime.now().strftime("camera_%Y%m%d_%H%M%S.jpg")
filepath = os.path.join(folder, filename)
with open(filepath, "wb") as file:
file.write(msg.data)
print(f"Кадр сохранён: {filepath}")
print(f"Формат: {msg.format}")
print(f"Размер файла: {len(msg.data)} байт")
def use_diagnostics(self):
print("\n=== Диагностика /diagnostics ===")
msg = self.wait_msg("/diagnostics", DiagnosticArray)
if msg is None:
return
if not msg.status:
print("Диагностические статусы пустые")
return
for status in msg.status:
print("--------------------------------")
print(f"Имя: {status.name}")
print(f"Уровень: {status.level}")
print(f"Сообщение: {status.message}")
print(f"Аппаратный ID: {status.hardware_id}")
if status.values:
print("Параметры:")
for value in status.values:
print(f" {value.key}: {value.value}")
def use_buttons(self):
print("\n=== Кнопки /buttons ===")
msg = self.wait_msg("/buttons", Int16)
if msg is None:
return
print(f"Состояние кнопок: {msg.data}")
if msg.data == 0:
print("Кнопки не нажаты")
else:
print("Есть нажатие или активное состояние кнопки")
def use_thermovisor(self):
print("\n=== Термовизор /thermovisor ===")
msg = self.wait_msg("/thermovisor", Float32MultiArray)
if msg is None:
return
data = list(msg.data)
if not data:
print("Термовизор передал пустой массив")
return
min_temp = min(data)
max_temp = max(data)
avg_temp = sum(data) / len(data)
print(f"Количество значений: {len(data)}")
print(f"Минимум: {min_temp:.2f}")
print(f"Максимум: {max_temp:.2f}")
print(f"Среднее: {avg_temp:.2f}")
print("")
print("Первые 20 значений:")
print(data[:20])
def use_client_count(self):
print("\n=== Количество клиентов /client_count ===")
msg = self.wait_msg("/client_count", Int32)
if msg is None:
return
print(f"Количество клиентов: {msg.data}")
def echo_connected_clients(self):
print("\n=== Подключенные клиенты /connected_clients ===")
print("Читаю одно сообщение через rostopic echo")
self.echo_topic_once("/connected_clients")
def echo_web_telemetry(self):
print("\n=== Web telemetry /web_tele ===")
print("Это кастомный тип turtlebro_web/WebTelemetry, поэтому читаем через rostopic echo")
self.echo_topic_once("/web_tele")
def echo_raw_odom(self):
print("\n=== Raw odometry /raw_odom ===")
print("Это кастомный тип turtlebro/RawOdom, поэтому читаем через rostopic echo")
self.echo_topic_once("/raw_odom")
def echo_tf(self):
print("\n=== TF /tf ===")
self.echo_topic_once("/tf")
def echo_tf_static(self):
print("\n=== TF static /tf_static ===")
self.echo_topic_once("/tf_static")
# ======================================================
# АВТОМАТИЧЕСКИЕ ТЕСТЫ С ДАТЧИКАМИ
# ======================================================
def obstacle_check(self):
print("\n=== Проверка препятствия по лидару ===")
msg = self.wait_msg("/scan", LaserScan)
if msg is None:
return
valid_ranges = []
for distance in msg.ranges:
if not math.isinf(distance) and not math.isnan(distance):
valid_ranges.append(distance)
if not valid_ranges:
print("Нет валидных расстояний")
return
min_distance = min(valid_ranges)
print(f"Минимальное расстояние: {min_distance:.3f} м")
if min_distance < 0.20:
print("Очень близко препятствие. Движение опасно.")
elif min_distance < 0.50:
print("Препятствие рядом. Ехать надо аккуратно.")
else:
print("Свободно. Критичных препятствий рядом нет.")
def safe_forward_by_lidar(self):
print("\n=== Безопасное движение вперед по лидару ===")
seconds = get_seconds()
if seconds is None:
return
print("Робот будет ехать вперед, пока впереди нет близкого препятствия")
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
msg = self.wait_msg("/scan", LaserScan, timeout=1.0)
if msg is None:
print("Лидар не отвечает. Останавливаю робота.")
self.stop_robot()
return
valid_ranges = []
for distance in msg.ranges:
if not math.isinf(distance) and not math.isnan(distance):
valid_ranges.append(distance)
if not valid_ranges:
print("Нет данных лидара. Останавливаю робота.")
self.stop_robot()
return
min_distance = min(valid_ranges)
if min_distance < 0.35:
print(f"Препятствие близко: {min_distance:.3f} м. Стоп.")
self.stop_robot()
return
self.set_velocity(LINEAR_SPEED, 0.0)
self.rate.sleep()
self.stop_robot()
print("Безопасное движение завершено")
def battery_and_diagnostics_check(self):
print("\n=== Быстрая проверка состояния робота ===")
self.use_battery()
print("")
self.use_diagnostics()
def full_sensor_check(self):
print("\n=== Полная проверка основных датчиков ===")
self.use_battery()
print("")
self.use_odom()
print("")
self.use_pose2d()
print("")
self.use_imu()
print("")
self.use_lidar_scan()
print("")
self.use_camera_info()
print("")
self.use_buttons()
print("")
self.use_thermovisor()
print("")
self.use_client_count()
print("\nПолная проверка завершена")
# ======================================================
# ТОПИКИ, СЕРВИСЫ, ДИАГНОСТИКА ROS
# ======================================================
def echo_topic_once(self, topic_name):
try:
subprocess.call(["rostopic", "echo", "-n", "1", topic_name])
except Exception as error:
print("Ошибка чтения топика:", error)
def show_all_topics(self):
print("\n=== Все ROS-топики ===")
topics = rospy.get_published_topics()
if not topics:
print("Топики не найдены")
return
for topic_name, topic_type in topics:
print(f"{topic_name} [{topic_type}]")
def show_all_services(self):
print("\n=== Все ROS-сервисы ===")
try:
subprocess.call(["rosservice", "list"])
except Exception as error:
print("Ошибка просмотра сервисов:", error)
def show_ros_nodes(self):
print("\n=== Активные ROS-узлы ===")
try:
subprocess.call(["rosnode", "list"])
except Exception as error:
print("Ошибка просмотра узлов:", error)
def show_cmd_vel_info(self):
print("\n=== Информация о /cmd_vel ===")
try:
subprocess.call(["rostopic", "info", CMD_VEL_TOPIC])
except Exception as error:
print("Ошибка просмотра /cmd_vel:", error)
def read_any_topic_once(self):
topic_name = input("Введи название топика: ").strip()
if not topic_name:
print("Название топика пустое")
return
self.echo_topic_once(topic_name)
# ======================================================
# МЕНЮ
# ======================================================
def print_main_menu():
print("\n==============================")
print(" TURTLEBRO CONTROL")
print("==============================")
print("1 - Управление роботом стрелками")
print("2 - Ехать вперед по времени")
print("3 - Ехать назад по времени")
print("4 - Повернуть налево по времени")
print("5 - Повернуть направо по времени")
print("6 - Стоп")
print("7 - Тест отзывчивости")
print("")
print("8 - Все ROS-топики")
print("9 - Все ROS-сервисы")
print("10 - Активные ROS-узлы")
print("11 - Информация о /cmd_vel")
print("12 - Прочитать любой топик один раз")
print("")
print("13 - Датчики и топики TurtleBro")
print("14 - Безопасное движение вперед по лидару")
print("15 - Проверка препятствий по лидару")
print("16 - Быстрая проверка батареи и диагностики")
print("17 - Полная проверка основных датчиков")
print("")
print("0 - Выход")
print("==============================")
def print_sensor_menu():
print("\n======================================")
print(" ДАТЧИКИ И ТОПИКИ TURTLEBRO")
print("======================================")
print("1 - Лидар /scan")
print("2 - Одометрия /odom")
print("3 - Положение 2D /odom_pose2d")
print("4 - IMU /imu")
print("5 - Батарея /bat")
print("6 - Информация о камере /front_camera/camera_info")
print("7 - Информация о raw-кадре /front_camera/image_raw")
print("8 - Сохранить кадр с камеры /front_camera/image_raw/compressed")
print("9 - Диагностика /diagnostics")
print("10 - Кнопки /buttons")
print("11 - Термовизор /thermovisor")
print("12 - Количество клиентов /client_count")
print("13 - Подключенные клиенты /connected_clients")
print("14 - Web telemetry /web_tele")
print("15 - Raw odometry /raw_odom")
print("16 - TF /tf")
print("17 - TF static /tf_static")
print("")
print("0 - Назад")
print("======================================")
def get_seconds():
try:
seconds = float(input("Введите время в секундах: "))
if seconds <= 0:
print("Время должно быть больше 0")
return None
return seconds
except ValueError:
print("Нужно ввести число")
return None
def sensor_menu(robot):
while not rospy.is_shutdown():
print_sensor_menu()
choice = input("Выбери датчик/топик: ").strip()
if choice == "1":
robot.use_lidar_scan()
elif choice == "2":
robot.use_odom()
elif choice == "3":
robot.use_pose2d()
elif choice == "4":
robot.use_imu()
elif choice == "5":
robot.use_battery()
elif choice == "6":
robot.use_camera_info()
elif choice == "7":
robot.use_camera_raw_info()
elif choice == "8":
robot.save_compressed_camera_image()
elif choice == "9":
robot.use_diagnostics()
elif choice == "10":
robot.use_buttons()
elif choice == "11":
robot.use_thermovisor()
elif choice == "12":
robot.use_client_count()
elif choice == "13":
robot.echo_connected_clients()
elif choice == "14":
robot.echo_web_telemetry()
elif choice == "15":
robot.echo_raw_odom()
elif choice == "16":
robot.echo_tf()
elif choice == "17":
robot.echo_tf_static()
elif choice == "0":
break
else:
print("Нет такого пункта")
def main():
robot = TurtleBroController()
try:
while not rospy.is_shutdown():
print_main_menu()
choice = input("Выбери действие: ").strip()
if choice == "1":
robot.keyboard_control()
elif choice == "2":
seconds = get_seconds()
if seconds is not None:
robot.move_forward(seconds)
elif choice == "3":
seconds = get_seconds()
if seconds is not None:
robot.move_backward(seconds)
elif choice == "4":
seconds = get_seconds()
if seconds is not None:
robot.turn_left(seconds)
elif choice == "5":
seconds = get_seconds()
if seconds is not None:
robot.turn_right(seconds)
elif choice == "6":
robot.stop_robot()
print("Робот остановлен")
elif choice == "7":
robot.test_response()
elif choice == "8":
robot.show_all_topics()
elif choice == "9":
robot.show_all_services()
elif choice == "10":
robot.show_ros_nodes()
elif choice == "11":
robot.show_cmd_vel_info()
elif choice == "12":
robot.read_any_topic_once()
elif choice == "13":
sensor_menu(robot)
elif choice == "14":
robot.safe_forward_by_lidar()
elif choice == "15":
robot.obstacle_check()
elif choice == "16":
robot.battery_and_diagnostics_check()
elif choice == "17":
robot.full_sensor_check()
elif choice == "0":
robot.stop_robot()
print("Выход из программы")
break
else:
print("Нет такого пункта меню")
except KeyboardInterrupt:
print("\nПрограмма остановлена через Ctrl+C")
robot.stop_robot()
finally:
robot.stop_robot()
if __name__ == "__main__":
main()