Загрузка данных


fh[

import os


class ImitatorConstants:
    TEST_SETTINGS_KEY_NAME: str = "test_settings"
    IMITATOR_FLAGS_KEY_NAME: str = "imitator_flags"
    IMITATOR_TIME_FORMAT: str = "%Y%m%dT%H%M%S"
    IMITATOR_START_DELAY_S: int = 100
    IMITATOR_FINISH_DELAY_MINUTE: float = 2.0
    IMITATOR_CHECK_CMD: str = "pgrep -f Playground"
    IMITATOR_KILL_CMD: str = "pkill -f Playground"
    IMITATOR_PATH = "/data/imitator/lds-flow-playground-csv-latest"
    IMITATOR_RUN_CMD: str = f"dotnet {IMITATOR_PATH}/TN.LDS.Flow.Playground.Application.dll"
    IMITATOR_LOG_FILE_NAME: str = "imitator.log"
    IMITATOR_KEY_NAME: str = "imitator_key"
    SERVER_IP_KEY_NAME: str = "server_ip"
    SANDBOX_PATH: str = "Sandbox_path"  # Удалить при переработке json_config_model.py
    SANDBOX_DATA: str = "data"
    SANDBOX_RULES: str = "rules.txt"
    SANDBOX_TAGS: str = "tags.txt"
    STAND_ENV_NAMING: str = os.environ.get("STAND_NAME")[:-1]
    CONFIG_PATH: str = f"/data/{STAND_ENV_NAMING}/configs"
    SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME: str = "signal_unit_conversion_rules.json"
    SIGNAL_UNIT_CONVERSION_RULES_BACKUP_DIR: str = "original_conversion_rules"
    SOURCE_TYPE_DEF_VALUE: str = "inflow"
    SPEED_DEF_VALUE: int = 1
    NS_DEF_VALUE: int = 2
    KAFKA_OFFSET_EARLIEST: str = "earliest"
    KAFKA_POLL_TIMEOUT_S: float = 1.0
    KAFKA_SESSION_TIMEOUT_MS: int = 10000
    TEST_ID_KEY: str = "test_id"
    AUTOTEST_DATA_PATH: str = "/data/imitator/autotest_data"
    POPEN_WAIT_TIMOUT_S: int = 5
    LONG_PROCESS_TIMEOUT_S: int = 20
    CMD_STATUS_OK: str = "OK"
    CMD_STATUS_FAIL: str = "FAIL"
    REDIS_STAND_ADDRESS: str = "10.7.49.210"
    CORE_START_DELAY_S: int = 5
    ENCODING_UTF_8: str = "utf-8"
    ENCODING_UTF_8_SIG: str = "utf-8-sig"
    ENCODING_LATIN_1: str = "latin-1"
    WIN_ENCODING_CP866: str = "cp866"  # Нужна только для запуска под WIN
    WIN_ENCODING_CP1251: str = "cp1251"  # Нужна только для запуска под WIN
    OS_NAME_WIN: str = 'nt'
    DEFAULT_ENCODINGS = [ENCODING_UTF_8_SIG, ENCODING_UTF_8, WIN_ENCODING_CP866, WIN_ENCODING_CP1251, ENCODING_LATIN_1]

    HOST_MAP = {
        "dev1": {IMITATOR_KEY_NAME: "DEV1_", SERVER_IP_KEY_NAME: "10.7.49.37"},
        "dev2": {IMITATOR_KEY_NAME: "DEV2_", SERVER_IP_KEY_NAME: "10.7.49.38"},
        "dev3": {IMITATOR_KEY_NAME: "DEV3_", SERVER_IP_KEY_NAME: "10.7.49.205"},
        "test1": {IMITATOR_KEY_NAME: "TEST1_", SERVER_IP_KEY_NAME: "10.7.49.206"},
        "test2": {IMITATOR_KEY_NAME: "TEST2_", SERVER_IP_KEY_NAME: "10.7.49.207"},
        "test3": {IMITATOR_KEY_NAME: "TEST3_", SERVER_IP_KEY_NAME: "10.7.49.208"},
        "test4": {IMITATOR_KEY_NAME: "TEST4_", SERVER_IP_KEY_NAME: "10.7.49.209"},
    }


class ClickhouseConstants(ImitatorConstants):
    CH_TABLE_NAMES: list = ["lds.records", "lds.records_lastvalue"]
    EVO_OBJECT_ID_KEY_NAME: str = "evoObjectId"
    EVO_PARAMETER_ID_KEY_NAME: str = "evoParameterId"
    OBJECT_ID_KEY_NAME: str = "objectId"
    PARAMETER_ID_KEY_NAME: str = "parameterId"
    EVO_ID_PAIRS_CHUNK_SIZE: int = 450
    NAME_CONTAINER: str = "clickhouse-2"


class DockerConstants:
    HOSTNAME_CMD: str = "hostname"
    STOP_CMD: str = "docker stop"
    START_CMD: str = "docker start"
    CHECK_STATUS_CMD: str = "docker inspect -f '{{.State.Status}}'"
    RUNNING_STATUS: str = "running"
    EXITED_STATUS: str = "exited"
    CORE_CONTAINERS_GROUP: list = ["lds-core-node1", "lds-core-node2", "lds-core-node3"]
    LB_CONTAINERS_GROUP: list = ["lds-layer-builder-node1", "lds-layer-builder-node2", "lds-layer-builder-node3"]
    JOURNAL_CONTAINERS_GROUP: list = ["lds-journals-node1", "lds-journals-node2", "lds-journals-node3"]
    WEB_APP_CONTAINERS_GROUP: list = ["lds-web-app-node1", "lds-web-app-node2", "lds-web-app-node3"]
    API_GW_CONTAINERS_GROUP: list = ["lds-api-gw-node1", "lds-api-gw-node2", "lds-api-gw-node3"]
    REPORTS_CONTAINERS_GROUP: list = ["lds-reports-node1", "lds-reports-node2", "lds-reports-node3"]


class RedisConstants:
    LB_REDIS_KEY: str = "lds-layer-builder"
    CORE_REDIS_KEY: str = "lds-core"
    REDIS_KEY_FIND_CMD: str = "docker exec -i redis-redis-01-1-1 redis-cli KEYS"
    REDIS_KEY_DEL_CMD: str = "| xargs -r docker exec -i redis-redis-01-1-1 redis-cli DEL"


class KeycloakClientConstants:
    TOKEN_LEEWAY: int = 30
    GRANT_TYPE: str = "password"
    KEYCLOAK_HEADERS: dict = {"Content-Type": "application/x-www-form-urlencoded"}
    TOKEN_KEY: str = "access_token"
    ISSUED_AT_KEY: str = "issued_at"
    EXPIRES_IN_KEY: str = "expires_in"


class TestOpsConstants:
    TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
    TESTOPS_UPLOAD_ERROR_MSG: str = "Ошибка при загрузке файлов allure отчета"
    TESTOPS_UPLOAD_RESPONSE_MSG_KEY: str = "message"
    TESTOPS_UPLOAD_FILES_KEY: str = "files"
    POST_METHOD: str = "post"
    ALLURE_RESULTS_DIR_NAME: str = "allure-results"
    GZIP_FILE_SIGNATURE: bytes = b'\x1f\x8b'


class HTTPClientConstants:
    GET_METHOD: str = "get"
    POST_METHOD: str = "post"
    TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
    TESTOPS_ATTACHMENTS_LIST_ENDPOINT: str = "/test_cases/{test_case_id}/attachments"
    TESTOPS_LOAD_ATTACHMENT_ENDPOINT: str = "/test_cases/{test_case_id}/attachments/{attachment_id}?download=1"
    TESTOPS_ATTACHMENTS_KEY: str = "items"
    TESTOPS_ATTACHMENT_FILENAME_KEY: str = "original_filename"
    TESTOPS_ATTACHMENT_ID_KEY: str = "id"
    TEST_ID_KEY: str = "test_id"
    IMITATOR_RUN_DATA_FILENAME: str = "imitator_run_data.tar.gz"  # Название архива данных для прогона


class WebSocketClientConstants:
    RS: bytes = b'\x1E'  # ASCII Record Separator
    HANDSHAKE_WAITING: float | int = 5.0
    HANDSHAKE_MESSAGE: str = "{\"protocol\":\"messagepack\",\"version\":1}"
    WS_HUBS: str = "/hubs/ldsClientHub"
    START_INVOCATION_ID: str = 1
    DEFAULT_RECONNECT_INTERVAL: float | int = 5.0
    PING_INTERVAL: int = 3
    PING_TIMEOUT: int = 5
    CLOSE_TIMEOUT: int = 30
    DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1  # invocation
    STREAM_INVOCATION_MESSAGE_TYPE: int = 4  # StreamInvocation
    STREAM_ITEM_MESSAGE_TYPE: int = 2  # StreamItem
    COMPLETION_MESSAGE_TYPE: int = 3  # Completion
    # Текст ошибки Completion при неуспешном streaming (SignalR CompletionWithDetail)
    COMPLETION_ERROR_MESSAGE_INDEX: int = 4
    DEFAULT_SIGNALR_MAP_HEADERS: dict = {}
    EVENT_TYPE_INDEX = 3
    INVOCATION_ID_INDEX = 2
    SERVICE_NAME = "web-app"
    COMPONENT = "lds"
    ROOT_DOMAIN = "tn.tngrp.ru"
    FILTERING_TIMEOUT: int | float = 10.0
    ZONE_INFO: str = 'Europe/Moscow'


class MockConstants:
    MOCK_DURATION: int = 60
    MOCK_TEST_DATA_ID: int = 1
    MOCK_TEST_DATA_NAME: str = "mock.tar.gz"


class EnvKeyConstants:
    CONNECTION_HOST: str = "CONNECTION_HOST"
    KEYCLOAK_URL: str = "KEYCLOAK_URL"
    KEYCLOAK_CLIENT_ID: str = "KEYCLOAK_CLIENT_ID"
    KEYCLOAK_CLIENT_SECRET: str = "KEYCLOAK_CLIENT_SECRET"
    KEYCLOAK_USERNAME: str = "KEYCLOAK_USERNAME"
    KEYCLOAK_PASSWORD: str = "KEYCLOAK_PASSWORD"
    TESTOPS_BASE_URL: str = "TESTOPS_BASE_URL"
    SSH_KEY_NAME: str = "SSH_KEY_NAME"
    SSH_USER_DEV: str = "SSH_USER_DEV"
    STAND_NAME: str = "STAND_NAME"
    DATA_PATH: str = "DATA_PATH"
    OPC_URL: str = "OPC_URL"
    TU_ID: str = "TU_ID"




















енам
from enum import Enum, IntEnum, IntFlag
from typing import Mapping


class BaseStrEnum(Enum):
    def __str__(self) -> str:
        return f"{self.name} ({self.value})"


class BaseStrIntFlag(IntFlag):
    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [f"{flag.name} ({flag.value})" for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{', '.join(active_flags)}"

        return str(raw_value)


class BaseReasonEnum(IntFlag):
    """
    .report_text - название на русском языке.
    Вывод в формате report_text(value)
    """

    def __new__(cls, value: int, report_text: str):
        member = int.__new__(cls, value)
        member._value_ = value
        member.report_text = report_text
        return member

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [
            f"{flag.report_text} ({flag.value})" for flag in type(self) if flag.value and flag & self == flag
        ]
        if active_flags:
            return f"{', '.join(active_flags)}"

        return str(raw_value)

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст по числовому значению статуса"""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None


class TU(Enum):
    YAROSLAVL_MOSCOW = (1, "Ярославль - Москва", "volga.json")
    TIKHORETSK_NOVOROSSIYSK_2 = (2, "Тихорецк-Новороссийск-2", "tn2.json")
    TIKHORETSK_NOVOROSSIYSK_3 = (3, "Тихорецк-Новороссийск-3", "tn3.json")
    RODIONOVSKAYA_TIKHORETSKAYA = (4, "Родионовская–Тихорецкая", "lt3_rt.json")
    TIKHORETSKAYA_GRUSHEVAYA = (5, "Тихорецкая-6-Грушовая", "tn4_t6g.json")

    def __init__(self, tu_id: int, description: str, file_name: str) -> None:
        self.id = tu_id
        self.description = description
        self.file_name = file_name

    def __str__(self):
        return f"{self.id} - {self.description}"

    @classmethod
    def get_file_name_by_id(cls, target_id: int) -> str:
        for item in cls:
            if item.id == target_id:
                return item.file_name
        raise ValueError(f"ТУ с id = {target_id} не найден")


class ReplyStatus(Enum):
    OK = 200
    BAD_REQUEST = 400
    UNAUTHORIZED = 401
    FORBIDDEN = 403
    NOT_FOUND = 404
    REQUEST_TIMEOUT = 408
    CONFLICT = 409
    PRECONDITION_FAILED = 412
    RANGE_NOT_SATISFIABLE = 416
    TOO_MANY_REQUESTS = 429
    INTERNAL_SERVER_ERROR = 500
    NOT_IMPLEMENTED = 501
    SERVICE_UNAVAILABLE = 503
    GATEWAY_TIMEOUT = 504
    UNKNOWN_ERROR = 520


class ExportedDataType(IntEnum):
    """
    Тип экспортируемых данных
    """

    STATIONARY_STATUS_REPORT = 6
    LDS_STATUS_REPORT = 5
    LEAKS_REPORT = 4
    REJECTED_REPORT = 7

    def to_download_name(self) -> str:
        """Строковый тип для DownloadExportedDataRequest.exportedDataType"""
        return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]


_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
    ExportedDataType.STATIONARY_STATUS_REPORT: "MnStateReport",
    ExportedDataType.LDS_STATUS_REPORT: "LdsStateReport",
    ExportedDataType.LEAKS_REPORT: "LeaksReport",
    ExportedDataType.REJECTED_REPORT: "RejectedSignalsReport",
}


class ExportStatus(IntEnum):
    """Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""

    NOT_READY = 0
    DONE = 1


class StationaryStatus(BaseStrEnum):
    UNSTATIONARY = (1, 'Нестационарный режим работы МТ')  # Нестационарный режим
    STATIONARY = (2, 'Стационарный режим работы МТ')  # Стационарный режим
    STOPPED = (3, 'МТ в режиме остановленной перекачки')  # Режим остановкленной перекачки

    def __new__(cls, value: int, report_text: str) -> "StationaryStatus":
        member = object.__new__(cls)
        member._value_ = value
        member.report_text = report_text
        return member

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст режима СОУ для отчёта по числовому значению статуса"""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None


class LeakStatus(BaseStrEnum):
    CONFIRMED = 2
    WAITING = 1
    POSSIBLE = 3


class LeakLocationStatus(BaseStrEnum):
    NODATA = 1  # нет данных
    LEFT_FROM_PUMP_STATION = 2  # Слева от МНС
    RIGHT_FROM_PUMP_STATION = 3  # Справа от МНС
    INSIDE_PUMP_STATION = 4  # Внутри МНС
    INSIDE_NPS = 5  # Внутри НПС при неработающей/отсутствующей МНС


class FieldName(Enum):
    SECTION_TYPE = "sectionType"
    SIGNAL_TYPE = "signalType"


class FilterCriteriaType(Enum):
    ONE_OF = "oneOf"
    ALL_OF = "allOf"


class FilterCriteriaValue(Enum):
    MASK = "mask"
    MASK_REASON = "maskReason"
    LEAK = "leak"
    LEAK_COORDINATE = "leakCoordinate"
    PUMPING_STATUS = "pumpingStatus"
    FREE_FLOW = "freeFlow"
    ACKNOWLEDGE = "acknowledge"
    LEAK_TIME = "leakTime"
    LEAK_VOLUME = "leakVolume"
    LDS_STATUS = "ldsStatus"
    CONTROLLED_SITES = "controlledSites"
    LINEAR_PARTS = "linearParts"
    SERVER_DOWN = "serverDown"
    TIME_SYNCHRONIZATION_DISABLE = "timeSynchronizationDisable"
    FREE_FLOW_START_COORDINATE = "freeFlowStartCoordinate"


class SortingParam(Enum):
    OBJECT_NAME = "objectName"
    ADDRESS = "address"


class SortingType(Enum):
    ASCENDING = "ascending"
    DESCENDING = "descending"


class Direction(Enum):
    """Направление прокрутки"""

    PREV = 1
    NEXT = 2
    FIRST = 3
    LAST = 4


class LdsStatus(BaseStrEnum):
    """
    Режим работы СОУ.
    report_text - значение колонки 'Режим работы СОУ' в xlsx-отчёте об утечках.
    """

    FAULTY = (1, "СОУ неисправна")
    INITIALIZATION = (2, "СОУ в инициализации")
    DEGRADATION = (3, "СОУ в ухудшенных характеристиках")
    SERVICEABLE = (4, "СОУ исправна")

    def __new__(cls, value: int, report_text: str) -> "LdsStatus":
        member = object.__new__(cls)
        member._value_ = value
        member.report_text = report_text
        return member

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст режима СОУ для отчёта по числовому значению статуса"""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None


class ConfirmationStatus(BaseStrEnum):
    FAULTY = 0  # Неисправность
    AWAITING = 1  # Предварительная
    NOT_CONFIRMED = 2  # Не подтверждена
    CONFIRMED = 3  # Подтверждена
    CONFIRMED_AND_LEAK_CLOSED = 4  # Завершена


class ReservedType(BaseStrEnum):
    """Алгоритмы СОУ"""

    FAULTY = 0  # Неисправность
    STOP = 1  # Дифференциальный
    STATIONARY_FLOW = 2  # Стационарный
    UNSTATIONARY_FLOW = 3  # Модельный
    BALANCE_IN_NPS = 4  # Баланс внутри НПС
    CHANGED_IN_DECISION_MAKING = 5  # Стационарный + Изменено в АПР
    CREATED_IN_DECISION_MAKING = 6  # Создано в АПР


class MessageType(BaseStrIntFlag):
    AUTHENTICATION = 1  # Вход в систему
    REJECTION = 1 << 2  # Отбраковка сигналов
    LDS_STATUS = 1 << 3  # Режим работы СОУ
    INPUT_SIGNALS = 1 << 6  # Входные сигналы
    PUMPING_STATUS = 1 << 7  # Режим работы МТ
    MASKING_LDS = 1 << 8  # Маскирование СОУ
    FREE_FLOWS = 1 << 9  # Самотечное течение
    LEAKS = 1 << 10  # Утечка


class MessagePriority(BaseStrIntFlag):
    LOW = 1  # Прочее
    COMMON = 1 << 1  # Информационное
    MEDIUM = 1 << 2  # Значительное
    HIGH = 1 << 3  # Важное
    VERY_HIGH = 1 << 4  # Особой важности


class LdsStatusDegradation(BaseReasonEnum):
    """
    Причины режима работы СОУ: Ухудшение характеристик
    """

    LEAK_ON_ADJACENT_DIAGNOSTIC_AREAS = (1 << 0, 'Утечка на соседнем диагностическом участке')
    ADDITIVE_INJECTORS_OPERATION = (1 << 1, 'Наличие ПТП')
    PIG_SENSOR_PASSAGE = (1 << 2, 'Наличие СОД')
    TRIGGERING_EMERGENCY_RESET = (1 << 3, 'Срабатывание аварийного сброса или предохранительных клапанов')
    STARTING_PUMPING_OUT_PUMPS = (1 << 4, 'Работа насосов откачки')
    EXCEEDING_DISTANCE_BETWEEN_SERVICEABLE_PRESSURE_SENSORS = (
        1 << 5,
        'Расстояние между ближайшими исправными СИ давления на пути перекачки более 50 км',
    )
    FAULTY_PRESSURE_SENSORS_AT_PUMP_STATION_NODES = (1 << 6, 'Отказ СИ давления на входе/выходе НПС')
    REJECTION_TEMPERATURE_SENSOR = (1 << 7, 'Отказ СИ температуры')
    REJECTION_VISCOSITY_SENSOR = (1 << 8, 'Отказ СИ вязкости')
    REJECTION_DENSITY_SENSOR = (1 << 9, 'Отказ СИ плотности')
    GRAVITY_SECTION_IN_PUMPING_MODE = (1 << 10, 'Наличие самотечного участка/участка с неполным сечением')
    ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = (1 << 11, 'Менее 4 исправных СИ давления на разных КП ЛЧ и НПС')
    EXCEEDING_DISTANCE_BETWEEN_FLOW_METERS = (
        1 << 12,
        'Расстояние между ближайшими исправными СИ расхода на пути перекачки более 200 км',
    )
    GRAVITY_SECTION_IN_STOPPED_PUMPING_MODE = (
        1 << 13,
        'Наличие самотечного участка/участка с неполным сечением в режиме остановленной перекачки',
    )


class LdsStatusFaulty(BaseReasonEnum):
    """
    Причины режима работы СОУ: Неисправность
    """

    NO_DATA_SOURCE_CONNECTION = (1 << 0, 'Потеря связи СОУ с СДКУ')
    ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = (1 << 1, 'Менее 4 КП с достоверными СИ давления')
    ABSENCE_MIN_FLOW_METERS_REQUIRED_NUMBER = (1 << 2, 'Недостоверность граничного СИ расхода')


class LdsStatusInitialization(BaseReasonEnum):
    """
    Причины режима работы СОУ: Инициализация
    """

    ACCUMULATION_DATA = (1 << 0, 'Накопление данных')
    EXITING_FAULTY_MODE = (1 << 1, 'Выход СОУ из режима «Неисправна»')
    COLD_START_OF_SERVERS = (1 << 2, 'Одновременный «холодный» запуск нескольких серверов СОУ')
    SWITCHING_SHUT_OFF_IN_STOPPED_PUMPING_MODE = (
        1 << 3,
        'Переключение запорной арматуры в режиме остановленной перекачки',
    )
    USER_ACTION = (1 << 4, 'По команде пользователя')


class StationaryReason(BaseStrEnum):
    """
    Причины режима работы МТ: Стационар для ЭФ Журнал
    """

    # Отклонения давления и расхода не превышают допустимых отклонений
    PRESSURE_AND_FLOW_MOVING_AVERAGES_MEET_CRITERIA = "Отклонения давления и расхода не превышают допустимых отклонений"


class UnStationaryReason(BaseStrIntFlag):
    """
    Причины режима работы МТ: Нестационар
    """

    # Пуск/остановка трубопровода; включение/отключение магистрального насоса; включение/отключение НПС
    CHANGING_EQUIPMENT_STATUS = 1 << 0
    # Начало/окончание работы насосов откачки емкостей на НПС и ЛЧ технологического участка
    CHANGING_WORKING_OF_PUMPING_OUT_PUMPS = 1 << 1
    # Изменение частоты вращения в ручном режиме и/или изменение уставки регулирования в автоматическом режиме
    # работы МНА с ЧРП
    CHANGING_MAIN_PUMPS_ROTATION_SPEED = 1 << 2
    CHANGING_BLOCK_VALVES_STATUS = 1 << 3  # Полное или частичное открытие/закрытие задвижки
    SWITCHING_TANKS = 1 << 4  # Переключение резервуаров
    CHANGING_ACCEPTANCE_OR_DELIVERY_STATE = 1 << 5  # Начало или прекращение приема/сдачи нефти/нефтепродуктов
    TRIGGERING_EMERGENCY_RESET_OR_PWSS_OPERATION = 1 << 6  # Задействование аварийного сброса
    SAFETY_VALVES_ACTUATION = 1 << 7  # Срабатывание предохранительных клапанов
    # Изменение уставки регулирования по давлению узлов регулирования давления, работающих в автоматическом режиме
    # управления
    CHANGING_PRESSURE_SETTING = 1 << 8
    # Изменение процента открытия/закрытия заслонки узлов регулирования давления, работающих в ручном режиме управления
    CHANGING_OPENING_PERCENTAGE_VALVE = 1 << 9
    CHANGING_ADDITIVE_INJECTOR_STATUS_OR_FLOW = 1 << 10  # Начало/окончание ввода ПТП или изменение расхода вводимой ПТП
    LEAK_END = 1 << 11  # Окончание утечки
    # Наличие сигнала статуса «Открывается»/»Закрывается» запорной арматуры (не в режиме имитации),
    # расположенной в точке, гидравлически связанной с рассматриваемым ДУ
    TO_OPEN_OR_TO_CLOSE_STATUS = 1 << 12
    # Нестационарный режим работы/отсутствие сигнала о режиме работы смежного ТУ, работающего
    # в единой гидравлической системе с защищаемым ТУ
    ADJACENT_TU = 1 << 13
    COLD_START = 1 << 14  # Одновременный «холодный» запуск нескольких серверов СОУ


class StoppedPumpingReason(BaseStrIntFlag):
    """
    Причины режима работы МТ: Остановленный
    """

    # На ДУ отсутствуют работающие НА, при этом показания СИ расхода не превышают 1 % от максимального значения
    # диапазона измерений всех СИ расхода на технологическом участке
    STOPPING_PUMPS = 1 << 0
    CUTOFF_AREA = 1 << 1  # Участок отсечен запорной арматурой от подкачек/откачек


class RejectionCriteria(IntFlag):
    """Критерии отбраковки сигналов criteriaNames"""

    QUALITY = 1 << 0  # qualityRejection
    RANGE = 1 << 1  # rangeRejection
    EMPTY = 1 << 2  # emptyRejection
    TIME = 1 << 3  # timeRejection
    CONSTANT_SIGNAL = 1 << 4  # constantSignalRejection
    DISCHARGE = 1 << 5  # dischargeRejection
    VTOR = 1 << 6  # VTORRejection
    NEARBY = 1 << 7  # nearbyRejection
    DIAGNOSTIC_INFO = 1 << 8  # diagnInfoRejection

    @property
    def backend_name(self) -> str:
        names = {
            "QUALITY": "qualityRejection",
            "RANGE": "rangeRejection",
            "EMPTY": "emptyRejection",
            "TIME": "timeRejection",
            "CONSTANT_SIGNAL": "constantSignalRejection",
            "DISCHARGE": "dischargeRejection",
            "SIGMA3": "sigma3Rejection",
            "VTOR": "VTORRejection",
            "NEARBY": "nearbyRejection",
            "DIAGNOSTIC_INFO": "diagnInfoRejection",
        }
        return names.get(self.name or "", str(int(self)))

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{'|'.join(active_flags)} ({raw_value})"

        return str(raw_value)


class RejectionSensorTag(Enum):
    """
    Теги датчиков для тестов отбраковки (id, description=tag)
    Айди тегов подставляется на ходу из текущей версии конфы с сервера
    """

    KP_8_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pin")  # nearby_pressure_pin range_upper_pressure range_lower_pres
    NPS_TIH_5_Vmom = (0, "AK.CHTN.NPS_TIH_5.UZR_1.Vmom")  # diagnostic_info_flowrange_upper_flow range_lower_flow
    KP_8_Pout = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout")  # nearby_pressure_pout
    KP_209_1_Pin = (0, "AK.CHTN.LU_VELKRIM.KP_209-1.SW_215-3-1.Pin")  # empty_pressure
    KP_7_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_7.SW_6-3.Pin")  # vtor_pressure
    NPS_KRIM_P_Vmom = (0, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom")  # empty_flow quality_flow

    def __init__(self, sensor_id: int, description: str) -> None:
        self.id = sensor_id
        self.description = description

    @classmethod
    def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
        """
        Обновляет sensor_id по tag из конфигурации стенда.
        """
        missing_tags = []
        for sensor in cls:
            sensor_id = sensor_ids_by_address.get(sensor.description)
            if sensor_id is None:
                missing_tags.append(sensor.description)
                continue
            sensor.id = sensor_id

        if missing_tags:
            raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")

    def __str__(self):
        return f"{self.id} - {self.description}"


class UserActions(IntFlag):
    USER_LOGIN = 1  # Вход пользователя
    USER_EXIT = 1 << 1  # Выход пользователя
    FAILED_USER_LOGIN = 1 << 2  # Неуспешная попытка входа пользователя
    ALGORITHMS_REINITIALIZATION = 1 << 3  # Переинициализация алгоритмов
    SIGNAL_MASK_SIM = 1 << 4  # Маскирование и имитация входных сигналов
    LDS_MASKING = 1 << 5  # Маскирование СОУ
    EXPORT = 1 << 6  # Экспорт и выгрузки
    SETTINGS_CHANGE = 1 << 7  # Изменение настроек
    LEAK_ACK = 1 << 8  # Квитирование сообщения об утечке
    LEAK_REMOVE = 1 << 9  # Исключение неактивных утечек
    LDS_ADMIN = 1 << 10  # Администрирование СОУ
    PIG_CONTROL = 1 << 11  # Управление СОД


class SiteKpKp(Enum):
    """controlledSiteId, segmentId"""

    TIXORECZKAYA_NOVOVELICHKOVSKAYA = {'controlledSiteId': 6012, 'segmentId': 6013}
    NOVOVELICHKOVSKAYA_KRYMSKAYA = {'controlledSiteId': 6074, 'segmentId': 6075}
    KRYMSKAYA_GRUSHOVAYA = {'controlledSiteId': 6220, 'segmentId': 6221}
    BACKUP_ROUTE_BEJSUG = {'controlledSiteId': 6242, 'segmentId': 6243}
    BACKUP_ROUTE_PONURA = {'controlledSiteId': 6076, 'segmentId': 6077}
    BACKUP_ROUTE_KUBAN = {'controlledSiteId': 6088, 'segmentId': 6089}
    NPZ_AFIPSKIJ = {'controlledSiteId': 6244, 'segmentId': 6245}
    NPZ_ILINSKIJ = {'controlledSiteId': 6120, 'segmentId': 6121}


class SignalType(IntFlag):
    """Типы сигналов: режим МТ - 8, режим СОУ - 256, самотеки -16"""

    REGLU = 1 << 3
    REGSOU = 1 << 8
    GRAVITYPIPE = 1 << 4

    @property
    def backend_name(self) -> str:
        names = {
            "REGLU": "PumpingStatus",
            "REGSOU": "LdsStatus",
            "GRAVITYPIPE": "FreeFlow",
        }
        return names.get(self.name or "", str(int(self)))

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{'|'.join(active_flags)} ({raw_value})"

        return str(raw_value)


class GravityPipe(Enum):
    expected_lds_status_gravity_true = (1, "Наличие самотека")
    expected_lds_status_gravity_false = (0, "Отсутствие самотека")

    def __init__(self, status_id: int, description: str) -> None:
        self.id = status_id
        self.description = description

    def __str__(self):
        return f"{self.id} - {self.description}"


class MeasureConversionRule(Enum):
    MPA_MEASURE = "MPA_MEASURE"
    KG_CM_MEASURE = "KG_CM_MEASURE"


















тест конст
"""
Общие константы для тестов.
"""

from constants.enums import StationaryStatus


class BaseTN3Constants:
    # ===== Константы для запросов журнала =====
    COLUMN_SELECTION_DEF = [
        'Time',
        'User',
        'MainPipeline',
        'TechnologicalSection',
        'TechnologicalObject',
        'ControlPoint',
        'Object',
        'SignalName',
        'Event',
        'Value',
        'MessageType',
        'Tag',
        'Status',
    ]

    # ===== Типы сигналов и объектов =====
    PRESSURE_SENSOR_OBJECT_TYPE = 2
    FLOWMETER_OBJECT_TYPE = 3
    PRESSURE_SIGNAL_TYPE = 1
    FLOW_SIGNAL_TYPE = 4

    # ===== Суффиксы адресов выходных сигналов =====
    ADDRESS_SUFFIX_ACK_LEAK = "AckLeak"
    ADDRESS_SUFFIX_LEAK = "Leak"
    ADDRESS_SUFFIX_MASK = "Mask"
    ADDRESS_SUFFIX_POINT_LEAK = "PointLeak"
    ADDRESS_SUFFIX_Q_LEAK = "QLeak"
    ADDRESS_SUFFIX_TIME_LEAK = "TimeLeak"
    ADDRESS_SUFFIX_PUMPING_STATUS = "RegLU"
    ADDRESS_SUFFIX_LDS_STATUS = "RegSOU"

    # ===== Ключи поиска =====
    LEAK_LINEAR_PART_ID_KEY = "id"
    CONTROLLED_SITE_ID_AND_SEGMENT_ID = "controlledSiteId"

    # ===== Общее количество участков КП-КП =====
    LIMIT_CONTROLLED_SITES = 500
    COUNT_CONTROLLED_SITES = 114

    # ===== Ожидаемые значения выходных сигналов =====
    OUTPUT_IS_ACK_LEAK = "1"
    OUTPUT_IS_LEAK = "1"
    OUTPUT_IS_NOT_LEAK = "0"
    OUTPUT_IS_NOT_MASK = "0"
    OUTPUT_IS_MASK = "1"

    MASS_KG = 3600  # Коэффициент массы, нужно умножить, чтобы получить объем в м3/час
    KGS_SM2 = 98066  # Коэффициент давления, нужно умножить, чтобы получить объем в кгс/см2
    ALLOWED_VOLUME_DIFF = 0.3  # Относительная погрешность по объему
    ALLOWED_DISTANCE_DIFF_METERS = 5000  # Погрешность координаты в метрах
    KM_TO_METERS = 1000  # Перевод в метры
    LEAK_START_INTERVAL = 2100  # Интервал от старта имитатора до первого обнаружения утечки - 35 минут по умолчанию
    LEAK_LOCATION_STATUS = 1

    # ===== Параметры выходных сигналов =====
    OUTPUT_TEST_DELAY = 120  # Задержка для теста выходных сигналов в секундах
    OUTPUT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"  # Формат времени для парсинга выходных сигналов

    # ===== Параметры маскирования =====
    IS_MASKED_TRUE = True
    IS_MASKED_FALSE = False

    # ===== Параметры имитации =====
    PRESSURE_IMITATION_RANGE = (1, 40)
    VOLUME_IMITATION_RANGE = (100, 2400)
    GOOD_QUALITY_VAL = 1

    # ===== Константы журнала =====
    JOURNAL_EVENT_MASK = "Установка признака маскирования"
    JOURNAL_EVENT_UNMASK = "Снятие признака маскирования"
    JOURNAL_SIGNAL_PRESSURE = "Значение давления"
    JOURNAL_SIGNAL_FLOW = "Расход"
    JOURNAL_MESSAGE_TYPE_USER_ACTIONS = "Действия пользователя"
    JOURNAL_STATUS_SUCCESS = "Успешно"
    JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL = 2
    JOURNAL_MASK_PAGINATION_LIMIT = 10
    JOURNAL_EVENT_POSSIBLE_LEAK = "Возможна утечка"
    JOURNAL_EVENT_DETECTED_LEAK = "Утечка."
    JOURNAL_MESSAGE_TYPE_LEAKS = "Утечки"
    JOURNAL_EVENT_COMPLETED_LEAKS = "Утечка завершена"
    JOURNAL_EXPECTED_MASK_MSG_TOTAL = 4
    JOURNAL_MASK_EXPECTED_EVENTS = {"Установка признака маскирования", "Снятие признака маскирования"}
    JOURNAL_MASK_EXPECTED_SIGNALS = {"Значение давления", "Расход"}
    JOURNAL_PAGINATION_LIMIT = 10
    JOURNAL_PAGINATION_REJECT_LIMIT = 20
    JOURNAL_PAGINATION_STATUS_LIMIT = 120
    JOURNAL_STATUS_TOTAL_WAIT = 300  # Время установки режима в данных, в секундах
    JOURNAL_EVENT_LEAK_ACKNOWLEDGED = "Сообщение об утечке квитировано"
    JOURNAL_EVENT_LDS_INIT_ACCUM_DATA = "СОУ в инициализации (Накопление данных)"
    JOURNAL_EVENT_LDS_INIT_COLD_START = "СОУ в инициализации (Одновременный «холодный» запуск нескольких серверов СОУ)"
    JOURNAL_MESSAGE_TYPE_LDS_STATUS = "Режим работы СОУ"
    JOURNAL_MESSAGE_TYPE_REJECTION = "Отбраковка"
    JOURNAL_MESSAGE_EVENT_STATIONARY = (
        "Стационарный режим работы МТ (Отклонения давления и расхода не превышают допустимых отклонений)"
    )
    JOURNAL_MESSAGE_EVENT_NOT_STATIONARY = (
        "Нестационарный режим работы МТ (Одновременный «холодный» запуск нескольких серверов СОУ)"
    )
    JOURNAL_MESSAGE_EVENT_STOP = (
        "МТ в режиме остановленной перекачки (На ДУ отсутствуют работающие НА, "
        "при этом показания СИ расхода не превышают 1 % отмаксимального значения "
        "диапазона измерений всех СИ расхода на технологическом участке)"
    )
    SEC_PER_MIN = 60

    # ===== Параметры подтверждения =====
    IS_ACKNOWLEDGED_FALSE = False

    # ===== Параметры BalanceAlgorithmResults =====
    BALANCE_ALGORITHM_POLL_INTERVAL = 15  # Интервал опроса подписки в секундах
    BALANCE_ALGORITHM_TOTAL_WAIT = 300  # Общее время опроса в секундах
    DEBALANCE_TOLERANCE = 0.25  # Допустимое отклонение дебаланса от порога 30%

    # ===== Теги датчиков для маскирования и имитации =====
    PRESSURE_SENSOR_ADDRESS = "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout"
    FLOWMETER_ADDRESS = "AK.CHTN.NPS_TIH_5.UZR_1.Vmom"
    SENSOR_IDS_BY_ADDRESS = {}

    # ===== Прочие константы =====
    BASIC_MESSAGE_TIMEOUT = 10.0  # Таймаут ожидания сообщений в секундах
    MASK_MESSAGE_TIMEOUT = 180.0  # Таймаут ожидания сообщений в секундах
    PRECISION = 3  # Точность округления для координат
    DIGITS_WITH_DOT_PATTERN = r'\d+(?:\.\d+)?'  # Регулярное выражение для поиска чисел с точкой
    DIAGNOSTIC_AREA_BASE_IDS = {
        "Т-Н-3.НПС-5 «Тихорецкая».УЗР СИКН ТН-3 - Т-Н-3.НПС-5 «Тихорецкая».УЗР вых": (9992054907, (9992054908,)),
        "Т-Н-3.НПС-5 «Тихорецкая».УЗР вых - Т-Н-3.УЗР НПС-3 «Нововеличковская».": (
            9992054908,
            (9992054907, 9992054909),
        ),
        "Т-Н-3.УЗР НПС-3 «Нововеличковская». - Т-Н-3.НПС-2 «Крымская».УЗР СИКН Т-К": (
            9992054909,
            (9992054908, 9992054910, 9992054911),
        ),
        "Т-Н-3.НПС-2 «Крымская».УЗР СИКН Т-К - Т-Н-3.НПС-2 «Крымская».УЗР вых": (9992054910, (9992054909, 9992054912)),
        "Н-К.КП-0.УЗР 0км - Н-К.КП-30.УЗР 30км": (9992054911, (9992054909, 9992054913, 9992054914)),
        "Т-Н-3.НПС-2 «Крымская».УЗР вых - Т-Н-3.НПС«Крымская».УЗР вых Камеры пуска": (
            9992054912,
            (9992054910, 9992054915),
        ),
        "Н-К.КП-30.УЗР 30км - ПСП Афипский СИКН 1015 УЗР": (9992054913, (9992054911,)),
        "Н-К.УП ИНПЗ.УЗР 4,3км - Н-К.ИНПЗ.УЗР СИКН 1019": (9992054914, (9992054911,)),
        "Т-Н-3.НПС«Крымская».УЗР вых Камеры пуска - Т-Н-3.«Грушовая».УЗР-700": (9992054915, (9992054912,)),
    }
    REPRESENTATIVE_DIAGNOSTIC_AREA_IDS = [2, 3]  # Список показательных ДУ для определения режима СОУ
    ZONE_INFO: str = "Europe/Moscow"
    SECONDS_PER_HOUR: int = 3600
    CRITERIA_NAMES_FIELD: str = 'criteriaNames'


class ExportReportConstants:
    """Константы для теста формирования отчёта об утечках"""

    # Максимальное ожидание уведомления о готовности отчёта
    NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
    # Максимальное время ожидания появления отчёта в списке после уведомления
    LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
    # Интервал между запросами getExportedFilesListRequest
    LIST_POLL_INTERVAL_SECONDS: float = 10.0
    # Таймаут получения ответа на скачивание
    DOWNLOAD_TIMEOUT_SECONDS: float = 60.0

    # ===== Имя файла отчёта =====
    LEAKS_REPORT_NAME_PART: str = "Отчет об утечках"  # подстрока в имени файла/отчёта
    XLSX_EXTENSION: str = ".xlsx"
    # Сигнатура zip-архива, используется для проверки формата файла по содержимому
    ZIP_SIGNATURE: bytes = b'PK\x03\x04'

    # ===== Формат даты/времени в отчёте =====
    REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
    # Регулярное выражение для извлечения двух дат из заголовка
    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    # Регулярное выражение для извлечения двух дат из названия файла
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет об утечках (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

    # Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    # ===== Названия колонок =====
    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_LDS_STATUS: str = "Режим работы СОУ"
    COL_MASK_INFO: str = "Информация о маскировании"
    COL_COORDINATE: str = "Координата"
    COL_LEAK_VOLUME: str = "Объемный расход утечки"
    COL_MT_MODE: str = "Режим работы МТ"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_LDS_STATUS,
        COL_MASK_INFO,
        COL_COORDINATE,
        COL_LEAK_VOLUME,
        COL_MT_MODE,
    ]

    MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"

    # ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
    STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
        StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
        StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
        StationaryStatus.STOPPED.value: "МТ в режиме остановленной перекачки",
    }

    # ===== Прочее =====
    DEFAULT_SHEET_INDEX: int = 0

    SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST: str = "SubscribeReportsDataExportedRequest"
    EXPORT_REPORTS_COMMAND_REQUEST: str = "ExportReportsCommandRequest"
    REPORT_DATA_EXPORTED_NOTIFICATION: str = "ReportDataExportedNotification"
    GET_EXPORTED_DATA_LIST_REQUEST: str = "GetExportedDataListRequest"
    EXPORTED_DATA_LIST_LIMIT: int = 10
    DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"

    # Допустимая погрешность при сравнении границ периода отчёта
    REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
    # Формат даты/времени в имени скачиваемого xlsx-файла
    REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"


class ExportLdsStatusReportConstants:
    """Константы для теста формирования xlsx-отчёта о режиме работы СОУ"""

    LDS_STATUS_REPORT_NAME_PART: str = "Отчет о режиме работы СОУ"
    SECTION_NAMES: list[str] = [
        "НПС-5 Тихорецкая - НПС-3 Нововеличковская",
        "НПС-3 Нововеличковская - НПС-2 Крымская",
        "НПС-2 Крымская - НПС Грушовая",
    ]
    TOTAL_WORK_DURATION_LABEL: str = "Суммарное время работы:"
    ZERO_DURATION_TEXT: str = "0:00:00"
    TOTAL_DURATION_TOLERANCE_SECONDS: int = 5
    # Число частей времени при split(':') - часы:минуты:секунды (1:02:51) и минуты:секунды (02:51)
    DURATION_PARTS_COUNT_H_MM_SS: int = 3
    DURATION_PARTS_COUNT_MM_SS: int = 2

    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    COL_SECTION: str = "Наименование участка"
    COL_FAULTY: str = "Неисправность"
    COL_DEGRADATION: str = "В ухудшенных характеристиках"
    COL_INITIALIZATION: str = "Инициализация"
    COL_SERVICEABLE: str = "Исправность"

    MODE_DURATION_COLUMNS: list = [
        COL_FAULTY,
        COL_DEGRADATION,
        COL_INITIALIZATION,
        COL_SERVICEABLE,
    ]

    EXPECTED_COLUMN_HEADERS: list = [COL_SECTION, *MODE_DURATION_COLUMNS]

    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет о режиме работы СОУ с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет о режиме работы СОУ\. (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )


class ExportRejectedReportConstants:
    """Константы для теста формирования xlsx-отчёта об отбракованных входных данных"""

    REJECTED_REPORT_NAME_PART: str = "Отчет об отбракованных входных данных"
    REJECTED_REPORT_NAME_PART_ALT: str = "Отчёт об отбракованных входных данных"

    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_EVENT: str = "Событие"
    COL_VALUE: str = "Значение"
    COL_DURATION: str = "Продолжительность отбраковки"
    COL_TAG: str = "Тег сигнала"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_EVENT,
        COL_VALUE,
        COL_DURATION,
        COL_TAG,
    ]

    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'[Оо]тч[её]т об отбракованных входных данных с '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^[Оо]тч[её]т об отбракованных входных данных (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

    TIME_FILTER_TOLERANCE_SECONDS: int = 60

    # Суффикс сигнала в колонке "Объект" отчёта (после последней точки в строке)
    REPORT_SIGNAL_FLOW: str = "Расход"
    REPORT_SIGNAL_PRESSURE: str = "Давление"
    REPORT_SIGNAL_SUFFIX_BY_EXPECTED_NAME: dict = {
        BaseTN3Constants.JOURNAL_SIGNAL_FLOW: REPORT_SIGNAL_FLOW,
        BaseTN3Constants.JOURNAL_SIGNAL_PRESSURE: REPORT_SIGNAL_PRESSURE,
    }

    # Разбор колонки "Объект": участок трубопровода и суффикс сигнала разделяются последней точкой
    OBJECT_SIGNAL_SEPARATOR: str = "."
    OBJECT_SIGNAL_RSPLIT_MAXSPLIT: int = 1

    REJECTED_REPORT_HEADER_TITLE_PART: str = "отчет об отбракованных входных данных с"
    REJECTED_REPORT_HEADER_TITLE_PART_ALT: str = "отчёт об отбракованных входных данных с"


class MeasureUnitConstants:
    MPA_MEASURE: str = "MPa"
    KG_CM_MEASURE: str = "kgf/cm^2"


class ExportMtModeReportConstants:
    """Константы для теста формирования xlsx-отчёта о режиме работы МТ"""

    MT_MODE_REPORT_NAME_PART: str = "Отчет о режиме работы МТ"
    SECTION_NAMES: list[str] = [
        "НПС-5 Тихорецкая - НПС-3 Нововеличковская",
        "НПС-3 Нововеличковская - НПС-2 Крымская",
        "НПС-2 Крымская - НПС Грушовая",
    ]
    TOTAL_WORK_DURATION_LABEL: str = "Суммарное время работы:"
    ZERO_DURATION_TEXT: str = "0:00:00"
    TOTAL_DURATION_TOLERANCE_SECONDS: int = 5
    DURATION_PARTS_COUNT_H_MM_SS: int = 3
    DURATION_PARTS_COUNT_MM_SS: int = 2

    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    COL_SECTION: str = "Наименование участка"
    COL_STOPPED: str = "Остановленный"
    COL_UNSTATIONARY: str = "Нестационарный"
    COL_STATIONARY: str = "Стационарный"

    MODE_DURATION_COLUMNS: list = [
        COL_STOPPED,
        COL_UNSTATIONARY,
        COL_STATIONARY,
    ]

    EXPECTED_COLUMN_HEADERS: list = [COL_SECTION, *MODE_DURATION_COLUMNS]

    STATIONARY_STATUS_TO_COLUMN: dict = {
        StationaryStatus.STOPPED.value: COL_STOPPED,
        StationaryStatus.UNSTATIONARY.value: COL_UNSTATIONARY,
        StationaryStatus.STATIONARY.value: COL_STATIONARY,
    }

    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет о режиме работы МТ с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет о режиме работы МТ\. (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )


















кх мен
import json
import logging
from pathlib import Path
from typing import Any, List, Optional

from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import ClickhouseConstants as CH_const
from infra.cmd_generator import ClickHouseCmdGenerator

logger = logging.getLogger(__name__)


class ClickHouseManager:
    """
    Класс работы с clickhouse
    Пример использования:
    click_manager = ClickHouseManager(stand_client, infra_client)
    click_manager.copy_configuration_file_from_stand() - для загрузки конфигурации со стенда
    click_manager.delete_clickhouse_keys_with_check() - для удаления данных по определенным ключам
    """

    def __init__(
        self,
        stand_client: SubprocessClient,
        infra_client: SubprocessClient,
        configuration_file_name: str,
    ) -> None:
        self._stand_client = stand_client
        self._infra_client = infra_client
        self._configuration_file_name = configuration_file_name
        self._username = stand_client.username
        self._stand_host = stand_client.host
        self._infra_host = infra_client.host
        self._evo_id_pairs: List[tuple] = []
        self._cmd_generator = ClickHouseCmdGenerator(self._username, self._stand_host, configuration_file_name)

    def copy_configuration_file_from_stand(self) -> None:
        """
        Копирует файл конфигурации со стенда в корень проекта
        """
        copy_cmd = self._cmd_generator.generate_scp_config_file_cmd()
        configuration_file_path = Path(self._configuration_file_name)
        if not configuration_file_path.exists():
            try:
                self._stand_client.run_cmd(copy_cmd, timeout=CH_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)
                logger.info(f"[CLICKHOUSE] [OK] файл: {self._configuration_file_name} успешно сохранен на runner")

            except Exception as error:
                error_msg = f"[CLICKHOUSE] [ERROR] При сохранении файла: {self._configuration_file_name} на runner"
                logger.exception(error_msg)
                raise RuntimeError(error_msg) from error

    def delete_clickhouse_keys_with_check(self) -> None:
        """
        Метод удаления данных по ключам с проверкой из ClickHouse командами
        """

        evo_id_pairs_chunks = self._split_pairs_list()
        for table_name in CH_const.CH_TABLE_NAMES:
            for chunk in evo_id_pairs_chunks:
                self._delete_clickhouse_keys(chunk, table_name)
                self._check_clickhouse_keys(chunk, table_name)
            logger.info(f"[CLICKHOUSE] [OK] Успех! Данные всех датчиков в таблице {table_name} удалены")

    def _delete_clickhouse_keys(self, evo_id_pairs: List[tuple], table_name) -> None:
        """
        Метод удаления данных по ключам из ClickHouse командой, используя clickhouse-client
        """
        delete_cmd = self._cmd_generator.generate_delete_clickhouse_keys_cmd(evo_id_pairs, table_name)
        try:
            self._infra_client.run_cmd(delete_cmd)
        except Exception as error:
            error_msg = f"[CLICKHOUSE] [ERROR] При удалении данных в таблице командой: {delete_cmd}."
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def _check_clickhouse_keys(self, evo_id_pairs: List[tuple], table_name: str) -> None:
        """
        Метод проверки удаления данных по ключам из ClickHouse командой, используя clickhouse-client
        """

        check_cmd = self._cmd_generator.generate_check_sensor_data_click_cmd(evo_id_pairs, table_name)
        try:
            result = self._infra_client.run_cmd(check_cmd, need_output=True)
            try:
                result_int = int(result)
            except (TypeError, ValueError) as error:
                error_msg = (
                    f"[CLICKHOUSE] [ERROR] Результат: {result} проверки количества записей после удаления,"
                    " не является числом или равен None"
                )
                logger.exception(error_msg)
                raise TypeError(error_msg) from error
            if result_int != 0:
                error_msg = f"[CLICKHOUSE] [ERROR] Осталось: {result} записей после удаления"
                logger.error(error_msg)
                raise RuntimeError(error_msg)
        except Exception as error:
            error_msg = f"[CLICKHOUSE] [ERROR] При проверке данных в таблице командой: {check_cmd}."
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def _extract_evo_id_pairs_from_configuration(self) -> None:
        """
        Получение списка пар значений evoObjectId и evoParameterId из файла конфигурации
        """
        configuration_json = self._read_configuration_file()
        self._evo_id_pairs = self._extract_evo_id_pairs(configuration_json)

    def _extract_evo_id_pairs(self, configuration_json: Any) -> List[tuple]:
        """
        Получение списка уникальных пар значений evoObjectId и evoParameterId
        """
        uniq_pairs = set()
        evo_id_pairs = []
        stack = [configuration_json]
        try:
            while stack:
                current_element = stack.pop()  # Забирает последний элемент списка
                if isinstance(current_element, dict):
                    evo_id_pair = self._extract_evo_id_pair(current_element)
                    if evo_id_pair is not None and evo_id_pair not in uniq_pairs:
                        uniq_pairs.add(evo_id_pair)
                        evo_id_pairs.append(evo_id_pair)
                    # Добавляет все значения текущего элемента в список в обратном порядке
                    stack.extend(reversed(current_element.values()))
                elif isinstance(current_element, list):
                    stack.extend(reversed(current_element))
        except Exception as error:
            error_msg = (
                f"[CLICKHOUSE] [ERROR] При получении списка пар {CH_const.EVO_OBJECT_ID_KEY_NAME} и "
                f"{CH_const.EVO_PARAMETER_ID_KEY_NAME} из конфигурации "
            )
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error
        if not evo_id_pairs:
            error_msg = (
                f"[CLICKHOUSE] [ERROR] Пустой список при получении списка пар {CH_const.EVO_OBJECT_ID_KEY_NAME} и "
                f"{CH_const.EVO_PARAMETER_ID_KEY_NAME} из конфигурации "
            )
            logger.error(error_msg)
            raise ValueError(error_msg)

        return evo_id_pairs

    @staticmethod
    def _extract_evo_id_pair(element: dict) -> Optional[tuple]:
        """
        Ищет пару значений evoObjectId и evoParameterId в словаре
        """
        try:
            evo_id = element[CH_const.EVO_OBJECT_ID_KEY_NAME]
            param_id = element[CH_const.EVO_PARAMETER_ID_KEY_NAME]
            if (isinstance(evo_id, int) and evo_id != 0) and (isinstance(param_id, int) and param_id != 0):
                return evo_id, param_id
        except (AttributeError, KeyError, TypeError):
            pass

    def _read_configuration_file(self) -> Any:
        """
        Чтение файла конфигурации, использует список кодировок
        """
        error_msg = (
            f"[CLICKHOUSE] [ERROR] Не удалось декодировать файл {self._configuration_file_name} "
            f"в кодировках {CH_const.DEFAULT_ENCODINGS}"
        )
        for encoding in CH_const.DEFAULT_ENCODINGS:
            try:
                with open(self._configuration_file_name, "r", encoding=encoding, errors="strict") as conf_file:
                    data = json.load(conf_file)
                    if not data:
                        error_msg = f"Пустой json (кодировка:{encoding}"
                        logger.error(error_msg)
                        raise ValueError(error_msg)
                    return data
            except UnicodeDecodeError:
                # следующая кодировка
                continue
            except Exception as error:
                logger.exception(error_msg)
                raise OSError(error_msg) from error
        logger.exception(error_msg)
        raise OSError(error_msg)

    def _split_pairs_list(self) -> List[list]:
        """
        Делит список пар на части
        """
        if not self._evo_id_pairs:
            self._extract_evo_id_pairs_from_configuration()
        return [
            # fmt: off
            self._evo_id_pairs[i:i + CH_const.EVO_ID_PAIRS_CHUNK_SIZE]
            # fmt: on
            for i in range(0, len(self._evo_id_pairs), CH_const.EVO_ID_PAIRS_CHUNK_SIZE)
        ]

























смд ген


import logging
import os
from datetime import datetime, timedelta
from pathlib import PurePosixPath
from typing import List

import allure

from constants.architecture_constants import ClickhouseConstants as CH_const
from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const
from infra.path_generator import ImitatorDataPathGenerator

logger = logging.getLogger(__name__)


class BaseCmdGenerator:
    def __init__(self, username: str, host: str) -> None:
        self._username = username
        self._host = host
        self._ssh_key_name: str = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)
        self._scp_cmd: str = ""
        self._choose_scp_cmd_by_os()

    def _choose_scp_cmd_by_os(self):
        if os.name == Im_const.OS_NAME_WIN:
            scp_cmd = f"scp -i {self._ssh_key_name}"
        else:
            scp_cmd = "scp"
        self._scp_cmd = scp_cmd


class TimeProcessor:
    """
    Класс для получения времени запуска и остановки имитатора
    Для получения времени:
    from utils.flag_generator import TimeProcessor
    time_processor = TimeProcessor(test_duration_m)
    start_time = time_processor.formatted_start_time
    stop_time = time_processor.formatted_stop_time
    start_time_dt = time_processor.start_time  # datetime объект для расчётов
    """

    def __init__(self, duration_m: float) -> None:
        self._duration_m = duration_m
        self._current_time: datetime = datetime.now()
        self._start_time: datetime = self._add_time_delta(seconds=Im_const.IMITATOR_START_DELAY_S)
        self._formatted_start_time: str = self._get_formatted_start_time()
        self._formatted_stop_time: str = self._get_formatted_stop_time()

    @property
    def start_time(self) -> datetime:
        """Возвращает время старта имитатора как datetime объект для расчётов интервалов"""
        return self._start_time

    @property
    def formatted_start_time(self) -> str:
        return self._formatted_start_time

    @property
    def formatted_stop_time(self) -> str:
        return self._formatted_stop_time

    def _add_time_delta(self, minutes: float = 0, seconds: int = 0) -> datetime:
        """
        :param minutes: время в минутах
        :param seconds: время в секундах
        :return: текущее время + добавленное время
        """
        try:
            delta = timedelta(minutes=minutes, seconds=seconds)
            current_time_with_delta = self._current_time + delta
            return current_time_with_delta

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при добавлении времени")
            raise

    @staticmethod
    def get_formatted_time(time) -> str:
        """
        :param time: время в формате datetime
        :return: время в формате строки, которую принимает имитатор
        """
        try:
            formatted_time = time.strftime(Im_const.IMITATOR_TIME_FORMAT)
            return formatted_time

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при форматировании времени")
            raise

    def _get_formatted_start_time(self) -> str:
        """
        :return: время запуска имитатора строкой
        """
        formatted_start_time = self.get_formatted_time(self._start_time)
        return formatted_start_time

    def _get_formatted_stop_time(self) -> str:
        """
        Добавляет время прогона ко времени отсрочки пуска имитатора
        :return: время остановки имитатора строкой
        """

        stop_time = self._add_time_delta(minutes=self._duration_m, seconds=Im_const.IMITATOR_START_DELAY_S)
        formatted_stop_time = self.get_formatted_time(stop_time)
        return formatted_stop_time


class ImitatorCmdGenerator:
    """
    Класс для получения команды запуска имитатора
    Большая часть флагов формируется из дефолтных значений.
    Для получения флагов:
    from utils.imitator_cmd_generator import ImitatorCmdGenerator
    cmd_generator = ImitatorCmdGenerator(path_to_test_data, host, test_duration_m)
    final_cmd = cmd_generator.generate_final_imitator_cmd()
    start_time_dt = cmd_generator.start_time  # datetime объект для расчётов интервалов утечек
    """

    def __init__(self, sandbox_path: str, stand_name: str, duration_m: float) -> None:
        self._sandbox_path = sandbox_path
        self._stand_name = stand_name
        self._duration_m = duration_m
        self._time_processor = TimeProcessor(self._duration_m)
        self._source_type: str = Im_const.SOURCE_TYPE_DEF_VALUE
        self._speed: int = Im_const.SPEED_DEF_VALUE
        self._opcua: str = os.environ.get(EnvKeyConstants.OPC_URL)
        self._ns: int = Im_const.NS_DEF_VALUE
        self.final_cmd: str = ""
        self._get_other_flags_values()
        self._generate_flags()

    @property
    def start_time(self) -> datetime:
        """Возвращает время старта имитатора как datetime объект для расчётов интервалов утечек"""
        return self._time_processor.start_time

    def _generate_inner_test_data_path(self, sub_path: str) -> str:
        """
        Добавляет название файла / директорию к пути хранения тестовых данных
        :param sub_path: название файла / директорию
        :return: полный путь к файлу / директории
        """
        try:
            result_path = PurePosixPath(self._sandbox_path) / sub_path
            return str(result_path)

        except (ValueError, TypeError, OSError):
            logger.exception(f"[ERROR] Ошибка при создании пути к данным прогона. Данные: {sub_path}")
            raise

    def _generate_sandbox_paths(self) -> tuple[str, str, str]:
        """
        :return: кортеж: пути к директории data и к файлам внутри директории с данными для запуска имитатора
        """
        # Формирует пути к папке с логами датчиков и файлам
        path_to_data = self._generate_inner_test_data_path(Im_const.SANDBOX_DATA)
        path_to_rules = self._generate_inner_test_data_path(Im_const.SANDBOX_RULES)
        path_to_tags = self._generate_inner_test_data_path(Im_const.SANDBOX_TAGS)

        return path_to_data, path_to_rules, path_to_tags

    def _get_target_host(self) -> str:
        """
        Получает target для флага из списка стендов
        :return: target для флага
        """
        try:
            return Im_const.HOST_MAP.get(self._stand_name, {}).get(Im_const.IMITATOR_KEY_NAME)

        except KeyError:
            logger.exception(f"[ERROR] Не удалось получить target для стенда: {self._stand_name}")
            raise

    def _get_other_flags_values(self):
        """
        Метод получения значений флагов
        """
        try:
            self._path_to_data, self._path_to_rules, self._path_to_tags = self._generate_sandbox_paths()
            self._start_time = self._time_processor.formatted_start_time
            self._stop_time = self._time_processor.formatted_stop_time
            self._target = self._get_target_host()

        except (AttributeError, ValueError):
            logger.exception("[ERROR] Ошибка при получении флагов")
            raise

    def _generate_flags(self) -> None:
        """
        Метод получения флагов
        :return: строку с флагами для запуска имитатора
        """
        try:
            # Формирует флаги
            command_parts = [
                f'  --rules="{self._path_to_rules}"',
                f'--source="{self._path_to_data}"',
                f'--sourceType="{self._source_type}"',
                f'--sourceTagTypes="{self._path_to_tags}"',
                f'--startTime="{self._start_time}"',
                f'--stopTime="{self._stop_time}"',
                f'--speed={self._speed}',
                f'--opcua="{self._opcua}"',
                f'--ns={self._ns}',
            ]

            if self._target:
                command_parts.append(f'--target="{self._target}"')

            self._flags = " ".join(command_parts)

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при создании итоговой версии флагов")
            raise

    def generate_final_imitator_cmd(self) -> str:
        """
        Собирает команду для запуска имитатора
        :return: финальная команда запуска имитатора
        """
        try:
            with allure.step(f"Запуск имитатора данных с флагами {self._flags}"):
                self.final_cmd = Im_const.IMITATOR_RUN_CMD + self._flags
                return self.final_cmd

        except (ValueError, TypeError):
            logger.exception("[ERROR] Ошибка при создании итоговой команды для запуска")
            raise


class UploadImitatorDataCmdGenerator(BaseCmdGenerator):
    def __init__(self, username: str, host: str, path_generator: ImitatorDataPathGenerator) -> None:
        super().__init__(username=username, host=host)
        # Список ожидаемых файлов в архиве
        self.expected_files: list = [Im_const.SANDBOX_RULES]
        self._ssh_key_name: str = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)
        self._os_is_windows: bool = os.name == Im_const.OS_NAME_WIN
        self._path_generator = path_generator
        self._remote_temp_dir_path = self._path_generator.remote_temp_dir_path
        self._tar_package_name = self._path_generator.tar_package_name
        # Путь к архиву во временной директории на удаленном сервере
        self._full_remote_tar_path = self._path_generator.generate_full_remote_tar_path()

    def generate_check_remote_data_cmd(self) -> str:
        """
        Создает команду проверки существования директории с данными и сопутствующих файлов
        :return: команда для выполнения в консоли
        """
        expected_dir = Im_const.SANDBOX_DATA

        # Файлы для проверки после распаковки и копирования tags.txt
        files_to_check = [Im_const.SANDBOX_RULES, Im_const.SANDBOX_TAGS]

        check_dir_part = f"[ -d '{self._remote_temp_dir_path}/{expected_dir}' ]"
        check_parts = [check_dir_part]
        for file in files_to_check:
            check_parts.append(f"[ -f '{self._remote_temp_dir_path}/{file}' ]")

        condition = " && ".join(check_parts)
        check_cmd = f"if {condition}; then echo {Im_const.CMD_STATUS_OK}; else echo {Im_const.CMD_STATUS_FAIL}; fi"
        return check_cmd

    def generate_create_dir_cmd(self) -> str:
        """
        Генерирует команду создания временной директории
        """
        return f"mkdir -p {self._remote_temp_dir_path}"

    def generate_delete_dir_cmd(self) -> str:
        """
        Генерирует команду удаления временной директории
        """
        return f"rm -rf {self._remote_temp_dir_path}"

    def generate_copy_tar_to_remote_cmd(self) -> str:
        """
        Генерирует команду копирования данных на удаленный сервер
        """

        return f"{self._scp_cmd} {self._tar_package_name} {self._username}@{self._host}:{self._remote_temp_dir_path}/"

    def generate_unpack_tar_cmd(self) -> str:
        """
        Генерирует команду распаковки архива на удаленном сервере
        """
        return f"tar -xvzf {self._full_remote_tar_path} -C {self._remote_temp_dir_path}"

    def generate_check_tar_cmd(self) -> str:
        """
        Генерирует команду проверки архива на удаленном сервере
        """
        return f"tar -tzf {self._full_remote_tar_path}"

    def generate_copy_tags_cmd(self, tu_id: int) -> str:
        """
        Генерирует команду для копирования tags.txt с сервера во временную директорию текущего набора данных
        Файл tn{tu_id}_tags.txt копируется как tags.txt
        """
        source_path = f"{Im_const.CONFIG_PATH}/tn{tu_id}_tags.txt"
        target_path = f"{self._remote_temp_dir_path}/{Im_const.SANDBOX_TAGS}"
        return f"cp {source_path} {target_path}"


