Загрузка данных


import logging
import os
from datetime import datetime, timedelta
from pathlib import PurePosixPath
from typing import List

import allure

from constants.architecture_constants import ClickhouseConstants as CH_const
from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const
from infra.path_generator import ImitatorDataPathGenerator

logger = logging.getLogger(__name__)


class BaseCmdGenerator:
    def __init__(self, username: str, host: str) -> None:
        self._username = username
        self._host = host
        self._ssh_key_name: str = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)
        self._scp_cmd: str = ""
        self._choose_scp_cmd_by_os()

    def _choose_scp_cmd_by_os(self):
        if os.name == Im_const.OS_NAME_WIN:
            scp_cmd = f"scp -i {self._ssh_key_name}"
        else:
            scp_cmd = "scp"
        self._scp_cmd = scp_cmd


class TimeProcessor:
    """
    Класс для получения времени запуска и остановки имитатора
    Для получения времени:
    from utils.flag_generator import TimeProcessor
    time_processor = TimeProcessor(test_duration_m)
    start_time = time_processor.formatted_start_time
    stop_time = time_processor.formatted_stop_time
    start_time_dt = time_processor.start_time  # datetime объект для расчётов
    """

    def __init__(self, duration_m: float) -> None:
        self._duration_m = duration_m
        self._current_time: datetime = datetime.now()
        self._start_time: datetime = self._add_time_delta(seconds=Im_const.IMITATOR_START_DELAY_S)
        self._formatted_start_time: str = self._get_formatted_start_time()
        self._formatted_stop_time: str = self._get_formatted_stop_time()

    @property
    def start_time(self) -> datetime:
        """Возвращает время старта имитатора как datetime объект для расчётов интервалов"""
        return self._start_time

    @property
    def formatted_start_time(self) -> str:
        return self._formatted_start_time

    @property
    def formatted_stop_time(self) -> str:
        return self._formatted_stop_time

    def _add_time_delta(self, minutes: float = 0, seconds: int = 0) -> datetime:
        """
        :param minutes: время в минутах
        :param seconds: время в секундах
        :return: текущее время + добавленное время
        """
        try:
            delta = timedelta(minutes=minutes, seconds=seconds)
            current_time_with_delta = self._current_time + delta
            return current_time_with_delta

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при добавлении времени")
            raise

    @staticmethod
    def get_formatted_time(time) -> str:
        """
        :param time: время в формате datetime
        :return: время в формате строки, которую принимает имитатор
        """
        try:
            formatted_time = time.strftime(Im_const.IMITATOR_TIME_FORMAT)
            return formatted_time

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при форматировании времени")
            raise

    def _get_formatted_start_time(self) -> str:
        """
        :return: время запуска имитатора строкой
        """
        formatted_start_time = self.get_formatted_time(self._start_time)
        return formatted_start_time

    def _get_formatted_stop_time(self) -> str:
        """
        Добавляет время прогона ко времени отсрочки пуска имитатора
        :return: время остановки имитатора строкой
        """

        stop_time = self._add_time_delta(minutes=self._duration_m, seconds=Im_const.IMITATOR_START_DELAY_S)
        formatted_stop_time = self.get_formatted_time(stop_time)
        return formatted_stop_time


