Загрузка данных
import logging
import os
from urllib.parse import urlparse
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const
from constants.enums import TU
from infra.clickhouse_manager import ClickHouseManager
from infra.cmd_generator import ImitatorCmdGenerator
from infra.docker_manager import DockerContainerManager
from infra.imitator_data_uploader import ImitatorDataUploader
from infra.imitator_manager import ImitatorManager
from infra.redis_manager import RedisCleaner
logger = logging.getLogger(__name__)
class StandSetupManager:
"""
Подготовка стенда к запуску автотестов
Для запуска имитатора:
setup_manager = StandSetupManager(test_duration(minutes), test_data_id, test_data_name, tu_id)
setup_manager.setup_stand_for_imitator_run()
imitator_thread = threading.Thread(target=stand_manager.start_imitator, daemon=True)
core_thread = threading.Thread(target=stand_manager.start_core)
imitator_thread.start()
time.sleep(20)
core_thread.start()
Доступ к времени старта имитатора для расчёта интервалов утечек:
start_time = setup_manager.start_time # datetime объект
"""
def __init__(
self,
duration_m: float, # Максимальное время работы имитатора в минутах
test_data_id: int, # id тест кейса из которого будут загружены данные
test_data_name: str, # Название архива данных имитатора для загрузки из TestOps
tu_id: int,
username: str = os.environ.get(EnvKeyConstants.SSH_USER_DEV),
stand_name: str = os.environ.get(EnvKeyConstants.STAND_NAME),
) -> None:
self._duration_m = duration_m
self._test_data_id = test_data_id
self._test_data_name = test_data_name
self._tu_id = tu_id
self._username = username
self._stand_name = stand_name
self._configuration_file_name = self._get_configuration_file_name()
self._server_ip = self._get_server_ip() # Получает ip сервера из словаря
self._init_clients()
self._cmd_generator = self._choose_cmd_generator()
self._final_cmd = self._cmd_generator.generate_final_imitator_cmd()
# Экземпляр имитатор менеджера нужно создавать после генерации команды, отдельно от других клиентов
self._imitator_manager = ImitatorManager(self._stand_client, self._final_cmd)
@property
def start_time(self):
"""
Возвращает время старта имитатора как datetime объект.
Используется для расчёта интервалов утечек в тестах:
- leak_start_time = start_time + LEAK_START_INTERVAL
- leak_end_time = start_time + LEAK_START_INTERVAL + ALLOWED_TIME_DIFF_SECONDS
"""
return self._cmd_generator.start_time
def setup_stand_for_imitator_run(self) -> None:
"""
Обертка, в которой проходит полная подготовка стенда
"""
try:
if not os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
# При запуске с TestOps загружает данные для прогона
self._uploader.upload_with_confirm()
self.stop_all_containers()
self.clean_redis_and_clickhouse()
self.start_containers_without_core()
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка подготовки стенда к запуску имитатора"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def clean_redis_and_clickhouse(self):
"""
Чистит БД: Clickhouse и Redis
"""
# Копирование файла конфигурации на runner
self._clickhouse_manager.copy_configuration_file_from_stand()
# Чистка ключей Redis
self._redis_cleaner.delete_keys_with_check()
# Чистка ключей ClickHouse
self._clickhouse_manager.delete_clickhouse_keys_with_check()
def stop_all_containers(self):
"""
Останавливает все контейнеры и чистит БД: Clickhouse и Redis
"""
self._docker_manager.stop_all_lds_containers()
def start_containers_without_core(self):
"""
Запускает все контейнеры кроме core
"""
# Запуск lds-layer-builder
self._docker_manager.start_lds_layer_builder_containers()
# Запуск lds-journals
self._docker_manager.start_lds_journals_containers()
# Запуск lds-web-app
self._docker_manager.start_lds_web_app_containers()
# Запуск lds-api-gw
self._docker_manager.start_lds_api_gw_containers()
# Запуск lds-reports
self._docker_manager.start_lds_reports_containers()
def start_imitator(self) -> None:
"""
Запускает имитатор и собирает отдельно лог имитатора в файл
"""
try:
self._imitator_manager.run_imitator()
self._imitator_manager.log_imitator_stdout()
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка запуска имитатора"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def start_core(self) -> None:
"""
Запускает core контейнеры
"""
try:
# Запуск CORE
self._docker_manager.start_lds_core_containers()
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка запуска CORE"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def stop_imitator_wrapper(self) -> None:
"""
В teardown может вызываться даже если имитатор не запущен
"""
try:
if not self._imitator_manager.imitator_process:
logger.info("[TEARDOWN] [SKIP] Имитатор не был запущен")
return
self._imitator_manager.wait_and_stop_imitator()
logger.info("[TEARDOWN] [OK] Имитатор остановлен")
except Exception as error:
error_msg = "[TEARDOWN] [ERROR] Не удалось остановить имитатор"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def server_test_data_remover(self):
"""
Может вызваться в teardown если загрузка не была выполнена
"""
uploader = getattr(self, "_uploader", None)
if uploader is None:
logger.info("[TEARDOWN] [SKIP] uploader не инициализирован, удаление данных со стенда пропущено")
return
try:
uploader.delete_with_confirm()
except Exception as error:
error_msg = "[TEARDOWN] [ERROR] Не удалось удалить данные с сервера"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
@staticmethod
def _parse_opc_target() -> tuple[str, int]:
"""
Извлекает хост и порт OPC из переменной окружения OPC_URL.
"""
opc_url = os.environ.get(EnvKeyConstants.OPC_URL)
if not opc_url:
raise RuntimeError(f"[SETUP] [ERROR] Переменная окружения {EnvKeyConstants.OPC_URL} не задана")
parsed = urlparse(opc_url)
if not parsed.hostname or not parsed.port:
raise RuntimeError(
f"[SETUP] [ERROR] Некорректное значение OPC_URL: '{opc_url}'. Ожидается формат вида opc.tcp://host:port"
)
return parsed.hostname, parsed.port
def check_opc_server_status(self, timeout_s: int = 5) -> None:
"""
Проверяет доступность OPC сервера с сервера стенда через /dev/tcp.
"""
host, port = self._parse_opc_target()
check_cmd = (
f"if timeout {timeout_s} bash -lc 'cat < /dev/null > /dev/tcp/{host}/{port}'; "
f"then echo {Im_const.CMD_STATUS_OK}; else echo {Im_const.CMD_STATUS_FAIL}; fi"
)
result = self._stand_client.run_cmd(check_cmd, need_output=True)
if result != Im_const.CMD_STATUS_OK:
raise RuntimeError(f"[SETUP] [ERROR] OPC сервер {host}:{port} недоступен с сервера стенда")
logger.info(f"[SETUP] [OK] OPC сервер {host}:{port} доступен")
def _get_server_ip(self) -> str:
"""
Получает server ip из списка стендов
:return: server ip
"""
try:
return Im_const.HOST_MAP.get(self._stand_name, {}).get(Im_const.SERVER_IP_KEY_NAME)
except Exception as error:
error_msg = f"[SETUP] [ERROR] Не удалось получить server ip для стенда: {self._stand_name}"
logger.exception(error_msg)
raise ValueError(error_msg) from error
def _get_configuration_file_name(self) -> str:
"""
Получает имя файла конфигурации
"""
return TU.get_file_name_by_id(self._tu_id)
def _choose_cmd_generator(self) -> ImitatorCmdGenerator:
"""
Выбирает вариант генерации команды запуска имитатора, в зависимости от типа запуска
"""
try:
if os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
# Запуск без TestOps
return ImitatorCmdGenerator(self._test_data_name, self._stand_name, self._duration_m)
else:
self._uploader = ImitatorDataUploader(
self._stand_client, self._test_data_id, self._test_data_name, self._tu_id
)
self._data_path = self._uploader.remote_temp_dir_path
return ImitatorCmdGenerator(self._data_path, self._stand_name, self._duration_m)
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка при выборе варианта генерации команды запуска имитатора"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def _init_clients(self) -> None:
"""
Создает экземпляры необходимых для запуска клиентов
"""
try:
self._stand_client = SubprocessClient(self._username, self._server_ip)
self._infra_client = SubprocessClient(self._username, Im_const.REDIS_STAND_ADDRESS)
self._clickhouse_manager = ClickHouseManager(
self._stand_client, self._infra_client, self._configuration_file_name
)
self._docker_manager = DockerContainerManager(self._stand_client)
self._redis_cleaner = RedisCleaner(self._infra_client, self._stand_name)
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка инициализации клиентов"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error