Загрузка данных


сабпр


import logging
import os
import subprocess
from typing import Optional

from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const

logger = logging.getLogger(__name__)


class SubprocessClient:
    """
    Клиент для выполнения команд в консоли, с автоматической оберткой в ssh команды
    Для выполнения команды:
    from utils.ssh_manager import SubprocessClient
    client = SubprocessClient(your_user, your_host)
    client.run_cmd(your_command)

    """

    def __init__(self, remote_username: str, remote_host: str) -> None:
        self._username = remote_username
        self._host = remote_host
        self._ssh_key_name = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)

    @property
    def username(self):
        return self._username

    @property
    def host(self):
        return self._host

    def _wrap_ssh_cmd(self, cmd: str, use_ssh: bool = True) -> str:
        """
        Обертка в ssh команду
        :param cmd: команда
        :param use_ssh: нужна ли обертка
        :return: команда
        """

        if use_ssh:
            if os.name == Im_const.OS_NAME_WIN:
                # Для запуска под WIN требуется добавить в команду путь к ключу
                return f"ssh -i {self._ssh_key_name} {self._username}@{self._host} \"{cmd}\""
            else:
                return f'ssh {self._username}@{self._host} "{cmd}"'
        return cmd

    def _exec_run(
        self,
        cmd: str,
        check: bool = True,
        timeout: int = None,
        use_ssh: bool = True,
    ) -> subprocess.CompletedProcess:
        """
        Выполняет команду в консоли и возвращает результат
        :param cmd: команда для выполнения в консоли
        :return: результат выполнения команды
        """
        final_cmd = self._wrap_ssh_cmd(cmd, use_ssh)
        logging.info(f"[RUN] Выполняю команду: {final_cmd[:200]}")
        try:
            return subprocess.run(
                final_cmd,
                # Запуск команды через оболочку
                shell=True,
                # Аргумент check выбросит исключение если придет ответ не "0"
                check=check,
                capture_output=True,
                encoding=self._get_encoding(),
                timeout=timeout,
            )
        except subprocess.TimeoutExpired:
            logger.exception(f"[RUN] [ERROR] Команда превысила таймаут: {final_cmd}")
            raise
        except subprocess.CalledProcessError as err:
            output_error = err.stderr.strip()
            logging.error(f"[RUN] [ERROR] Ошибка выполнения команды: {output_error}. Код ошибки: {err.returncode}")
            raise

    def exec_popen(self, cmd, use_ssh: bool = True) -> Optional[subprocess.Popen]:
        """
        Выполняет команду в консоли.
        Используется для запуска долгих процессов
        :param cmd: команда для выполнения в консоли
        :param use_ssh: нужна ли ssh обертка
        :return: процесс выполнения команды
        """
        final_cmd = self._wrap_ssh_cmd(cmd, use_ssh)
        logging.info(f"[POPEN] выполняю команду: {final_cmd}")
        try:
            return subprocess.Popen(
                final_cmd,
                # Запуск команды через оболочку
                shell=True,
                stdout=subprocess.PIPE,
                # Объединяет вывод и вывод ошибок в один поток
                stderr=subprocess.STDOUT,
                text=True,
                # Буферизация строк
                bufsize=1,
                # При адаптации для запуска в gitlab нужно убрать кодировку
                encoding=Im_const.ENCODING_UTF_8,
                # Замена знаков, которые были декодированы с ошибкой
                errors="replace",
            )

        except (FileNotFoundError, OSError):
            logging.exception(f"[POPEN] [ERROR] Ошибка выполнения команды {cmd}")

    def run_cmd(
        self, cmd: str, check: bool = True, timeout: int = None, need_output: bool = False, use_ssh: bool = True
    ) -> Optional[str]:
        """
        Выполняет команду через subprocess.run с логированием или возвратом результата выполнения команды
        :param cmd: команда
        :param check: проверяет что код ответа 0
        :param timeout: таймаут на выполнение(опционально)
        :param need_output: нужно ли вернуть вывод после выполнения команды
        :param use_ssh: нужна ли ssh обертка
        :return: Вывод
        """
        result = self._exec_run(cmd, check, timeout, use_ssh)
        logging.info(f"[RUN] [OK] Команда выполнена успешно: {cmd[:200]}")
        if need_output:
            output = result.stdout.strip()
            return output
        if result.stdout:
            logging.debug(f"[RUN] [STDOUT]\n{result.stdout.strip()}")
        if result.stderr:
            logging.warning(f"[RUN] [STDERR]\n{result.stderr.strip()}")
        return None

    @staticmethod
    def terminate_process(process: subprocess.Popen, timeout: float) -> None:
        """
        Завершает процесс.
        :param process: Процесс, который требуется остановить
        :param timeout: Время на остановку процесс
        """
        if process is None:
            logger.warning("[POPEN] terminate_process вызван при пустом process")
            return

        try:
            if process.poll is None:
                process.terminate()
                process.wait(timeout=timeout)
                logger.info("[POPEN] [OK] Процесс завершился успешно")
        except subprocess.TimeoutExpired:
            logger.exception("[POPEN] [ERROR] Процесс не завершился, принудительное уничтожение процесса")
            try:
                process.kill()
                process.wait()
            except RuntimeError:
                logger.exception("[POPEN] [ERROR] Ошибка завершения процесса")
                raise
        except Exception:
            logger.exception("[POPEN] [ERROR] Неожиданная ошибка при завершении процесса")
            raise

    @staticmethod
    def _get_encoding() -> Optional[str]:
        """
        Временная функция. Будет удалена во время адаптации скрипта к gitlab runner
        Получает кодировку консоли или применяет дефолтную для windows
        """

        return os.device_encoding(1) or Im_const.WIN_ENCODING_CP866




















вс




import asyncio
import logging
import time
from datetime import datetime
from typing import Any, Callable, List, Optional
from zoneinfo import ZoneInfo

import msgpack
import websockets
from allure import attach, attachment_type

from constants.architecture_constants import WebSocketClientConstants as WS_Const
from utils.msgpack_utils.message_filters import is_desired_invocation_id, is_desired_type
from utils.msgpack_utils.msgpack_utils import encode_with_varint_prefix, parse_message

logger = logging.getLogger(__name__)