class ImitatorCmdGenerator:
    """
    Класс для получения команды запуска имитатора
    Большая часть флагов формируется из дефолтных значений.
    Для получения флагов:
    from utils.imitator_cmd_generator import ImitatorCmdGenerator
    cmd_generator = ImitatorCmdGenerator(path_to_test_data, host, test_duration_m)
    final_cmd = cmd_generator.generate_final_imitator_cmd()
    start_time_dt = cmd_generator.start_time  # datetime объект для расчётов интервалов утечек
    """

    def __init__(self, sandbox_path: str, stand_name: str, duration_m: float) -> None:
        self._sandbox_path = sandbox_path
        self._stand_name = stand_name
        self._duration_m = duration_m
        self._time_processor = TimeProcessor(self._duration_m)
        self._source_type: str = Im_const.SOURCE_TYPE_DEF_VALUE
        self._speed: int = Im_const.SPEED_DEF_VALUE
        self._opcua: str = os.environ.get(EnvKeyConstants.OPC_URL)
        self._ns: int = Im_const.NS_DEF_VALUE
        self.final_cmd: str = ""
        self._get_other_flags_values()
        self._generate_flags()

    @property
    def start_time(self) -> datetime:
        """Возвращает время старта имитатора как datetime объект для расчётов интервалов утечек"""
        return self._time_processor.start_time

    def _generate_inner_test_data_path(self, sub_path: str) -> str:
        """
        Добавляет название файла / директорию к пути хранения тестовых данных
        :param sub_path: название файла / директорию
        :return: полный путь к файлу / директории
        """
        try:
            result_path = PurePosixPath(self._sandbox_path) / sub_path
            return str(result_path)

        except (ValueError, TypeError, OSError):
            logger.exception(f"[ERROR] Ошибка при создании пути к данным прогона. Данные: {sub_path}")
            raise

    def _generate_sandbox_paths(self) -> tuple[str, str, str]:
        """
        :return: кортеж: пути к директории data и к файлам внутри директории с данными для запуска имитатора
        """
        # Формирует пути к папке с логами датчиков и файлам
        path_to_data = self._generate_inner_test_data_path(Im_const.SANDBOX_DATA)
        path_to_rules = self._generate_inner_test_data_path(Im_const.SANDBOX_RULES)
        path_to_tags = self._generate_inner_test_data_path(Im_const.SANDBOX_TAGS)

        return path_to_data, path_to_rules, path_to_tags

    def _get_target_host(self) -> str:
        """
        Получает target для флага из списка стендов
        :return: target для флага
        """
        try:
            return Im_const.HOST_MAP.get(self._stand_name, {}).get(Im_const.IMITATOR_KEY_NAME)

        except KeyError:
            logger.exception(f"[ERROR] Не удалось получить target для стенда: {self._stand_name}")
            raise

    def _get_other_flags_values(self):
        """
        Метод получения значений флагов
        """
        try:
            self._path_to_data, self._path_to_rules, self._path_to_tags = self._generate_sandbox_paths()
            self._start_time = self._time_processor.formatted_start_time
            self._stop_time = self._time_processor.formatted_stop_time
            self._target = self._get_target_host()

        except (AttributeError, ValueError):
            logger.exception("[ERROR] Ошибка при получении флагов")
            raise

    def _generate_flags(self) -> None:
        """
        Метод получения флагов
        :return: строку с флагами для запуска имитатора
        """
        try:
            # Формирует флаги
            command_parts = [
                f'  --rules="{self._path_to_rules}"',
                f'--source="{self._path_to_data}"',
                f'--sourceType="{self._source_type}"',
                f'--sourceTagTypes="{self._path_to_tags}"',
                f'--startTime="{self._start_time}"',
                f'--stopTime="{self._stop_time}"',
                f'--speed={self._speed}',
                f'--opcua="{self._opcua}"',
                f'--ns={self._ns}',
            ]

            if self._target:
                command_parts.append(f'--target="{self._target}"')

            self._flags = " ".join(command_parts)

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при создании итоговой версии флагов")
            raise

    def generate_final_imitator_cmd(self) -> str:
        """
        Собирает команду для запуска имитатора
        :return: финальная команда запуска имитатора
        """
        try:
            with allure.step(f"Запуск имитатора данных с флагами {self._flags}"):
                self.final_cmd = Im_const.IMITATOR_RUN_CMD + self._flags
                return self.final_cmd

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при создании итоговой команды для запуска")
            raise