class ClickHouseCmdGenerator(BaseCmdGenerator):
    def __init__(self, username: str, host: str, configuration_file_name: str) -> None:
        super().__init__(username=username, host=host)
        self._configuration_file_name = configuration_file_name

    def generate_scp_config_file_cmd(self) -> str:
        """
        Генерирует команду копирования файла конфигурации со стенда
        """
        path_to_remote_configuration = self._generate_path_to_remote_configuration()
        return f"{self._scp_cmd} {self._username}@{self._host}:{path_to_remote_configuration} ."

    def generate_check_sensor_data_click_cmd(self, evo_id_pairs: List[tuple], table_name: str) -> str:
        """
        Генерирует команду проверки данных в ClickHouse по паре значений objectId и parameterId
        """
        sql_evo_id_pairs = self._generate_sql_evo_id_pairs(evo_id_pairs)
        return (
            f"echo \'SELECT COUNT(*) FROM {table_name} WHERE "
            f"({CH_const.OBJECT_ID_KEY_NAME}, {CH_const.PARAMETER_ID_KEY_NAME}) IN ({sql_evo_id_pairs})\' "
            f"| docker exec -i {CH_const.NAME_CONTAINER} clickhouse-client"
        )

    def generate_delete_clickhouse_keys_cmd(self, evo_id_pairs: List[tuple], table_name: str) -> str:
        """
        Генерирует команду удаления данных в ClickHouse по парам значений objectId и parameterId
        """
        sql_evo_id_pairs = self._generate_sql_evo_id_pairs(evo_id_pairs)
        return (
            f"echo \'DELETE FROM {table_name} WHERE "
            f"({CH_const.OBJECT_ID_KEY_NAME}, {CH_const.PARAMETER_ID_KEY_NAME}) IN ({sql_evo_id_pairs})\' "
            f"| docker exec -i {CH_const.NAME_CONTAINER} clickhouse-client"
        )

    @staticmethod
    def _generate_sql_evo_id_pairs(evo_id_pairs: List[tuple]) -> str:
        """
        Создает строку из списка пар значений evoObjectId и evoParameterId
        """
        return ", ".join("({}, {})".format(object_id, param_id) for object_id, param_id in evo_id_pairs)

    def _generate_path_to_remote_configuration(self) -> PurePosixPath:
        """
        Создает путь к файлу конфигурации конкретного стенда
        """
        return PurePosixPath(CH_const.CONFIG_PATH) / self._configuration_file_name