class WebSocketClient:
    """
    Асинхронный ws-клиент для api-gateway по протоколу Async Api
    """

    def __init__(
        self,
        host: str,
        access_token: str,
        reconnect_interval: float = WS_Const.DEFAULT_RECONNECT_INTERVAL,
    ):
        self._host = host
        self._access_token = access_token
        self._reconnect_interval = reconnect_interval
        self._ws_url = f"wss://{host.rstrip('/')}{WS_Const.WS_HUBS}"
        self._buffer = b""
        self._next_id = WS_Const.START_INVOCATION_ID

        self._ws: websockets.ClientConnection | None = None
        self.recv_queue: asyncio.Queue[Any] = asyncio.Queue()
        self._recv_task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._invocation_id: Optional[str] = None
        self.suppress_recv_logging: bool = False

    @property
    def invocation_id(self):
        return self._invocation_id

    def clear_queue(self):
        """
        Очищает очередь путем пересоздания экземпляра класса очереди
        """
        # TODO разобраться почему не выполняется очистка очереди сообщений LDS-8599
        while not self.recv_queue.empty():
            try:
                self.recv_queue.get_nowait()
                self.recv_queue.task_done()
            except asyncio.QueueEmpty as message_empty:
                logger.info(f"Очередь сообщений очищена: {message_empty}")
                break

    async def __aenter__(self):
        await self._connect_loop()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self._stop_event.set()
        if self._ws:
            await self._ws.close()
        if self._recv_task:
            await self._recv_task

        # TODO: дергать ручку завершения сессии в LDS-4083

    async def _handshake(self) -> None:
        payload = WS_Const.HANDSHAKE_MESSAGE + WS_Const.RS.decode()
        logger.debug(
            f"Отправлен handshake: {payload}",
        )
        await self._ws.send(payload)

        buf = self._buffer
        finish = time.monotonic() + WS_Const.HANDSHAKE_WAITING
        while time.monotonic() < finish:
            chunk = await self._ws.recv()
            logger.debug(f"Ответ на handshake: {chunk}")
            buf += chunk.encode() if isinstance(chunk, str) else chunk
            if WS_Const.RS in buf:
                return

        raise TimeoutError("Handshake timeout: не получили сообщение с разделителем RS за указанное время")

    async def _connect_loop(self) -> None:
        """
        Цикл подключения с повторными попытками до наступления stop_event.
        """
        while not self._stop_event.is_set():
            try:
                self.ws_request = f"{self._ws_url}/?access_token={self._access_token}"
                logger.info(f"Попытка подключения по wss: {self._ws_url}/?access_token=...")
                self._ws = await websockets.connect(
                    self.ws_request,
                    ping_interval=WS_Const.PING_INTERVAL,
                    ping_timeout=WS_Const.PING_TIMEOUT,
                    close_timeout=WS_Const.CLOSE_TIMEOUT,
                )
                # Handshake
                await self._handshake()
                # Запускаем приём в фоне
                self._recv_task = asyncio.create_task(self._recv_loop())
                logger.info("Websocket connected")
                return
            except ConnectionError:
                logger.exception(
                    f"Websocket подключение не установлено, повтор подключения через: {self._reconnect_interval}"
                )
                await asyncio.sleep(self._reconnect_interval)

    async def _recv_loop(self) -> None:
        """
        Прием сообщений, парсинг и отправка в очередь.
        """
        assert self._ws is not None

        while not self._stop_event.is_set():
            try:
                chunk = await self._ws.recv()
                if not self.suppress_recv_logging:
                    logger.debug(f"Сырые биты до обработки: {chunk[:100]}")
            except websockets.ConnectionClosed as e:
                logger.warning(f"WebSocket соединение разорвано: {e}")
                return

            result_message = parse_message(chunk)
            str_message = str(result_message)
            if not self.suppress_recv_logging:
                logger.info(f"Обработанное сообщение от api-gateway: {str_message[:500]} полное сообщение в attach")
                attach(
                    str_message,
                    name=f"Распакованное сообщение от api-gateway {datetime.now(ZoneInfo(WS_Const.ZONE_INFO))}",
                    attachment_type=attachment_type.TEXT,
                )
            await self.recv_queue.put(result_message)

    async def invoke(self, target: str, args: list) -> None:
        """
        Отправляет удаленный вызов invocation о websocket соединению

        Метод формирует сообщение по протоколу SignalR, включая:
        - типа сообщения
        - заголовки
        - уникальный идентификатор запроса
        - имя целевого метода
        - аргументы запроса

        Сообщение запаковывается в messagepack и отправляется через текущее websocket соединение
        """

        if not self._ws:
            raise websockets.WebSocketException("Не установлено подключение по wss")
        self._invocation_id = str(self._next_id)
        self._next_id += 1
        invocation = [
            WS_Const.DEFAULT_SIGNALR_MESSAGE_TYPE,
            WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
            self._invocation_id,
            target,
            [args],
        ]
        logger.info(f"Сообщение подготовлено к отправке: {invocation}")
        payload = msgpack.packb(invocation, use_bin_type=True)
        packet = encode_with_varint_prefix(payload)
        logger.debug(f"Отправляем сообщение: {packet}")
        await self._ws.send(packet)

    async def invoke_stream(self, target: str, args: list) -> None:
        """
        Отправляет streaming-вызов (StreamInvocation) по протоколу SignalR.
        """
        if not self._ws:
            raise websockets.WebSocketException("Не установлено подключение по wss")
        self._invocation_id = str(self._next_id)
        self._next_id += 1
        invocation = [
            WS_Const.STREAM_INVOCATION_MESSAGE_TYPE,
            WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
            self._invocation_id,
            target,
            [args],
        ]
        logger.info(f"Streaming-сообщение подготовлено к отправке: {invocation}")
        payload = msgpack.packb(invocation, use_bin_type=True)
        packet = encode_with_varint_prefix(payload)
        await self._ws.send(packet)

    async def receive_by_type(self, message_type: str, timeout: WS_Const.FILTERING_TIMEOUT) -> List[Any]:
        """
        Фильтрует сообщения по message_type
        """
        try:
            return await self._receive_by(filter_func=lambda msg: is_desired_type(msg, message_type), timeout=timeout)
        except websockets.WebSocketException:
            raise websockets.WebSocketException(f"Ошибка при фильтрации сообщений по {message_type}")

    async def receive_by_invocation_id(
        self, invocation_id: str, timeout: float = WS_Const.FILTERING_TIMEOUT
    ) -> List[Any]:
        """
        Фильтрует сообщения по invocation_id
        """
        try:
            return await self._receive_by(
                filter_func=lambda msg: is_desired_invocation_id(msg, invocation_id), timeout=timeout
            )
        except websockets.WebSocketException:
            raise websockets.WebSocketException("Ошибка при фильтрации сообщений по invocation_id")

    async def _receive_by(self, filter_func: Callable[[list], bool], timeout: float) -> List[Any]:
        """
        Ждет и фильтрует сообщение по filter_func
        """
        # 1) Единая точка вычисления дедлайна
        deadline = time.monotonic() + timeout

        while True:
            # 2) Остаток времени до таймаута
            remaining = deadline - time.monotonic()
            if remaining <= 0:
                raise asyncio.TimeoutError(f"Timeout при фильтрации сообщений {timeout:.1f} секунд")

            try:
                # 3) Получает сообщение
                msg = await asyncio.wait_for(self.recv_queue.get(), timeout=remaining)
            except asyncio.TimeoutError:
                # 4) Явно перехватывает и пробрасывает свой TimeoutError
                raise asyncio.TimeoutError(f"Timeout при фильтрации сообщений {timeout:.1f} секунд")

            # 5) Фильтрация по filter_func
            if isinstance(msg, list) and filter_func(msg):
                return msg
























арх конст

import os