class UploadImitatorDataCmdGenerator(BaseCmdGenerator):
    def __init__(self, username: str, host: str, path_generator: ImitatorDataPathGenerator) -> None:
        super().__init__(username=username, host=host)
        # Список ожидаемых файлов в архиве
        self.expected_files: list = [Im_const.SANDBOX_RULES]
        self._ssh_key_name: str = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)
        self._os_is_windows: bool = os.name == Im_const.OS_NAME_WIN
        self._path_generator = path_generator
        self._remote_temp_dir_path = self._path_generator.remote_temp_dir_path
        self._tar_package_name = self._path_generator.tar_package_name
        # Путь к архиву во временной директории на удаленном сервере
        self._full_remote_tar_path = self._path_generator.generate_full_remote_tar_path()

    def generate_check_remote_data_cmd(self) -> str:
        """
        Создает команду проверки существования директории с данными и сопутствующих файлов
        :return: команда для выполнения в консоли
        """
        expected_dir = Im_const.SANDBOX_DATA

        # Файлы для проверки после распаковки и копирования tags.txt
        files_to_check = [Im_const.SANDBOX_RULES, Im_const.SANDBOX_TAGS]

        check_dir_part = f"[ -d '{self._remote_temp_dir_path}/{expected_dir}' ]"
        check_parts = [check_dir_part]
        for file in files_to_check:
            check_parts.append(f"[ -f '{self._remote_temp_dir_path}/{file}' ]")

        condition = " && ".join(check_parts)
        check_cmd = f"if {condition}; then echo {Im_const.CMD_STATUS_OK}; else echo {Im_const.CMD_STATUS_FAIL}; fi"
        return check_cmd

    def generate_create_dir_cmd(self) -> str:
        """
        Генерирует команду создания временной директории
        """
        return f"mkdir -p {self._remote_temp_dir_path}"

    def generate_delete_dir_cmd(self) -> str:
        """
        Генерирует команду удаления временной директории
        """
        return f"rm -rf {self._remote_temp_dir_path}"

    def generate_copy_tar_to_remote_cmd(self) -> str:
        """
        Генерирует команду копирования данных на удаленный сервер
        """

        return f"{self._scp_cmd} {self._tar_package_name} {self._username}@{self._host}:{self._remote_temp_dir_path}/"

    def generate_unpack_tar_cmd(self) -> str:
        """
        Генерирует команду распаковки архива на удаленном сервере
        """
        return f"tar -xvzf {self._full_remote_tar_path} -C {self._remote_temp_dir_path}"

    def generate_check_tar_cmd(self) -> str:
        """
        Генерирует команду проверки архива на удаленном сервере
        """
        return f"tar -tzf {self._full_remote_tar_path}"

    def generate_copy_tags_cmd(self, tu_id: int) -> str:
        """
        Генерирует команду для копирования tags.txt с сервера во временную директорию текущего набора данных
        Файл tn{tu_id}_tags.txt копируется как tags.txt
        """
        source_path = f"{Im_const.CONFIG_PATH}/tn{tu_id}_tags.txt"
        target_path = f"{self._remote_temp_dir_path}/{Im_const.SANDBOX_TAGS}"
        return f"cp {source_path} {target_path}"


class ClickHouseCmdGenerator(BaseCmdGenerator):
    def __init__(self, username: str, host: str, configuration_file_name: str) -> None:
        super().__init__(username=username, host=host)
        self._configuration_file_name = configuration_file_name

    def generate_scp_config_file_cmd(self) -> str:
        """
        Генерирует команду копирования файла конфигурации со стенда
        """
        path_to_remote_configuration = self._generate_path_to_remote_configuration()
        return f"{self._scp_cmd} {self._username}@{self._host}:{path_to_remote_configuration} ."

    def generate_check_sensor_data_click_cmd(self, evo_id_pairs: List[tuple], table_name: str) -> str:
        """
        Генерирует команду проверки данных в ClickHouse по паре значений objectId и parameterId
        """
        sql_evo_id_pairs = self._generate_sql_evo_id_pairs(evo_id_pairs)
        return (
            f"echo \'SELECT COUNT(*) FROM {table_name} WHERE "
            f"({CH_const.OBJECT_ID_KEY_NAME}, {CH_const.PARAMETER_ID_KEY_NAME}) IN ({sql_evo_id_pairs})\' "
            f"| docker exec -i {CH_const.NAME_CONTAINER} clickhouse-client"
        )

    def generate_delete_clickhouse_keys_cmd(self, evo_id_pairs: List[tuple], table_name: str) -> str:
        """
        Генерирует команду удаления данных в ClickHouse по парам значений objectId и parameterId
        """
        sql_evo_id_pairs = self._generate_sql_evo_id_pairs(evo_id_pairs)
        return (
            f"echo \'DELETE FROM {table_name} WHERE "
            f"({CH_const.OBJECT_ID_KEY_NAME}, {CH_const.PARAMETER_ID_KEY_NAME}) IN ({sql_evo_id_pairs})\' "
            f"| docker exec -i {CH_const.NAME_CONTAINER} clickhouse-client"
        )

    @staticmethod
    def _generate_sql_evo_id_pairs(evo_id_pairs: List[tuple]) -> str:
        """
        Создает строку из списка пар значений evoObjectId и evoParameterId
        """
        return ", ".join("({}, {})".format(object_id, param_id) for object_id, param_id in evo_id_pairs)

    def _generate_path_to_remote_configuration(self) -> PurePosixPath:
        """
        Создает путь к файлу конфигурации конкретного стенда
        """
        return PurePosixPath(CH_const.CONFIG_PATH) / self._configuration_file_name