class SignalUnitConversionCmdGenerator(BaseCmdGenerator):
    def generate_scp_signal_rules_from_stand_cmd(self) -> str:
        """
        Генерирует команду копирования signal_unit_conversion_rules.json со стенда на runner
        """
        remote_path = self._generate_path_to_remote_signal_rules()
        return f"{self._scp_cmd} {self._username}@{self._host}:{remote_path} ."

    def generate_scp_signal_rules_to_stand_cmd(self, local_file_path: str) -> str:
        """
        Генерирует команду копирования локального файла на стенд
        """
        remote_path = self._generate_path_to_remote_signal_rules()
        return f"{self._scp_cmd} {local_file_path} {self._username}@{self._host}:{remote_path}"

    @staticmethod
    def _generate_path_to_remote_signal_rules() -> PurePosixPath:
        return PurePosixPath(Im_const.CONFIG_PATH) / Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME






















конфиг мен
import json
import logging
from typing import Any

from constants.architecture_constants import ImitatorConstants as Imitator_const
from utils.helpers.configuration_utils import extract_sensor_ids_by_address

logger = logging.getLogger(__name__)


class ConfigurationManager:
    """
    Читает локальную конфигурацию стенда и извлекает из нее данные для тестов.
    """

    def __init__(self, configuration_file_name: str) -> None:
        self._configuration_file_name = configuration_file_name

    def get_sensor_ids_by_address(self) -> dict[str, int]:
        """
        Возвращает словарь address: id из файла конфигурации.
        """
        configuration_json = self._read_configuration_file()
        return extract_sensor_ids_by_address(configuration_json)

    def _read_configuration_file(self) -> Any:
        """
        Чтение файла конфигурации, использует список кодировок.
        """
        error_msg = (
            f"[CONFIGURATION] [ERROR] Не удалось декодировать файл {self._configuration_file_name} "
            f"в кодировках {Imitator_const.DEFAULT_ENCODINGS}"
        )
        for encoding in Imitator_const.DEFAULT_ENCODINGS:
            try:
                with open(self._configuration_file_name, "r", encoding=encoding, errors="strict") as conf_file:
                    data = json.load(conf_file)
                    if not data:
                        error_msg = f"Пустой json (кодировка:{encoding}"
                        logger.error(error_msg)
                        raise ValueError(error_msg)
                    return data
            except UnicodeDecodeError:
                continue
            except Exception as error:
                logger.exception(error_msg)
                raise OSError(error_msg) from error
        logger.exception(error_msg)
        raise OSError(error_msg)