class ImitatorConstants:
    TEST_SETTINGS_KEY_NAME: str = "test_settings"
    IMITATOR_FLAGS_KEY_NAME: str = "imitator_flags"
    IMITATOR_TIME_FORMAT: str = "%Y%m%dT%H%M%S"
    IMITATOR_START_DELAY_S: int = 100
    IMITATOR_FINISH_DELAY_MINUTE: float = 2.0
    IMITATOR_CHECK_CMD: str = "pgrep -f Playground"
    IMITATOR_KILL_CMD: str = "pkill -f Playground"
    IMITATOR_PATH = "/data/imitator/lds-flow-playground-csv-latest"
    IMITATOR_RUN_CMD: str = f"dotnet {IMITATOR_PATH}/TN.LDS.Flow.Playground.Application.dll"
    IMITATOR_LOG_FILE_NAME: str = "imitator.log"
    IMITATOR_KEY_NAME: str = "imitator_key"
    SERVER_IP_KEY_NAME: str = "server_ip"
    SANDBOX_PATH: str = "Sandbox_path"  # Удалить при переработке json_config_model.py
    SANDBOX_DATA: str = "data"
    SANDBOX_RULES: str = "rules.txt"
    SANDBOX_TAGS: str = "tags.txt"
    STAND_ENV_NAMING: str = os.environ.get("STAND_NAME")[:-1]
    CONFIG_PATH: str = f"/data/{STAND_ENV_NAMING}/configs"
    SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME: str = "signal_unit_conversion_rules.json"
    SIGNAL_UNIT_CONVERSION_RULES_BACKUP_DIR: str = "original_conversion_rules"
    SOURCE_TYPE_DEF_VALUE: str = "inflow"
    SPEED_DEF_VALUE: int = 1
    NS_DEF_VALUE: int = 2
    KAFKA_OFFSET_EARLIEST: str = "earliest"
    KAFKA_POLL_TIMEOUT_S: float = 1.0
    KAFKA_SESSION_TIMEOUT_MS: int = 10000
    TEST_ID_KEY: str = "test_id"
    AUTOTEST_DATA_PATH: str = "/data/imitator/autotest_data"
    POPEN_WAIT_TIMOUT_S: int = 5
    LONG_PROCESS_TIMEOUT_S: int = 20
    CMD_STATUS_OK: str = "OK"
    CMD_STATUS_FAIL: str = "FAIL"
    REDIS_STAND_ADDRESS: str = "10.7.49.210"
    CORE_START_DELAY_S: int = 5
    ENCODING_UTF_8: str = "utf-8"
    ENCODING_UTF_8_SIG: str = "utf-8-sig"
    ENCODING_LATIN_1: str = "latin-1"
    WIN_ENCODING_CP866: str = "cp866"  # Нужна только для запуска под WIN
    WIN_ENCODING_CP1251: str = "cp1251"  # Нужна только для запуска под WIN
    OS_NAME_WIN: str = 'nt'
    DEFAULT_ENCODINGS = [ENCODING_UTF_8_SIG, ENCODING_UTF_8, WIN_ENCODING_CP866, WIN_ENCODING_CP1251, ENCODING_LATIN_1]

    HOST_MAP = {
        "dev1": {IMITATOR_KEY_NAME: "DEV1_", SERVER_IP_KEY_NAME: "10.7.49.37"},
        "dev2": {IMITATOR_KEY_NAME: "DEV2_", SERVER_IP_KEY_NAME: "10.7.49.38"},
        "dev3": {IMITATOR_KEY_NAME: "DEV3_", SERVER_IP_KEY_NAME: "10.7.49.205"},
        "test1": {IMITATOR_KEY_NAME: "TEST1_", SERVER_IP_KEY_NAME: "10.7.49.206"},
        "test2": {IMITATOR_KEY_NAME: "TEST2_", SERVER_IP_KEY_NAME: "10.7.49.207"},
        "test3": {IMITATOR_KEY_NAME: "TEST3_", SERVER_IP_KEY_NAME: "10.7.49.208"},
        "test4": {IMITATOR_KEY_NAME: "TEST4_", SERVER_IP_KEY_NAME: "10.7.49.209"},
    }


class ClickhouseConstants(ImitatorConstants):
    CH_TABLE_NAMES: list = ["lds.records", "lds.records_lastvalue"]
    EVO_OBJECT_ID_KEY_NAME: str = "evoObjectId"
    EVO_PARAMETER_ID_KEY_NAME: str = "evoParameterId"
    OBJECT_ID_KEY_NAME: str = "objectId"
    PARAMETER_ID_KEY_NAME: str = "parameterId"
    EVO_ID_PAIRS_CHUNK_SIZE: int = 450
    NAME_CONTAINER: str = "clickhouse-2"


class DockerConstants:
    HOSTNAME_CMD: str = "hostname"
    STOP_CMD: str = "docker stop"
    START_CMD: str = "docker start"
    CHECK_STATUS_CMD: str = "docker inspect -f '{{.State.Status}}'"
    RUNNING_STATUS: str = "running"
    EXITED_STATUS: str = "exited"
    CORE_CONTAINERS_GROUP: list = ["lds-core-node1", "lds-core-node2", "lds-core-node3"]
    LB_CONTAINERS_GROUP: list = ["lds-layer-builder-node1", "lds-layer-builder-node2", "lds-layer-builder-node3"]
    JOURNAL_CONTAINERS_GROUP: list = ["lds-journals-node1", "lds-journals-node2", "lds-journals-node3"]
    WEB_APP_CONTAINERS_GROUP: list = ["lds-web-app-node1", "lds-web-app-node2", "lds-web-app-node3"]
    API_GW_CONTAINERS_GROUP: list = ["lds-api-gw-node1", "lds-api-gw-node2", "lds-api-gw-node3"]
    REPORTS_CONTAINERS_GROUP: list = ["lds-reports-node1", "lds-reports-node2", "lds-reports-node3"]


class RedisConstants:
    LB_REDIS_KEY: str = "lds-layer-builder"
    CORE_REDIS_KEY: str = "lds-core"
    REDIS_KEY_FIND_CMD: str = "docker exec -i redis-redis-01-1-1 redis-cli KEYS"
    REDIS_KEY_DEL_CMD: str = "| xargs -r docker exec -i redis-redis-01-1-1 redis-cli DEL"


class KeycloakClientConstants:
    TOKEN_LEEWAY: int = 30
    GRANT_TYPE: str = "password"
    KEYCLOAK_HEADERS: dict = {"Content-Type": "application/x-www-form-urlencoded"}
    TOKEN_KEY: str = "access_token"
    ISSUED_AT_KEY: str = "issued_at"
    EXPIRES_IN_KEY: str = "expires_in"


class TestOpsConstants:
    TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
    TESTOPS_UPLOAD_ERROR_MSG: str = "Ошибка при загрузке файлов allure отчета"
    TESTOPS_UPLOAD_RESPONSE_MSG_KEY: str = "message"
    TESTOPS_UPLOAD_FILES_KEY: str = "files"
    POST_METHOD: str = "post"
    ALLURE_RESULTS_DIR_NAME: str = "allure-results"
    GZIP_FILE_SIGNATURE: bytes = b'\x1f\x8b'


class HTTPClientConstants:
    GET_METHOD: str = "get"
    POST_METHOD: str = "post"
    TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
    TESTOPS_ATTACHMENTS_LIST_ENDPOINT: str = "/test_cases/{test_case_id}/attachments"
    TESTOPS_LOAD_ATTACHMENT_ENDPOINT: str = "/test_cases/{test_case_id}/attachments/{attachment_id}?download=1"
    TESTOPS_ATTACHMENTS_KEY: str = "items"
    TESTOPS_ATTACHMENT_FILENAME_KEY: str = "original_filename"
    TESTOPS_ATTACHMENT_ID_KEY: str = "id"
    TEST_ID_KEY: str = "test_id"
    IMITATOR_RUN_DATA_FILENAME: str = "imitator_run_data.tar.gz"  # Название архива данных для прогона


class WebSocketClientConstants:
    RS: bytes = b'\x1E'  # ASCII Record Separator
    HANDSHAKE_WAITING: float | int = 5.0
    HANDSHAKE_MESSAGE: str = "{\"protocol\":\"messagepack\",\"version\":1}"
    WS_HUBS: str = "/hubs/ldsClientHub"
    START_INVOCATION_ID: str = 1
    DEFAULT_RECONNECT_INTERVAL: float | int = 5.0
    PING_INTERVAL: int = 3
    PING_TIMEOUT: int = 5
    CLOSE_TIMEOUT: int = 30
    DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1  # invocation
    STREAM_INVOCATION_MESSAGE_TYPE: int = 4  # StreamInvocation
    STREAM_ITEM_MESSAGE_TYPE: int = 2  # StreamItem
    COMPLETION_MESSAGE_TYPE: int = 3  # Completion
    # Текст ошибки Completion при неуспешном streaming (SignalR CompletionWithDetail)
    COMPLETION_ERROR_MESSAGE_INDEX: int = 4
    DEFAULT_SIGNALR_MAP_HEADERS: dict = {}
    EVENT_TYPE_INDEX = 3
    INVOCATION_ID_INDEX = 2
    SERVICE_NAME = "web-app"
    COMPONENT = "lds"
    ROOT_DOMAIN = "tn.tngrp.ru"
    FILTERING_TIMEOUT: int | float = 10.0
    ZONE_INFO: str = 'Europe/Moscow'


