Загрузка данных
#!/usr/bin/env python3
import rospy
from geometry_msgs.msg import Twist
from std_srvs.srv import Trigger, TriggerResponse
import sys
import select
import termios
import tty
import subprocess
CMD_VEL_TOPIC = "/cmd_vel"
LINEAR_SPEED = 0.15
ANGULAR_SPEED = 0.5
RATE_HZ = 10
class TurtleBroController:
def __init__(self):
rospy.init_node("turtlebro_full_controller", anonymous=True)
self.pub = rospy.Publisher(CMD_VEL_TOPIC, Twist, queue_size=10)
self.rate = rospy.Rate(RATE_HZ)
rospy.sleep(1)
print("TurtleBro controller запущен")
print("Топик управления:", CMD_VEL_TOPIC)
# =========================
# БАЗОВОЕ УПРАВЛЕНИЕ
# =========================
def set_velocity(self, linear_x=0.0, angular_z=0.0):
move_cmd = Twist()
move_cmd.linear.x = linear_x
move_cmd.linear.y = 0.0
move_cmd.linear.z = 0.0
move_cmd.angular.x = 0.0
move_cmd.angular.y = 0.0
move_cmd.angular.z = angular_z
self.pub.publish(move_cmd)
def stop_robot(self):
self.set_velocity(0.0, 0.0)
def move_forward(self, seconds):
move_cmd = Twist()
move_cmd.linear.x = LINEAR_SPEED
move_cmd.angular.z = 0.0
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.pub.publish(move_cmd)
self.rate.sleep()
self.stop_robot()
def move_backward(self, seconds):
move_cmd = Twist()
move_cmd.linear.x = -LINEAR_SPEED
move_cmd.angular.z = 0.0
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.pub.publish(move_cmd)
self.rate.sleep()
self.stop_robot()
def turn_left(self, seconds):
move_cmd = Twist()
move_cmd.linear.x = 0.0
move_cmd.angular.z = ANGULAR_SPEED
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.pub.publish(move_cmd)
self.rate.sleep()
self.stop_robot()
def turn_right(self, seconds):
move_cmd = Twist()
move_cmd.linear.x = 0.0
move_cmd.angular.z = -ANGULAR_SPEED
for _ in range(int(seconds * RATE_HZ)):
if rospy.is_shutdown():
break
self.pub.publish(move_cmd)
self.rate.sleep()
self.stop_robot()
def test_response(self):
print("\nПроверка отзывчивости робота")
print("Робот сделает короткие движения: вперед, назад, влево, вправо")
self.move_forward(0.5)
rospy.sleep(0.3)
self.move_backward(0.5)
rospy.sleep(0.3)
self.turn_left(0.5)
rospy.sleep(0.3)
self.turn_right(0.5)
rospy.sleep(0.3)
self.stop_robot()
print("Проверка завершена")
# =========================
# УПРАВЛЕНИЕ СТРЕЛКАМИ
# =========================
def get_key(self):
settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
select.select([sys.stdin], [], [], 0.1)
key = sys.stdin.read(1)
if key == "\x1b":
key += sys.stdin.read(2)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
return key
def keyboard_control(self):
print("\n=== Управление роботом стрелками ===")
print("↑ - ехать вперед")
print("↓ - ехать назад")
print("← - поворот налево")
print("→ - поворот направо")
print("Пробел - стоп")
print("q - выход в главное меню")
print("")
while not rospy.is_shutdown():
key = self.get_key()
if key == "\x1b[A":
print("Вперед")
self.set_velocity(LINEAR_SPEED, 0.0)
elif key == "\x1b[B":
print("Назад")
self.set_velocity(-LINEAR_SPEED, 0.0)
elif key == "\x1b[D":
print("Налево")
self.set_velocity(0.0, ANGULAR_SPEED)
elif key == "\x1b[C":
print("Направо")
self.set_velocity(0.0, -ANGULAR_SPEED)
elif key == " ":
print("Стоп")
self.stop_robot()
elif key == "q":
print("Выход из режима управления")
self.stop_robot()
break
else:
self.stop_robot()
# =========================
# ПРОСМОТР ТОПИКОВ И ДАТЧИКОВ
# =========================
def show_all_topics(self):
print("\n=== Все ROS-топики ===")
topics = rospy.get_published_topics()
if not topics:
print("Топики не найдены")
return
for topic_name, topic_type in topics:
print(f"{topic_name} [{topic_type}]")
def show_sensor_topics(self):
print("\n=== Возможные датчики робота ===")
topics = rospy.get_published_topics()
sensor_keywords = [
"scan",
"laser",
"camera",
"image",
"imu",
"odom",
"joint",
"battery",
"ultrasound",
"sonar",
"depth",
"pointcloud",
"point_cloud",
"tf"
]
found = False
for topic_name, topic_type in topics:
lower_name = topic_name.lower()
lower_type = topic_type.lower()
for keyword in sensor_keywords:
if keyword in lower_name or keyword in lower_type:
print(f"{topic_name} [{topic_type}]")
found = True
break
if not found:
print("Датчики не найдены автоматически")
print("Посмотри все топики через пункт меню 'Показать все ROS-топики'")
def read_topic_once(self):
topic_name = input("Введи название топика, например /odom или /scan: ").strip()
if not topic_name:
print("Название топика пустое")
return
print(f"\nЧитаю одно сообщение из {topic_name}")
print("Если программа зависла, значит в этот топик сейчас ничего не приходит")
print("")
try:
subprocess.call(["rostopic", "echo", "-n", "1", topic_name])
except Exception as error:
print("Ошибка чтения топика:", error)
# =========================
# СЕРВИСЫ И ДОПОЛНЕНИЯ
# =========================
def show_all_services(self):
print("\n=== Все ROS-сервисы ===")
try:
services = rospy.get_param_names()
except Exception:
services = []
try:
result = subprocess.check_output(["rosservice", "list"])
result = result.decode("utf-8")
print(result)
except Exception as error:
print("Не удалось получить список сервисов:", error)
def show_service_info(self):
service_name = input("Введи название сервиса, например /reset или /stop_robot: ").strip()
if not service_name:
print("Название сервиса пустое")
return
try:
subprocess.call(["rosservice", "info", service_name])
except Exception as error:
print("Ошибка просмотра сервиса:", error)
def call_service(self):
service_name = input("Введи название сервиса для вызова: ").strip()
if not service_name:
print("Название сервиса пустое")
return
print(f"\nВызываю сервис {service_name}")
try:
subprocess.call(["rosservice", "call", service_name])
except Exception as error:
print("Ошибка вызова сервиса:", error)
# =========================
# СИСТЕМНАЯ ДИАГНОСТИКА
# =========================
def show_ros_nodes(self):
print("\n=== Активные ROS-узлы ===")
try:
subprocess.call(["rosnode", "list"])
except Exception as error:
print("Ошибка просмотра узлов:", error)
def show_cmd_vel_info(self):
print("\n=== Информация о топике управления ===")
try:
subprocess.call(["rostopic", "info", CMD_VEL_TOPIC])
except Exception as error:
print("Ошибка просмотра /cmd_vel:", error)
def show_cmd_vel_hz(self):
print("\n=== Частота публикации в /cmd_vel ===")
print("Нажми Ctrl+C, чтобы остановить проверку частоты")
try:
subprocess.call(["rostopic", "hz", CMD_VEL_TOPIC])
except Exception as error:
print("Ошибка проверки частоты:", error)
def print_menu():
print("\n==============================")
print(" TURTLEBRO CONTROL")
print("==============================")
print("1 - Управление роботом стрелками")
print("2 - Ехать вперед по времени")
print("3 - Ехать назад по времени")
print("4 - Повернуть налево по времени")
print("5 - Повернуть направо по времени")
print("6 - Стоп")
print("7 - Тест отзывчивости")
print("")
print("8 - Показать все ROS-топики")
print("9 - Показать возможные датчики")
print("10 - Прочитать данные датчика / топика")
print("")
print("11 - Показать все ROS-сервисы / дополнения")
print("12 - Показать информацию о сервисе")
print("13 - Вызвать сервис")
print("")
print("14 - Показать активные ROS-узлы")
print("15 - Информация о /cmd_vel")
print("16 - Проверить частоту /cmd_vel")
print("")
print("0 - Выход")
print("==============================")
def get_seconds():
try:
seconds = float(input("Введите время в секундах: "))
if seconds <= 0:
print("Время должно быть больше 0")
return None
return seconds
except ValueError:
print("Нужно ввести число")
return None
def main():
robot = TurtleBroController()
try:
while not rospy.is_shutdown():
print_menu()
choice = input("Выбери действие: ").strip()
if choice == "1":
robot.keyboard_control()
elif choice == "2":
seconds = get_seconds()
if seconds is not None:
robot.move_forward(seconds)
elif choice == "3":
seconds = get_seconds()
if seconds is not None:
robot.move_backward(seconds)
elif choice == "4":
seconds = get_seconds()
if seconds is not None:
robot.turn_left(seconds)
elif choice == "5":
seconds = get_seconds()
if seconds is not None:
robot.turn_right(seconds)
elif choice == "6":
robot.stop_robot()
print("Робот остановлен")
elif choice == "7":
robot.test_response()
elif choice == "8":
robot.show_all_topics()
elif choice == "9":
robot.show_sensor_topics()
elif choice == "10":
robot.read_topic_once()
elif choice == "11":
robot.show_all_services()
elif choice == "12":
robot.show_service_info()
elif choice == "13":
robot.call_service()
elif choice == "14":
robot.show_ros_nodes()
elif choice == "15":
robot.show_cmd_vel_info()
elif choice == "16":
robot.show_cmd_vel_hz()
elif choice == "0":
robot.stop_robot()
print("Выход из программы")
break
else:
print("Нет такого пункта меню")
except KeyboardInterrupt:
print("\nПрограмма остановлена через Ctrl+C")
robot.stop_robot()
finally:
robot.stop_robot()
if __name__ == "__main__":
main()