им дата ап


import logging
import tarfile
from pathlib import Path
from typing import List

from clients.http_client import HttpClient
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import HTTPClientConstants as Http_const
from constants.architecture_constants import ImitatorConstants as Im_const
from infra.cmd_generator import UploadImitatorDataCmdGenerator
from infra.path_generator import ImitatorDataPathGenerator
from models.http_models.attacments_list_testops_model import FileInfo, Items

logger = logging.getLogger(__name__)


class ImitatorDataUploader:
    """
    Класс загрузки набора данных на удаленный сервер, нужных для запуска имитатора
    Пример использования:
    uploader = ImitatorDataUploader(your_user, your_host, test_id, tu_id)
    uploader.upload_with_confirm() - для загрузки данных на удаленный сервер
    remote_temp_path = uploader.remote_temp_dir_path - для получения пути ко временной директории
    uploader.delete_with_confirm() - для удаления данных на удаленном сервере
    """

    def __init__(self, stand_client: SubprocessClient, test_data_id: int, test_data_name: str, tu_id: int) -> None:

        self._username = stand_client.username
        self._host = stand_client.host
        self._test_data_id = test_data_id
        self._test_data_name = test_data_name
        self._tu_id = tu_id  # ID ТУ для получения tags.txt с сервера
        self._http_client = HttpClient()
        self._stand_client = stand_client
        self._path_generator = ImitatorDataPathGenerator(test_data_id)
        self._cmd_generator = UploadImitatorDataCmdGenerator(self._username, self._host, self._path_generator)
        self._subprocess_client = UploadDataSubprocessClient(stand_client, self._cmd_generator, self._tu_id)
        self._tar_package_name = self._path_generator.tar_package_name
        self.remote_temp_dir_path = self._path_generator.remote_temp_dir_path

    def upload_with_confirm(self) -> None:
        """
        Выполняется основной сценарий загрузки данных на удаленный сервер
        """
        # 1. Загрузка архива на runner
        imitator_data_bytes = self._get_run_data_bytes()
        # 2. Сохранение архива на runner
        self._save_test_data_package(imitator_data_bytes)
        # 3. Проверка архива на runner
        if not self._is_tar_valid():
            logging.error(f"[DATA UPLOADER] [ERROR] Архив: {self._tar_package_name} поврежден на runner")
            raise ValueError("[DATA UPLOADER] [ERROR] При проверке архива на runner")
        # 4. Создание временной директории на удаленном сервере
        self._subprocess_client.create_remote_data_dir()
        # 5. Копирование архива во временную директорию на удаленный сервер
        self._subprocess_client.copy_tar_to_remote()
        # 6. Проверка целостности архива на удаленном сервере
        if not self._subprocess_client.is_remote_tar_valid():
            logging.error(
                f"[DATA UPLOADER] [ERROR] Архив: {self._tar_package_name} поврежден на удаленном сервере: {self._host}"
            )
            raise ValueError("[DATA UPLOADER] [ERROR] При проверке архива на удаленном сервере")
        # 7. Распаковка архива
        self._subprocess_client.unpack_remote_package()
        # 8. Копирование tags.txt с сервера во временную директорию текущего набора данных
        self._subprocess_client.copy_tags_from_server()
        # 9. Проверка данных
        if not self._subprocess_client.check_remote_unpack_data():
            logging.error(
                f"[DATA UPLOADER] [ERROR] При распаковке данных на удаленном сервере: "
                f"{self._host} Путь: {self.remote_temp_dir_path}"
            )
            raise ValueError("[DATA UPLOADER] [ERROR] При распаковке данных на удаленном сервере")
        logging.info(
            f"[DATA UPLOADER] [OK] Тестовые данные успешно загружены на удаленный сервер: "
            f"{self._host} Путь: {self.remote_temp_dir_path}"
        )

    def delete_with_confirm(self) -> None:
        """
        Удаление временной директории с удаленного сервера с проверкой удаления
        """
        self._subprocess_client.delete_remote_data_dir()
        if self._subprocess_client.check_remote_unpack_data():
            logging.error(
                f"[DATA UPLOADER] [ERROR] При удалении данных на удаленном сервере: "
                f"{self._host} Путь: {self.remote_temp_dir_path}"
            )
            raise ValueError("[DATA UPLOADER] [ERROR] При проверке удаления данных")
        logging.info(f"[DATA UPLOADER] [OK] Тестовые данные успешно удалены с удаленного сервера: {self._host}")

    def _get_test_data_attachment_id_by_name(self, attachments_list: dict) -> int:
        """
        Получает id архива данных для имитатора
        """
        parsed_attachments_list = Items(
            items=[FileInfo(**file) for file in attachments_list.get(Http_const.TESTOPS_ATTACHMENTS_KEY, [])]
        )
        attachment_id = next(
            (file.id for file in parsed_attachments_list.items if file.original_filename == self._test_data_name), None
        )
        return attachment_id

    def _get_run_data_bytes(self) -> bytes:
        """
        Получает данные через GET запрос к Testops
        :return: содержимое ответа на запрос
        """
        # Получает список вложений для test_data_id
        attachments_list = self._http_client.get_attachments_list_by_test_case_id(self._test_data_id)
        # Получает id архива данных
        attachment_id = self._get_test_data_attachment_id_by_name(attachments_list)
        run_data_bytes = self._http_client.get_test_case_attachment_by_id(self._test_data_id, attachment_id)
        return run_data_bytes

    def _is_tar_valid(self) -> bool:
        """
        Проверяет целостность архива на runner после скачивания с testops
        """
        req_files = {Im_const.SANDBOX_RULES}
        req_dir = Im_const.SANDBOX_DATA
        try:
            with tarfile.open(self._tar_package_name, "r:gz") as tar_file:
                names = set(tar_file.getnames())
                has_dir = any(name.startswith(req_dir) for name in names)
                has_files = req_files.issubset(names)
                if not has_dir or not has_files:
                    raise FileNotFoundError("[DATA UPLOADER] [ERROR] В архиве на runner отсутствуют необходимые файлы")
                return has_dir and has_files
        except tarfile.TarError:
            logging.exception("[DATA UPLOADER] [ERROR] Архив на runner поврежден")
            raise

    def _save_test_data_package(self, tar_bytes: bytes) -> None:
        """
        Сохраняет архив тестовых данных в рабочей директории runner
        :return: путь к архиву
        """
        file_path = Path(self._tar_package_name)
        try:
            with open(file_path, "wb") as tar_file:
                tar_file.write(tar_bytes)
            logging.info(f"[DATA UPLOADER] [OK] Архив: {self._tar_package_name} успешно сохранен на runner")
        except (FileNotFoundError, PermissionError, OSError):
            logging.exception("[DATA UPLOADER] [ERROR] При сохранении архива на runner")
            raise