class MockConstants:
    MOCK_DURATION: int = 60
    MOCK_TEST_DATA_ID: int = 1
    MOCK_TEST_DATA_NAME: str = "mock.tar.gz"


class EnvKeyConstants:
    CONNECTION_HOST: str = "CONNECTION_HOST"
    KEYCLOAK_URL: str = "KEYCLOAK_URL"
    KEYCLOAK_CLIENT_ID: str = "KEYCLOAK_CLIENT_ID"
    KEYCLOAK_CLIENT_SECRET: str = "KEYCLOAK_CLIENT_SECRET"
    KEYCLOAK_USERNAME: str = "KEYCLOAK_USERNAME"
    KEYCLOAK_PASSWORD: str = "KEYCLOAK_PASSWORD"
    TESTOPS_BASE_URL: str = "TESTOPS_BASE_URL"
    SSH_KEY_NAME: str = "SSH_KEY_NAME"
    SSH_USER_DEV: str = "SSH_USER_DEV"
    STAND_NAME: str = "STAND_NAME"
    DATA_PATH: str = "DATA_PATH"
    OPC_URL: str = "OPC_URL"
    TU_ID: str = "TU_ID"













енам
from enum import Enum, IntEnum, IntFlag
from typing import Mapping


class BaseStrEnum(Enum):
    def __str__(self) -> str:
        return f"{self.name} ({self.value})"