class UploadDataSubprocessClient:
    """
    Выполняет команды в консоли для загрузки данных прогона для имитатора
    """

    def __init__(self, client: SubprocessClient, cmd_generator: UploadImitatorDataCmdGenerator, tu_id: int) -> None:
        self._client = client
        self._expected_files: List[str] = list(cmd_generator.expected_files)
        self._cmd_generator = cmd_generator
        self._tu_id = tu_id

    def create_remote_data_dir(self) -> None:
        """
        Создает временную директорию на удаленном сервере
        """
        create_dir_cmd = self._cmd_generator.generate_create_dir_cmd()
        self._client.run_cmd(create_dir_cmd)

    def delete_remote_data_dir(self) -> None:
        """
        Удаляет временную директорию на удаленном сервере
        """
        delete_dir_cmd = self._cmd_generator.generate_delete_dir_cmd()
        self._client.run_cmd(delete_dir_cmd)

    def copy_tar_to_remote(self) -> None:
        """
        Копирует архив во временную директорию на удаленном сервере
        """
        copy_cmd = self._cmd_generator.generate_copy_tar_to_remote_cmd()
        self._client.run_cmd(copy_cmd, timeout=Im_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)

    def unpack_remote_package(self) -> None:
        """
        Распаковывает архив во временную директорию на удаленном сервере
        """
        unpack_cmd = self._cmd_generator.generate_unpack_tar_cmd()
        self._client.run_cmd(unpack_cmd, timeout=Im_const.LONG_PROCESS_TIMEOUT_S)

    def copy_tags_from_server(self) -> None:
        """ """
        source_path = f"{Im_const.CONFIG_PATH}/tn{self._tu_id}_tags.txt"
        copy_tags_cmd = self._cmd_generator.generate_copy_tags_cmd(self._tu_id)
        try:
            self._client.run_cmd(copy_tags_cmd)
            logging.info(f"[DATA UPLOADER] [OK] tags.txt скопирован из {source_path}")
        except Exception as e:
            logging.error(f"[DATA UPLOADER] [ERROR] Не удалось скопировать {source_path}: {e}")
            raise RuntimeError(
                f"Не удалось скопировать tags.txt с сервера. Проверьте наличие файла {source_path}"
            ) from e

    def is_remote_tar_valid(self) -> bool:
        """
        Проверка целостности архива
        :return: результат проверки
        """
        check_tar_cmd = self._cmd_generator.generate_check_tar_cmd()
        result = self._client.run_cmd(check_tar_cmd, need_output=True)
        if not result:
            return False

        tar_list = result.split("\n")

        return all(file in tar_list for file in self._expected_files)

    def check_remote_unpack_data(self) -> bool:
        """
        Проверяет наличие директории с данными и сопутствующих файлов
        :return: результат проверки
        """
        check_cmd = self._cmd_generator.generate_check_remote_data_cmd()
        result = self._client.run_cmd(check_cmd, need_output=True)
        return result == Im_const.CMD_STATUS_OK




