class BaseStrIntFlag(IntFlag):
    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [f"{flag.name} ({flag.value})" for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{', '.join(active_flags)}"

        return str(raw_value)


class BaseReasonEnum(IntFlag):
    """
    .report_text - название на русском языке.
    Вывод в формате report_text(value)
    """

    def __new__(cls, value: int, report_text: str):
        member = int.__new__(cls, value)
        member._value_ = value
        member.report_text = report_text
        return member

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [
            f"{flag.report_text} ({flag.value})" for flag in type(self) if flag.value and flag & self == flag
        ]
        if active_flags:
            return f"{', '.join(active_flags)}"

        return str(raw_value)

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст по числовому значению статуса"""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None


class TU(Enum):
    YAROSLAVL_MOSCOW = (1, "Ярославль - Москва", "volga.json")
    TIKHORETSK_NOVOROSSIYSK_2 = (2, "Тихорецк-Новороссийск-2", "tn2.json")
    TIKHORETSK_NOVOROSSIYSK_3 = (3, "Тихорецк-Новороссийск-3", "tn3.json")
    RODIONOVSKAYA_TIKHORETSKAYA = (4, "Родионовская–Тихорецкая", "lt3_rt.json")
    TIKHORETSKAYA_GRUSHEVAYA = (5, "Тихорецкая-6-Грушовая", "tn4_t6g.json")

    def __init__(self, tu_id: int, description: str, file_name: str) -> None:
        self.id = tu_id
        self.description = description
        self.file_name = file_name

    def __str__(self):
        return f"{self.id} - {self.description}"

    @classmethod
    def get_file_name_by_id(cls, target_id: int) -> str:
        for item in cls:
            if item.id == target_id:
                return item.file_name
        raise ValueError(f"ТУ с id = {target_id} не найден")


class ReplyStatus(Enum):
    OK = 200
    BAD_REQUEST = 400
    UNAUTHORIZED = 401
    FORBIDDEN = 403
    NOT_FOUND = 404
    REQUEST_TIMEOUT = 408
    CONFLICT = 409
    PRECONDITION_FAILED = 412
    RANGE_NOT_SATISFIABLE = 416
    TOO_MANY_REQUESTS = 429
    INTERNAL_SERVER_ERROR = 500
    NOT_IMPLEMENTED = 501
    SERVICE_UNAVAILABLE = 503
    GATEWAY_TIMEOUT = 504
    UNKNOWN_ERROR = 520


class ExportedDataType(IntEnum):
    """
    Тип экспортируемых данных
    """

    STATIONARY_STATUS_REPORT = 6
    LDS_STATUS_REPORT = 5
    LEAKS_REPORT = 4
    REJECTED_REPORT = 7

    def to_download_name(self) -> str:
        """Строковый тип для DownloadExportedDataRequest.exportedDataType"""
        return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]


_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
    ExportedDataType.STATIONARY_STATUS_REPORT: "StationaryStatusReport",
    ExportedDataType.LDS_STATUS_REPORT: "LdsStateReport",
    ExportedDataType.LEAKS_REPORT: "LeaksReport",
    ExportedDataType.REJECTED_REPORT: "RejectedSignalsReport",
}


class ExportStatus(IntEnum):
    """Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""

    NOT_READY = 0
    DONE = 1


class StationaryStatus(BaseStrEnum):
    UNSTATIONARY = (1, 'Нестационарный режим работы МТ')  # Нестационарный режим
    STATIONARY = (2, 'Стационарный режим работы МТ')  # Стационарный режим
    STOPPED = (3, 'МТ в режиме остановленной перекачки')  # Режим остановкленной перекачки

    def __new__(cls, value: int, report_text: str) -> "StationaryStatus":
        member = object.__new__(cls)
        member._value_ = value
        member.report_text = report_text
        return member

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст режима СОУ для отчёта по числовому значению статуса"""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None


class LeakStatus(BaseStrEnum):
    CONFIRMED = 2
    WAITING = 1
    POSSIBLE = 3


class LeakLocationStatus(BaseStrEnum):
    NODATA = 1  # нет данных
    LEFT_FROM_PUMP_STATION = 2  # Слева от МНС
    RIGHT_FROM_PUMP_STATION = 3  # Справа от МНС
    INSIDE_PUMP_STATION = 4  # Внутри МНС
    INSIDE_NPS = 5  # Внутри НПС при неработающей/отсутствующей МНС


class FieldName(Enum):
    SECTION_TYPE = "sectionType"
    SIGNAL_TYPE = "signalType"


class FilterCriteriaType(Enum):
    ONE_OF = "oneOf"
    ALL_OF = "allOf"


class FilterCriteriaValue(Enum):
    MASK = "mask"
    MASK_REASON = "maskReason"
    LEAK = "leak"
    LEAK_COORDINATE = "leakCoordinate"
    PUMPING_STATUS = "pumpingStatus"
    FREE_FLOW = "freeFlow"
    ACKNOWLEDGE = "acknowledge"
    LEAK_TIME = "leakTime"
    LEAK_VOLUME = "leakVolume"
    LDS_STATUS = "ldsStatus"
    CONTROLLED_SITES = "controlledSites"
    LINEAR_PARTS = "linearParts"
    SERVER_DOWN = "serverDown"
    TIME_SYNCHRONIZATION_DISABLE = "timeSynchronizationDisable"
    FREE_FLOW_START_COORDINATE = "freeFlowStartCoordinate"


class SortingParam(Enum):
    OBJECT_NAME = "objectName"
    ADDRESS = "address"


class SortingType(Enum):
    ASCENDING = "ascending"
    DESCENDING = "descending"


class Direction(Enum):
    """Направление прокрутки"""

    PREV = 1
    NEXT = 2
    FIRST = 3
    LAST = 4


class LdsStatus(BaseStrEnum):
    """
    Режим работы СОУ.
    report_text - значение колонки 'Режим работы СОУ' в xlsx-отчёте об утечках.
    """

    FAULTY = (1, "СОУ неисправна")
    INITIALIZATION = (2, "СОУ в инициализации")
    DEGRADATION = (3, "СОУ в ухудшенных характеристиках")
    SERVICEABLE = (4, "СОУ исправна")

    def __new__(cls, value: int, report_text: str) -> "LdsStatus":
        member = object.__new__(cls)
        member._value_ = value
        member.report_text = report_text
        return member

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст режима СОУ для отчёта по числовому значению статуса"""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None


class ConfirmationStatus(BaseStrEnum):
    FAULTY = 0  # Неисправность
    AWAITING = 1  # Предварительная
    NOT_CONFIRMED = 2  # Не подтверждена
    CONFIRMED = 3  # Подтверждена
    CONFIRMED_AND_LEAK_CLOSED = 4  # Завершена


class ReservedType(BaseStrEnum):
    """Алгоритмы СОУ"""

    FAULTY = 0  # Неисправность
    STOP = 1  # Дифференциальный
    STATIONARY_FLOW = 2  # Стационарный
    UNSTATIONARY_FLOW = 3  # Модельный
    BALANCE_IN_NPS = 4  # Баланс внутри НПС
    CHANGED_IN_DECISION_MAKING = 5  # Стационарный + Изменено в АПР
    CREATED_IN_DECISION_MAKING = 6  # Создано в АПР


class MessageType(BaseStrIntFlag):
    AUTHENTICATION = 1  # Вход в систему
    REJECTION = 1 << 2  # Отбраковка сигналов
    LDS_STATUS = 1 << 3  # Режим работы СОУ
    INPUT_SIGNALS = 1 << 6  # Входные сигналы
    PUMPING_STATUS = 1 << 7  # Режим работы МТ
    MASKING_LDS = 1 << 8  # Маскирование СОУ
    FREE_FLOWS = 1 << 9  # Самотечное течение
    LEAKS = 1 << 10  # Утечка


class MessagePriority(BaseStrIntFlag):
    LOW = 1  # Прочее
    COMMON = 1 << 1  # Информационное
    MEDIUM = 1 << 2  # Значительное
    HIGH = 1 << 3  # Важное
    VERY_HIGH = 1 << 4  # Особой важности


class LdsStatusDegradation(BaseReasonEnum):
    """
    Причины режима работы СОУ: Ухудшение характеристик
    """

    LEAK_ON_ADJACENT_DIAGNOSTIC_AREAS = (1 << 0, 'Утечка на соседнем диагностическом участке')
    ADDITIVE_INJECTORS_OPERATION = (1 << 1, 'Наличие ПТП')
    PIG_SENSOR_PASSAGE = (1 << 2, 'Наличие СОД')
    TRIGGERING_EMERGENCY_RESET = (1 << 3, 'Срабатывание аварийного сброса или предохранительных клапанов')
    STARTING_PUMPING_OUT_PUMPS = (1 << 4, 'Работа насосов откачки')
    EXCEEDING_DISTANCE_BETWEEN_SERVICEABLE_PRESSURE_SENSORS = (
        1 << 5,
        'Расстояние между ближайшими исправными СИ давления на пути перекачки более 50 км',
    )
    FAULTY_PRESSURE_SENSORS_AT_PUMP_STATION_NODES = (1 << 6, 'Отказ СИ давления на входе/выходе НПС')
    REJECTION_TEMPERATURE_SENSOR = (1 << 7, 'Отказ СИ температуры')
    REJECTION_VISCOSITY_SENSOR = (1 << 8, 'Отказ СИ вязкости')
    REJECTION_DENSITY_SENSOR = (1 << 9, 'Отказ СИ плотности')
    GRAVITY_SECTION_IN_PUMPING_MODE = (1 << 10, 'Наличие самотечного участка/участка с неполным сечением')
    ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = (1 << 11, 'Менее 4 исправных СИ давления на разных КП ЛЧ и НПС')
    EXCEEDING_DISTANCE_BETWEEN_FLOW_METERS = (
        1 << 12,
        'Расстояние между ближайшими исправными СИ расхода на пути перекачки более 200 км',
    )
    GRAVITY_SECTION_IN_STOPPED_PUMPING_MODE = (
        1 << 13,
        'Наличие самотечного участка/участка с неполным сечением в режиме остановленной перекачки',
    )


class LdsStatusFaulty(BaseReasonEnum):
    """
    Причины режима работы СОУ: Неисправность
    """

    NO_DATA_SOURCE_CONNECTION = (1 << 0, 'Потеря связи СОУ с СДКУ')
    ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = (1 << 1, 'Менее 4 КП с достоверными СИ давления')
    ABSENCE_MIN_FLOW_METERS_REQUIRED_NUMBER = (1 << 2, 'Недостоверность граничного СИ расхода')


class LdsStatusInitialization(BaseReasonEnum):
    """
    Причины режима работы СОУ: Инициализация
    """

    ACCUMULATION_DATA = (1 << 0, 'Накопление данных')
    EXITING_FAULTY_MODE = (1 << 1, 'Выход СОУ из режима «Неисправна»')
    COLD_START_OF_SERVERS = (1 << 2, 'Одновременный «холодный» запуск нескольких серверов СОУ')
    SWITCHING_SHUT_OFF_IN_STOPPED_PUMPING_MODE = (
        1 << 3,
        'Переключение запорной арматуры в режиме остановленной перекачки',
    )
    USER_ACTION = (1 << 4, 'По команде пользователя')


class StationaryReason(BaseStrEnum):
    """
    Причины режима работы МТ: Стационар для ЭФ Журнал
    """

    # Отклонения давления и расхода не превышают допустимых отклонений
    PRESSURE_AND_FLOW_MOVING_AVERAGES_MEET_CRITERIA = "Отклонения давления и расхода не превышают допустимых отклонений"


class UnStationaryReason(BaseStrIntFlag):
    """
    Причины режима работы МТ: Нестационар
    """

    # Пуск/остановка трубопровода; включение/отключение магистрального насоса; включение/отключение НПС
    CHANGING_EQUIPMENT_STATUS = 1 << 0
    # Начало/окончание работы насосов откачки емкостей на НПС и ЛЧ технологического участка
    CHANGING_WORKING_OF_PUMPING_OUT_PUMPS = 1 << 1
    # Изменение частоты вращения в ручном режиме и/или изменение уставки регулирования в автоматическом режиме
    # работы МНА с ЧРП
    CHANGING_MAIN_PUMPS_ROTATION_SPEED = 1 << 2
    CHANGING_BLOCK_VALVES_STATUS = 1 << 3  # Полное или частичное открытие/закрытие задвижки
    SWITCHING_TANKS = 1 << 4  # Переключение резервуаров
    CHANGING_ACCEPTANCE_OR_DELIVERY_STATE = 1 << 5  # Начало или прекращение приема/сдачи нефти/нефтепродуктов
    TRIGGERING_EMERGENCY_RESET_OR_PWSS_OPERATION = 1 << 6  # Задействование аварийного сброса
    SAFETY_VALVES_ACTUATION = 1 << 7  # Срабатывание предохранительных клапанов
    # Изменение уставки регулирования по давлению узлов регулирования давления, работающих в автоматическом режиме
    # управления
    CHANGING_PRESSURE_SETTING = 1 << 8
    # Изменение процента открытия/закрытия заслонки узлов регулирования давления, работающих в ручном режиме управления
    CHANGING_OPENING_PERCENTAGE_VALVE = 1 << 9
    CHANGING_ADDITIVE_INJECTOR_STATUS_OR_FLOW = 1 << 10  # Начало/окончание ввода ПТП или изменение расхода вводимой ПТП
    LEAK_END = 1 << 11  # Окончание утечки
    # Наличие сигнала статуса «Открывается»/»Закрывается» запорной арматуры (не в режиме имитации),
    # расположенной в точке, гидравлически связанной с рассматриваемым ДУ
    TO_OPEN_OR_TO_CLOSE_STATUS = 1 << 12
    # Нестационарный режим работы/отсутствие сигнала о режиме работы смежного ТУ, работающего
    # в единой гидравлической системе с защищаемым ТУ
    ADJACENT_TU = 1 << 13
    COLD_START = 1 << 14  # Одновременный «холодный» запуск нескольких серверов СОУ


class StoppedPumpingReason(BaseStrIntFlag):
    """
    Причины режима работы МТ: Остановленный
    """

    # На ДУ отсутствуют работающие НА, при этом показания СИ расхода не превышают 1 % от максимального значения
    # диапазона измерений всех СИ расхода на технологическом участке
    STOPPING_PUMPS = 1 << 0
    CUTOFF_AREA = 1 << 1  # Участок отсечен запорной арматурой от подкачек/откачек


class RejectionCriteria(IntFlag):
    """Критерии отбраковки сигналов criteriaNames"""

    QUALITY = 1 << 0  # qualityRejection
    RANGE = 1 << 1  # rangeRejection
    EMPTY = 1 << 2  # emptyRejection
    TIME = 1 << 3  # timeRejection
    CONSTANT_SIGNAL = 1 << 4  # constantSignalRejection
    DISCHARGE = 1 << 5  # dischargeRejection
    VTOR = 1 << 6  # VTORRejection
    NEARBY = 1 << 7  # nearbyRejection
    DIAGNOSTIC_INFO = 1 << 8  # diagnInfoRejection

    @property
    def backend_name(self) -> str:
        names = {
            "QUALITY": "qualityRejection",
            "RANGE": "rangeRejection",
            "EMPTY": "emptyRejection",
            "TIME": "timeRejection",
            "CONSTANT_SIGNAL": "constantSignalRejection",
            "DISCHARGE": "dischargeRejection",
            "SIGMA3": "sigma3Rejection",
            "VTOR": "VTORRejection",
            "NEARBY": "nearbyRejection",
            "DIAGNOSTIC_INFO": "diagnInfoRejection",
        }
        return names.get(self.name or "", str(int(self)))

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{'|'.join(active_flags)} ({raw_value})"

        return str(raw_value)


class RejectionSensorTag(Enum):
    """
    Теги датчиков для тестов отбраковки (id, description=tag)
    Айди тегов подставляется на ходу из текущей версии конфы с сервера
    """

    KP_8_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pin")  # nearby_pressure_pin range_upper_pressure range_lower_pres
    NPS_TIH_5_Vmom = (0, "AK.CHTN.NPS_TIH_5.UZR_1.Vmom")  # diagnostic_info_flowrange_upper_flow range_lower_flow
    KP_8_Pout = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout")  # nearby_pressure_pout
    KP_209_1_Pin = (0, "AK.CHTN.LU_VELKRIM.KP_209-1.SW_215-3-1.Pin")  # empty_pressure
    KP_7_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_7.SW_6-3.Pin")  # vtor_pressure
    NPS_KRIM_P_Vmom = (0, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom")  # empty_flow quality_flow

    def __init__(self, sensor_id: int, description: str) -> None:
        self.id = sensor_id
        self.description = description

    @classmethod
    def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
        """
        Обновляет sensor_id по tag из конфигурации стенда.
        """
        missing_tags = []
        for sensor in cls:
            sensor_id = sensor_ids_by_address.get(sensor.description)
            if sensor_id is None:
                missing_tags.append(sensor.description)
                continue
            sensor.id = sensor_id

        if missing_tags:
            raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")

    def __str__(self):
        return f"{self.id} - {self.description}"


class UserActions(IntFlag):
    USER_LOGIN = 1  # Вход пользователя
    USER_EXIT = 1 << 1  # Выход пользователя
    FAILED_USER_LOGIN = 1 << 2  # Неуспешная попытка входа пользователя
    ALGORITHMS_REINITIALIZATION = 1 << 3  # Переинициализация алгоритмов
    SIGNAL_MASK_SIM = 1 << 4  # Маскирование и имитация входных сигналов
    LDS_MASKING = 1 << 5  # Маскирование СОУ
    EXPORT = 1 << 6  # Экспорт и выгрузки
    SETTINGS_CHANGE = 1 << 7  # Изменение настроек
    LEAK_ACK = 1 << 8  # Квитирование сообщения об утечке
    LEAK_REMOVE = 1 << 9  # Исключение неактивных утечек
    LDS_ADMIN = 1 << 10  # Администрирование СОУ
    PIG_CONTROL = 1 << 11  # Управление СОД


class SiteKpKp(Enum):
    """controlledSiteId, segmentId"""

    TIXORECZKAYA_NOVOVELICHKOVSKAYA = {'controlledSiteId': 6012, 'segmentId': 6013}
    NOVOVELICHKOVSKAYA_KRYMSKAYA = {'controlledSiteId': 6074, 'segmentId': 6075}
    KRYMSKAYA_GRUSHOVAYA = {'controlledSiteId': 6220, 'segmentId': 6221}
    BACKUP_ROUTE_BEJSUG = {'controlledSiteId': 6242, 'segmentId': 6243}
    BACKUP_ROUTE_PONURA = {'controlledSiteId': 6076, 'segmentId': 6077}
    BACKUP_ROUTE_KUBAN = {'controlledSiteId': 6088, 'segmentId': 6089}
    NPZ_AFIPSKIJ = {'controlledSiteId': 6244, 'segmentId': 6245}
    NPZ_ILINSKIJ = {'controlledSiteId': 6120, 'segmentId': 6121}


class SignalType(IntFlag):
    """Типы сигналов: режим МТ - 8, режим СОУ - 256, самотеки -16"""

    REGLU = 1 << 3
    REGSOU = 1 << 8
    GRAVITYPIPE = 1 << 4

    @property
    def backend_name(self) -> str:
        names = {
            "REGLU": "PumpingStatus",
            "REGSOU": "LdsStatus",
            "GRAVITYPIPE": "FreeFlow",
        }
        return names.get(self.name or "", str(int(self)))

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{'|'.join(active_flags)} ({raw_value})"

        return str(raw_value)


class GravityPipe(Enum):
    expected_lds_status_gravity_true = (1, "Наличие самотека")
    expected_lds_status_gravity_false = (0, "Отсутствие самотека")

    def __init__(self, status_id: int, description: str) -> None:
        self.id = status_id
        self.description = description

    def __str__(self):
        return f"{self.id} - {self.description}"


class MeasureConversionRule(Enum):
    MPA_MEASURE = "MPA_MEASURE"
    KG_CM_MEASURE = "KG_CM_MEASURE"


























конст 

"""
Общие константы для тестов.
"""

from constants.enums import StationaryStatus


class BaseTN3Constants:
    # ===== Константы для запросов журнала =====
    COLUMN_SELECTION_DEF = [
        'Time',
        'User',
        'MainPipeline',
        'TechnologicalSection',
        'TechnologicalObject',
        'ControlPoint',
        'Object',
        'SignalName',
        'Event',
        'Value',
        'MessageType',
        'Tag',
        'Status',
    ]

    # ===== Типы сигналов и объектов =====
    PRESSURE_SENSOR_OBJECT_TYPE = 2
    FLOWMETER_OBJECT_TYPE = 3
    PRESSURE_SIGNAL_TYPE = 1
    FLOW_SIGNAL_TYPE = 4

    # ===== Суффиксы адресов выходных сигналов =====
    ADDRESS_SUFFIX_ACK_LEAK = "AckLeak"
    ADDRESS_SUFFIX_LEAK = "Leak"
    ADDRESS_SUFFIX_MASK = "Mask"
    ADDRESS_SUFFIX_POINT_LEAK = "PointLeak"
    ADDRESS_SUFFIX_Q_LEAK = "QLeak"
    ADDRESS_SUFFIX_TIME_LEAK = "TimeLeak"
    ADDRESS_SUFFIX_PUMPING_STATUS = "RegLU"
    ADDRESS_SUFFIX_LDS_STATUS = "RegSOU"

    # ===== Ключи поиска =====
    LEAK_LINEAR_PART_ID_KEY = "id"
    CONTROLLED_SITE_ID_AND_SEGMENT_ID = "controlledSiteId"

    # ===== Общее количество участков КП-КП =====
    LIMIT_CONTROLLED_SITES = 500
    COUNT_CONTROLLED_SITES = 114

    # ===== Ожидаемые значения выходных сигналов =====
    OUTPUT_IS_ACK_LEAK = "1"
    OUTPUT_IS_LEAK = "1"
    OUTPUT_IS_NOT_LEAK = "0"
    OUTPUT_IS_NOT_MASK = "0"
    OUTPUT_IS_MASK = "1"

    MASS_KG = 3600  # Коэффициент массы, нужно умножить, чтобы получить объем в м3/час
    KGS_SM2 = 98066  # Коэффициент давления, нужно умножить, чтобы получить объем в кгс/см2
    ALLOWED_VOLUME_DIFF = 0.3  # Относительная погрешность по объему
    ALLOWED_DISTANCE_DIFF_METERS = 5000  # Погрешность координаты в метрах
    KM_TO_METERS = 1000  # Перевод в метры
    LEAK_START_INTERVAL = 2100  # Интервал от старта имитатора до первого обнаружения утечки - 35 минут по умолчанию
    LEAK_LOCATION_STATUS = 1

    # ===== Параметры выходных сигналов =====
    OUTPUT_TEST_DELAY = 120  # Задержка для теста выходных сигналов в секундах
    OUTPUT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"  # Формат времени для парсинга выходных сигналов

    # ===== Параметры маскирования =====
    IS_MASKED_TRUE = True
    IS_MASKED_FALSE = False

    # ===== Параметры имитации =====
    PRESSURE_IMITATION_RANGE = (1, 40)
    VOLUME_IMITATION_RANGE = (100, 2400)
    GOOD_QUALITY_VAL = 1

    # ===== Константы журнала =====
    JOURNAL_EVENT_MASK = "Установка признака маскирования"
    JOURNAL_EVENT_UNMASK = "Снятие признака маскирования"
    JOURNAL_SIGNAL_PRESSURE = "Значение давления"
    JOURNAL_SIGNAL_FLOW = "Расход"
    JOURNAL_MESSAGE_TYPE_USER_ACTIONS = "Действия пользователя"
    JOURNAL_STATUS_SUCCESS = "Успешно"
    JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL = 2
    JOURNAL_MASK_PAGINATION_LIMIT = 10
    JOURNAL_EVENT_POSSIBLE_LEAK = "Возможна утечка"
    JOURNAL_EVENT_DETECTED_LEAK = "Утечка."
    JOURNAL_MESSAGE_TYPE_LEAKS = "Утечки"
    JOURNAL_EVENT_COMPLETED_LEAKS = "Утечка завершена"
    JOURNAL_EXPECTED_MASK_MSG_TOTAL = 4
    JOURNAL_MASK_EXPECTED_EVENTS = {"Установка признака маскирования", "Снятие признака маскирования"}
    JOURNAL_MASK_EXPECTED_SIGNALS = {"Значение давления", "Расход"}
    JOURNAL_PAGINATION_LIMIT = 10
    JOURNAL_PAGINATION_REJECT_LIMIT = 20
    JOURNAL_PAGINATION_STATUS_LIMIT = 120
    JOURNAL_STATUS_TOTAL_WAIT = 300  # Время установки режима в данных, в секундах
    JOURNAL_EVENT_LEAK_ACKNOWLEDGED = "Сообщение об утечке квитировано"
    JOURNAL_EVENT_LDS_INIT_ACCUM_DATA = "СОУ в инициализации (Накопление данных)"
    JOURNAL_EVENT_LDS_INIT_COLD_START = "СОУ в инициализации (Одновременный «холодный» запуск нескольких серверов СОУ)"
    JOURNAL_MESSAGE_TYPE_LDS_STATUS = "Режим работы СОУ"
    JOURNAL_MESSAGE_TYPE_REJECTION = "Отбраковка"
    JOURNAL_MESSAGE_EVENT_STATIONARY = (
        "Стационарный режим работы МТ (Отклонения давления и расхода не превышают допустимых отклонений)"
    )
    JOURNAL_MESSAGE_EVENT_NOT_STATIONARY = (
        "Нестационарный режим работы МТ (Одновременный «холодный» запуск нескольких серверов СОУ)"
    )
    JOURNAL_MESSAGE_EVENT_STOP = (
        "МТ в режиме остановленной перекачки (На ДУ отсутствуют работающие НА, "
        "при этом показания СИ расхода не превышают 1 % отмаксимального значения "
        "диапазона измерений всех СИ расхода на технологическом участке)"
    )
    SEC_PER_MIN = 60

    # ===== Параметры подтверждения =====
    IS_ACKNOWLEDGED_FALSE = False

    # ===== Параметры BalanceAlgorithmResults =====
    BALANCE_ALGORITHM_POLL_INTERVAL = 15  # Интервал опроса подписки в секундах
    BALANCE_ALGORITHM_TOTAL_WAIT = 300  # Общее время опроса в секундах
    DEBALANCE_TOLERANCE = 0.25  # Допустимое отклонение дебаланса от порога 30%

    # ===== Теги датчиков для маскирования и имитации =====
    PRESSURE_SENSOR_ADDRESS = "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout"
    FLOWMETER_ADDRESS = "AK.CHTN.NPS_TIH_5.UZR_1.Vmom"
    SENSOR_IDS_BY_ADDRESS = {}

    # ===== Прочие константы =====
    BASIC_MESSAGE_TIMEOUT = 10.0  # Таймаут ожидания сообщений в секундах
    MASK_MESSAGE_TIMEOUT = 180.0  # Таймаут ожидания сообщений в секундах
    PRECISION = 3  # Точность округления для координат
    DIGITS_WITH_DOT_PATTERN = r'\d+(?:\.\d+)?'  # Регулярное выражение для поиска чисел с точкой
    DIAGNOSTIC_AREA_BASE_IDS = {
        "Т-Н-3.НПС-5 «Тихорецкая».УЗР СИКН ТН-3 - Т-Н-3.НПС-5 «Тихорецкая».УЗР вых": (9992054907, (9992054908,)),
        "Т-Н-3.НПС-5 «Тихорецкая».УЗР вых - Т-Н-3.УЗР НПС-3 «Нововеличковская».": (
            9992054908,
            (9992054907, 9992054909),
        ),
        "Т-Н-3.УЗР НПС-3 «Нововеличковская». - Т-Н-3.НПС-2 «Крымская».УЗР СИКН Т-К": (
            9992054909,
            (9992054908, 9992054910, 9992054911),
        ),
        "Т-Н-3.НПС-2 «Крымская».УЗР СИКН Т-К - Т-Н-3.НПС-2 «Крымская».УЗР вых": (9992054910, (9992054909, 9992054912)),
        "Н-К.КП-0.УЗР 0км - Н-К.КП-30.УЗР 30км": (9992054911, (9992054909, 9992054913, 9992054914)),
        "Т-Н-3.НПС-2 «Крымская».УЗР вых - Т-Н-3.НПС«Крымская».УЗР вых Камеры пуска": (
            9992054912,
            (9992054910, 9992054915),
        ),
        "Н-К.КП-30.УЗР 30км - ПСП Афипский СИКН 1015 УЗР": (9992054913, (9992054911,)),
        "Н-К.УП ИНПЗ.УЗР 4,3км - Н-К.ИНПЗ.УЗР СИКН 1019": (9992054914, (9992054911,)),
        "Т-Н-3.НПС«Крымская».УЗР вых Камеры пуска - Т-Н-3.«Грушовая».УЗР-700": (9992054915, (9992054912,)),
    }
    REPRESENTATIVE_DIAGNOSTIC_AREA_IDS = [2, 3]  # Список показательных ДУ для определения режима СОУ
    ZONE_INFO: str = "Europe/Moscow"
    SECONDS_PER_HOUR: int = 3600
    CRITERIA_NAMES_FIELD: str = 'criteriaNames'


class ExportReportConstants:
    """Константы для теста формирования отчёта об утечках"""

    # Максимальное ожидание уведомления о готовности отчёта
    NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
    # Максимальное время ожидания появления отчёта в списке после уведомления
    LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
    # Интервал между запросами getExportedFilesListRequest
    LIST_POLL_INTERVAL_SECONDS: float = 10.0
    # Таймаут получения ответа на скачивание
    DOWNLOAD_TIMEOUT_SECONDS: float = 60.0

    # ===== Имя файла отчёта =====
    LEAKS_REPORT_NAME_PART: str = "Отчет об утечках"  # подстрока в имени файла/отчёта
    XLSX_EXTENSION: str = ".xlsx"
    # Сигнатура zip-архива, используется для проверки формата файла по содержимому
    ZIP_SIGNATURE: bytes = b'PK\x03\x04'

    # ===== Формат даты/времени в отчёте =====
    REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
    # Регулярное выражение для извлечения двух дат из заголовка
    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    # Регулярное выражение для извлечения двух дат из названия файла
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет об утечках (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

    # Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    # ===== Названия колонок =====
    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_LDS_STATUS: str = "Режим работы СОУ"
    COL_MASK_INFO: str = "Информация о маскировании"
    COL_COORDINATE: str = "Координата"
    COL_LEAK_VOLUME: str = "Объемный расход утечки"
    COL_MT_MODE: str = "Режим работы МТ"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_LDS_STATUS,
        COL_MASK_INFO,
        COL_COORDINATE,
        COL_LEAK_VOLUME,
        COL_MT_MODE,
    ]

    MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"

    # ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
    STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
        StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
        StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
        StationaryStatus.STOPPED.value: "МТ в режиме остановленной перекачки",
    }

    # ===== Прочее =====
    DEFAULT_SHEET_INDEX: int = 0

    SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST: str = "SubscribeReportsDataExportedRequest"
    EXPORT_REPORTS_COMMAND_REQUEST: str = "ExportReportsCommandRequest"
    REPORT_DATA_EXPORTED_NOTIFICATION: str = "ReportDataExportedNotification"
    GET_EXPORTED_DATA_LIST_REQUEST: str = "GetExportedDataListRequest"
    EXPORTED_DATA_LIST_LIMIT: int = 10
    DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"

    # Допустимая погрешность при сравнении границ периода отчёта
    REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
    # Формат даты/времени в имени скачиваемого xlsx-файла
    REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"


class ExportLdsStatusReportConstants:
    """Константы для теста формирования xlsx-отчёта о режиме работы СОУ"""

    LDS_STATUS_REPORT_NAME_PART: str = "Отчет о режиме работы СОУ"
    SECTION_NAMES: list[str] = [
        "НПС-5 Тихорецкая - НПС-3 Нововеличковская",
        "НПС-3 Нововеличковская - НПС-2 Крымская",
        "НПС-2 Крымская - НПС Грушовая",
    ]
    TOTAL_WORK_DURATION_LABEL: str = "Суммарное время работы:"
    ZERO_DURATION_TEXT: str = "0:00:00"
    TOTAL_DURATION_TOLERANCE_SECONDS: int = 5
    # Число частей времени при split(':') - часы:минуты:секунды (1:02:51) и минуты:секунды (02:51)
    DURATION_PARTS_COUNT_H_MM_SS: int = 3
    DURATION_PARTS_COUNT_MM_SS: int = 2

    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    COL_SECTION: str = "Наименование участка"
    COL_FAULTY: str = "Неисправность"
    COL_DEGRADATION: str = "В ухудшенных характеристиках"
    COL_INITIALIZATION: str = "Инициализация"
    COL_SERVICEABLE: str = "Исправность"

    MODE_DURATION_COLUMNS: list = [
        COL_FAULTY,
        COL_DEGRADATION,
        COL_INITIALIZATION,
        COL_SERVICEABLE,
    ]

    EXPECTED_COLUMN_HEADERS: list = [COL_SECTION, *MODE_DURATION_COLUMNS]

    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет о режиме работы СОУ с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет о режиме работы СОУ\. (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )


class ExportRejectedReportConstants:
    """Константы для теста формирования xlsx-отчёта об отбракованных входных данных"""

    REJECTED_REPORT_NAME_PART: str = "Отчет об отбракованных входных данных"
    REJECTED_REPORT_NAME_PART_ALT: str = "Отчёт об отбракованных входных данных"

    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_EVENT: str = "Событие"
    COL_VALUE: str = "Значение"
    COL_DURATION: str = "Продолжительность отбраковки"
    COL_TAG: str = "Тег сигнала"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_EVENT,
        COL_VALUE,
        COL_DURATION,
        COL_TAG,
    ]

    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'[Оо]тч[её]т об отбракованных входных данных с '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^[Оо]тч[её]т об отбракованных входных данных (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

    TIME_FILTER_TOLERANCE_SECONDS: int = 60

    # Суффикс сигнала в колонке "Объект" отчёта (после последней точки в строке)
    REPORT_SIGNAL_FLOW: str = "Расход"
    REPORT_SIGNAL_PRESSURE: str = "Давление"
    REPORT_SIGNAL_SUFFIX_BY_EXPECTED_NAME: dict = {
        BaseTN3Constants.JOURNAL_SIGNAL_FLOW: REPORT_SIGNAL_FLOW,
        BaseTN3Constants.JOURNAL_SIGNAL_PRESSURE: REPORT_SIGNAL_PRESSURE,
    }

    # Разбор колонки "Объект": участок трубопровода и суффикс сигнала разделяются последней точкой
    OBJECT_SIGNAL_SEPARATOR: str = "."
    OBJECT_SIGNAL_RSPLIT_MAXSPLIT: int = 1

    REJECTED_REPORT_HEADER_TITLE_PART: str = "отчет об отбракованных входных данных с"
    REJECTED_REPORT_HEADER_TITLE_PART_ALT: str = "отчёт об отбракованных входных данных с"


class MeasureUnitConstants:
    MPA_MEASURE: str = "MPa"
    KG_CM_MEASURE: str = "kgf/cm^2"