им мен

import logging
import subprocess
from typing import Optional

from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import ImitatorConstants as Im_const

logger = logging.getLogger(__name__)


class ImitatorManager:
    """
    Класс управления имитатором
    Для запуска имитатора:
    from clients.subprocess_client import SubprocessClient
    from infra.imitator_manager import ImitatorManager
    client = SubprocessClient(your_user, your_host)
    imitator_manager = ImitatorManager(client, your_command)
    imitator_manager.run_imitator()
    imitator_manager.log_imitator_stdout()
    imitator_manager.wait_and_stop_imitator()
    Для получения процесса с запущенным имитатором:
    imitator_process = imitator_manager.imitator_process
    Для остановки имитатора:
    imitator_manager.stop_imitator()
    Для получения процесса с запущенным имитатором
    """

    def __init__(self, client: SubprocessClient, imitator_run_cmd: str) -> None:
        self._client = client
        self._imitator_run_cmd = imitator_run_cmd
        self._logger: logging.getLogger() = logging.getLogger(self.__class__.__name__)
        self._setup_logger()
        self._imitator_process: Optional[subprocess.Popen] = None

    @property
    def imitator_process(self) -> Optional[subprocess.Popen]:
        return self._imitator_process

    def _setup_logger(self) -> None:
        """
        Настраивает отдельный логер для имитатора
        """
        # Запись логов имитатора в отдельный файл
        # Кодировка установлена под WIN, для gitlab нужно установить utf-8
        file_handler = logging.FileHandler(Im_const.IMITATOR_LOG_FILE_NAME, encoding=Im_const.WIN_ENCODING_CP1251)
        file_handler.setLevel(logging.INFO)
        formatter = logging.Formatter()
        file_handler.setFormatter(formatter)
        self._logger.addHandler(file_handler)
        # Отключает дублирование логов в консоль
        self._logger.propagate = False

    def run_imitator(self) -> None:
        """
        Запускает имитатор на удаленном сервере
        """
        if self._imitator_process is not None:
            logger.error("[IMITATOR] [ERROR] Имитатор уже запущен")
            raise
        process = self._client.exec_popen(self._imitator_run_cmd)
        if process.poll() is None:
            self._imitator_process = process
            logger.info("[IMITATOR] [OK] Имитатор запущен успешно")
        else:
            logger.error("[IMITATOR] [ERROR] Ошибка запуска имитатора")
            raise

    def log_imitator_stdout(self) -> None:
        """
        Записывает логи имитатора в файл с помощью отдельного логера
        """

        try:
            if self._imitator_process.poll() is None:
                for line in self._imitator_process.stdout:
                    self._logger.info(line)
            else:
                logger.error("[IMITATOR] [ERROR] Ошибка записи логов имитатора: Имитатор не запущен")
                raise
        except (OSError, ValueError):
            logging.exception("[IMITATOR] [ERROR] Ошибка при получении логов имитатора!")

    def _is_imitator_running(self) -> bool:
        """
        Проверяет наличие PID имитатора на стенде
        """
        result = self._client.run_cmd(Im_const.IMITATOR_CHECK_CMD, check=False, need_output=True)
        if result:
            logger.warning(f"[IMITATOR] [WARNING] Имитатор не остановлен! PID: {result}")
        else:
            logger.info("[IMITATOR] [OK] Имитатор остановлен успешно")
        return bool(result and result.strip())

    def stop_imitator(self) -> None:
        """
        Останавливает имитатор
        """
        try:
            self._client.terminate_process(self._imitator_process, timeout=Im_const.POPEN_WAIT_TIMOUT_S)
            if self._is_imitator_running():
                # Останавливает имитатор на стенде
                self._client.run_cmd(Im_const.IMITATOR_KILL_CMD)
                logger.info("[IMITATOR] [OK] Имитатор остановлен успешно")
            self._imitator_process = None
        except RuntimeError:
            logger.exception("[IMITATOR] [ERROR] Ошибка остановки имитатора")
            raise

    def wait_and_stop_imitator(self) -> None:
        """
        Ожидает окончания работы имитатора и завершает его работу
        """
        try:
            self._imitator_process.wait()

        except KeyboardInterrupt:
            # На случай остановки принудительно, через Ctrl+C например
            logging.exception("[IMITATOR] [ERROR] Принудительная остановка имитатора!")
            raise
        finally:
            self.stop_imitator()



























пас ген
import logging
from datetime import datetime
from pathlib import PurePosixPath  # Используется для корректного запуска на Windows

from constants.architecture_constants import ImitatorConstants as JC_const

logger = logging.getLogger(__name__)


class ImitatorDataPathGenerator:
    """
    Генерация путей для данных прогона
    """

    def __init__(self, test_data_id: int):
        self._test_data_id = test_data_id
        self._expected_files: list = [JC_const.SANDBOX_TAGS, JC_const.SANDBOX_RULES]
        self._test_package_name = self._generate_test_package_name()
        # Временное название архива
        self.tar_package_name = self._add_tar_extension()
        # Путь к временной директории
        self.remote_temp_dir_path = self._generate_remote_temp_dir_path()

    def _generate_test_package_name(self) -> str:
        """
        Создает уникальное имя для набора данных
        :return: уникальное имя для набора данных
        """
        time_now_str = datetime.now().strftime(JC_const.IMITATOR_TIME_FORMAT)
        return f"test_case_id_{self._test_data_id}_{time_now_str}"

    def _add_tar_extension(self) -> str:
        """
        Добавляет tar расширение к имени набора данных
        :return: уникальное имя архива набора данных
        """
        return f"{self._test_package_name}.tar.gz"

    def _generate_remote_temp_dir_path(self) -> str:
        """
        Создает путь к временной директории на удаленном сервере
        :return: путь к временной директории на удаленном сервере
        """
        remote_temp_dir_path = PurePosixPath(JC_const.AUTOTEST_DATA_PATH) / self._test_package_name
        return str(remote_temp_dir_path)

    def generate_full_remote_tar_path(self) -> str:
        """
        Создает путь к архиву на удаленном сервере
        :return: путь к архиву на удаленном сервере
        """
        full_remote_tar_path = PurePosixPath(self.remote_temp_dir_path) / self.tar_package_name
        return str(full_remote_tar_path)




















сигн юнит 
import json
import logging
import shutil
from pathlib import Path
from typing import Any

from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import ClickhouseConstants as CH_const
from constants.architecture_constants import ImitatorConstants as Im_const
from constants.enums import MeasureConversionRule
from infra.cmd_generator import SignalUnitConversionCmdGenerator
from utils.helpers.signal_unit_conversion_utils import apply_measure_conversion_rule, conversion_rules_need_update

logger = logging.getLogger(__name__)


class SignalUnitConversionManager:
    """
    Управляет signal_unit_conversion_rules.json на стенде:
    - скачивает оригинал в original_conversion_rules/ перед прогоном набора
    - подкладывает изменённую версию в CONFIG_PATH (имя на сервере не меняется)
    - восстанавливает оригинал в teardown
    """

    def __init__(
        self,
        stand_client: SubprocessClient,
        measure_conversion_rule: MeasureConversionRule,
    ) -> None:
        self._stand_client = stand_client
        self._measure_conversion_rule = measure_conversion_rule
        self._cmd_generator = SignalUnitConversionCmdGenerator(stand_client.username, stand_client.host)
        self._local_file = Path(Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME)
        self._backup_file = (
            Path(Im_const.SIGNAL_UNIT_CONVERSION_RULES_BACKUP_DIR) / Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME
        )
        self._modified = False

    def setup_signal_unit_conversion_rules(self) -> None:
        """
        Скачивает файл со стенда, при необходимости меняет единицы и загружает обратно.
        """
        try:
            self._download_from_stand()
            rules_json = self._read_local_file()

            if not conversion_rules_need_update(rules_json, self._measure_conversion_rule):
                logger.info(
                    "[SETUP] [OK] signal_unit_conversion_rules.json уже настроен корректно для набора данных "
                    f"(правило {self._measure_conversion_rule.name})"
                )
                return

            self._save_backup()
            modified_rules = apply_measure_conversion_rule(rules_json, self._measure_conversion_rule)
            self._write_local_file(modified_rules)
            self._upload_to_stand(self._local_file)
            self._modified = True
            logger.info(
                "[SETUP] [OK] signal_unit_conversion_rules.json обновлён по правилу "
                f"{self._measure_conversion_rule.name}"
            )
        except Exception as error:
            error_msg = "[SETUP] [ERROR] Ошибка при подготовке signal_unit_conversion_rules.json"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def restore_signal_unit_conversion_rules(self) -> None:
        """
        Возвращает оригинальный signal_unit_conversion_rules.json на стенд
        """
        if not self._modified:
            logger.info("[TEARDOWN] [SKIP] signal_unit_conversion_rules.json не изменялся")
            return

        if not self._backup_file.exists():
            error_msg = f"[TEARDOWN] [ERROR] Оригинал signal_unit_conversion_rules.json не найден: {self._backup_file}"
            logger.error(error_msg)
            raise RuntimeError(error_msg)

        try:
            self._upload_to_stand(self._backup_file)
            self._modified = False
            logger.info(
                f"[TEARDOWN] [OK] signal_unit_conversion_rules.json восстановлен на стенде из {self._backup_file}"
            )
        except Exception as error:
            error_msg = "[TEARDOWN] [ERROR] Ошибка при восстановлении signal_unit_conversion_rules.json"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def _download_from_stand(self) -> None:
        copy_cmd = self._cmd_generator.generate_scp_signal_rules_from_stand_cmd()
        self._stand_client.run_cmd(copy_cmd, timeout=CH_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)
        if not self._local_file.exists():
            raise FileNotFoundError(f"Не удалось скачать {Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME} со стенда")

    def _save_backup(self) -> None:
        self._backup_file.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(self._local_file, self._backup_file)

    def _upload_to_stand(self, local_file: Path) -> None:
        upload_cmd = self._cmd_generator.generate_scp_signal_rules_to_stand_cmd(local_file.as_posix())
        self._stand_client.run_cmd(upload_cmd, timeout=CH_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)

    def _read_local_file(self) -> dict[str, Any]:
        error_msg = (
            f"[SETUP] [ERROR] Не удалось декодировать файл {self._local_file} "
            f"в кодировках {Im_const.DEFAULT_ENCODINGS}"
        )
        for encoding in Im_const.DEFAULT_ENCODINGS:
            try:
                with open(self._local_file, "r", encoding=encoding, errors="strict") as rules_file:
                    data = json.load(rules_file)
                    if not data:
                        raise ValueError(f"Пустой json (кодировка:{encoding})")
                    return data
            except UnicodeDecodeError:
                continue
            except json.JSONDecodeError as error:
                logger.exception(error_msg)
                raise OSError(error_msg) from error
        logger.exception(error_msg)
        raise OSError(error_msg)

    @staticmethod
    def _write_local_file(rules_json: dict[str, Any]) -> None:
        with open(
            Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME,
            "w",
            encoding=Im_const.ENCODING_UTF_8,
        ) as rules_file:
            json.dump(rules_json, rules_file, ensure_ascii=False, indent=2)
























стенд сет мен


import logging
import os
from urllib.parse import urlparse

from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const
from constants.enums import TU, MeasureConversionRule
from infra.clickhouse_manager import ClickHouseManager
from infra.cmd_generator import ImitatorCmdGenerator
from infra.configuration_manager import ConfigurationManager
from infra.docker_manager import DockerContainerManager
from infra.imitator_data_uploader import ImitatorDataUploader
from infra.imitator_manager import ImitatorManager
from infra.redis_manager import RedisCleaner
from infra.signal_unit_conversion_manager import SignalUnitConversionManager

logger = logging.getLogger(__name__)


class StandSetupManager:
    """
    Подготовка стенда к запуску автотестов
    Для запуска имитатора:
    setup_manager = StandSetupManager(test_duration(minutes), test_data_id, test_data_name, tu_id)
    setup_manager.setup_stand_for_imitator_run()
    imitator_thread = threading.Thread(target=stand_manager.start_imitator, daemon=True)
    core_thread = threading.Thread(target=stand_manager.start_core)
    imitator_thread.start()
    time.sleep(20)
    core_thread.start()

    Доступ к времени старта имитатора для расчёта интервалов утечек:
    start_time = setup_manager.start_time  # datetime объект
    """

    def __init__(
        self,
        duration_m: float,  # Максимальное время работы имитатора в минутах
        test_data_id: int,  # id тест кейса из которого будут загружены данные
        test_data_name: str,  # Название архива данных имитатора для загрузки из TestOps
        tu_id: int,
        measure_conversion_rules: MeasureConversionRule | None = None,
        username: str = os.environ.get(EnvKeyConstants.SSH_USER_DEV),
        stand_name: str = os.environ.get(EnvKeyConstants.STAND_NAME),
    ) -> None:
        self._duration_m = duration_m
        self._test_data_id = test_data_id
        self._test_data_name = test_data_name
        self._tu_id = tu_id
        self._measure_conversion_rules = measure_conversion_rules
        self._username = username
        self._stand_name = stand_name
        self._configuration_file_name = self._get_configuration_file_name()
        self._server_ip = self._get_server_ip()  # Получает ip сервера из словаря
        self._init_clients()
        self._cmd_generator = self._choose_cmd_generator()
        self._final_cmd = self._cmd_generator.generate_final_imitator_cmd()
        # Экземпляр имитатор менеджера нужно создавать после генерации команды, отдельно от других клиентов
        self._imitator_manager = ImitatorManager(self._stand_client, self._final_cmd)

    @property
    def start_time(self):
        """
        Возвращает время старта имитатора как datetime объект.
        Используется для расчёта интервалов утечек в тестах:
        - leak_start_time = start_time + LEAK_START_INTERVAL
        - leak_end_time = start_time + LEAK_START_INTERVAL + ALLOWED_TIME_DIFF_SECONDS
        """
        return self._cmd_generator.start_time

    def setup_stand_for_imitator_run(self) -> None:
        """
        Обертка, в которой проходит полная подготовка стенда
        """
        try:
            if not os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
                # При запуске с TestOps загружает данные для прогона
                self._uploader.upload_with_confirm()

            self.stop_all_containers()
            if self._signal_unit_conversion_manager is not None:
                self._signal_unit_conversion_manager.setup_signal_unit_conversion_rules()
            else:
                logger.info("[SETUP] [SKIP] measure_conversion_rules не задан для набора данных")
            self.clean_redis_and_clickhouse()
            self.start_containers_without_core()
        except Exception as error:
            error_msg = "[SETUP] [ERROR] Ошибка подготовки стенда к запуску имитатора"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def restore_signal_unit_conversion_rules(self) -> None:
        """
        Возвращает оригинальный signal_unit_conversion_rules.json на стенд.
        """
        self._signal_unit_conversion_manager.restore_signal_unit_conversion_rules()

    def clean_redis_and_clickhouse(self):
        """
        Чистит БД: Clickhouse и Redis
        """
        # Копирование файла конфигурации на runner
        self._clickhouse_manager.copy_configuration_file_from_stand()
        # Чистка ключей Redis
        self._redis_cleaner.delete_keys_with_check()
        # Чистка ключей ClickHouse
        self._clickhouse_manager.delete_clickhouse_keys_with_check()

    def get_sensor_ids_by_address(self) -> dict[str, int]:
        """
        Возвращает словарь address: id из конфигурации, скопированной на runner.
        """
        return self._configuration_manager.get_sensor_ids_by_address()

    def stop_all_containers(self):
        """
        Останавливает все контейнеры и чистит БД: Clickhouse и Redis
        """
        self._docker_manager.stop_all_lds_containers()

    def start_containers_without_core(self):
        """
        Запускает все контейнеры кроме core
        """
        # Запуск lds-layer-builder
        self._docker_manager.start_lds_layer_builder_containers()
        # Запуск lds-journals
        self._docker_manager.start_lds_journals_containers()
        # Запуск lds-web-app
        self._docker_manager.start_lds_web_app_containers()
        # Запуск lds-api-gw
        self._docker_manager.start_lds_api_gw_containers()
        # Запуск lds-reports
        self._docker_manager.start_lds_reports_containers()

    def start_imitator(self) -> None:
        """
        Запускает имитатор и собирает отдельно лог имитатора в файл
        """
        try:
            self._imitator_manager.run_imitator()
            self._imitator_manager.log_imitator_stdout()
        except Exception as error:
            error_msg = "[SETUP] [ERROR] Ошибка запуска имитатора"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def start_core(self) -> None:
        """
        Запускает core контейнеры
        """
        try:
            # Запуск CORE
            self._docker_manager.start_lds_core_containers()
        except Exception as error:
            error_msg = "[SETUP] [ERROR] Ошибка запуска CORE"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def stop_imitator_wrapper(self) -> None:
        """
        В teardown может вызываться даже если имитатор не запущен
        """
        try:
            if not self._imitator_manager.imitator_process:
                logger.info("[TEARDOWN] [SKIP] Имитатор не был запущен")
                return
            self._imitator_manager.wait_and_stop_imitator()
            logger.info("[TEARDOWN] [OK] Имитатор остановлен")
        except Exception as error:
            error_msg = "[TEARDOWN] [ERROR] Не удалось остановить имитатор"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def server_test_data_remover(self):
        """
        Может вызваться в teardown если загрузка не была выполнена
        """
        uploader = getattr(self, "_uploader", None)
        if uploader is None:
            logger.info("[TEARDOWN] [SKIP] uploader не инициализирован, удаление данных со стенда пропущено")
            return
        try:
            uploader.delete_with_confirm()
        except Exception as error:
            error_msg = "[TEARDOWN] [ERROR] Не удалось удалить данные с сервера"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    @staticmethod
    def _parse_opc_target() -> tuple[str, int]:
        """
        Извлекает хост и порт OPC из переменной окружения OPC_URL.
        """
        opc_url = os.environ.get(EnvKeyConstants.OPC_URL)
        if not opc_url:
            raise RuntimeError(f"[SETUP] [ERROR] Переменная окружения {EnvKeyConstants.OPC_URL} не задана")

        parsed = urlparse(opc_url)
        if not parsed.hostname or not parsed.port:
            raise RuntimeError(
                f"[SETUP] [ERROR] Некорректное значение OPC_URL: '{opc_url}'. Ожидается формат вида opc.tcp://host:port"
            )

        return parsed.hostname, parsed.port

    def check_opc_server_status(self, timeout_s: int = 5) -> None:
        """
        Проверяет доступность OPC сервера с сервера стенда через /dev/tcp.
        """
        host, port = self._parse_opc_target()
        check_cmd = (
            f"if timeout {timeout_s} bash -lc 'cat < /dev/null > /dev/tcp/{host}/{port}'; "
            f"then echo {Im_const.CMD_STATUS_OK}; else echo {Im_const.CMD_STATUS_FAIL}; fi"
        )
        result = self._stand_client.run_cmd(check_cmd, need_output=True)
        if result != Im_const.CMD_STATUS_OK:
            raise RuntimeError(f"[SETUP] [ERROR] OPC сервер {host}:{port} недоступен с сервера стенда")

        logger.info(f"[SETUP] [OK] OPC сервер {host}:{port} доступен")

    def _get_server_ip(self) -> str:
        """
        Получает server ip из списка стендов
        :return: server ip
        """
        try:
            return Im_const.HOST_MAP.get(self._stand_name, {}).get(Im_const.SERVER_IP_KEY_NAME)

        except Exception as error:
            error_msg = f"[SETUP] [ERROR] Не удалось получить server ip для стенда: {self._stand_name}"
            logger.exception(error_msg)
            raise ValueError(error_msg) from error

    def _get_configuration_file_name(self) -> str:
        """
        Получает имя файла конфигурации
        """
        return TU.get_file_name_by_id(self._tu_id)

    def _choose_cmd_generator(self) -> ImitatorCmdGenerator:
        """
        Выбирает вариант генерации команды запуска имитатора, в зависимости от типа запуска
        """
        try:
            if os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
                # Запуск без TestOps
                return ImitatorCmdGenerator(self._test_data_name, self._stand_name, self._duration_m)
            else:
                self._uploader = ImitatorDataUploader(
                    self._stand_client, self._test_data_id, self._test_data_name, self._tu_id
                )
                self._data_path = self._uploader.remote_temp_dir_path
                return ImitatorCmdGenerator(self._data_path, self._stand_name, self._duration_m)
        except Exception as error:
            error_msg = "[SETUP] [ERROR] Ошибка при выборе варианта генерации команды запуска имитатора"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error

    def _init_clients(self) -> None:
        """
        Создает экземпляры необходимых для запуска клиентов
        """
        try:
            self._stand_client = SubprocessClient(self._username, self._server_ip)
            self._infra_client = SubprocessClient(self._username, Im_const.REDIS_STAND_ADDRESS)
            self._clickhouse_manager = ClickHouseManager(
                self._stand_client, self._infra_client, self._configuration_file_name
            )
            self._configuration_manager = ConfigurationManager(self._configuration_file_name)
            self._signal_unit_conversion_manager = SignalUnitConversionManager(
                self._stand_client, self._measure_conversion_rules
            )
            self._docker_manager = DockerContainerManager(self._stand_client)
            self._redis_cleaner = RedisCleaner(self._infra_client, self._stand_name)
        except Exception as error:
            error_msg = "[SETUP] [ERROR] Ошибка инициализации клиентов"
            logger.exception(error_msg)
            raise RuntimeError(error_msg) from error