Загрузка данных


utils/helpers/asserts.py

from __future__ import annotations

import traceback
from typing import Any, List, Optional, TypeVar

import allure
from assertpy import assert_that
from pytest import fail

ObjectType = TypeVar("ObjectType")


class SoftAssertions:
    """
    Контекстный менеджер для "мягких" сравнений.
    Внутри теста используется так:
        with SoftAssertions() as soft:
            StepCheck(..., failures=soft).actual(...).expected(...).equal_to()
    По выходу, если были ошибки — они прикрепляются к Allure и поднимается Aggregated AssertionError.
    """

    def __init__(self) -> None:
        self.failures: List[str] = []

    def __enter__(self) -> List[str]:
        return self.failures

    def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
        # Если внутри контекста появились внешние исключения (не связанные с проверками) — не подавляем их.
        if exc_type is not None:
            return False

        if not self.failures:
            return False

        # Прикрепляем все собранные failure-traceback'и к Allure, чтобы их было удобно смотреть
        joined = "\n\n".join(self.failures)
        allure.attach(joined, name="soft assertion failures", attachment_type=allure.attachment_type.TEXT)

        # Поднимаем итоговую ошибку, чтобы CI увидел падение теста
        raise AssertionError("Soft assertion failures:\n\n" + joined)


class StepMessageBuilder:
    """
    Составляет разные сообщения для allure.step под конкретный вид assert
    """

    def __init__(self, check_step: str, field_name: str) -> None:
        self.check_step = check_step
        self.field_name = field_name

    def _build_message(self, message_parts) -> str:
        """
        Собирает сообщение из списка с нужным разделителем
        """
        return "\n".join([self.check_step] + message_parts)

    @staticmethod
    def _format_val(val: Any) -> str:
        """
        Вспомогательная функция для аккуратного форматирования значения в сообщении.
        Она пытается использовать repr(), но на случай исключения возвращает str().
        """
        try:
            return str(val)
        except TypeError:
            return repr(val)

    @staticmethod
    def _item_count(val: Any) -> int:
        """
        Считает количество элементов, если возможно
        """
        return len(val) if hasattr(val, "__len__") else 0

    def equal_to_msg(
        self,
        exp_value: Any,
        act_value: Any,
    ) -> str:
        message_parts = [
            f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)}",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        return self._build_message(message_parts)

    def is_not_equal_to_msg(
        self,
        exp_value: Any,
        act_value: Any,
    ) -> str:
        message_parts = [
            f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)} не равно фактическому",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        return self._build_message(message_parts)

    def is_not_none_msg(self, act_value: Any) -> str:
        message_parts = [
            f"Ожидаемый результат: {self.field_name} не пустое",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        return self._build_message(message_parts)

    def is_not_empty_msg(self, act_value: Any) -> str:
        item_count = self._item_count(act_value)
        message_parts = [
            f"Ожидаемый результат: {self.field_name} не пустое",
            f"Фактический результат: количество элементов в {self.field_name} = {item_count}",
        ]
        return self._build_message(message_parts)

    def is_empty_msg(self, act_value: Any) -> str:
        item_count = self._item_count(act_value)
        message_parts = [
            f"Ожидаемый результат: {self.field_name} пустое",
            f"Фактический результат: количество элементов в {self.field_name} = {item_count}",
        ]
        return self._build_message(message_parts)

    def is_close_to_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
        message_parts = [
            f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)}",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        if extra_info:
            message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
        return self._build_message(message_parts)

    def is_less_than_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
        message_parts = [
            f"Ожидаемый результат: Значение в поле {self.field_name} < {self._format_val(exp_value)}",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        if extra_info:
            message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
        return self._build_message(message_parts)

    def is_greater_than_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
        message_parts = [
            f"Ожидаемый результат: Значение в поле {self.field_name} > {self._format_val(exp_value)}",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        if extra_info:
            message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
        return self._build_message(message_parts)

    def is_greater_than_or_equal_to_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
        message_parts = [
            f"Ожидаемый результат: Значение в поле {self.field_name} >= {self._format_val(exp_value)}",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        if extra_info:
            message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
        return self._build_message(message_parts)

    def is_between_msg(self, act_value: Any, lower_bound: Any, upper_bound: Any) -> str:
        message_parts = [
            f"Ожидаемый результат: "
            f"Значение в поле {self.field_name} должно быть в диапазоне [{lower_bound}, {upper_bound}]",
            f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
        ]
        return self._build_message(message_parts)

    def does_not_contain_msg(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> str:
        message_parts = [
            f"Ожидаемый результат: Список элементов: {objects_list}",
            f"Не содержит элемента: {forbidden_object}",
        ]
        return self._build_message(message_parts)

    def contains_msg(self, container: Any, expected_item: Any) -> str:
        if isinstance(container, str):
            message_parts = [
                f"Ожидаемый результат: '{container}' содержит подстроку '{expected_item}'",
                f"Фактический результат: {self.field_name} = {self._format_val(container)}",
            ]
        else:
            message_parts = [
                f"Ожидаемый результат: список {self._format_val(container)} содержит элемент {expected_item}",
                f"Фактический результат: {self.field_name} = {self._format_val(container)}",
            ]
        return self._build_message(message_parts)


class StepCheck:
    """
    Обёртка для проверки
    Внутри всегда формируется единое сообщение и открывается allure.step.
    """

    def __init__(self, check_step: str, field_name: str, failures: Optional[List[str]] = None):
        # Сохраняем название проверки
        self.check_step = check_step
        self._field_name = field_name
        # Поля для хранения expected/actual/extra перед вызовом метода-проверки
        self._expected: Optional[Any] = None
        self._actual: Optional[Any] = None
        self._extra_info: Optional[str] = None
        self._msg_builder = StepMessageBuilder(check_step, field_name)
        # Хранение фейлов
        self._failures = failures

    def expected(self, value: Any) -> StepCheck:
        """Задаём ожидаемое значение и возвращаем self для цепочки вызовов"""
        self._expected = value
        return self

    def actual(self, value: Any) -> StepCheck:
        """Задаём фактическое значение и возвращаем self"""
        self._actual = value
        return self

    def extra(self, text: str) -> StepCheck:
        """Задаём дополнительный текст"""
        self._extra_info = text
        return self

    def _handle_assertion(self, exc: AssertionError) -> None:
        """
        Сохраняет traceback в список failures или перебрасывает дальше, если list не передан
        """
        if self._failures is not None:
            self._failures.append(traceback.format_exc())
        else:
            raise exc

    def equal_to(self, expected: Optional[Any] = None) -> None:
        """
        Выполняет проверку is_equal_to. Можно передать expected в метод или задать раньше через .expected(...)
        """
        # Если expected пришёл в аргументе — сохраняем его
        if expected is not None:
            self._expected = expected

        # Проверяем, что actual задан
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове equal_to()")

        msg = self._msg_builder.equal_to_msg(self._expected, self._actual)

        try:
            with allure.step(msg):
                # Бросаем AssertionError в момент выполнения шага, чтобы Allure увидел failed-step
                assert_that(self._actual).described_as(msg).is_equal_to(self._expected)
        except AssertionError as exc:
            # Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
            self._handle_assertion(exc)

    def is_not_equal_to(self, expected: Optional[Any] = None) -> None:
        """
        Выполняет проверку is_not_equal_to. Можно передать expected в метод или задать раньше через .expected(...)
        """
        # Если expected пришёл в аргументе — сохраняем его
        if expected is not None:
            self._expected = expected

        # Проверяем, что actual задан
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове is_not_equal_to()")

        msg = self._msg_builder.is_not_equal_to_msg(self._expected, self._actual)

        try:
            with allure.step(msg):
                # Бросаем AssertionError в момент выполнения шага, чтобы Allure увидел failed-step
                assert_that(self._actual).described_as(msg).is_not_equal_to(self._expected)
        except AssertionError as exc:
            # Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
            self._handle_assertion(exc)

    def is_not_none(self) -> None:
        """Проверка на существование поля"""
        msg = self._msg_builder.is_not_none_msg(self._actual)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_not_none()
        except AssertionError as exc:
            # Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
            self._handle_assertion(exc)

    def is_not_empty(self) -> None:
        """Проверка на не пустое значение"""
        if self._actual is None:
            fail("Фактический результат должен быть заполнен при вызове is_not_empty()")

        msg = self._msg_builder.is_not_empty_msg(self._actual)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_not_empty()
        except AssertionError as exc:
            self._handle_assertion(exc)

    def is_empty(self) -> None:
        """Проверка на пустое значение"""
        if self._actual is None:
            fail("Фактический результат должен быть заполнен при вызове is_not_empty()")

        msg = self._msg_builder.is_empty_msg(self._actual)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_empty()
        except AssertionError as exc:
            self._handle_assertion(exc)

    def is_close_to(self, expected: Any, allowed_diff: int | float, extra_info: Any) -> None:
        """
        Проверка допуска
        """
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове is_close_to()")
        self._expected = expected

        msg = self._msg_builder.is_close_to_msg(expected, self._actual, extra_info)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_close_to(expected, allowed_diff)
        except AssertionError as exc:
            self._handle_assertion(exc)

    def is_less_than(self, threshold: Any, extra_info: Any = None) -> None:
        """Проверка, что значение меньше порога"""
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове is_less_than()")

        msg = self._msg_builder.is_less_than_msg(self.check_step, self._actual, extra_info)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_less_than(threshold)
        except AssertionError as exc:
            self._handle_assertion(exc)

    def is_greater_than(self, threshold: Any, extra_info: Any = None) -> None:
        """Проверка, что значение строго больше порога"""
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове is_greater_than()")

        msg = self._msg_builder.is_greater_than_msg(threshold, self._actual, extra_info)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_greater_than(threshold)
        except AssertionError as exc:
            self._handle_assertion(exc)

    def is_between(self, lower_bound: Any, upper_bound: Any) -> None:
        """Проверка, что значение в пределах установленных границ"""
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове is_between()")
        msg = self._msg_builder.is_between_msg(self._actual, lower_bound, upper_bound)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_between(lower_bound, upper_bound)
        except AssertionError as exc:
            self._handle_assertion(exc)

    def is_greater_than_or_equal_to(self, threshold: Any, extra_info: Any = None) -> None:
        """Проверка, что значение больше или равно порогу"""
        if self._actual is None:
            raise ValueError("Фактический результат должен быть заполнен при вызове is_greater_than_or_equal_to_msg()")

        msg = self._msg_builder.is_greater_than_or_equal_to_msg(threshold, self._actual, extra_info)

        try:
            with allure.step(msg):
                assert_that(self._actual).described_as(msg).is_greater_than_or_equal_to(threshold)
        except AssertionError as exc:
            self._handle_assertion(exc)

    def does_not_contain(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> None:
        """
        Выполняет проверку does_not_contain.
        """

        msg = self._msg_builder.does_not_contain_msg(objects_list, forbidden_object)

        try:
            with allure.step(msg):
                assert_that(objects_list).described_as(msg).does_not_contain(forbidden_object)
        except AssertionError as exc:
            self._handle_assertion(exc)

    def contains(self, container: Any, expected_item: Any) -> None:
        """Проверка, что container (список или строка) содержит expected_item."""
        msg = self._msg_builder.contains_msg(container, expected_item)

        try:
            with allure.step(msg):
                assert_that(container).described_as(msg).contains_msg(expected_item)
        except AssertionError as exc:
            self._handle_assertion(exc)


















clients/websocket_client.py
import asyncio
import logging
import time
from datetime import datetime
from typing import Any, Callable, List, Optional
from zoneinfo import ZoneInfo

import msgpack
import websockets
from allure import attach, attachment_type

from constants.architecture_constants import WebSocketClientConstants as WS_Const
from utils.msgpack_utils.message_filters import is_desired_invocation_id, is_desired_type
from utils.msgpack_utils.msgpack_utils import encode_with_varint_prefix, parse_message

logger = logging.getLogger(__name__)


class WebSocketClient:
    """
    Асинхронный ws-клиент для api-gateway по протоколу Async Api
    """

    def __init__(
        self,
        host: str,
        access_token: str,
        reconnect_interval: float = WS_Const.DEFAULT_RECONNECT_INTERVAL,
    ):
        self._host = host
        self._access_token = access_token
        self._reconnect_interval = reconnect_interval
        self._ws_url = f"wss://{host.rstrip('/')}{WS_Const.WS_HUBS}"
        self._buffer = b""
        self._next_id = WS_Const.START_INVOCATION_ID

        self._ws: websockets.ClientConnection | None = None
        self.recv_queue: asyncio.Queue[Any] = asyncio.Queue()
        self._recv_task: asyncio.Task | None = None
        self._stop_event = asyncio.Event()
        self._invocation_id: Optional[str] = None
        self.suppress_recv_logging: bool = False

    @property
    def invocation_id(self):
        return self._invocation_id

    def clear_queue(self):
        """
        Очищает очередь путем пересоздания экземпляра класса очереди
        """
        # TODO разобраться почему не выполняется очистка очереди сообщений LDS-8599
        while not self.recv_queue.empty():
            try:
                self.recv_queue.get_nowait()
                self.recv_queue.task_done()
            except asyncio.QueueEmpty as message_empty:
                logger.info(f"Очередь сообщений очищена: {message_empty}")
                break

    async def __aenter__(self):
        await self._connect_loop()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self._stop_event.set()
        if self._ws:
            await self._ws.close()
        if self._recv_task:
            await self._recv_task

        # TODO: дергать ручку завершения сессии в LDS-4083

    async def _handshake(self) -> None:
        payload = WS_Const.HANDSHAKE_MESSAGE + WS_Const.RS.decode()
        logger.debug(
            f"Отправлен handshake: {payload}",
        )
        await self._ws.send(payload)

        buf = self._buffer
        finish = time.monotonic() + WS_Const.HANDSHAKE_WAITING
        while time.monotonic() < finish:
            chunk = await self._ws.recv()
            logger.debug(f"Ответ на handshake: {chunk}")
            buf += chunk.encode() if isinstance(chunk, str) else chunk
            if WS_Const.RS in buf:
                return

        raise TimeoutError("Handshake timeout: не получили сообщение с разделителем RS за указанное время")

    async def _connect_loop(self) -> None:
        """
        Цикл подключения с повторными попытками до наступления stop_event.
        """
        while not self._stop_event.is_set():
            try:
                self.ws_request = f"{self._ws_url}/?access_token={self._access_token}"
                logger.info(f"Попытка подключения по wss: {self._ws_url}/?access_token=...")
                self._ws = await websockets.connect(
                    self.ws_request,
                    ping_interval=WS_Const.PING_INTERVAL,
                    ping_timeout=WS_Const.PING_TIMEOUT,
                    close_timeout=WS_Const.CLOSE_TIMEOUT,
                )
                # Handshake
                await self._handshake()
                # Запускаем приём в фоне
                self._recv_task = asyncio.create_task(self._recv_loop())
                logger.info("Websocket connected")
                return
            except ConnectionError:
                logger.exception(
                    f"Websocket подключение не установлено, повтор подключения через: {self._reconnect_interval}"
                )
                await asyncio.sleep(self._reconnect_interval)

    async def _recv_loop(self) -> None:
        """
        Прием сообщений, парсинг и отправка в очередь.
        """
        assert self._ws is not None

        while not self._stop_event.is_set():
            try:
                chunk = await self._ws.recv()
                if not self.suppress_recv_logging:
                    logger.debug(f"Сырые биты до обработки: {chunk[:100]}")
            except websockets.ConnectionClosed as e:
                logger.warning(f"WebSocket соединение разорвано: {e}")
                return

            result_message = parse_message(chunk)
            str_message = str(result_message)
            if not self.suppress_recv_logging:
                logger.info(f"Обработанное сообщение от api-gateway: {str_message[:500]} полное сообщение в attach")
                attach(
                    str_message,
                    name=f"Распакованное сообщение от api-gateway {datetime.now(ZoneInfo(WS_Const.ZONE_INFO))}",
                    attachment_type=attachment_type.TEXT,
                )
            await self.recv_queue.put(result_message)

    async def invoke(self, target: str, args: list) -> None:
        """
        Отправляет удаленный вызов invocation о websocket соединению

        Метод формирует сообщение по протоколу SignalR, включая:
        - типа сообщения
        - заголовки
        - уникальный идентификатор запроса
        - имя целевого метода
        - аргументы запроса

        Сообщение запаковывается в messagepack и отправляется через текущее websocket соединение
        """

        if not self._ws:
            raise websockets.WebSocketException("Не установлено подключение по wss")
        self._invocation_id = str(self._next_id)
        self._next_id += 1
        invocation = [
            WS_Const.DEFAULT_SIGNALR_MESSAGE_TYPE,
            WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
            self._invocation_id,
            target,
            [args],
        ]
        logger.info(f"Сообщение подготовлено к отправке: {invocation}")
        payload = msgpack.packb(invocation, use_bin_type=True)
        packet = encode_with_varint_prefix(payload)
        logger.debug(f"Отправляем сообщение: {packet}")
        await self._ws.send(packet)

    async def invoke_stream(self, target: str, args: list) -> None:
        """
        Отправляет streaming-вызов (StreamInvocation) по протоколу SignalR.
        """
        if not self._ws:
            raise websockets.WebSocketException("Не установлено подключение по wss")
        self._invocation_id = str(self._next_id)
        self._next_id += 1
        invocation = [
            WS_Const.STREAM_INVOCATION_MESSAGE_TYPE,
            WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
            self._invocation_id,
            target,
            [args],
        ]
        logger.info(f"Streaming-сообщение подготовлено к отправке: {invocation}")
        payload = msgpack.packb(invocation, use_bin_type=True)
        packet = encode_with_varint_prefix(payload)
        await self._ws.send(packet)

    async def receive_by_type(self, message_type: str, timeout: WS_Const.FILTERING_TIMEOUT) -> List[Any]:
        """
        Фильтрует сообщения по message_type
        """
        try:
            return await self._receive_by(filter_func=lambda msg: is_desired_type(msg, message_type), timeout=timeout)
        except websockets.WebSocketException:
            raise websockets.WebSocketException(f"Ошибка при фильтрации сообщений по {message_type}")

    async def receive_by_invocation_id(
        self, invocation_id: str, timeout: float = WS_Const.FILTERING_TIMEOUT
    ) -> List[Any]:
        """
        Фильтрует сообщения по invocation_id
        """
        try:
            return await self._receive_by(
                filter_func=lambda msg: is_desired_invocation_id(msg, invocation_id), timeout=timeout
            )
        except websockets.WebSocketException:
            raise websockets.WebSocketException("Ошибка при фильтрации сообщений по invocation_id")

    async def _receive_by(self, filter_func: Callable[[list], bool], timeout: float) -> List[Any]:
        """
        Ждет и фильтрует сообщение по filter_func
        """
        # 1) Единая точка вычисления дедлайна
        deadline = time.monotonic() + timeout

        while True:
            # 2) Остаток времени до таймаута
            remaining = deadline - time.monotonic()
            if remaining <= 0:
                raise asyncio.TimeoutError(f"Timeout при фильтрации сообщений {timeout:.1f} секунд")

            try:
                # 3) Получает сообщение
                msg = await asyncio.wait_for(self.recv_queue.get(), timeout=remaining)
            except asyncio.TimeoutError:
                # 4) Явно перехватывает и пробрасывает свой TimeoutError
                raise asyncio.TimeoutError(f"Timeout при фильтрации сообщений {timeout:.1f} секунд")

            # 5) Фильтрация по filter_func
            if isinstance(msg, list) and filter_func(msg):
                return msg






















constants/architecture_constants.py
import os


class ImitatorConstants:
    TEST_SETTINGS_KEY_NAME: str = "test_settings"
    IMITATOR_FLAGS_KEY_NAME: str = "imitator_flags"
    IMITATOR_TIME_FORMAT: str = "%Y%m%dT%H%M%S"
    IMITATOR_START_DELAY_S: int = 100
    IMITATOR_FINISH_DELAY_MINUTE: float = 2.0
    IMITATOR_CHECK_CMD: str = "pgrep -f Playground"
    IMITATOR_KILL_CMD: str = "pkill -f Playground"
    IMITATOR_PATH = "/data/imitator/lds-flow-playground-csv-latest"
    IMITATOR_RUN_CMD: str = f"dotnet {IMITATOR_PATH}/TN.LDS.Flow.Playground.Application.dll"
    IMITATOR_LOG_FILE_NAME: str = "imitator.log"
    IMITATOR_KEY_NAME: str = "imitator_key"
    SERVER_IP_KEY_NAME: str = "server_ip"
    SANDBOX_PATH: str = "Sandbox_path"  # Удалить при переработке json_config_model.py
    SANDBOX_DATA: str = "data"
    SANDBOX_RULES: str = "rules.txt"
    SANDBOX_TAGS: str = "tags.txt"
    STAND_ENV_NAMING: str = os.environ.get("STAND_NAME")[:-1]
    CONFIG_PATH: str = f"/data/{STAND_ENV_NAMING}/configs"
    SOURCE_TYPE_DEF_VALUE: str = "inflow"
    SPEED_DEF_VALUE: int = 1
    NS_DEF_VALUE: int = 2
    KAFKA_OFFSET_EARLIEST: str = "earliest"
    KAFKA_POLL_TIMEOUT_S: float = 1.0
    KAFKA_SESSION_TIMEOUT_MS: int = 10000
    TEST_ID_KEY: str = "test_id"
    AUTOTEST_DATA_PATH: str = "/data/imitator/autotest_data"
    POPEN_WAIT_TIMOUT_S: int = 5
    LONG_PROCESS_TIMEOUT_S: int = 20
    CMD_STATUS_OK: str = "OK"
    CMD_STATUS_FAIL: str = "FAIL"
    REDIS_STAND_ADDRESS: str = "10.7.49.210"
    CORE_START_DELAY_S: int = 5
    ENCODING_UTF_8: str = "utf-8"
    ENCODING_UTF_8_SIG: str = "utf-8-sig"
    ENCODING_LATIN_1: str = "latin-1"
    WIN_ENCODING_CP866: str = "cp866"  # Нужна только для запуска под WIN
    WIN_ENCODING_CP1251: str = "cp1251"  # Нужна только для запуска под WIN
    OS_NAME_WIN: str = 'nt'
    DEFAULT_ENCODINGS = [ENCODING_UTF_8_SIG, ENCODING_UTF_8, WIN_ENCODING_CP866, WIN_ENCODING_CP1251, ENCODING_LATIN_1]

    HOST_MAP = {
        "dev1": {IMITATOR_KEY_NAME: "DEV1_", SERVER_IP_KEY_NAME: "10.7.49.37"},
        "dev2": {IMITATOR_KEY_NAME: "DEV2_", SERVER_IP_KEY_NAME: "10.7.49.38"},
        "dev3": {IMITATOR_KEY_NAME: "DEV3_", SERVER_IP_KEY_NAME: "10.7.49.205"},
        "test1": {IMITATOR_KEY_NAME: "TEST1_", SERVER_IP_KEY_NAME: "10.7.49.206"},
        "test2": {IMITATOR_KEY_NAME: "TEST2_", SERVER_IP_KEY_NAME: "10.7.49.207"},
        "test3": {IMITATOR_KEY_NAME: "TEST3_", SERVER_IP_KEY_NAME: "10.7.49.208"},
        "test4": {IMITATOR_KEY_NAME: "TEST4_", SERVER_IP_KEY_NAME: "10.7.49.209"},
    }


class ClickhouseConstants(ImitatorConstants):
    CH_TABLE_NAMES: list = ["lds.records", "lds.records_lastvalue"]
    EVO_OBJECT_ID_KEY_NAME: str = "evoObjectId"
    EVO_PARAMETER_ID_KEY_NAME: str = "evoParameterId"
    OBJECT_ID_KEY_NAME: str = "objectId"
    PARAMETER_ID_KEY_NAME: str = "parameterId"
    EVO_ID_PAIRS_CHUNK_SIZE: int = 450
    NAME_CONTAINER: str = "clickhouse-2"


class DockerConstants:
    HOSTNAME_CMD: str = "hostname"
    STOP_CMD: str = "docker stop"
    START_CMD: str = "docker start"
    CHECK_STATUS_CMD: str = "docker inspect -f '{{.State.Status}}'"
    RUNNING_STATUS: str = "running"
    EXITED_STATUS: str = "exited"
    CORE_CONTAINERS_GROUP: list = ["lds-core-node1", "lds-core-node2", "lds-core-node3"]
    LB_CONTAINERS_GROUP: list = ["lds-layer-builder-node1", "lds-layer-builder-node2", "lds-layer-builder-node3"]
    JOURNAL_CONTAINERS_GROUP: list = ["lds-journals-node1", "lds-journals-node2", "lds-journals-node3"]
    WEB_APP_CONTAINERS_GROUP: list = ["lds-web-app-node1", "lds-web-app-node2", "lds-web-app-node3"]
    API_GW_CONTAINERS_GROUP: list = ["lds-api-gw-node1", "lds-api-gw-node2", "lds-api-gw-node3"]
    REPORTS_CONTAINERS_GROUP: list = ["lds-reports-node1", "lds-reports-node2", "lds-reports-node3"]


class RedisConstants:
    LB_REDIS_KEY: str = "lds-layer-builder"
    CORE_REDIS_KEY: str = "lds-core"
    REDIS_KEY_FIND_CMD: str = "docker exec -i redis-redis-01-1-1 redis-cli KEYS"
    REDIS_KEY_DEL_CMD: str = "| xargs -r docker exec -i redis-redis-01-1-1 redis-cli DEL"


class KeycloakClientConstants:
    TOKEN_LEEWAY: int = 30
    GRANT_TYPE: str = "password"
    KEYCLOAK_HEADERS: dict = {"Content-Type": "application/x-www-form-urlencoded"}
    TOKEN_KEY: str = "access_token"
    ISSUED_AT_KEY: str = "issued_at"
    EXPIRES_IN_KEY: str = "expires_in"


class TestOpsConstants:
    TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
    TESTOPS_UPLOAD_ERROR_MSG: str = "Ошибка при загрузке файлов allure отчета"
    TESTOPS_UPLOAD_RESPONSE_MSG_KEY: str = "message"
    TESTOPS_UPLOAD_FILES_KEY: str = "files"
    POST_METHOD: str = "post"
    ALLURE_RESULTS_DIR_NAME: str = "allure-results"
    GZIP_FILE_SIGNATURE: bytes = b'\x1f\x8b'


class HTTPClientConstants:
    GET_METHOD: str = "get"
    POST_METHOD: str = "post"
    TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
    TESTOPS_ATTACHMENTS_LIST_ENDPOINT: str = "/test_cases/{test_case_id}/attachments"
    TESTOPS_LOAD_ATTACHMENT_ENDPOINT: str = "/test_cases/{test_case_id}/attachments/{attachment_id}?download=1"
    TESTOPS_ATTACHMENTS_KEY: str = "items"
    TESTOPS_ATTACHMENT_FILENAME_KEY: str = "original_filename"
    TESTOPS_ATTACHMENT_ID_KEY: str = "id"
    TEST_ID_KEY: str = "test_id"
    IMITATOR_RUN_DATA_FILENAME: str = "imitator_run_data.tar.gz"  # Название архива данных для прогона


class WebSocketClientConstants:
    RS: bytes = b'\x1E'  # ASCII Record Separator
    HANDSHAKE_WAITING: float | int = 5.0
    HANDSHAKE_MESSAGE: str = "{\"protocol\":\"messagepack\",\"version\":1}"
    WS_HUBS: str = "/hubs/ldsClientHub"
    START_INVOCATION_ID: str = 1
    DEFAULT_RECONNECT_INTERVAL: float | int = 5.0
    PING_INTERVAL: int = 3
    PING_TIMEOUT: int = 5
    CLOSE_TIMEOUT: int = 30
    DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1  # invocation
    STREAM_INVOCATION_MESSAGE_TYPE: int = 4  # StreamInvocation
    STREAM_ITEM_MESSAGE_TYPE: int = 2  # StreamItem
    COMPLETION_MESSAGE_TYPE: int = 3  # Completion
    # Текст ошибки Completion при неуспешном streaming (SignalR CompletionWithDetail)
    COMPLETION_ERROR_MESSAGE_INDEX: int = 4
    DEFAULT_SIGNALR_MAP_HEADERS: dict = {}
    EVENT_TYPE_INDEX = 3
    INVOCATION_ID_INDEX = 2
    SERVICE_NAME = "web-app"
    COMPONENT = "lds"
    ROOT_DOMAIN = "tn.tngrp.ru"
    FILTERING_TIMEOUT: int | float = 10.0
    ZONE_INFO: str = 'Europe/Moscow'


class MockConstants:
    MOCK_DURATION: int = 60
    MOCK_TEST_DATA_ID: int = 1
    MOCK_TEST_DATA_NAME: str = "mock.tar.gz"


class EnvKeyConstants:
    CONNECTION_HOST: str = "CONNECTION_HOST"
    KEYCLOAK_URL: str = "KEYCLOAK_URL"
    KEYCLOAK_CLIENT_ID: str = "KEYCLOAK_CLIENT_ID"
    KEYCLOAK_CLIENT_SECRET: str = "KEYCLOAK_CLIENT_SECRET"
    KEYCLOAK_USERNAME: str = "KEYCLOAK_USERNAME"
    KEYCLOAK_PASSWORD: str = "KEYCLOAK_PASSWORD"
    TESTOPS_BASE_URL: str = "TESTOPS_BASE_URL"
    SSH_KEY_NAME: str = "SSH_KEY_NAME"
    SSH_USER_DEV: str = "SSH_USER_DEV"
    STAND_NAME: str = "STAND_NAME"
    DATA_PATH: str = "DATA_PATH"
    OPC_URL: str = "OPC_URL"
    TU_ID: str = "TU_ID"
































enums 
from enum import Enum, IntEnum, IntFlag
from typing import Mapping


class TU(Enum):
    YAROSLAVL_MOSCOW = (1, "Ярославль - Москва", "volga.json")
    TIKHORETSK_NOVOROSSIYSK_2 = (2, "Тихорецк-Новороссийск-2", "tn2.json")
    TIKHORETSK_NOVOROSSIYSK_3 = (3, "Тихорецк-Новороссийск-3", "tn3.json")
    RODIONOVSKAYA_TIKHORETSKAYA = (4, "Родионовская–Тихорецкая", "lt3_rt.json")
    TIKHORETSKAYA_GRUSHEVAYA = (5, "Тихорецкая-6-Грушовая", "tn4_t6g.json")

    def __init__(self, tu_id: int, description: str, file_name: str) -> None:
        self.id = tu_id
        self.description = description
        self.file_name = file_name

    def __str__(self):
        return f"{self.id} - {self.description}"

    @classmethod
    def get_file_name_by_id(cls, target_id: int) -> str:
        for item in cls:
            if item.id == target_id:
                return item.file_name
        raise ValueError(f"ТУ с id = {target_id} не найден")


class ReplyStatus(Enum):
    OK = 200
    BAD_REQUEST = 400
    UNAUTHORIZED = 401
    FORBIDDEN = 403
    NOT_FOUND = 404
    REQUEST_TIMEOUT = 408
    CONFLICT = 409
    PRECONDITION_FAILED = 412
    RANGE_NOT_SATISFIABLE = 416
    TOO_MANY_REQUESTS = 429
    INTERNAL_SERVER_ERROR = 500
    NOT_IMPLEMENTED = 501
    SERVICE_UNAVAILABLE = 503
    GATEWAY_TIMEOUT = 504
    UNKNOWN_ERROR = 520


class ExportedDataType(IntEnum):
    """
    Тип экспортируемых данных
    """

    STATIONARY_STATUS_REPORT = 6
    LDS_STATUS_REPORT = 5
    LEAKS_REPORT = 4
    REJECTED_REPORT = 7

    def to_download_name(self) -> str:
        """Строковый тип для DownloadExportedDataRequest.exportedDataType"""
        return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]


_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
    ExportedDataType.STATIONARY_STATUS_REPORT: "StationaryStatusReport",
    ExportedDataType.LDS_STATUS_REPORT: "LdsStatusReport",
    ExportedDataType.LEAKS_REPORT: "LeaksReport",
    ExportedDataType.REJECTED_REPORT: "RejectedReport",
}


class ExportStatus(IntEnum):
    """Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""

    NOT_READY = 0
    DONE = 1


class StationaryStatus(Enum):
    UNSTATIONARY = 1  # Нестационарный режим
    STATIONARY = 2  # Стационарный режим
    STOPPED = 3  # Режим остановленной перекачки


class LeakStatus(Enum):
    CONFIRMED = 2
    WAITING = 1
    POSSIBLE = 3


class LeakLocationStatus(Enum):
    NODATA = 1  # нет данных
    LEFT_FROM_PUMP_STATION = 2  # Слева от МНС
    RIGHT_FROM_PUMP_STATION = 3  # Справа от МНС
    INSIDE_PUMP_STATION = 4  # Внутри МНС
    INSIDE_NPS = 5  # Внутри НПС при неработающей/отсутствующей МНС


class FieldName(Enum):
    SECTION_TYPE = "sectionType"
    SIGNAL_TYPE = "signalType"


class FilterCriteriaType(Enum):
    ONE_OF = "oneOf"
    ALL_OF = "allOf"


class FilterCriteriaValue(Enum):
    MASK = "mask"
    MASK_REASON = "maskReason"
    LEAK = "leak"
    LEAK_COORDINATE = "leakCoordinate"
    PUMPING_STATUS = "pumpingStatus"
    FREE_FLOW = "freeFlow"
    ACKNOWLEDGE = "acknowledge"
    LEAK_TIME = "leakTime"
    LEAK_VOLUME = "leakVolume"
    LDS_STATUS = "ldsStatus"
    CONTROLLED_SITES = "controlledSites"
    LINEAR_PARTS = "linearParts"
    SERVER_DOWN = "serverDown"
    TIME_SYNCHRONIZATION_DISABLE = "timeSynchronizationDisable"
    FREE_FLOW_START_COORDINATE = "freeFlowStartCoordinate"


class SortingParam(Enum):
    OBJECT_NAME = "objectName"
    ADDRESS = "address"


class SortingType(Enum):
    ASCENDING = "ascending"
    DESCENDING = "descending"


class Direction(Enum):
    """Направление прокрутки"""

    PREV = 1
    NEXT = 2
    FIRST = 3
    LAST = 4


class LdsStatus(Enum):
    FAULTY = 1  # Неисправность
    INITIALIZATION = 2  # Инициализация
    DEGRADATION = 3  # Ухудшенные характеристики
    SERVICEABLE = 4  # Исправность


class ConfirmationStatus(Enum):
    FAULTY = 0  # Неисправность
    AWAITING = 1  # Предварительная
    NOT_CONFIRMED = 2  # Не подтверждена
    CONFIRMED = 3  # Подтверждена
    CONFIRMED_AND_LEAK_CLOSED = 4  # Завершена


class ReservedType(Enum):
    FAULTY = 0  # Неисправность
    STOP = 1  # Дифференциальный
    STATIONARY_FLOW = 2  # Стационарный
    UNSTATIONARY_FLOW = 3  # Модельный
    BALANCE_IN_NPS = 4  # Баланс внутри НПС
    CHANGED_IN_DECISION_MAKING = 5  # Стационарный + Изменено в АПР
    CREATED_IN_DECISION_MAKING = 6  # Создано в АПР


class MessageType(IntFlag):
    AUTHENTICATION = 1  # Вход в систему
    REJECTION = 1 << 2  # Отбраковка сигналов
    LDS_STATUS = 1 << 3  # Режим работы СОУ
    INPUT_SIGNALS = 1 << 6  # Входные сигналы
    PUMPING_STATUS = 1 << 7  # Режим работы МТ
    MASKING_LDS = 1 << 8  # Маскирование СОУ
    FREE_FLOWS = 1 << 9  # Самотечное течение
    LEAKS = 1 << 10  # Утечка


class MessagePriority(IntFlag):
    LOW = 1  # Прочее
    COMMON = 1 << 1  # Информационное
    MEDIUM = 1 << 2  # Значительное
    HIGH = 1 << 3  # Важное
    VERY_HIGH = 1 << 4  # Особой важности


class LdsStatusDegradation(IntFlag):
    LEAK_ON_ADJACENT_DIAGNOSTIC_AREAS = 1 << 0  # Возникновение утечки на соседнем диагностическом участке
    ADDITIVE_INJECTORS_OPERATION = 1 << 1  # Наличие ПТП
    PIG_SENSOR_PASSAGE = 1 << 2  # Наличие СОД
    TRIGGERING_EMERGENCY_RESET = 1 << 3  # Срабатывание аварийного сброса
    STARTING_PUMPING_OUT_PUMPS = 1 << 4  # Работа насосов откачки
    EXCEEDING_DISTANCE_BETWEEN_SERVICEABLE_PRESSURE_SENSORS = 1 << 5  # Расстояние между СИ давления более 50 км
    FAULTY_PRESSURE_SENSORS_AT_PUMP_STATION_NODES = 1 << 6  # Отказ СИ давления на входе/выходе НПС
    REJECTION_TEMPERATURE_SENSOR = 1 << 7  # Отказ СИ температуры
    REJECTION_VISCOSITY_SENSOR = 1 << 8  # Отказ СИ вязкости
    REJECTION_DENSITY_SENSOR = 1 << 9  # Отказ СИ плотности
    GRAVITY_SECTION_IN_PUMPING_MODE = 1 << 10  # Наличие самотечного участка/участка с неполным сечением
    ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = 1 << 11  # Менее 4 исправных СИ давления на разных КП ЛЧ и НПС
    EXCEEDING_DISTANCE_BETWEEN_FLOW_METERS = 1 << 12  # Расстояние между СИ расхода на пути перекачки более 200 км
    GRAVITY_SECTION_IN_STOPPED_PUMPING_MODE = 1 << 13  # Наличие самотечного участка в режиме остановленной перекачки


class LdsStatusFaulty(IntFlag):
    NO_DATA_SOURCE_CONNECTION = 1 << 0  # Отсутствует связь серверного оборудования СОУ с источником «сырых» данных
    ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = 1 << 1  # Менее 4 КП с достоверными СИ давления
    ABSENCE_MIN_FLOW_METERS_REQUIRED_NUMBER = 1 << 2  # Недостоверность граничного СИ расхода


class LdsStatusInitialization(IntFlag):
    ACCUMULATION_DATA = 1 << 0  # Накопление данных
    EXITING_FAULTY_MODE = 1 << 1  # Выход СОУ из режима «Неисправна»
    COLD_START_OF_SERVERS = 1 << 2  # Одновременный «холодный» запуск нескольких серверов СОУ
    SWITCHING_SHUT_OFF_IN_STOPPED_PUMPING_MODE = 1 << 3  # Переключение запорной арматуры
    USER_ACTION = 1 << 4  # По команде пользователя


class StationaryReason(IntFlag):
    """
    Причины режима работы МТ: Стационар
    """

    # Отклонения давления и расхода не превышают допустимых отклонений
    PRESSURE_AND_FLOW_MOVING_AVERAGES_MEET_CRITERIA = 1 << 0
    # Окончание периода времени после технологических переключений и отсутствия самотечного участка
    ABSENCE_GRAVITY_SECTION_AND_TECHNOLOGICAL_SWITCHING = 1 << 1


class UnStationaryReason(IntFlag):
    """
    Причины режима работы МТ: Нестационар
    """

    # Пуск/остановка трубопровода; включение/отключение магистрального насоса; включение/отключение НПС
    CHANGING_EQUIPMENT_STATUS = 1 << 0
    # Начало/окончание работы насосов откачки емкостей на НПС и ЛЧ технологического участка
    CHANGING_WORKING_OF_PUMPING_OUT_PUMPS = 1 << 1
    # Изменение частоты вращения в ручном режиме и/или изменение уставки регулирования в автоматическом режиме
    # работы МНА с ЧРП
    CHANGING_MAIN_PUMPS_ROTATION_SPEED = 1 << 2
    CHANGING_BLOCK_VALVES_STATUS = 1 << 3  # Полное или частичное открытие/закрытие задвижки
    SWITCHING_TANKS = 1 << 4  # Переключение резервуаров
    CHANGING_ACCEPTANCE_OR_DELIVERY_STATE = 1 << 5  # Начало или прекращение приема/сдачи нефти/нефтепродуктов
    TRIGGERING_EMERGENCY_RESET_OR_PWSS_OPERATION = 1 << 6  # Задействование аварийного сброса
    SAFETY_VALVES_ACTUATION = 1 << 7  # Срабатывание предохранительных клапанов
    # Изменение уставки регулирования по давлению узлов регулирования давления, работающих в автоматическом режиме
    # управления
    CHANGING_PRESSURE_SETTING = 1 << 8
    # Изменение процента открытия/закрытия заслонки узлов регулирования давления, работающих в ручном режиме управления
    CHANGING_OPENING_PERCENTAGE_VALVE = 1 << 9
    CHANGING_ADDITIVE_INJECTOR_STATUS_OR_FLOW = 1 << 10  # Начало/окончание ввода ПТП или изменение расхода вводимой ПТП
    LEAK_END = 1 << 11  # Окончание утечки
    # Наличие сигнала статуса «Открывается»/»Закрывается» запорной арматуры (не в режиме имитации),
    # расположенной в точке, гидравлически связанной с рассматриваемым ДУ
    TO_OPEN_OR_TO_CLOSE_STATUS = 1 << 12
    # Нестационарный режим работы/отсутствие сигнала о режиме работы смежного ТУ, работающего
    # в единой гидравлической системе с защищаемым ТУ
    ADJACENT_TU = 1 << 13
    COLD_START = 1 << 14  # Одновременный «холодный» запуск нескольких серверов СОУ


class StoppedPumpingReason(IntFlag):
    """
    Причины режима работы МТ: Остановленный
    """

    # На ДУ отсутствуют работающие НА, при этом показания СИ расхода не превышают 1 % от максимального значения
    # диапазона измерений всех СИ расхода на технологическом участке
    STOPPING_PUMPS = 1 << 0
    CUTOFF_AREA = 1 << 1  # Участок отсечен запорной арматурой от подкачек/откачек


class RejectionCriteria(IntFlag):
    """Критерии отбраковки сигналов criteriaNames"""

    QUALITY = 1 << 0  # qualityRejection
    RANGE = 1 << 1  # rangeRejection
    EMPTY = 1 << 2  # emptyRejection
    TIME = 1 << 3  # timeRejection
    CONSTANT_SIGNAL = 1 << 4  # constantSignalRejection
    DISCHARGE = 1 << 5  # dischargeRejection
    VTOR = 1 << 6  # VTORRejection
    NEARBY = 1 << 7  # nearbyRejection
    DIAGNOSTIC_INFO = 1 << 8  # diagnInfoRejection

    @property
    def backend_name(self) -> str:
        names = {
            "QUALITY": "qualityRejection",
            "RANGE": "rangeRejection",
            "EMPTY": "emptyRejection",
            "TIME": "timeRejection",
            "CONSTANT_SIGNAL": "constantSignalRejection",
            "DISCHARGE": "dischargeRejection",
            "SIGMA3": "sigma3Rejection",
            "VTOR": "VTORRejection",
            "NEARBY": "nearbyRejection",
            "DIAGNOSTIC_INFO": "diagnInfoRejection",
        }
        return names.get(self.name or "", str(int(self)))

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{'|'.join(active_flags)} ({raw_value})"

        return str(raw_value)


class RejectionSensorTag(Enum):
    """
    Теги датчиков для тестов отбраковки (id, description=tag)
    Айди тегов подставляется на ходу из текущей версии конфы с сервера
    """

    KP_8_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pin")  # nearby_pressure_pin range_upper_pressure range_lower_pres
    NPS_TIH_5_Vmom = (0, "AK.CHTN.NPS_TIH_5.UZR_1.Vmom")  # diagnostic_info_flowrange_upper_flow range_lower_flow
    KP_8_Pout = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout")  # nearby_pressure_pout
    KP_209_1_Pin = (0, "AK.CHTN.LU_VELKRIM.KP_209-1.SW_215-3-1.Pin")  # empty_pressure
    KP_7_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_7.SW_6-3.Pin")  # vtor_pressure
    NPS_KRIM_P_Vmom = (0, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom")  # empty_flow quality_flow

    def __init__(self, sensor_id: int, description: str) -> None:
        self.id = sensor_id
        self.description = description

    @classmethod
    def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
        """
        Обновляет sensor_id по tag из конфигурации стенда.
        """
        missing_tags = []
        for sensor in cls:
            sensor_id = sensor_ids_by_address.get(sensor.description)
            if sensor_id is None:
                missing_tags.append(sensor.description)
                continue
            sensor.id = sensor_id

        if missing_tags:
            raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")

    def __str__(self):
        return f"{self.id} - {self.description}"


class UserActions(IntFlag):
    USER_LOGIN = 1  # Вход пользователя
    USER_EXIT = 1 << 1  # Выход пользователя
    FAILED_USER_LOGIN = 1 << 2  # Неуспешная попытка входа пользователя
    ALGORITHMS_REINITIALIZATION = 1 << 3  # Переинициализация алгоритмов
    SIGNAL_MASK_SIM = 1 << 4  # Маскирование и имитация входных сигналов
    LDS_MASKING = 1 << 5  # Маскирование СОУ
    EXPORT = 1 << 6  # Экспорт и выгрузки
    SETTINGS_CHANGE = 1 << 7  # Изменение настроек
    LEAK_ACK = 1 << 8  # Квитирование сообщения об утечке
    LEAK_REMOVE = 1 << 9  # Исключение неактивных утечек
    LDS_ADMIN = 1 << 10  # Администрирование СОУ
    PIG_CONTROL = 1 << 11  # Управление СОД


class SiteKpKp(Enum):
    """controlledSiteId, segmentId"""

    TIXORECZKAYA_NOVOVELICHKOVSKAYA = {'controlledSiteId': 6012, 'segmentId': 6013}
    NOVOVELICHKOVSKAYA_KRYMSKAYA = {'controlledSiteId': 6074, 'segmentId': 6075}
    KRYMSKAYA_GRUSHOVAYA = {'controlledSiteId': 6220, 'segmentId': 6221}
    BACKUP_ROUTE_BEJSUG = {'controlledSiteId': 6242, 'segmentId': 6243}
    BACKUP_ROUTE_PONURA = {'controlledSiteId': 6076, 'segmentId': 6077}
    BACKUP_ROUTE_KUBAN = {'controlledSiteId': 6088, 'segmentId': 6089}
    NPZ_AFIPSKIJ = {'controlledSiteId': 6244, 'segmentId': 6245}
    NPZ_ILINSKIJ = {'controlledSiteId': 6120, 'segmentId': 6121}


class SignalType(IntFlag):
    """Типы сигналов: режим МТ - 8, режим СОУ - 256, самотеки -16"""

    REGLU = 1 << 3
    REGSOU = 1 << 8
    GRAVITYPIPE = 1 << 4

    @property
    def backend_name(self) -> str:
        names = {
            "REGLU": "PumpingStatus",
            "REGSOU": "LdsStatus",
            "GRAVITYPIPE": "FreeFlow",
        }
        return names.get(self.name or "", str(int(self)))

    def __str__(self) -> str:
        raw_value = int(self)
        if raw_value == 0:
            return "0"

        active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
        if active_flags:
            return f"{'|'.join(active_flags)} ({raw_value})"

        return str(raw_value)


class GravityPipe(Enum):

    expected_lds_status_gravity_true = (1, "Наличие самотека")
    expected_lds_status_gravity_false = (0, "Отсутствие самотека")

    def __init__(self, status_id: int, description: str) -> None:
        self.id = status_id
        self.description = description

    def __str__(self):
        return f"{self.id} - {self.description}"























test  const
"""
Общие константы для тестов.
"""

from constants.enums import StationaryStatus


class BaseTN3Constants:
    # ===== Константы для запросов журнала =====
    COLUMN_SELECTION_DEF = [
        'Time',
        'User',
        'MainPipeline',
        'TechnologicalSection',
        'TechnologicalObject',
        'ControlPoint',
        'Object',
        'SignalName',
        'Event',
        'Value',
        'MessageType',
        'Tag',
        'Status',
    ]

    # ===== Типы сигналов и объектов =====
    PRESSURE_SENSOR_OBJECT_TYPE = 2
    FLOWMETER_OBJECT_TYPE = 3
    PRESSURE_SIGNAL_TYPE = 1
    FLOW_SIGNAL_TYPE = 4

    # ===== Суффиксы адресов выходных сигналов =====
    ADDRESS_SUFFIX_ACK_LEAK = "AckLeak"
    ADDRESS_SUFFIX_LEAK = "Leak"
    ADDRESS_SUFFIX_MASK = "Mask"
    ADDRESS_SUFFIX_POINT_LEAK = "PointLeak"
    ADDRESS_SUFFIX_Q_LEAK = "QLeak"
    ADDRESS_SUFFIX_TIME_LEAK = "TimeLeak"
    ADDRESS_SUFFIX_PUMPING_STATUS = "RegLU"
    ADDRESS_SUFFIX_LDS_STATUS = "RegSOU"

    # ===== Ключи поиска =====
    LEAK_LINEAR_PART_ID_KEY = "id"
    CONTROLLED_SITE_ID_AND_SEGMENT_ID = "controlledSiteId"

    # ===== Ожидаемые значения выходных сигналов =====
    OUTPUT_IS_ACK_LEAK = "1"
    OUTPUT_IS_LEAK = "1"
    OUTPUT_IS_NOT_LEAK = "0"
    OUTPUT_IS_NOT_MASK = "0"
    OUTPUT_IS_MASK = "1"

    MASS_KG = 3600  # Коэффициент массы, нужно умножить, чтобы получить объем в м3/час
    KGS_SM2 = 98066  # Коэффициент давления, нужно умножить, чтобы получить объем в кгс/см2
    ALLOWED_VOLUME_DIFF = 0.3  # Относительная погрешность по объему
    ALLOWED_DISTANCE_DIFF_METERS = 5000  # Погрешность координаты в метрах
    KM_TO_METERS = 1000  # Перевод в метры
    LEAK_START_INTERVAL = 2100  # Интервал от старта имитатора до первого обнаружения утечки - 35 минут по умолчанию
    LEAK_LOCATION_STATUS = 1

    # ===== Параметры выходных сигналов =====
    OUTPUT_TEST_DELAY = 120  # Задержка для теста выходных сигналов в секундах
    OUTPUT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"  # Формат времени для парсинга выходных сигналов

    # ===== Параметры маскирования =====
    IS_MASKED_TRUE = True
    IS_MASKED_FALSE = False

    # ===== Параметры имитации =====
    PRESSURE_IMITATION_RANGE = (1, 40)
    VOLUME_IMITATION_RANGE = (100, 2400)
    GOOD_QUALITY_VAL = 1

    # ===== Константы журнала =====
    JOURNAL_EVENT_MASK = "Установка признака маскирования"
    JOURNAL_EVENT_UNMASK = "Снятие признака маскирования"
    JOURNAL_SIGNAL_PRESSURE = "Значение давления"
    JOURNAL_SIGNAL_FLOW = "Расход"
    JOURNAL_MESSAGE_TYPE_USER_ACTIONS = "Действия пользователя"
    JOURNAL_STATUS_SUCCESS = "Успешно"
    JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL = 2
    JOURNAL_MASK_PAGINATION_LIMIT = 10
    JOURNAL_EVENT_POSSIBLE_LEAK = "Возможна утечка"
    JOURNAL_EVENT_DETECTED_LEAK = "Утечка."
    JOURNAL_MESSAGE_TYPE_LEAKS = "Утечки"
    JOURNAL_EVENT_COMPLETED_LEAKS = "Утечка завершена"
    JOURNAL_EXPECTED_MASK_MSG_TOTAL = 4
    JOURNAL_MASK_EXPECTED_EVENTS = {"Установка признака маскирования", "Снятие признака маскирования"}
    JOURNAL_MASK_EXPECTED_SIGNALS = {"Значение давления", "Расход"}
    JOURNAL_PAGINATION_LIMIT = 10
    JOURNAL_PAGINATION_REJECT_LIMIT = 20
    JOURNAL_EVENT_LEAK_ACKNOWLEDGED = "Сообщение об утечке квитировано"
    JOURNAL_EVENT_LDS_INIT_ACCUM_DATA = "СОУ в инициализации (Накопление данных)"
    JOURNAL_EVENT_LDS_INIT_COLD_START = "СОУ в инициализации (Одновременный «холодный» запуск нескольких серверов СОУ)"
    JOURNAL_MESSAGE_TYPE_LDS_STATUS = "Режим работы СОУ"
    JOURNAL_MESSAGE_TYPE_REJECTION = "Отбраковка"
    SEC_PER_MIN = 60

    # ===== Параметры подтверждения =====
    IS_ACKNOWLEDGED_FALSE = False

    # ===== Параметры BalanceAlgorithmResults =====
    BALANCE_ALGORITHM_POLL_INTERVAL = 15  # Интервал опроса подписки в секундах
    BALANCE_ALGORITHM_TOTAL_WAIT = 300  # Общее время опроса в секундах
    DEBALANCE_TOLERANCE = 0.25  # Допустимое отклонение дебаланса от порога 30%

    # ===== Прочие константы =====
    BASIC_MESSAGE_TIMEOUT = 10.0  # Таймаут ожидания сообщений в секундах
    MASK_MESSAGE_TIMEOUT = 180.0  # Таймаут ожидания сообщений в секундах
    PRECISION = 3  # Точность округления для координат
    DIGITS_WITH_DOT_PATTERN = r'\d+(?:\.\d+)?'  # Регулярное выражение для поиска чисел с точкой
    DIAGNOSTIC_AREA_BASE_IDS = [2, 3, 4, 5, 7, 8]  # Список ДУ с isBase = true из конфигурации Тн-3
    REPRESENTATIVE_DIAGNOSTIC_AREA_IDS = [2, 3]  # Список показательных ДУ для определения режима СОУ
    ZONE_INFO: str = "Europe/Moscow"
    SECONDS_PER_HOUR: int = 3600
    CRITERIA_NAMES_FIELD: str = 'criteriaNames'


class ExportReportConstants:
    """Константы для теста формирования отчёта об утечках"""

    # Максимальное ожидание уведомления о готовности отчёта
    NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
    # Максимальное время ожидания появления отчёта в списке после уведомления
    LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
    # Интервал между запросами getExportedFilesListRequest
    LIST_POLL_INTERVAL_SECONDS: float = 10.0
    # Таймаут получения ответа на скачивание
    DOWNLOAD_TIMEOUT_SECONDS: float = 60.0

    # ===== Имя файла отчёта =====
    LEAKS_REPORT_NAME_PART: str = "Отчет об утечках"  # подстрока в имени файла/отчёта
    XLSX_EXTENSION: str = ".xlsx"
    # Сигнатура zip-архива, используется для проверки формата файла по содержимому
    ZIP_SIGNATURE: bytes = b'PK\x03\x04'

    # ===== Формат даты/времени в отчёте =====
    REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
    # Регулярное выражение для извлечения двух дат из заголовка
    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    # Регулярное выражение для извлечения двух дат из названия файла
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет об утечках (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

    # Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    # ===== Названия колонок =====
    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_LDS_STATUS: str = "Режим работы СОУ"
    COL_MASK_INFO: str = "Информация о маскировании"
    COL_COORDINATE: str = "Координата"
    COL_LEAK_VOLUME: str = "Объемный расход утечки"
    COL_MT_MODE: str = "Режим работы МТ"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_LDS_STATUS,
        COL_MASK_INFO,
        COL_COORDINATE,
        COL_LEAK_VOLUME,
        COL_MT_MODE,
    ]

    LDS_STATUS_OK_TEXT: str = "СОУ исправна"
    MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"

    # ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
    STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
        StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
        StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
        StationaryStatus.STOPPED.value: "Режим остановленной перекачки МТ",
    }

    # ===== Прочее =====
    DEFAULT_SHEET_INDEX: int = 0

    SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST: str = "SubscribeReportsDataExportedRequest"
    EXPORT_REPORTS_COMMAND_REQUEST: str = "ExportReportsCommandRequest"
    REPORT_DATA_EXPORTED_NOTIFICATION: str = "ReportDataExportedNotification"
    GET_EXPORTED_DATA_LIST_REQUEST: str = "GetExportedDataListRequest"
    EXPORTED_DATA_LIST_LIMIT: int = 10
    DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"

    # Допустимая погрешность при сравнении границ периода отчёта
    REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
    # Формат даты/времени в имени скачиваемого xlsx-файла
    REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"

































scena
"""
Сценарии тестов - функции-обёртки без
pytest маркеров.

Каждая функция содержит логику одного теста.
Pytest маркеры и allure декораторы применяются в тестовых файлах.
"""

import time
from datetime import datetime, timedelta

import allure
import pytest

from constants.enums import (
    ConfirmationStatus,
    Direction,
    ExportedDataType,
    ExportStatus,
    GravityPipe,
    LdsStatus,
    LeakStatus,
    MessageType,
    ReplyStatus,
    SignalType,
    SiteKpKp,
    StationaryStatus,
    UserActions,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.get_messages_model import Filtering, FilteringObjects, Pagination
from test_config.models_for_tests import (
    CaseData,
    ExportLeaksReportState,
    LDSStatusConfig,
    LeakTestConfig,
    SmokeSuiteConfig,
)
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.asserts import SoftAssertions, StepCheck
from utils.helpers.ws_message_parser import ws_message_parser as parser
from utils.helpers.ws_test_utils import get_value


async def basic_info(ws_client, cfg: SmokeSuiteConfig | LDSStatusConfig):
    """
    Проверка базовой информации СОУ: список ТУ.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: BasicInfoContent"):
        payload = await t_utils.connect_and_get_msg(ws_client, "getBasicInfoRequest", [])
        parsed_payload = parser.parse_basic_info_msg(payload)
        expected_tu = [(cfg.tu_id, cfg.tu_name)]
        actual_tu = [(tu.tuId, tu.tuName) for tu in parsed_payload.replyContent.basicInfo.tus if tu.tuId == cfg.tu_id]

    with allure.step(f"Поверка наличия {cfg.tu_name} в списке доступных ТУ на сервере"):
        # Критическая проверка: если нужного ТУ нет в BasicInfoContent — считаем что ТУ отключен (через Zookeeper)
        # и прерываем весь прогон.
        if expected_tu[0] not in actual_tu:
            msg = (
                f"ТУ отключен: в BasicInfoContent отсутствует ТУ для запущенного набора данных: "
                f"tuId={cfg.tu_id}, tuName='{cfg.tu_name}', suite={cfg.suite_name}. "
                f"Необходимо убедиться, что ТУ включен (Zookeeper) и перезапустить прогон."
            )
            allure.attach(
                f"Ожидаемый ТУ: {expected_tu}\nПолученные ТУ: {actual_tu}",
                name="Предварительная проверка: ТУ отключен",
                attachment_type=allure.attachment_type.TEXT,
            )
            pytest.fail(msg, pytrace=False)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка статуса ответа", "replyStatus", soft_failures).actual(parsed_payload.replyStatus).expected(
            ReplyStatus.OK.value
        ).equal_to()

        StepCheck("Проверка наличия объектов в списке ТУ", "tus", soft_failures).actual(
            parsed_payload.replyContent.basicInfo.tus
        ).is_not_empty()

        StepCheck(
            f"Проверка наличия ТУ: {cfg.tu_name} в списке ТУ ",
            "(tuId, tuName)",
            soft_failures,
        ).actual(
            actual_tu
        ).expected(expected_tu).equal_to()


async def journal_info(ws_client):
    """
    Проверка наличия сообщений в журнале.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
        request_body = t_utils.create_journal_req_body()
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)

    StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(
        parsed_payload.replyContent.messagesInfo
    ).is_not_empty()


async def lds_status_initialization(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка режима работы СОУ: Инициализация.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "CommonSchemeContent",
            "SubscribeCommonSchemeRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )
        parsed_payload = parser.parse_common_scheme_info_msg(payload)
        # Получает список участков карты течения
        flow_areas = parsed_payload.replyContent.flowAreas
        # Получает самый протяженный участок карты течения
        longest_flow_area = t_utils.get_longest_flow_area(flow_areas)
        # Получает список ДУ
        diagnostic_areas = longest_flow_area.diagnosticAreas
        allure.attach(
            f"Самый протяженный участок карты течений: {longest_flow_area}",
            name="flowArea. Инициализация",
            attachment_type=allure.attachment_type.TEXT,
        )
        # Получает коллекцию статусов списка ДУ
        lds_status_set = {diagnostic_area.ldsStatus for diagnostic_area in diagnostic_areas}
        # Определяет режим работы СОУ по приоритету
        lds_status = t_utils.determine_lds_status_by_priority(lds_status_set)

    StepCheck("Проверка режима работы СОУ", "ldsStatus").actual(lds_status).expected(
        LdsStatus.INITIALIZATION.value
    ).equal_to()


async def diagnostics_of_signals_after_initialization(
    ws_client,
    cfg: SmokeSuiteConfig,
):
    """
    Проверка выходных сигналов после окончания режима Инициализация по причине "холодного" пуска  СОУ.

    """

    with allure.step("Подписка на сигналы для участков"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "OutputSignalsInfo",
            "SubscribeOutputSignalsRequest",
            {
                'objects': {
                    'linearParts': [],
                    'controlledSites': [
                        SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
                        SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
                        SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
                        SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
                        SiteKpKp.BACKUP_ROUTE_PONURA.value,
                        SiteKpKp.BACKUP_ROUTE_KUBAN.value,
                        SiteKpKp.NPZ_AFIPSKIJ.value,
                        SiteKpKp.NPZ_ILINSKIJ.value,
                    ],
                },
                'signalTypes': 1023,
                'tuId': cfg.tu_id,
                'additionalProperties': None,
            },
        )

        parsed_payload = parser.parse_output_signals_info_msg(payload)
        controlled_site_dict = {
            "controlled_site_first": SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
            "controlled_site_second": SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
            "controlled_site_third": SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
            "controlled_site_fourth": SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
            "controlled_site_fifth": SiteKpKp.BACKUP_ROUTE_PONURA.value,
            "controlled_site_sixth": SiteKpKp.BACKUP_ROUTE_KUBAN.value,
            "controlled_site_seventh": SiteKpKp.NPZ_AFIPSKIJ.value,
            "controlled_site_eight": SiteKpKp.NPZ_ILINSKIJ.value,
        }

        controlled_site_messages = {}
        for name, key in controlled_site_dict.items():
            controlled_site_messages[name] = t_utils.find_object_by_a_few_fields(
                parsed_payload.replyContent.controlledSiteSignals, key
            )

        all_signals = {}
        for site_name, site_message in controlled_site_messages.items():
            signal_dict = {'pump': None, 'sou': None, 'gravity': None}
            if site_message:
                all_signals[site_name] = {
                    'pump': t_utils.get_signal(site_message, SignalType.REGLU),
                    'sou': t_utils.get_signal(site_message, SignalType.REGSOU),
                    'gravity': t_utils.get_signal(site_message, SignalType.GRAVITYPIPE),
                }
            else:
                all_signals[site_name] = signal_dict

        first_kp_kp = all_signals.get("controlled_site_first") or {}
        if first_kp_kp:
            first_site_signal_pump = get_value(first_kp_kp.get("pump"))
            first_site_signal_sou = get_value(first_kp_kp.get("sou"))
            first_site_signal_gravity = get_value(first_kp_kp.get("gravity"))

        second_kp_kp = all_signals.get("controlled_site_second") or {}
        if second_kp_kp:
            second_site_signal_pump = get_value(second_kp_kp.get("pump"))
            second_site_signal_sou = get_value(second_kp_kp.get("sou"))
            second_site_signal_gravity = get_value(second_kp_kp.get("gravity"))

        third_kp_kp = all_signals.get("controlled_site_third") or {}
        if third_kp_kp:
            third_site_signal_pump = get_value(third_kp_kp.get("pump"))
            third_site_signal_sou = get_value(third_kp_kp.get("sou"))
            third_site_signal_gravity = get_value(third_kp_kp.get("gravity"))

        fourth_kp_kp = all_signals.get("controlled_site_fourth") or {}
        if fourth_kp_kp:
            fourth_site_signal_pump = get_value(fourth_kp_kp.get("pump"))
            fourth_site_signal_sou = get_value(fourth_kp_kp.get("sou"))
            fourth_site_signal_gravity = get_value(fourth_kp_kp.get("gravity"))

        fifth_kp_kp = all_signals.get("controlled_site_fifth") or {}
        if fifth_kp_kp:
            fifth_site_signal_pump = get_value(fifth_kp_kp.get("pump"))
            fifth_site_signal_sou = get_value(fifth_kp_kp.get("sou"))
            fifth_site_signal_gravity = get_value(fifth_kp_kp.get("gravity"))

        sixth_kp_kp = all_signals.get("controlled_site_sixth") or {}
        if sixth_kp_kp:
            sixth_site_signal_pump = get_value(sixth_kp_kp.get("pump"))
            sixth_site_signal_sou = get_value(sixth_kp_kp.get("sou"))
            sixth_site_signal_gravity = get_value(sixth_kp_kp.get("gravity"))

        seventh_kp_kp = all_signals.get("controlled_site_seventh") or {}
        if seventh_kp_kp:
            seventh_site_signal_pump = get_value(seventh_kp_kp.get("pump"))
            seventh_site_signal_sou = get_value(seventh_kp_kp.get("sou"))
            seventh_site_signal_gravity = get_value(seventh_kp_kp.get("gravity"))

        eighth_kp_kp = all_signals.get("controlled_site_eight") or {}
        if eighth_kp_kp:
            eight_site_signal_pump = get_value(eighth_kp_kp.get("pump"))
            eight_site_signal_sou = get_value(eighth_kp_kp.get("sou"))
            eight_site_signal_gravity = get_value(eighth_kp_kp.get("gravity"))

    with SoftAssertions() as soft_failures:
        StepCheck(
            "Проверка сигнала - режим МТ на участке Тихорецкая-Нововеличковская",
            "Режим МТ",
            soft_failures,
        ).actual(first_site_signal_pump).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_lu)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на участке Тихорецкая-Нововеличковская",
            "Режим СОУ",
            soft_failures,
        ).actual(first_site_signal_sou).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_sou)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} \n"
            f"на участке Тихорецкая-Нововеличковская",
            "Количество самотеков",
            soft_failures,
        ).actual(first_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на участке Нововеличковская-Крымская",
            "Режим МТ",
            soft_failures,
        ).actual(second_site_signal_pump).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_lu)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description}\n"
            f"на участке Нововеличковская-Крымская",
            "Количество самотеков",
            soft_failures,
        ).actual(second_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на участке Нововеличковская-Крымская",
            "Режим СОУ",
            soft_failures,
        ).actual(second_site_signal_sou).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_sou)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на участке Крымская-Грушовая",
            "Режим МТ",
            soft_failures,
        ).actual(
            third_site_signal_pump
        ).expected(str(cfg.exp_krymskaya_grushovaya_reg_lu)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_true.description} на участке Крымская-Грушовая",
            "Количество самотеков",
            soft_failures,
        ).actual(third_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_true.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на участке Крымская-Грушовая",
            "Режим СОУ",
            soft_failures,
        ).actual(
            third_site_signal_sou
        ).expected(str(cfg.exp_krymskaya_grushovaya_reg_sou)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на резервной нитке Бейсуг",
            "Режим МТ",
            soft_failures,
        ).actual(
            fourth_site_signal_pump
        ).expected(str(cfg.exp_backup_route_bejsug_reg_lu)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Бейсуг",
            "Количество самотеков",
            soft_failures,
        ).actual(fourth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на резервной нитке Бейсуг",
            "Режим СОУ",
            soft_failures,
        ).actual(
            fourth_site_signal_sou
        ).expected(str(cfg.exp_backup_route_bejsug_reg_sou)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на резервной нитке Понура",
            "Режим МТ",
            soft_failures,
        ).actual(
            fifth_site_signal_pump
        ).expected(str(cfg.exp_backup_route_ponura_reg_lu)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на резервной нитке Понура",
            "Режим СОУ",
            soft_failures,
        ).actual(
            fifth_site_signal_sou
        ).expected(str(cfg.exp_backup_route_ponura_reg_sou)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Понура",
            "Количество самотеков",
            soft_failures,
        ).actual(fifth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на резервной нитке Кубань",
            "Режим МТ",
            soft_failures,
        ).actual(
            sixth_site_signal_pump
        ).expected(str(cfg.exp_backup_route_kuban_reg_lu)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на резервной нитке Кубань",
            "Режим СОУ",
            soft_failures,
        ).actual(
            sixth_site_signal_sou
        ).expected(str(cfg.exp_backup_route_kuban_reg_sou)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Кубань",
            "Количество самотеков",
            soft_failures,
        ).actual(sixth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на НПЗ Афипский",
            "Режим МТ",
            soft_failures,
        ).actual(
            seventh_site_signal_pump
        ).expected(str(cfg.exp_npz_afipskij_reg_lu)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на НПЗ Афипский",
            "Режим СОУ",
            soft_failures,
        ).actual(
            seventh_site_signal_sou
        ).expected(str(cfg.exp_npz_afipskij_reg_sou)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Афипский",
            "Количество самотеков",
            soft_failures,
        ).actual(seventh_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
        StepCheck(
            "Проверка сигнала - режим МТ на НПЗ Ильинский",
            "Режим МТ",
            soft_failures,
        ).actual(
            eight_site_signal_pump
        ).expected(str(cfg.exp_npz_ilinskij_reg_lu)).equal_to()
        StepCheck(
            "Проверка сигнала - режим СОУ на НПЗ Ильинский",
            "Режим СОУ",
            soft_failures,
        ).actual(
            eight_site_signal_sou
        ).expected(str(cfg.exp_npz_ilinskij_reg_sou)).equal_to()
        StepCheck(
            f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Ильинский",
            "Количество самотеков",
            soft_failures,
        ).actual(eight_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()


async def lds_status_init_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
    """
    Проверка наличия записи в журнале о входе СОУ в режим Инициализация.
    """
    with allure.step("Запрос сообщений журнала с фильтром messageTypes=LDS_STATUS"):
        end_time = datetime.now()
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.LDS_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

    with allure.step("Фильтрация сообщений по времени и technologicalSection"):
        filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
        filter_end_msk = t_utils.localize_as_moscow(end_time)

        time_filtered = [
            msg
            for msg in messages_info
            if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
        ]
        time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

        lds_msg = next(
            (
                msg
                for msg in time_filtered
                if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START
            ),
            None,
        )

        allure.attach(
            f"Всего получено сообщений: {len(messages_info)}\n"
            f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
            f"проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_ACCUM_DATA}': {'True' if lds_msg else 'False'}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

        with allure.step(
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START}'"
        ):
            if lds_msg is None:
                pytest.fail(
                    f"Сообщение с technologicalSection='{cfg.tu_name}' "
                    f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START}' "
                    f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
                )

    with allure.step("Проверка актуальности сообщения"):
        msg_time_msk = t_utils.ensure_moscow_timezone(lds_msg.time)
        start_time_msk = t_utils.localize_as_moscow(imitator_start_time)

        StepCheck(
            f"Проверка: время сообщения позднее времени старта имитатора {msg_time_msk} > {start_time_msk}",
            "time",
        ).actual(msg_time_msk > start_time_msk).expected(True).equal_to()

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка event", "event", soft_failures).actual(lds_msg.event).expected(
            TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START
        ).equal_to()

        StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(lds_msg.mainPipeline).expected(
            cfg.main_pipeline
        ).equal_to()

        StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
            lds_msg.technologicalSection
        ).expected(cfg.tu_name).equal_to()

        StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
            lds_msg.technologicalObject
        ).is_not_none()

        StepCheck("Проверка priority не пустой", "priority", soft_failures).actual(lds_msg.priority).is_not_none()

        StepCheck("Проверка messageType", "messageType", soft_failures).actual(lds_msg.messageType).expected(
            TestConst.JOURNAL_MESSAGE_TYPE_LDS_STATUS
        ).equal_to()


async def main_page_info(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка установки режима МТ.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageInfoContent",
            "subscribeMainPageInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_msg(payload)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()

        StepCheck(
            f"Проверка установки стационара для ТУ {cfg.tu_name}",
            "stationary_status",
            soft_failures,
        ).actual(
            parsed_payload.replyContent.tuInfo.stationaryStatus
        ).expected(cfg.expected_stationary_status).equal_to()


async def main_page_info_signals(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка счетчиков состояния сигналов
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageSignalsInfoContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageSignalsInfoContent",
            "subscribeMainPageSignalsInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_signals_msg(payload)
    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()
        field_name = "numberOfRejectedSignals"
        # Проверяет что количество отбракованных сигналов больше или равно ОР
        StepCheck(
            f"Проверка количества отбракованных сигналов ТУ {cfg.tu_name}",
            field_name,
            soft_failures,
        ).actual(
            parsed_payload.replyContent.signalsInfo.numberOfRejectedSignals
        ).is_greater_than_or_equal_to(cfg.expected_main_page_signals[field_name])
        field_name = "numberOfMaskedSignals"
        StepCheck(
            f"Проверка количества маскированных сигналов ТУ {cfg.tu_name}",
            field_name,
            soft_failures,
        ).actual(
            parsed_payload.replyContent.signalsInfo.numberOfMaskedSignals
        ).expected(cfg.expected_main_page_signals[field_name]).equal_to()
        field_name = "numberOfImitatedSignals"
        StepCheck(
            f"Проверка количества имитированных сигналов ТУ {cfg.tu_name}",
            field_name,
            soft_failures,
        ).actual(
            parsed_payload.replyContent.signalsInfo.numberOfImitatedSignals
        ).expected(cfg.expected_main_page_signals[field_name]).equal_to()


async def main_page_info_unstationary(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка установки режима Нестационар (для наборов с несколькими утечками).
    Запускается после первой утечки, когда режим переходит в Нестационар.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageInfoContent",
            "subscribeMainPageInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_msg(payload)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()

        StepCheck(
            f"Проверка установки режима Нестационар для ТУ {cfg.tu_name}",
            "stationary_status",
            soft_failures,
        ).actual(parsed_payload.replyContent.tuInfo.stationaryStatus).expected(
            StationaryStatus.UNSTATIONARY.value
        ).equal_to()


async def leak_is_confirm_on_main_page(ws_client, cfg: SmokeSuiteConfig):
    """
    MainPageInfoContent - проверка подтвержденной утечки на ЭФ Состояние МТ
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent."):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageInfoContent",
            "subscribeMainPageInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_msg(payload)
        main_page_leak_info = parsed_payload.replyContent.tuInfo.leaksInfo
        confirm_leak = t_utils.find_object_by_field(main_page_leak_info, "leakStatus", LeakStatus.CONFIRMED.value)

    StepCheck("Проверка подтвержденной утечки на ЭФ Состояние МТ", "leakStatus").actual(
        confirm_leak.leakStatus
    ).expected(LeakStatus.CONFIRMED.value).equal_to()


async def leak_is_complete_on_main_page(ws_client, cfg: SmokeSuiteConfig):
    """
    MainPageInfoContent - отсутствует подтвержденная утечка на ЭФ Состояние МТ
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent."):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageInfoContent",
            "subscribeMainPageInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_msg(payload)
        main_page_leak_info = parsed_payload.replyContent.tuInfo.leaksInfo
        confirmed_and_closed_leaks = t_utils.find_confirmed_leaks_on_main_page(main_page_leak_info)

    StepCheck("Проверка подтвержденной утечки на ЭФ Состояние МТ", "leakStatus").actual(
        confirmed_and_closed_leaks
    ).is_empty()


async def imitate_senor_signal(ws_client, cfg: SmokeSuiteConfig, test_data: CaseData):
    """
    Проверка имитации сигнала датчика.
    """
    # Распаковка данных для теста
    sensor_id = test_data.params.get("sensor_id")
    sensor_val, sensor_quality = test_data.expected_result

    with allure.step(f"Отправка сообщения и обработка ответа об имитации сигнала датчика с id: {sensor_id}"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "ImitateSignalRequest",
            {
                'id': sensor_id,
                'tuId': cfg.tu_id,
                'imitateInfo': {
                    'value': str(sensor_val),
                    'quality': sensor_quality,
                    'additionalProperties': None,
                },
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_imitate_signal_msg(payload)
        sensor_imitate_reply_status = parsed_payload.replyStatus

        StepCheck("Проверка кода ответа на запрос об имитации", "replyStatus").actual(
            sensor_imitate_reply_status
        ).expected(ReplyStatus.OK.value).equal_to()

    with allure.step(
        "Подключение по ws, получение и обработка данных о статусе датчика из сообщения типа: InputSignalsContent"
    ):
        time.sleep(cfg.basic_message_timeout)
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "InputSignalsContent",
            "SubscribeInputSignalsRequest",
            {
                'signalIds': [sensor_id],
                'tuId': cfg.tu_id,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_input_signals_info_msg(payload)
        sensor_data = parsed_payload.replyContent.inputSignals
        sensor_imitate_data = t_utils.find_object_by_field(sensor_data, "id", sensor_id)
    with allure.step(f"Отправка сообщения и обработка ответа о снятии имитации датчика с id: {sensor_id}"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "UnimitateSignalRequest",
            {'id': sensor_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
        )
        parsed_payload = parser.parse_unimitate_signal_msg(payload)
        sensor_unimitate_reply_status = parsed_payload.replyStatus

        StepCheck("Проверка кода ответа на запрос о снятии имитации", "replyStatus").actual(
            sensor_unimitate_reply_status
        ).expected(ReplyStatus.OK.value).equal_to()

    with allure.step(
        "Подключение по ws, получение и обработка данных о статусе датчика из сообщения типа: InputSignalsContent"
    ):
        time.sleep(cfg.basic_message_timeout)
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "InputSignalsContent",
            "SubscribeInputSignalsRequest",
            {
                'signalIds': [sensor_id],
                'tuId': cfg.tu_id,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_input_signals_info_msg(payload)
        sensor_data = parsed_payload.replyContent.inputSignals
        sensor_unimitate_data = t_utils.find_object_by_field(sensor_data, "id", sensor_id)

    with SoftAssertions() as soft_failures:
        StepCheck(f"Проверка имитации датчика с id: {sensor_id}", "isImitated", soft_failures).actual(
            sensor_imitate_data.isImitated
        ).expected(True).equal_to()
        StepCheck(f"Проверка показаний датчика с id: {sensor_id}", "value", soft_failures).actual(
            sensor_imitate_data.imitation.value
        ).expected(sensor_val).equal_to()
        StepCheck(f"Проверка качества сигнала датчика с id: {sensor_id}", "quality", soft_failures).actual(
            sensor_imitate_data.quality
        ).expected(sensor_quality).equal_to()
        StepCheck(f"Проверка снятия имитации датчика с id: {sensor_id}", "isImitated", soft_failures).actual(
            sensor_unimitate_data.isImitated
        ).expected(False).equal_to()


async def mask_signal_msg(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка маскирования датчиков.
    """
    with allure.step("Подключение по ws, получение и обработка данных датчиков давления и расхода"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "GetInputSignalsRequest",
            {
                'tuId': cfg.tu_id,
                'sorting': None,
                'filtering': None,
                'columnsSelection': 512,
                'search': None,
                'additionalProperties': None,
            },
        )

        parsed_payload = parser.parse_input_signals_msg(payload)
        # Получает список датчиков давления
        pressure_sensor_list = [
            sensor
            for sensor in parsed_payload.replyContent
            if sensor.signalType == TestConst.PRESSURE_SIGNAL_TYPE
            and sensor.objectType == TestConst.PRESSURE_SENSOR_OBJECT_TYPE
        ]
        # Получает список расходомеров
        flowmeter_list = [
            sensor
            for sensor in parsed_payload.replyContent
            if sensor.signalType == TestConst.FLOW_SIGNAL_TYPE and sensor.objectType == TestConst.FLOWMETER_OBJECT_TYPE
        ]
        # Случайно выбирает 1 расходомер и 1 датчик давления
        pressure_sensor = t_utils.get_random_item(pressure_sensor_list)
        flowmeter = t_utils.get_random_item(flowmeter_list)

    with allure.step("Маскирование датчиков"):
        with allure.step(
            f"Отправка сообщения и обработка ответа о маскировании датчика давления с id: {pressure_sensor.id}"
        ):
            payload = await t_utils.connect_and_get_msg(
                ws_client,
                "MaskSignalRequest",
                {'id': pressure_sensor.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
            )
            parsed_payload = parser.parse_mask_signal_msg(payload)
            pressure_sensor_mask_reply_status = parsed_payload.replyStatus

            StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus").actual(
                pressure_sensor_mask_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()

        with allure.step(f"Отправка сообщения и обработка ответа о маскировании расходомера с id: {flowmeter.id}"):
            payload = await t_utils.connect_and_get_msg(
                ws_client,
                "MaskSignalRequest",
                {'id': flowmeter.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
            )
            parsed_payload = parser.parse_mask_signal_msg(payload)
            flowmeter_mask_reply_status = parsed_payload.replyStatus

            StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus").actual(
                flowmeter_mask_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()

    with allure.step(
        "Подключение по ws, получение и обработка данных о статусе датчиков из сообщения типа: InputSignalsContent"
    ):
        time.sleep(cfg.basic_message_timeout)
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "InputSignalsContent",
            "SubscribeInputSignalsRequest",
            {
                'signalIds': [pressure_sensor.id, flowmeter.id],
                'tuId': cfg.tu_id,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_input_signals_info_msg(payload)
        sensor_data = parsed_payload.replyContent.inputSignals
        pressure_sensor_mask_data = t_utils.find_object_by_field(sensor_data, "id", pressure_sensor.id)
        flowmeter_mask_data = t_utils.find_object_by_field(sensor_data, "id", flowmeter.id)

    with allure.step("Снятие маскирования датчиков"):
        with allure.step(
            f"Отправка сообщения и обработка ответа о снятии маскирования датчика давления с id: {pressure_sensor.id}"
        ):
            payload = await t_utils.connect_and_get_msg(
                ws_client,
                "UnmaskSignalRequest",
                {'id': pressure_sensor.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
            )
            parsed_payload = parser.parse_unmask_signal_msg(payload)
            pressure_sensor_unmask_reply_status = parsed_payload.replyStatus

            StepCheck("Проверка кода ответа на запрос о снятии маскирования", "replyStatus").actual(
                pressure_sensor_unmask_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()

        with allure.step(
            f"Отправка сообщения и обработка ответа о снятии маскирования расходомера с id: {flowmeter.id}"
        ):
            payload = await t_utils.connect_and_get_msg(
                ws_client,
                "UnmaskSignalRequest",
                {'id': flowmeter.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
            )
            parsed_payload = parser.parse_unmask_signal_msg(payload)
            flowmeter_unmask_reply_status = parsed_payload.replyStatus

            StepCheck("Проверка кода ответа на запрос о снятии маскирования", "replyStatus").actual(
                flowmeter_unmask_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()

    with allure.step(
        "Подключение по ws, получение и обработка данных о статусе датчиков из сообщения типа: InputSignalsContent"
    ):
        time.sleep(cfg.basic_message_timeout)
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "InputSignalsContent",
            "SubscribeInputSignalsRequest",
            {
                'signalIds': [pressure_sensor.id, flowmeter.id],
                'tuId': cfg.tu_id,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_input_signals_info_msg(payload)
        sensor_data = parsed_payload.replyContent.inputSignals
        pressure_sensor_unmask_data = t_utils.find_object_by_field(sensor_data, "id", pressure_sensor.id)
        flowmeter_unmask_data = t_utils.find_object_by_field(sensor_data, "id", flowmeter.id)

    with SoftAssertions() as soft_failures:
        StepCheck(
            f"Проверка маскирования датчика давления с id: {pressure_sensor.id}", "isMasked", soft_failures
        ).actual(pressure_sensor_mask_data.isMasked).expected(True).equal_to()
        StepCheck(f"Проверка маскирования расходомера с id: {flowmeter.id}", "isMasked", soft_failures).actual(
            flowmeter_mask_data.isMasked
        ).expected(True).equal_to()
        StepCheck(
            f"Проверка снятия маскирования датчика давления с id: {pressure_sensor.id}", "isMasked", soft_failures
        ).actual(pressure_sensor_unmask_data.isMasked).expected(False).equal_to()
        StepCheck(f"Проверка снятия маскирования расходомера с id: {flowmeter.id}", "isMasked", soft_failures).actual(
            flowmeter_unmask_data.isMasked
        ).expected(False).equal_to()


async def mask_info_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
    """
    Проверка записей журнала о маскировании и размаскировании.
    """
    with allure.step("Запрос сообщений журнала с фильтром userActions"):
        end_time = datetime.now()
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_MASK_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(userActions=int(UserActions.SIGNAL_MASK_SIM), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        all_messages = parsed_payload.replyContent.messagesInfo

    with allure.step("Фильтрация сообщений по событиям маскирования и временному диапазону"):
        filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
        filter_end_msk = t_utils.localize_as_moscow(end_time)

        mask_unmask_msgs = [
            msg
            for msg in all_messages
            if msg.event in TestConst.JOURNAL_MASK_EXPECTED_EVENTS
            and msg.signalName in TestConst.JOURNAL_MASK_EXPECTED_SIGNALS
        ]

        journal_messages = [
            msg
            for msg in mask_unmask_msgs
            if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
        ]

        allure.attach(
            f"Всего получено сообщений: {len(all_messages)}\n"
            f"После фильтрации по event и signalName осталось сообщений: {len(mask_unmask_msgs)}\n"
            f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}) "
            f"осталось сообщений: {len(journal_messages)}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step("Группировка отфильтрованных сообщений"):
        pressure_msgs = [msg for msg in journal_messages if msg.signalName == TestConst.JOURNAL_SIGNAL_PRESSURE]
        flow_msgs = [msg for msg in journal_messages if msg.signalName == TestConst.JOURNAL_SIGNAL_FLOW]

        mask_event_msgs = [msg for msg in journal_messages if msg.event == TestConst.JOURNAL_EVENT_MASK]
        unmask_event_msgs = [msg for msg in journal_messages if msg.event == TestConst.JOURNAL_EVENT_UNMASK]
        mask_signal_names = {msg.signalName for msg in mask_event_msgs}
        unmask_signal_names = {msg.signalName for msg in unmask_event_msgs}

    with SoftAssertions() as journal_soft_failures:
        StepCheck(
            "Проверка соответствия количества сообщений о действиях пользователя (снятие и установка "
            "маскирования для датчиков давления и расходомеров)",
            "total_count",
            journal_soft_failures,
        ).actual(len(journal_messages)).expected(TestConst.JOURNAL_EXPECTED_MASK_MSG_TOTAL).equal_to()

        StepCheck(
            f"Проверка соответствия количества сообщений "
            f"о действиях пользователя для датчиков давления - '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
            "count",
            journal_soft_failures,
        ).actual(len(pressure_msgs)).expected(TestConst.JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL).equal_to()

        StepCheck(
            f"Проверка соответствия количества сообщений "
            f"о действиях пользователя для расходомеров - '{TestConst.JOURNAL_SIGNAL_FLOW}'",
            "count",
            journal_soft_failures,
        ).actual(len(flow_msgs)).expected(TestConst.JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL).equal_to()

        StepCheck(
            f"Проверка: событие '{TestConst.JOURNAL_EVENT_MASK}' содержит '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
            "signalName",
            journal_soft_failures,
        ).actual(TestConst.JOURNAL_SIGNAL_PRESSURE in mask_signal_names).expected(True).equal_to()

        StepCheck(
            f"Проверка: событие '{TestConst.JOURNAL_EVENT_MASK}' содержит '{TestConst.JOURNAL_SIGNAL_FLOW}'",
            "signalName",
            journal_soft_failures,
        ).actual(TestConst.JOURNAL_SIGNAL_FLOW in mask_signal_names).expected(True).equal_to()

        StepCheck(
            f"Проверка: событие '{TestConst.JOURNAL_EVENT_UNMASK}' содержит '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
            "signalName",
            journal_soft_failures,
        ).actual(TestConst.JOURNAL_SIGNAL_PRESSURE in unmask_signal_names).expected(True).equal_to()

        StepCheck(
            f"Проверка: событие '{TestConst.JOURNAL_EVENT_UNMASK}' содержит '{TestConst.JOURNAL_SIGNAL_FLOW}'",
            "signalName",
            journal_soft_failures,
        ).actual(TestConst.JOURNAL_SIGNAL_FLOW in unmask_signal_names).expected(True).equal_to()

        for signal_name in [TestConst.JOURNAL_SIGNAL_PRESSURE, TestConst.JOURNAL_SIGNAL_FLOW]:
            mask_msg_for_signal = next((msg for msg in mask_event_msgs if msg.signalName == signal_name), None)
            unmask_msg_for_signal = next((msg for msg in unmask_event_msgs if msg.signalName == signal_name), None)

            if mask_msg_for_signal and unmask_msg_for_signal:
                StepCheck(
                    f"Проверка совпадения tag для '{signal_name}' между маскированием и снятием",
                    "tag",
                    journal_soft_failures,
                ).actual(mask_msg_for_signal.tag).expected(unmask_msg_for_signal.tag).equal_to()

        for msg in journal_messages:
            msg_label = f"{msg.event} - {msg.signalName}"

            StepCheck(
                f"Проверка user не пустой [{msg_label}]",
                "user",
                journal_soft_failures,
            ).actual(msg.user).is_not_none()

            StepCheck(
                f"Проверка mainPipeline [{msg_label}]",
                "mainPipeline",
                journal_soft_failures,
            ).actual(
                msg.mainPipeline
            ).expected(cfg.main_pipeline).equal_to()

            StepCheck(
                f"Проверка object не пустой [{msg_label}]",
                "object",
                journal_soft_failures,
            ).actual(msg.object).is_not_none()

            StepCheck(
                f"Проверка technologicalObject не пустой [{msg_label}]",
                "technologicalObject",
                journal_soft_failures,
            ).actual(msg.technologicalObject).is_not_none()

            StepCheck(
                f"Проверка technologicalSection [{msg_label}]",
                "technologicalSection",
                journal_soft_failures,
            ).actual(msg.technologicalSection).expected(cfg.tu_name).equal_to()

            StepCheck(
                f"Проверка priority не пустой [{msg_label}]",
                "priority",
                journal_soft_failures,
            ).actual(msg.priority).is_not_none()

            StepCheck(
                f"Проверка messageType [{msg_label}]",
                "messageType",
                journal_soft_failures,
            ).actual(
                msg.messageType
            ).expected(TestConst.JOURNAL_MESSAGE_TYPE_USER_ACTIONS).equal_to()

            StepCheck(
                f"Проверка status [{msg_label}]",
                "status",
                journal_soft_failures,
            ).actual(
                msg.status
            ).expected(TestConst.JOURNAL_STATUS_SUCCESS).equal_to()


async def mask_du_on_mini_scheme(ws_client, cfg: SmokeSuiteConfig):
    """
    Маскирование ДУ на мини-схеме
    Проверка маскированного участка в выходных сигналах

    """
    linear_part_id = cfg.linear_part_identifier_for_mask
    mask_reason = cfg.mask_reason

    with allure.step(
        "Подключение по ws, отправка сообщения типа: MaskLdsRequest. Совершается действие - маскирование ДУ"
    ):
        payload = (
            await t_utils.connect_and_get_msg(
                ws_client,
                "MaskLdsRequest",
                {
                    'tuId': cfg.tu_id,
                    'maskInfo': [
                        {
                            'linearPartId': linear_part_id,
                            'reason': mask_reason,
                            'additionalProperties': None,
                        }
                    ],
                    'additionalProperties': None,
                },
            ),
        )
        time.sleep(cfg.basic_message_timeout)
        parsed_payload = parser.parse_unmask_lds_message(payload)
        flowmeter_mask_reply_status = parsed_payload.replyStatus

    with allure.step(f"Получение словаря для линейного участка с id: {linear_part_id}.\n" f"ЭФ Выходные сигналы."):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "GetOutputSignalsRequest",
            {
                'tuId': cfg.tu_id,
                'filtering': None,
                'search': None,
                'sorting': None,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_output_signals_msg(payload)
        # Получение данных линейного участка утечки по id
        leak_linear_part = t_utils.find_object_by_field(
            parsed_payload.replyContent.linearPartSignals,
            TestConst.LEAK_LINEAR_PART_ID_KEY,
            linear_part_id,
        )

        with allure.step("Получение типов выходных сигналов из обработанных данных. ЭФ Выходные сигналы"):
            leak_signals_list = leak_linear_part.signals

            mask_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
            )

    with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}\n"):
        with allure.step(
            "Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo. "
            "Получен результат маскирования ДУ на ЭФ Выходные сигнал"
        ):
            payload = await t_utils.connect_and_subscribe_msg(
                ws_client,
                "OutputSignalsInfo",
                "SubscribeOutputSignalsRequest",
                {
                    'objects': {
                        'linearParts': [{'linearPartId': linear_part_id}],
                        'controlledSites': [],
                    },
                    'signalTypes': 1023,
                    'tuId': cfg.tu_id,
                    'additionalProperties': None,
                },
            )
            parsed_payload = parser.parse_output_signals_info_msg(payload)
            leak_linear_part = t_utils.find_object_by_field(
                parsed_payload.replyContent.linearPartSignals,
                TestConst.LEAK_LINEAR_PART_ID_KEY,
                linear_part_id,
            )
        with allure.step("Обработка полученных данных выходных сигналов"):
            leak_signals_list = leak_linear_part.signals
            mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)

    with allure.step(
        "Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent. "
        "Получен результат маскирования ДУ на ЭФ Схема"
    ):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "CommonSchemeContent",
            "SubscribeCommonSchemeRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )
        parsed_payload = parser.parse_common_scheme_info_msg(payload)
        linear_parts = parsed_payload.replyContent.linearParts
        mask_linear_part = next((lp for lp in linear_parts if lp.id == linear_part_id), None)

    with allure.step(
        "Подключение по ws, получение и обработка сообщения типа: MessagesInfo. "
        "Получен результат маскирования ДУ на ЭФ Журнал"
    ):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=10, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.MASKING_LDS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        if cfg.technological_section:
            mask_message = t_utils.find_object_by_field(
                messages_info, "technologicalSection", cfg.technological_section
            )
        else:
            mask_message = parsed_payload.replyContent.messagesInfo[0]

    with allure.step(
        "Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent. "
        "Получен результат маскирования ДУ на ЭФ Состояние МТ"
    ):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageInfoContent",
            "subscribeMainPageInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_msg(payload)
        number_of_masked_lps = parsed_payload.replyContent.tuInfo.ldsStatus.numberOfMaskedLps
        main_page_info_lds_status_obj = parsed_payload.replyContent.tuInfo.ldsStatus
        mask_du_list = getattr(main_page_info_lds_status_obj, "maskedLps", None)
        if mask_du_list:
            masked_lps_name = next(iter(mask_du_list), None)
        else:
            masked_lps_name = None

    # Проверки сообщений
    with SoftAssertions() as soft_failures:
        StepCheck(
            "Проверка сигнала маскирования ДУ в выходных сигналах", TestConst.ADDRESS_SUFFIX_MASK, soft_failures
        ).actual(mask_leak_value).expected(TestConst.OUTPUT_IS_MASK).equal_to()
        StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus", soft_failures).actual(
            flowmeter_mask_reply_status
        ).expected(ReplyStatus.OK.value).equal_to()
        StepCheck("Проверка статуса маскирования ДУ на схеме", "isMasked", soft_failures).actual(
            mask_linear_part.isMasked
        ).expected(True).equal_to()
        StepCheck("Проверка причины маски в журнале", "maskReason").actual(mask_linear_part.maskReason).expected(
            cfg.mask_reason
        ).equal_to()
        StepCheck("Проверка имени ТУ в журнале", "mainPipeline", soft_failures).actual(
            mask_message.mainPipeline
        ).expected(cfg.main_pipe_line).equal_to()
        StepCheck("Проверка имени ДУ в журнале", "technologicalObject", soft_failures).actual(
            mask_message.technologicalObject
        ).expected(cfg.mask_du_name).equal_to()
        StepCheck("Проверка события в журнале", "event", soft_failures).actual(mask_message.event).expected(
            cfg.mask_du_event
        ).equal_to()
        StepCheck("Проверка количества маскированных ДУ", "numberOfMaskedLps", soft_failures).actual(
            number_of_masked_lps
        ).expected(cfg.mask_one_du).equal_to()
        StepCheck("Проверка счетчика маски. ЭФ Состояние МТ", "Количество замаскированных ДУ", soft_failures).actual(
            number_of_masked_lps
        ).expected(cfg.mask_one_du).equal_to()
        StepCheck(
            "Проверка имени маскированного ДУ. ЭФ Состояние МТ", "Наименование замаскированного ДУ", soft_failures
        ).actual(masked_lps_name).expected(cfg.mask_du_name).equal_to()


async def unmask_du_on_mini_scheme(ws_client, cfg: SmokeSuiteConfig):
    """
    Размаскирование ДУ на мини-схеме
    Проверка маскированного участка в выходных сигналах

    """
    linear_part_id = cfg.linear_part_identifier_for_mask
    unmask_reason = cfg.unmask_reason

    with allure.step(
        "Подключение по ws, отправка сообщения типа: UnmaskLdsRequest. Совершается действие - размаскирование ДУ"
    ):
        payload = (
            await t_utils.connect_and_get_msg(
                ws_client,
                "UnmaskLdsRequest",
                {
                    'tuId': cfg.tu_id,
                    'maskInfo': [
                        {
                            'linearPartId': linear_part_id,
                            'reason': unmask_reason,
                            'additionalProperties': None,
                        }
                    ],
                    'additionalProperties': None,
                },
            ),
        )
        time.sleep(cfg.basic_message_timeout)
        parsed_payload = parser.parse_unmask_lds_message(payload)
        flowmeter_mask_reply_status = parsed_payload.replyStatus

    with allure.step(f"Получение словаря для линейного участка с id: {linear_part_id}\n" f"ЭФ Выходные сигналы"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "GetOutputSignalsRequest",
            {
                'tuId': cfg.tu_id,
                'filtering': None,
                'search': None,
                'sorting': None,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_output_signals_msg(payload)
        # Получение данных линейного участка утечки по id
        leak_linear_part = t_utils.find_object_by_field(
            parsed_payload.replyContent.linearPartSignals,
            TestConst.LEAK_LINEAR_PART_ID_KEY,
            linear_part_id,
        )

        with allure.step("Получение типов выходных сигналов из обработанных данных. ЭФ Выходные сигналы"):
            leak_signals_list = leak_linear_part.signals

            mask_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
            )

    with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}\n"):
        with allure.step("Получен результат маскирования ДУ на ЭФ Выходные сигналы"):
            payload = await t_utils.connect_and_subscribe_msg(
                ws_client,
                "OutputSignalsInfo",
                "SubscribeOutputSignalsRequest",
                {
                    'objects': {
                        'linearParts': [{'linearPartId': linear_part_id}],
                        'controlledSites': [],
                    },
                    'signalTypes': 1023,
                    'tuId': cfg.tu_id,
                    'additionalProperties': None,
                },
            )
            parsed_payload = parser.parse_output_signals_info_msg(payload)
            leak_linear_part = t_utils.find_object_by_field(
                parsed_payload.replyContent.linearPartSignals,
                TestConst.LEAK_LINEAR_PART_ID_KEY,
                linear_part_id,
            )

        with allure.step("Обработка полученных данных выходных сигналов"):
            leak_signals_list = leak_linear_part.signals
            mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)

    with allure.step(
        "Подключение по ws, получение и обработка сообщения типа: MessagesInfo. "
        "Получен результат маскирования ДУ на ЭФ Журнал"
    ):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=10, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.MASKING_LDS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        if cfg.technological_section:
            mask_message = t_utils.find_object_by_field(
                messages_info, "technologicalSection", cfg.technological_section
            )
        else:
            mask_message = parsed_payload.replyContent.messagesInfo[0]

    with allure.step(
        "Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent. "
        "Получен результат маскирования ДУ на ЭФ Состояние МТ"
    ):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageInfoContent",
            "subscribeMainPageInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_msg(payload)
        number_of_masked_lps = parsed_payload.replyContent.tuInfo.ldsStatus.numberOfMaskedLps
        main_page_info_lds_status_obj = parsed_payload.replyContent.tuInfo.ldsStatus
        masked_lps_name = getattr(main_page_info_lds_status_obj, "maskedLps", None)

    # Проверки сообщений
    with SoftAssertions() as soft_failures:
        StepCheck("Проверка кода ответа на запрос о размаскировании", "replyStatus", soft_failures).actual(
            flowmeter_mask_reply_status
        ).expected(ReplyStatus.OK.value).equal_to()
        StepCheck(
            "Проверяем, что тег маскирования ДУ в выходных сигналах равен null",
            TestConst.ADDRESS_SUFFIX_MASK,
            soft_failures,
        ).actual(mask_leak_value).expected(TestConst.OUTPUT_IS_NOT_MASK).equal_to()
        StepCheck("Проверяем имя ТУ в сообщении в журнале", "mainPipeline", soft_failures).actual(
            mask_message.mainPipeline
        ).expected(cfg.main_pipe_line).equal_to()
        StepCheck("Проверяем имя ДУ в сообщении в журнале", "technologicalObject", soft_failures).actual(
            mask_message.technologicalObject
        ).expected(cfg.mask_du_name).equal_to()
        StepCheck("Проверка события в сообщении в журнале", "event", soft_failures).actual(mask_message.event).expected(
            cfg.unmask_du_event
        ).equal_to()
        StepCheck("Проверка количества маскированных ДУ", "numberOfMaskedLps", soft_failures).actual(
            number_of_masked_lps
        ).expected(cfg.not_mask_du).equal_to()
        StepCheck("Проверка счетчика маски. ЭФ Состояние МТ", "Количество замаскированных ДУ", soft_failures).actual(
            number_of_masked_lps
        ).expected(cfg.not_mask_du).equal_to()
        StepCheck(
            "Проверка отсутствия списка маскированных ДУ. ЭФ Состояние МТ",
            "Отсутствуют замаскированные ДУ",
            soft_failures,
        ).actual(masked_lps_name).is_none()


async def lds_status_initialization_out(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка выхода СОУ из Инициализации.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "CommonSchemeContent",
            "SubscribeCommonSchemeRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )

        parsed_payload = parser.parse_common_scheme_info_msg(payload)
        flow_areas = parsed_payload.replyContent.flowAreas
        longest_flow_area = t_utils.get_longest_flow_area(flow_areas)
        diagnostic_areas = longest_flow_area.diagnosticAreas
        allure.attach(
            f"Самый протяженный участок карты течений: {longest_flow_area}",
            name="flowArea. Выход из Инициализации",
            attachment_type=allure.attachment_type.TEXT,
        )
        lds_status_set = {diagnostic_area.ldsStatus for diagnostic_area in diagnostic_areas}
        lds_status = t_utils.determine_lds_status_by_priority(lds_status_set)

    StepCheck(
        "Проверка: СОУ находится не в режиме 'Инициализация'",
        "ldsStatus",
    ).actual(
        lds_status
    ).expected(LdsStatus.INITIALIZATION.value).is_not_equal_to()


async def lds_status_init_out_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
    """
    Проверка наличия записи в журнале о выходе СОУ из режима Инициализация.
    """
    with allure.step("Запрос сообщений журнала с фильтром messageTypes=LDS_STATUS"):
        end_time = datetime.now()
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.LDS_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

    with allure.step("Фильтрация сообщений по времени и technologicalSection"):
        filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
        filter_end_msk = t_utils.localize_as_moscow(end_time)

        time_filtered = [
            msg
            for msg in messages_info
            if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
        ]
        time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

        lds_msg = next(
            (msg for msg in time_filtered if msg.technologicalSection == cfg.tu_name),
            None,
        )

        allure.attach(
            f"Всего получено сообщений: {len(messages_info)}\n"
            f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}': {'True' if lds_msg else 'False'}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

        with allure.step(f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}'"):
            if lds_msg is None:
                pytest.fail(
                    f"Сообщение с technologicalSection='{cfg.tu_name}' "
                    f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
                )

    with allure.step("Проверка актуальности сообщения"):
        msg_time_msk = t_utils.ensure_moscow_timezone(lds_msg.time)
        start_time_msk = t_utils.localize_as_moscow(imitator_start_time)

        StepCheck(
            f"Проверка: время сообщения позднее времени старта имитатора {msg_time_msk} > {start_time_msk}",
            "time",
        ).actual(msg_time_msk > start_time_msk).expected(True).equal_to()

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка: event не является Инициализацией", "event", soft_failures).actual(lds_msg.event).expected(
            TestConst.JOURNAL_EVENT_LDS_INIT_ACCUM_DATA
        ).is_not_equal_to()

        StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(lds_msg.mainPipeline).expected(
            cfg.main_pipeline
        ).equal_to()

        StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
            lds_msg.technologicalSection
        ).expected(cfg.tu_name).equal_to()

        StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
            lds_msg.technologicalObject
        ).is_not_none()

        StepCheck("Проверка priority не пустой", "priority", soft_failures).actual(lds_msg.priority).is_not_none()

        StepCheck("Проверка messageType", "messageType", soft_failures).actual(lds_msg.messageType).expected(
            TestConst.JOURNAL_MESSAGE_TYPE_LDS_STATUS
        ).equal_to()


async def leaks_content(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    """
    Проверка утечки через сообщение LeaksContent.
    """
    with allure.step("Подключение по ws и получение сообщения об утечке типа: LeaksContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "LeaksContent",
            "SubscribeLeaksRequest",
            {'tuId': cfg.tu_id},
        )
        parsed_payload = parser.parse_leaks_content_msg(payload)
        leaks_list_info = parsed_payload.replyContent.leaksListInfo
        # Ищет подтвержденные утечки
        confirmed_leaks_list = t_utils.find_confirmed_leaks(leaks_list_info)
        first_leak_info = t_utils.find_leak_by_coordinate(confirmed_leaks_list, leak.coordinate_meters)

        # Конвертируем время обнаружения в московское время
        leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.detectedAt)
        leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
            imitator_start_time,
            leak.leak_start_interval_seconds,
            leak.allowed_time_diff_seconds,
            detected_at_tz=leak_detected_at.tzinfo,
        )
        leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.leakVolume)
        leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()

        StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
            first_leak_info.diagnosticAreaName
        ).is_not_none()

        StepCheck("Проверка статуса утечки", "confirmationStatus", soft_failures).actual(
            first_leak_info.confirmationStatus
        ).expected(leak.expected_leak_status).equal_to()

        StepCheck("Проверка источника события (алгоритм)", "type", soft_failures).actual(first_leak_info.type).expected(
            leak.expected_algorithm_type
        ).equal_to()

        StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()

        StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
            leak_coordinate_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )

        StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
            leak_detected_at
        ).is_between(leak_wait_start_time, leak_wait_end_time)

        StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
            leak.volume_m3,
            leak.allowed_volume_m3,
            f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
        )


async def possible_leak_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
    """
    Проверка наличия сообщения 'Возможна утечка' в журнале.
    """
    with allure.step("Подключение по ws, получение и обработка сообщений журнала типа: MessagesInfoContent"):
        end_time = datetime.now()
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

    with allure.step("Фильтрация сообщений по времени и technologicalSection"):
        filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
        filter_end_msk = t_utils.localize_as_moscow(end_time)

        time_filtered = [
            msg
            for msg in messages_info
            if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
        ]
        time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

        possible_leak_msg = next(
            (
                msg
                for msg in time_filtered
                if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_POSSIBLE_LEAK
            ),
            None,
        )

        allure.attach(
            f"Всего получено сообщений: {len(messages_info)}\n"
            f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}': {'True' if possible_leak_msg else 'False'}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

        with allure.step(
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}'"
        ):
            if possible_leak_msg is None:
                pytest.fail(
                    f"Сообщение с technologicalSection='{cfg.tu_name}' "
                    f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}' "
                    f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
                )

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка статуса утечки в журнале", "event", soft_failures).actual(possible_leak_msg.event).expected(
            TestConst.JOURNAL_EVENT_POSSIBLE_LEAK
        ).equal_to()

        StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(
            possible_leak_msg.mainPipeline
        ).expected(cfg.main_pipeline).equal_to()

        StepCheck("Проверка messageType", "messageType", soft_failures).actual(possible_leak_msg.messageType).expected(
            TestConst.JOURNAL_MESSAGE_TYPE_LEAKS
        ).equal_to()

        StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
            possible_leak_msg.technologicalSection
        ).is_not_none()

        StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
            possible_leak_msg.technologicalObject
        ).is_not_none()


async def leak_info_in_journal(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        end_time = datetime.now()
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

        with allure.step("Фильтрация сообщений по времени и technologicalSection"):
            filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
            filter_end_msk = t_utils.localize_as_moscow(end_time)

            time_filtered = [
                msg
                for msg in messages_info
                if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
            ]
            time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

            leak_message = next(
                (
                    msg
                    for msg in time_filtered
                    if msg.technologicalSection == cfg.tu_name and TestConst.JOURNAL_EVENT_DETECTED_LEAK in msg.event
                ),
                None,
            )

            allure.attach(
                f"Всего получено сообщений: {len(messages_info)}\n"
                f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n",
                name="Результат фильтрации сообщений журнала",
                attachment_type=allure.attachment_type.TEXT,
            )

    with allure.step("Первичная проверка после фильтрации"):

        StepCheck(
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event содержит подстроку подтвержденной утечки'{TestConst.JOURNAL_EVENT_DETECTED_LEAK}'",
            "event",
        ).actual(leak_message).is_not_none()

        leak_coordinate_km, leak_volume_m3 = t_utils.parse_journal_msg_value(leak_message.value)
        leak_coordinate_round = round(leak_coordinate_km * TestConst.KM_TO_METERS, TestConst.PRECISION)
        leak_message_time = t_utils.ensure_moscow_timezone(leak_message.time)

    with SoftAssertions() as soft_failures:

        StepCheck("Проверка полученного события event", "event", soft_failures).contains(
            leak_message.event, TestConst.JOURNAL_EVENT_DETECTED_LEAK
        )

        StepCheck("Проверка полученного ТУ", "technologicalSection", soft_failures).actual(
            leak_message.technologicalSection
        ).expected(cfg.tu_name).equal_to()

        StepCheck("Проверка типа полученного сообщения", "messageType", soft_failures).actual(
            leak_message.messageType
        ).expected(TestConst.JOURNAL_MESSAGE_TYPE_LEAKS).equal_to()

        StepCheck("Проверка имени ДУ", "technologicalObject", soft_failures).actual(
            leak_message.technologicalObject
        ).is_not_none()

        StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
            leak_coordinate_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )

        StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
            leak_message_time
        ).is_between(filter_start_msk, filter_end_msk)

        StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
            leak.volume_m3,
            leak.allowed_volume_m3,
            f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
        )


async def completed_leak_info_in_journal(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    """
    Проверка наличия сообщения 'Утечка завершена' в журнале.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        end_time = datetime.now()
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

        with allure.step("Фильтрация сообщений по времени и technologicalSection"):
            filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
            filter_end_msk = t_utils.localize_as_moscow(end_time)

            time_filtered = [
                msg
                for msg in messages_info
                if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
            ]
            time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

            completed_leak_message = next(
                (
                    msg
                    for msg in time_filtered
                    if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_COMPLETED_LEAKS
                ),
                None,
            )

            allure.attach(
                f"Всего получено сообщений: {len(messages_info)}\n"
                f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n",
                name="Результат фильтрации сообщений журнала",
                attachment_type=allure.attachment_type.TEXT,
            )

    with allure.step("Первичная проверка после фильтрации"):

        StepCheck(
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_COMPLETED_LEAKS}'",
            "event",
        ).actual(completed_leak_message).is_not_none()

        leak_coordinate_km, leak_volume_m3 = t_utils.parse_journal_msg_value(completed_leak_message.value)
        leak_coordinate_round = round(leak_coordinate_km * TestConst.KM_TO_METERS, TestConst.PRECISION)
        leak_message_time = t_utils.ensure_moscow_timezone(completed_leak_message.time)

    with SoftAssertions() as soft_failures:

        StepCheck("Проверка статуса утечки в журнале", "event", soft_failures).actual(
            completed_leak_message.event
        ).expected(TestConst.JOURNAL_EVENT_COMPLETED_LEAKS).equal_to()

        StepCheck("Проверка полученного ТУ", "technologicalSection", soft_failures).actual(
            completed_leak_message.technologicalSection
        ).expected(cfg.tu_name).equal_to()

        StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
            leak_coordinate_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )

        StepCheck("Проверка времени завершения утечки", "leakDetectedAt", soft_failures).actual(
            leak_message_time
        ).is_between(filter_start_msk, filter_end_msk)


async def all_leaks_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    """
    Проверка сообщения AllLeaksInfo об утечке.
    """
    with allure.step("Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent"):
        parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
            cfg.tu_id,
            ws_client,
            "AllLeaksInfoContent",
            "subscribeAllLeaksInfoRequest",
            [],
        )

        StepCheck("Проверка наличия сообщения об утечке типа AllLeaksInfoContent", "leaksInfo").actual(
            parsed_payload.replyContent.leaksInfo
        ).is_not_empty()

    with allure.step("Обработка сообщения об утечке типа AllLeaksInfoContent"):
        leaks_info = parsed_payload.replyContent.leaksInfo
        # Если у утечки указано имя ДУ - ищем по нему, иначе берём первую
        first_leak_info = t_utils.find_leak_by_coordinate(leaks_info, leak.coordinate_meters)

        # Конвертируем время обнаружения в московское время
        leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.leakDetectedAt)
        leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
            imitator_start_time,
            leak.leak_start_interval_seconds,
            leak.allowed_time_diff_seconds,
            detected_at_tz=leak_detected_at.tzinfo,
        )
        leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.volume)
        leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()

        StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
            first_leak_info.diagnosticAreaName
        ).is_not_none()

        StepCheck("Проверка статуса СОУ", "ldsStatus", soft_failures).actual(first_leak_info.ldsStatus).expected(
            leak.expected_lds_status
        ).equal_to()

        StepCheck("Проверка маскирования утечки", "isMasked", soft_failures).actual(first_leak_info.isMasked).expected(
            False
        ).equal_to()

        StepCheck("Проверка квитирования утечки", "isAcknowledged", soft_failures).actual(
            first_leak_info.isAcknowledged
        ).expected(False).equal_to()

        StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()

        StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
            leak_coordinate_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )

        StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
            leak_detected_at
        ).is_between(leak_wait_start_time, leak_wait_end_time)

        StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
            leak.volume_m3,
            leak.allowed_volume_m3,
            f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
        )

        StepCheck("Проверка режима ТУ", "stationaryStatus", soft_failures).actual(
            first_leak_info.stationaryStatus
        ).expected(leak.expected_stationary_status).equal_to()


async def all_leaks_is_empty(ws_client, cfg: SmokeSuiteConfig):
    """
    Проверка отсутствия информации об утечке
    """
    with allure.step("Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent"):
        parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
            cfg.tu_id,
            ws_client,
            "AllLeaksInfoContent",
            "subscribeAllLeaksInfoRequest",
            [],
        )

    StepCheck("Проверка отсутствия информации об утечке в сообщении AllLeaksInfoContent", "leaksInfo").actual(
        parsed_payload.replyContent.leaksInfo
    ).is_empty()


async def tu_leaks_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    """
    Проверка сообщения TuLeaksInfo об утечке.
    """
    with allure.step("Подключение по ws и получение сообщения об утечке типа: TuLeaksInfoContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "TuLeaksInfoContent",
            "subscribeTuLeaksInfoRequest",
            {'tuId': cfg.tu_id},
        )
        parsed_payload = parser.parse_tu_leaks_info_msg(payload)

        StepCheck("Проверка наличия сообщения об утечке типа TuLeaksInfoContent", "leaksInfo").actual(
            parsed_payload.replyContent.leaksInfo
        ).is_not_empty()

    with allure.step("Обработка сообщения об утечке типа TuLeaksInfoContent"):
        tu_leaks_info_list = parsed_payload.replyContent.leaksInfo

        first_leak_info = t_utils.find_leak_by_coordinate(tu_leaks_info_list, leak.coordinate_meters)

        # Конвертируем время обнаружения в московское время
        leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.leakDetectedAt)
        leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
            imitator_start_time,
            leak.leak_start_interval_seconds,
            leak.allowed_time_diff_seconds,
            detected_at_tz=leak_detected_at.tzinfo,
        )
        leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.volume)
        leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()

        StepCheck("Проверка наличия id участка утечки", "controlledSiteId", soft_failures).actual(
            first_leak_info.controlledSiteId
        ).is_not_none()

        StepCheck("Проверка статуса СОУ", "ldsStatus", soft_failures).actual(first_leak_info.ldsStatus).expected(
            leak.expected_lds_status
        ).equal_to()

        StepCheck("Проверка маскирования утечки", "isMasked", soft_failures).actual(first_leak_info.isMasked).expected(
            False
        ).equal_to()

        StepCheck("Проверка наличия pipeId в сообщении", "pipeId", soft_failures).actual(
            first_leak_info.pipeId
        ).is_not_none()

        StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()

        StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
            leak_coordinate_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )

        StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
            leak_detected_at
        ).is_between(leak_wait_start_time, leak_wait_end_time)

        StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
            leak.volume_m3,
            leak.allowed_volume_m3,
            f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
        )

        StepCheck("Проверка режима ТУ", "stationaryStatus", soft_failures).actual(
            first_leak_info.stationaryStatus
        ).expected(leak.expected_stationary_status).equal_to()


async def lds_status_during_leak(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
    """
    Проверка режима работы СОУ во время утечки.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "CommonSchemeContent",
            "SubscribeCommonSchemeRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )

    parsed_payload = parser.parse_common_scheme_info_msg(payload)
    flow_areas = parsed_payload.replyContent.flowAreas

    status_config = leak.lds_status_during_leak_config
    if status_config is None:
        pytest.fail("Не задан leak.lds_status_during_leak_config для теста lds_status_during_leak")

    leak_diagnostic_area = t_utils.find_diagnostic_area_by_pipe_id(
        flow_areas, status_config.leak_diagnostic_area_pipe_id
    )
    if not leak_diagnostic_area:
        pytest.fail(f"В сообщении не найден ДУ с id: {status_config.leak_diagnostic_area_id}")

    # Формат конфига: status_config.in_neighbors / status_config.out_neighbors (dict[id] = expected_status)
    in_neighbors: dict[int, int] = status_config.in_neighbors or {}
    out_neighbors: dict[int, int] = status_config.out_neighbors or {}
    all_neighbors = in_neighbors | out_neighbors
    if not all_neighbors:
        pytest.fail("Не заданы id, соседних с утечкой ДУ для теста lds_status_during_leak")
    found_diagnostic_area_count = 0

    with SoftAssertions() as soft_failures:
        StepCheck(
            f"Проверка режима работы СОУ на ДУ с утечкой, pipe_id ДУ: {status_config.leak_diagnostic_area_pipe_id}",
            "ldsStatus",
            soft_failures,
        ).actual(leak_diagnostic_area.ldsStatus).expected(status_config.leak_du_expected_lds_status).equal_to()

        # Проверки соседних ДУ: поддерживаются 0. N соседей
        for neighbor_pipe_id, expected_status in sorted(all_neighbors.items()):
            diagnostic_area = t_utils.find_diagnostic_area_by_pipe_id(flow_areas, neighbor_pipe_id)
            if diagnostic_area:
                found_diagnostic_area_count += 1
                StepCheck(
                    f"Проверка режима работы СОУ на соседнем ДУ, pipe_id ДУ: {neighbor_pipe_id}",
                    "ldsStatus",
                    soft_failures,
                ).actual(diagnostic_area.ldsStatus).expected(expected_status).equal_to()
        if found_diagnostic_area_count == 0:
            pytest.fail(f"Не найдены соседние с утечкой ДУ по pipe_id: {list(all_neighbors.keys())}")


async def acknowledge_leak_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig = None):
    """
    Проверка квитирования утечки.

    Для multi-leak наборов: после квитирования проверяется что утечка удалена из списка.
    Для single-leak наборов: проверяется что список утечек пуст.
    """
    with allure.step("Получение id утечки"):
        with allure.step("Подключение по ws, получение и обработка сообщения об утечке типа: TuLeaksInfoContent"):
            payload = await t_utils.connect_and_subscribe_msg(
                ws_client,
                "TuLeaksInfoContent",
                "subscribeTuLeaksInfoRequest",
                {'tuId': cfg.tu_id},
            )
            parsed_payload = parser.parse_tu_leaks_info_msg(payload)

        with allure.step("Получение id утечки из принятого сообщения типа: TuLeaksInfoContent"):
            StepCheck("Проверка наличия сообщения об утечке", "leaksInfo").actual(
                parsed_payload.replyContent.leaksInfo
            ).is_not_empty()

            leaks_info = parsed_payload.replyContent.leaksInfo

            leak_to_ack = t_utils.find_leak_by_coordinate(leaks_info, leak.coordinate_meters)

            acknowledged_leak_id = leak_to_ack.id

    with allure.step(
        "Подключение по ws, отправка сообщения и обработка ответа о квитировании утечки типа: AcknowledgeLeakRequest"
    ):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "AcknowledgeLeakRequest",
            {'leakId': str(acknowledged_leak_id), 'tuId': cfg.tu_id, 'additionalProperties': None},
        )
        parsed_payload = parser.parse_acknowledge_leak_msg(payload)
        acknowledge_reply_status = parsed_payload.replyStatus

    with allure.step(
        "Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent для проверки квитирования"
    ):
        with allure.step("Очистка очереди websocket сообщений"):
            ws_client.clear_queue()
        time.sleep(cfg.basic_message_timeout)
        parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
            cfg.tu_id,
            ws_client,
            "AllLeaksInfoContent",
            "subscribeAllLeaksInfoRequest",
            [],
        )
        remaining_leaks = parsed_payload.replyContent.leaksInfo
        remaining_leak_ids = [leak.id for leak in remaining_leaks] if remaining_leaks else []

    StepCheck("Проверка кода ответа на запрос о квитировании", "replyStatus").actual(acknowledge_reply_status).expected(
        ReplyStatus.OK.value
    ).equal_to()

    # Проверяем что квитированная утечка исчезла из списка
    StepCheck("Проверка отсутствия квитированной утечки в списке AllLeaksInfo", "id").does_not_contain(
        remaining_leak_ids, acknowledged_leak_id
    )


async def acknowledge_leak_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
    """
    Проверка записи в журнале о квитировании утечки.
    """
    with allure.step("Запрос сообщений журнала с фильтром userActions=LEAK_ACK"):
        end_time = datetime.now()
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(userActions=int(UserActions.LEAK_ACK), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

    with allure.step("Фильтрация сообщений по времени и technologicalSection"):
        filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
        filter_end_msk = t_utils.localize_as_moscow(end_time)

        time_filtered = [
            msg
            for msg in messages_info
            if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
        ]
        time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

        ack_message = next(
            (
                msg
                for msg in time_filtered
                if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED
            ),
            None,
        )

        allure.attach(
            f"Всего получено сообщений: {len(messages_info)}\n"
            f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}': {'True' if ack_message else 'False'}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

        with allure.step(
            f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
            f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}'"
        ):
            if ack_message is None:
                pytest.fail(
                    f"Сообщение с technologicalSection='{cfg.tu_name}' "
                    f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}' "
                    f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
                )

    with allure.step("Проверка актуальности сообщения"):
        msg_time_msk = t_utils.ensure_moscow_timezone(ack_message.time)
        start_time_msk = t_utils.localize_as_moscow(imitator_start_time)

        StepCheck(
            "Проверка: время сообщения позднее времени старта имитатора",
            "time",
        ).actual(
            msg_time_msk > start_time_msk
        ).expected(True).equal_to()

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка event", "event", soft_failures).actual(ack_message.event).expected(
            TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED
        ).equal_to()

        StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(ack_message.mainPipeline).expected(
            cfg.main_pipeline
        ).equal_to()

        StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
            ack_message.technologicalSection
        ).expected(cfg.tu_name).equal_to()

        StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
            ack_message.technologicalObject
        ).is_not_none()


async def output_signals(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    """
    Проверка наличия данных об утечке в выходных сигналах.
    """
    linear_part_id = leak.linear_part_id

    with allure.step(f"Получение списка выходных сигналов для линейного участка с id: {linear_part_id}"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "GetOutputSignalsRequest",
            {
                'tuId': cfg.tu_id,
                'filtering': None,
                'search': None,
                'sorting': None,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_output_signals_msg(payload)
        # Получение данных линейного участка утечки по id
        leak_linear_part = t_utils.find_object_by_field(
            parsed_payload.replyContent.linearPartSignals,
            TestConst.LEAK_LINEAR_PART_ID_KEY,
            linear_part_id,
        )

        with allure.step("Получение типов выходных сигналов из обработанных данных"):
            leak_signals_list = leak_linear_part.signals
            ack_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_ACK_LEAK
            )
            leak_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_LEAK
            )
            mask_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
            )
            point_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_POINT_LEAK
            )
            q_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_Q_LEAK
            )
            time_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_TIME_LEAK
            )

    with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}"):
        with allure.step("Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo"):
            payload = await t_utils.connect_and_subscribe_msg(
                ws_client,
                "OutputSignalsInfo",
                "SubscribeOutputSignalsRequest",
                {
                    'objects': {
                        'linearParts': [{'linearPartId': linear_part_id}],
                        'controlledSites': [],
                    },
                    'signalTypes': 1023,
                    'tuId': cfg.tu_id,
                    'additionalProperties': None,
                },
            )
            parsed_payload = parser.parse_output_signals_info_msg(payload)
            leak_linear_part = t_utils.find_object_by_field(
                parsed_payload.replyContent.linearPartSignals,
                TestConst.LEAK_LINEAR_PART_ID_KEY,
                linear_part_id,
            )

        with allure.step("Обработка полученных данных выходных сигналов"):
            leak_signals_list = leak_linear_part.signals
            ack_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, ack_leak_signal_type)
            leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, leak_signal_type)
            mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
            point_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, point_leak_signal_type)
            q_leak_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, q_leak_signal_type)
            time_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, time_leak_signal_type).strip()

            StepCheck("Проверка наличия времени утечки", TestConst.ADDRESS_SUFFIX_TIME_LEAK).actual(
                time_leak_value
            ).is_not_none()

            time_leak_value_datetime = t_utils.to_moscow_timezone(time_leak_value)
            leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
                imitator_start_time,
                leak.leak_start_interval_seconds,
                leak.output_allowed_time_diff_seconds,
                detected_at_tz=time_leak_value_datetime.tzinfo,
            )
            q_leak_value_m3 = t_utils.convert_leak_volume_m3(float(q_leak_leak_value))
            point_leak_value_round = round(float(point_leak_value), cfg.precision)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка сигнала квитирования утечки", TestConst.ADDRESS_SUFFIX_ACK_LEAK, soft_failures).actual(
            ack_leak_value
        ).expected(TestConst.OUTPUT_IS_ACK_LEAK).equal_to()

        StepCheck("Проверка сигнала наличия утечки", TestConst.ADDRESS_SUFFIX_LEAK, soft_failures).actual(
            leak_value
        ).expected(TestConst.OUTPUT_IS_LEAK).equal_to()

        StepCheck("Проверка сигнала маскирования утечки", TestConst.ADDRESS_SUFFIX_MASK, soft_failures).actual(
            mask_leak_value
        ).expected(TestConst.OUTPUT_IS_NOT_MASK).equal_to()

        StepCheck("Проверка сигнала координаты утечки", TestConst.ADDRESS_SUFFIX_POINT_LEAK, soft_failures).actual(
            point_leak_value_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )

        StepCheck("Проверка сигнала объема утечки", TestConst.ADDRESS_SUFFIX_Q_LEAK, soft_failures).actual(
            q_leak_value_m3
        ).is_close_to(
            leak.volume_m3,
            leak.allowed_volume_m3,
            f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
        )

        StepCheck("Проверка времени обнаружения утечки", TestConst.ADDRESS_SUFFIX_TIME_LEAK, soft_failures).actual(
            time_leak_value_datetime
        ).is_between(leak_wait_start_time, leak_wait_end_time)


async def balance_algorithm_leak_waiting(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
    """
    Проверка подозрения утечки через BalanceAlgorithmResults

    Логика:
    - Подписка на BalanceAlgorithmResults однократно
    - Раз в BALANCE_ALGORITHM_POLL_INTERVAL секунд забираем из очереди свежее сообщение
    - Собираем все diagnosticAreas (только из flowAreas с непустым списком)
    - Проверяем, что на ДУ с утечкой хотя бы раз пришёл isLeakPossible=True
    - Проверяем, что на всех остальных ДУ isLeakPossible всегда False
    - Проверяем дебаланс на ДУ с будущей утечкой, дебаланс должен быть выше значения порога - 20%
    """
    poll_interval = TestConst.BALANCE_ALGORITHM_POLL_INTERVAL
    total_wait = TestConst.BALANCE_ALGORITHM_TOTAL_WAIT
    end_time = imitator_start_time + timedelta(
        seconds=leak.balance_algorithm_leak_waiting_test.offset * 60 + total_wait
    )

    with allure.step(
        f"Подписка и сбор BalanceAlgorithmResults раз в {poll_interval} с, в течение {total_wait} с после начала утечки"
    ):
        await t_utils.connect(
            ws_client,
            "SubscribeBalanceAlgorithmResultsRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )

        collected_diagnostic_areas = await t_utils.poll_balance_algorithm_diagnostic_areas(
            ws_client,
            parser,
            imitator_start_time,
            end_time,
            poll_interval,
        )

        if collected_diagnostic_areas is not None:
            allure.attach(
                str(collected_diagnostic_areas),
                name="Тестируемый фрагмент ответа с бэка",
                attachment_type=allure.attachment_type.TEXT,
            )

        diagnostic_area_names_with_possible = [
            diagnostic_area.name for diagnostic_area in collected_diagnostic_areas if diagnostic_area.isLeakPossible
        ]

        diagnostic_area_possible_leak = next(
            (diagnostic_area for diagnostic_area in collected_diagnostic_areas if diagnostic_area.isLeakDetected),
            None,
        )

        is_leak_possible_seen = any(diagnostic_area.isLeakPossible for diagnostic_area in collected_diagnostic_areas)

    with SoftAssertions() as soft_failures:

        StepCheck(
            "Проверка: получен хотя бы один ДУ с подозрением на утечку",
            "isLeakPossible",
            soft_failures,
        ).actual(diagnostic_area_names_with_possible).is_not_empty()

        StepCheck(
            f"Проверка: на ДУ {str(diagnostic_area_names_with_possible)} бы раз за "
            f"{TestConst.BALANCE_ALGORITHM_TOTAL_WAIT / TestConst.SEC_PER_MIN} минут приходил"
            " статус 'подозрение на утечку': isLeakPossible=True",
            "isLeakPossible",
            soft_failures,
        ).actual(is_leak_possible_seen).expected(True).equal_to()

        if leak.flow_rate_settings_threshold is not None and diagnostic_area_possible_leak is not None:
            threshold = leak.flow_rate_settings_threshold
            tolerance = TestConst.DEBALANCE_TOLERANCE
            lower_bound = threshold * (1 - tolerance)

            StepCheck(
                f"Проверка значения дебаланса на ДУ name={diagnostic_area_possible_leak.name} с будущей утечкой"
                f" в пределах {int(tolerance * 100)}% снизу от порогового значения по объему: {threshold}).",
                "debalance",
                soft_failures,
            ).actual(abs(diagnostic_area_possible_leak.debalance)).is_greater_than(lower_bound)


async def balance_algorithm_leak_detected(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
    """
    Проверка наличия утечки (isLeakDetected) через BalanceAlgorithmResults.

    Логика:
    - Подписка на BalanceAlgorithmResultsContent
    - Получение первого подходящего сообщения типа BalanceAlgorithmResultsContent
    - Проверяем, что на ДУ с утечкой isLeakDetected=True
    - Проверяем, что на всех остальных ДУ isLeakDetected=False
    - Проверяем, что дебаланс на ДУ с утечкой > FLOW_RATE_SETTINGS_THRESHOLD
    """

    with allure.step("Подписка и получение BalanceAlgorithmResultsContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "BalanceAlgorithmResultsContent",
            "SubscribeBalanceAlgorithmResultsRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )

        parsed_payload = parser.parse_balance_algorithm_msg(payload)
        reply_content = parsed_payload.replyContent
        if not reply_content or not reply_content.flowAreas:
            pytest.fail(
                "В ответе с бэка в DTO BalanceAlgorithmResults отсутствуют flowAreas, "
                "невозможно проверить наличие утечки"
            )

        all_diagnostic_areas = []
        for flow_area in reply_content.flowAreas:
            if flow_area.diagnosticAreas:
                all_diagnostic_areas.extend(flow_area.diagnosticAreas)

        if not all_diagnostic_areas:
            pytest.fail(
                "В ответе с бэка в DTO BalanceAlgorithmResults во всех flowAreas отсутствуют diagnosticAreas, "
                "невозможно проверить наличие утечки"
            )

        leak_diagnostic_area = next(
            (diagnostic_area for diagnostic_area in all_diagnostic_areas if diagnostic_area.isLeakDetected),
            None,
        )

        if leak_diagnostic_area is None:
            pytest.fail("Ни одного ДУ с утечкой не найдено в ответе BalanceAlgorithmResultsContent")

        leak_diagnostic_area_name = leak_diagnostic_area.name

    with SoftAssertions() as soft_failures:
        StepCheck(
            f"Проверка: на ДУ name={leak_diagnostic_area_name} обнаружена утечка",
            "isLeakDetected",
            soft_failures,
        ).actual(leak_diagnostic_area.isLeakDetected).expected(True).equal_to()

        foreign_with_detected = [
            diagnostic_area
            for diagnostic_area in all_diagnostic_areas
            if diagnostic_area.name != leak_diagnostic_area_name and diagnostic_area.isLeakDetected
        ]

        if not cfg.has_multiple_leaks:
            StepCheck(
                "Проверка: на остальных ДУ не обнаружена утечка, "
                f" количество ДУ с неправильным статусом: {len(foreign_with_detected)}, "
                f"их id: {[diagnostic_area.id for diagnostic_area in foreign_with_detected]})",
                "isLeakDetected_without_leak",
                soft_failures,
            ).actual(len(foreign_with_detected)).expected(0).equal_to()

        if leak.flow_rate_settings_threshold is not None:
            threshold = leak.flow_rate_settings_threshold
            StepCheck(
                f"Дебаланс на ДУ name={leak_diagnostic_area_name} по модулю больше порога для данного режима МТ:"
                f" {threshold}",
                "debalance",
                soft_failures,
            ).actual(abs(leak_diagnostic_area.debalance)).is_greater_than(threshold)


async def balance_algorithm_leak_completed(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
    """
    Проверка отсутствия утечки (isLeakDetected) через BalanceAlgorithmResults.

    Логика:
    - Подписка на BalanceAlgorithmResultsContent.
    - Получение первого подходящего сообщения типа BalanceAlgorithmResultsContent.
    - Проверяем, что на всех ДУ флаг isLeakDetected=False.
    - Проверяем, что дебаланс на всех ДУ < FLOW_RATE_SETTINGS_THRESHOLD.
    """

    with allure.step("Подписка и получение BalanceAlgorithmResultsContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "BalanceAlgorithmResultsContent",
            "SubscribeBalanceAlgorithmResultsRequest",
            {'tuId': cfg.tu_id, 'additionalProperties': None},
        )

        parsed_payload = parser.parse_balance_algorithm_msg(payload)
        reply_content = parsed_payload.replyContent
        if not reply_content or not reply_content.flowAreas:
            pytest.fail(
                "В ответе с бэка в DTO BalanceAlgorithmResults отсутствуют flowAreas, "
                "невозможно проверить наличие/отсутствие утечки"
            )

        all_diagnostic_areas = []
        for flow_area in reply_content.flowAreas:
            if flow_area.diagnosticAreas:
                all_diagnostic_areas.extend(flow_area.diagnosticAreas)

        if not all_diagnostic_areas:
            pytest.fail(
                "В ответе с бэка в DTO BalanceAlgorithmResults во всех flowAreas отсутствуют diagnosticAreas, "
                "невозможно проверить наличие/отсутствие утечки"
            )

    with SoftAssertions() as soft_failures:

        for diagnostic_area in all_diagnostic_areas:
            diagnostic_area_name = diagnostic_area.name
            StepCheck(
                f"Проверка: на ДУ {diagnostic_area_name} не должно быть утечки",
                "isLeakDetected_without_leak",
                soft_failures,
            ).actual(diagnostic_area.isLeakDetected).expected(False).equal_to()


async def the_leak_is_complete_on_kg(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
    """
    Проверка факта завершения утечки на ЭФ КГ(табличное представление).

    Логика:
    LeaksContent - проверить, что утечка в статусе завершена
    """
    with allure.step("Подключение по ws и получение сообщения об утечке типа: LeaksContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "LeaksContent",
            "SubscribeLeaksRequest",
            {'tuId': cfg.tu_id},
        )
        parsed_payload = parser.parse_leaks_content_msg(payload)
        leaks_list_info = parsed_payload.replyContent.leaksListInfo
        complete_leak_info = t_utils.find_leak_by_coordinate(leaks_list_info, leak.coordinate_meters)
        leak_coordinate_round = round(complete_leak_info.leakCoordinate, cfg.precision)
        complete_leak = t_utils.find_object_by_field(
            leaks_list_info, "confirmationStatus", ConfirmationStatus.CONFIRMED_AND_LEAK_CLOSED.value
        )

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка статуса утечки в КГ - завершена", "confirmationStatus", soft_failures).actual(
            complete_leak.confirmationStatus
        ).expected(leak.expected_complete_leak_status).equal_to()
        StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
            complete_leak_info.diagnosticAreaName
        ).is_not_none()
        StepCheck("Проверка источника события (алгоритм)", "type", soft_failures).actual(
            complete_leak_info.type
        ).expected(leak.expected_algorithm_type).equal_to()
        StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
            leak_coordinate_round
        ).is_close_to(
            leak.coordinate_meters,
            cfg.allowed_distance_diff_meters,
            f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
        )


async def leak_is_complete_in_output_signals(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
    """OutputSignalsInfo - нет утечки в выходных сигналах"""
    linear_part_id = leak.linear_part_id

    with allure.step(f"Получение списка выходных сигналов для линейного участка с id: {linear_part_id}"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            "GetOutputSignalsRequest",
            {
                'tuId': cfg.tu_id,
                'filtering': None,
                'search': None,
                'sorting': None,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_output_signals_msg(payload)
        # Получение данных линейного участка утечки по id
        leak_linear_part = t_utils.find_object_by_field(
            parsed_payload.replyContent.linearPartSignals,
            TestConst.LEAK_LINEAR_PART_ID_KEY,
            linear_part_id,
        )

        with allure.step("Получение типов выходных сигналов из обработанных данных"):
            leak_signals_list = leak_linear_part.signals
            leak_signal_type = t_utils.find_signal_type_by_address_suffix(
                leak_signals_list, TestConst.ADDRESS_SUFFIX_LEAK
            )

    with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}"):
        with allure.step("Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo"):
            payload = await t_utils.connect_and_subscribe_msg(
                ws_client,
                "OutputSignalsInfo",
                "SubscribeOutputSignalsRequest",
                {
                    'objects': {
                        'linearParts': [{'linearPartId': linear_part_id}],
                        'controlledSites': [],
                    },
                    'signalTypes': 1023,
                    'tuId': cfg.tu_id,
                    'additionalProperties': None,
                },
            )
            parsed_payload = parser.parse_output_signals_info_msg(payload)
            leak_linear_part = t_utils.find_object_by_field(
                parsed_payload.replyContent.linearPartSignals,
                TestConst.LEAK_LINEAR_PART_ID_KEY,
                linear_part_id,
            )

        with allure.step("Обработка полученных данных выходных сигналов"):
            leak_signals_list = leak_linear_part.signals
            leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, leak_signal_type)

    with SoftAssertions() as soft_failures:
        StepCheck(
            "Проверка отсутствия времени утечки в выходных сигналах",
            TestConst.ADDRESS_SUFFIX_TIME_LEAK,
            soft_failures,
        ).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
        StepCheck(
            "Проверка отсутствия утечки в выходных сигналах",
            TestConst.ADDRESS_SUFFIX_LEAK,
            soft_failures,
        ).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
        StepCheck(
            "Проверка отсутствия квитирования утечки в выходных сигналах",
            TestConst.ADDRESS_SUFFIX_ACK_LEAK,
            soft_failures,
        ).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
        StepCheck(
            "Проверка отсутствия объема утечки в выходных сигналах",
            TestConst.ADDRESS_SUFFIX_Q_LEAK,
            soft_failures,
        ).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
        StepCheck(
            "Проверка отсутствия координаты утечки в выходных сигналах",
            TestConst.ADDRESS_SUFFIX_POINT_LEAK,
            soft_failures,
        ).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()


async def complete_tu_leaks_info_content(ws_client, cfg: SmokeSuiteConfig):
    """
    TuLeaksInfoContent - проверка отсутствия утечки на схеме
    """
    with allure.step("Подключение по ws, получение и обработка сообщения об утечке типа: TuLeaksInfoContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "TuLeaksInfoContent",
            "subscribeTuLeaksInfoRequest",
            {'tuId': cfg.tu_id},
        )
        parsed_payload = parser.parse_tu_leaks_info_msg(payload)
        leak_on_scheme = parsed_payload.replyContent.leaksInfo

    StepCheck("Проверка отсутствия утечки на схеме", "leaksInfo").actual(leak_on_scheme).is_empty()


async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
    """
    Сценарий формирования отчёта об утечках.

    Этапы:
    1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
    2. Отправка ExportReportsCommandRequest с фильтром по времени
       (start = старт имитатора, end = старт имитатора + offset теста).
    3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
    4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
    5. Отправка DownloadExportedDataRequest по id отчёта.
    6. Получение fileChunk по ответу на скачивание.
    7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.

    Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
    """
    actual_report_state = ExportLeaksReportState()

    with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
        actual_report_state.report_test = leak.export_leaks_report_test
        StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
            actual_report_state.report_test
        ).is_not_none()

        actual_report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
        actual_report_state.period_end = t_utils.localize_as_moscow(
            imitator_start_time + timedelta(minutes=actual_report_state.report_test.offset)
        )
        actual_report_state.period_start_naive = report_utils.normalize_report_period_naive(
            actual_report_state.period_start
        )
        actual_report_state.period_end_naive = report_utils.normalize_report_period_naive(
            actual_report_state.period_end
        )
        actual_report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
            leak.expected_stationary_status
        )
        actual_report_state.tu_description_lower = cfg.technological_unit.description.lower()
        time_offset_hours = t_utils.report_time_offset_hours()
        StepCheck(
            f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
            "time_offset_hours",
        ).actual(time_offset_hours).is_not_none()
        actual_report_state.time_offset_hours = time_offset_hours

        StepCheck(
            "Задан ожидаемый текст режима МТ для отчёта",
            "expected_mt_mode",
        ).actual(actual_report_state.expected_mt_mode).is_not_none()

        allure.attach(
            f"period.start={actual_report_state.period_start}\n"
            f"period.end={actual_report_state.period_end}\n"
            f"offset_minutes={actual_report_state.report_test.offset}",
            name="Фильтр периода отчёта",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"):
        await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])

    with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
        request_payload = {
            "tuId": cfg.tu_id,
            "exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
            "timeOffset": actual_report_state.time_offset_hours,
            "period": {
                "start": t_utils.datetime_to_msgpack_timestamp(actual_report_state.period_start),
                "end": t_utils.datetime_to_msgpack_timestamp(actual_report_state.period_end),
                "additionalProperties": {},
            },
        }
        await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)

    with allure.step(
        f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
    ):
        actual_report_state.notification = await t_utils.poll_for_report_export_notification(
            ws_client=ws_client,
            parser=parser,
            total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Извлечение полей пуш-нотификации"):
        notification = actual_report_state.notification
        notification_reply_status = notification.replyStatus if notification else None
        notification_reply_content = notification.replyContent if notification else None
        notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
        notification_error_message = (
            (notification_reply_content.errorMessage or "") if notification_reply_content else ""
        )

    with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
        actual_report_state.report_item = await t_utils.poll_for_exported_file(
            ws_client=ws_client,
            parser=parser,
            list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
            expected_data_type=ExportedDataType.LEAKS_REPORT,
            name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
            tu_name_substring=cfg.technological_unit.description,
            period_start=actual_report_state.period_start,
            period_end=actual_report_state.period_end,
            total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Подготовка данных найденного отчёта в списке"):
        report_item = actual_report_state.report_item
        if report_item is not None:
            allure.attach(
                f"id={report_item.id}, name={report_item.name}, "
                f"exportedDataType={report_item.exportedDataType}, "
                f"start={t_utils.format_datetime_moscow(report_item.start)}, "
                f"end={t_utils.format_datetime_moscow(report_item.end)}",
                name="Найденный отчёт в списке",
                attachment_type=allure.attachment_type.TEXT,
            )
        actual_report_state.report_file_name = report_utils.build_export_report_file_name(
            cfg.technological_unit.description,
            actual_report_state.period_start,
            actual_report_state.period_end,
        )

    with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
        StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
            actual_report_state.report_item
        ).is_not_none()

    with allure.step(
        f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по "
        f"id={actual_report_state.report_item.id}"
    ):
        download_request = {
            "exportedDataId": actual_report_state.report_item.id,
            "exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
            "additionalProperties": None,
            "timeOffset": actual_report_state.time_offset_hours,
        }

        download_purpose = (
            f"скачивание xlsx-отчёта об утечках (exportedDataId={actual_report_state.report_item.id}) "
            f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - "
            f"выпадашка уведомлений на UI"
        )

        await t_utils.connect_stream(
            ws_client,
            ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            download_request,
            purpose=download_purpose,
        )
        actual_report_state.download_invocation_id = ws_client.invocation_id

    with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
        actual_report_state.download_reply = await t_utils.receive_download_exported_data_reply(
            ws_client=ws_client,
            parser=parser,
            invocation_id=actual_report_state.download_invocation_id,
            request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
            purpose=download_purpose,
        )

    with allure.step("Извлечение данных ответа на скачивание"):
        download_reply = actual_report_state.download_reply
        download_reply_status = download_reply.replyStatus
        has_download_reply_content = download_reply.replyContent is not None
        actual_report_state.file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
        is_xlsx_signature = (
            report_utils.is_xlsx_file_bytes(actual_report_state.file_bytes) if actual_report_state.file_bytes else False
        )

    with allure.step("Проверка ответа на скачивание и формата xlsx"):
        StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
            ReplyStatus.OK.value
        ).equal_to()
        StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
            has_download_reply_content
        ).expected(True).equal_to()
        StepCheck("Проверка наличия байт файла", "fileChunk").actual(actual_report_state.file_bytes).is_not_empty()
        StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
            True
        ).equal_to()

    with allure.step("Подготовка данных для проверки имени файла отчёта"):
        report_file_name = actual_report_state.report_file_name
        report_file_name_lower = report_file_name.lower()
        file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(report_file_name)
        period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
            actual_report_state.period_start_naive,
            actual_report_state.period_end_naive,
        )
        has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
        leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()

    with allure.step("Этап 8. Сохранение, обработка и проверка отчета по утечкам"):
        actual_report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(actual_report_state.file_bytes)

    try:
        with allure.step("Проверка: временный xlsx файл создан"):
            StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
                actual_report_state.temp_file_path
            ).is_not_none()

        with allure.step("Этап 9. Открытие xlsx и чтение шапки"):
            actual_report_state.worksheet = report_utils.load_report_worksheet(actual_report_state.temp_file_path)
            actual_report_state.title_info = report_utils.parse_report_title(
                report_utils.get_report_title_cell(actual_report_state.worksheet)
            )
            allure.attach(
                f"Шапка отчёта (raw): {actual_report_state.title_info.raw_title}\n"
                f"period_start: {actual_report_state.title_info.period_start}\n"
                f"period_end: {actual_report_state.title_info.period_end}",
                name="Шапка отчёта (1-я строка)",
                attachment_type=allure.attachment_type.TEXT,
            )

        with allure.step("Подготовка данных шапки отчёта для проверки"):
            title_info = actual_report_state.title_info
            report_title_lower = title_info.raw_title.lower()
            leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
            column_headers = report_utils.get_report_column_headers(actual_report_state.worksheet)
            period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
                report_utils.report_period_comparison_bounds(
                    actual_report_state.period_start_naive,
                    actual_report_state.period_end_naive,
                )
            )
            header_period_start = title_info.period_start
            header_period_end = title_info.period_end

        with allure.step("Этап 10. Извлечение строк данных из отчёта"):
            actual_report_state.data_rows = report_utils.iter_report_data_rows(actual_report_state.worksheet)
            actual_report_state.target_row = report_utils.find_row_with_object(
                actual_report_state.data_rows, cfg.technological_unit.description
            )
            allure.attach(
                "\n".join(f"row#{row.row_index}: {row.cells}" for row in actual_report_state.data_rows),
                name="Все строки данных отчёта",
                attachment_type=allure.attachment_type.TEXT,
            )

        with allure.step("Подготовка данных строки утечки для проверки"):
            target_row = actual_report_state.target_row
            leak_datetime_value = target_row.datetime_value if target_row else None
            object_value_lower = target_row.object_value.lower() if target_row else ""
            lds_status_value = target_row.lds_status.strip() if target_row else ""
            masking_info_lower = target_row.masking_info.lower() if target_row else ""
            leak_coordinate_meters = target_row.coordinate_meters if target_row else None
            leak_volume_value = target_row.leak_volume if target_row else None
            mt_mode_lower = target_row.mt_mode.lower() if target_row else ""
            expected_mt_mode_lower = actual_report_state.expected_mt_mode.lower()
            masking_not_masked_lower = ReportConst.MASKING_NOT_MASKED_TEXT.lower()
            period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
                report_utils.report_period_comparison_bounds(
                    actual_report_state.period_start_naive,
                    actual_report_state.period_end_naive,
                )
            )

        with allure.step("Проверка содержимого строки утечки"):
            StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
                actual_report_state.data_rows
            ).is_not_empty()
            StepCheck(
                f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
                ReportConst.COL_OBJECT,
            ).actual(actual_report_state.target_row).is_not_none()

            with SoftAssertions() as soft_failures:
                StepCheck(
                    "Время утечки в диапазоне [старт имитатора, старт + offset теста] (+-1 мин)",
                    ReportConst.COL_DATETIME,
                    soft_failures,
                ).actual(leak_datetime_value).is_between(period_start_lo, period_end_hi)

                StepCheck(
                    f"Колонка '{ReportConst.COL_OBJECT}' содержит '{cfg.technological_unit.description}'",
                    ReportConst.COL_OBJECT,
                    soft_failures,
                ).contains(object_value_lower, actual_report_state.tu_description_lower)

                StepCheck(
                    f"Колонка '{ReportConst.COL_LDS_STATUS}'",
                    ReportConst.COL_LDS_STATUS,
                    soft_failures,
                ).actual(
                    lds_status_value
                ).expected(ReportConst.LDS_STATUS_OK_TEXT).equal_to()

                StepCheck(
                    f"Колонка '{ReportConst.COL_MASK_INFO}' содержит '{ReportConst.MASKING_NOT_MASKED_TEXT}'",
                    ReportConst.COL_MASK_INFO,
                    soft_failures,
                ).contains(masking_info_lower, masking_not_masked_lower)

                StepCheck(
                    f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью {cfg.allowed_distance_diff_meters} м)",
                    ReportConst.COL_COORDINATE,
                    soft_failures,
                ).actual(leak_coordinate_meters).is_close_to(
                    leak.coordinate_meters,
                    cfg.allowed_distance_diff_meters,
                    f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
                )

                StepCheck(
                    f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
                    ReportConst.COL_LEAK_VOLUME,
                    soft_failures,
                ).actual(leak_volume_value).is_not_none()

                StepCheck(
                    f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{actual_report_state.expected_mt_mode}'",
                    ReportConst.COL_MT_MODE,
                    soft_failures,
                ).contains(mt_mode_lower, expected_mt_mode_lower)
    except Exception:
        with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
            if actual_report_state.temp_file_path and actual_report_state.report_file_name:
                report_utils.attach_report_file_to_allure(
                    actual_report_state.temp_file_path, actual_report_state.report_file_name
                )
        raise
    finally:
        with allure.step("Удаление временного xlsx файла"):
            temp_path = actual_report_state.temp_file_path
            if temp_path is not None:
                try:
                    temp_path.unlink(missing_ok=True)
                except OSError:
                    pass

    with allure.step("Проверка имени файла отчёта"):
        with SoftAssertions() as soft_failures:
            StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
                has_xlsx_extension
            ).expected(True).equal_to()
            StepCheck(
                f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name", soft_failures
            ).contains(report_file_name_lower, leaks_report_name_part_lower)
            StepCheck(
                f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'", "file_name", soft_failures
            ).contains(report_file_name_lower, actual_report_state.tu_description_lower)
            StepCheck(
                "Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
                "period_start_in_file_name",
                soft_failures,
            ).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
            StepCheck(
                "Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
                "period_end_in_file_name",
                soft_failures,
            ).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)

    with allure.step("Проверка двойной шапки отчёта"):
        StepCheck("Лист xlsx открыт", "worksheet").actual(actual_report_state.worksheet).is_not_none()
        with SoftAssertions() as soft_failures:
            StepCheck(
                f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
                "report_title",
                soft_failures,
            ).contains(report_title_lower, leaks_report_name_part_lower)

            StepCheck(
                "Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
                "period_start",
                soft_failures,
            ).actual(header_period_start).is_between(period_start_lo, period_start_hi)
            StepCheck(
                "Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
                "period_end",
                soft_failures,
            ).actual(header_period_end).is_between(period_end_lo, period_end_hi)
            StepCheck(
                "Названия колонок в шапке отчёта",
                "column_headers",
                soft_failures,
            ).actual(
                column_headers
            ).expected(ReportConst.EXPECTED_COLUMN_HEADERS).equal_to()

    with allure.step("Проверка пуш-нотификации о готовности отчёта"):
        with SoftAssertions() as soft_failures:
            StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
                actual_report_state.notification
            ).is_not_none()
            StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
                notification_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()
            StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
                notification_reply_content
            ).is_not_none()
            StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
                notification_export_status
            ).expected(ExportStatus.DONE).equal_to()
            StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
                notification_error_message
            ).is_empty()





























rep xlxs
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional

import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet

from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow


@dataclass
class ReportTitleInfo:
    """Разобранная шапка отчёта"""

    raw_title: str
    period_start: Optional[datetime] = None
    period_end: Optional[datetime] = None


@dataclass
class LeakReportRow:
    """Разобранная строка данных по утечке"""

    row_index: int
    cells: Dict[str, str] = field(default_factory=dict)

    @property
    def datetime_value(self) -> Optional[datetime]:
        return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))

    @property
    def object_value(self) -> str:
        return self.cells.get(ReportConst.COL_OBJECT, "")

    @property
    def lds_status(self) -> str:
        return self.cells.get(ReportConst.COL_LDS_STATUS, "")

    @property
    def masking_info(self) -> str:
        return self.cells.get(ReportConst.COL_MASK_INFO, "")

    @property
    def coordinate_meters(self) -> Optional[float]:
        coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
        if coordinate_km is None:
            return None
        return coordinate_km * TestConst.KM_TO_METERS

    @property
    def leak_volume(self) -> Optional[float]:
        return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))

    @property
    def mt_mode(self) -> str:
        return self.cells.get(ReportConst.COL_MT_MODE, "")


def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
    """Проверяет zip-сигнатуру xlsx"""
    if not file_bytes:
        return False
    return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)


def is_xlsx_extension(file_name: str) -> bool:
    """Проверяет расширение .xlsx без учёта регистра."""
    return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)


def parse_report_datetime(value: object) -> Optional[datetime]:
    """Парсит дату/время из ячейки отчёта."""
    if value is None:
        return None
    if isinstance(value, datetime):
        return value
    if isinstance(value, str):
        try:
            return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
        except ValueError:
            return None
    return None


def _stringify_cell(value: object) -> str:
    if value is None:
        return ""
    if isinstance(value, datetime):
        return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
    return str(value)


def normalize_report_period_naive(value: datetime) -> datetime:
    """Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
    return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)


def report_period_comparison_bounds(
    period_start: datetime,
    period_end: datetime,
    tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
    """
    Границы периода с допуском +-tolerance_minutes для start и end отдельно.
    Возвращает (start_lower, start_upper, end_lower, end_upper).
    """
    start = normalize_report_period_naive(period_start)
    end = normalize_report_period_naive(period_end)
    delta = timedelta(minutes=tolerance_minutes)
    return start - delta, start + delta, end - delta, end + delta


def build_export_report_file_name(
    tu_description: str,
    period_start: datetime,
    period_end: datetime,
) -> str:
    """
    Имя xlsx при скачивании: «Отчет об утечках Тихорецк-Новороссийск-3 DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx».
    """
    start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
    end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
    return (
        f"{ReportConst.LEAKS_REPORT_NAME_PART} {tu_description} {start_text} - {end_text}"
        f"{ReportConst.XLSX_EXTENSION}"
    )


def parse_period_from_export_file_name(file_name: str) -> tuple[Optional[datetime], Optional[datetime]]:
    """Извлекает границы периода из имени скачанного xlsx-файла."""
    match = re.search(ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN, file_name.strip(), re.IGNORECASE)
    if match is None:
        return None, None

    parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")

    def _parse_part(value: str) -> Optional[datetime]:
        try:
            return datetime.strptime(value.replace("_", ":"), parse_format)
        except ValueError:
            return None

    return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))


def parse_report_title(title_raw: object) -> ReportTitleInfo:
    """
    Парсит шапку отчёта с именованными группами period_start/period_end.
    """
    title_str = _stringify_cell(title_raw)
    match = re.search(ReportConst.REPORT_HEADER_PERIOD_PATTERN, title_str)
    if match is None:
        return ReportTitleInfo(raw_title=title_str)

    return ReportTitleInfo(
        raw_title=title_str,
        period_start=parse_report_datetime(match.group("period_start")),
        period_end=parse_report_datetime(match.group("period_end")),
    )


def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
    """Открывает первый лист xlsx. При ошибке возвращает None."""
    if not file_path.exists():
        return None
    try:
        workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
    except Exception:
        return None
    sheet_names = workbook.sheetnames
    if not sheet_names:
        return None
    return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]


def get_report_title_cell(worksheet: Worksheet) -> object:
    return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value


def get_report_column_headers(worksheet: Worksheet) -> List[str]:
    """Возвращает непустые заголовки колонок из строки REPORT_COLUMN_HEADERS_ROW."""
    headers: List[str] = []
    column_index = 1
    while True:
        cell_value = worksheet.cell(row=ReportConst.REPORT_COLUMN_HEADERS_ROW, column=column_index).value
        if cell_value is None or not str(cell_value).strip():
            break
        headers.append(_stringify_cell(cell_value).strip())
        column_index += 1
    return headers


def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
    """Собирает словарь {название колонки: значение ячейки} по строке данных."""
    return {
        header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
        for column_index, header in enumerate(headers)
    }


def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
    """
    Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
    Пустые строки пропускаются.
    """
    headers = get_report_column_headers(worksheet)
    if not headers:
        return []

    rows: List[LeakReportRow] = []
    for excel_row_index, row_values in enumerate(
        worksheet.iter_rows(
            min_row=ReportConst.REPORT_DATA_FIRST_ROW,
            max_col=len(headers),
            values_only=True,
        ),
        start=ReportConst.REPORT_DATA_FIRST_ROW,
    ):
        if not any(cell is not None and str(cell).strip() for cell in row_values):
            continue
        rows.append(
            LeakReportRow(
                row_index=excel_row_index,
                cells=build_column_cells(row_values, headers),
            )
        )
    return rows


def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
    """Ищет первую строку, где колонка 'Объект' содержит подстроку без учёта регистра"""
    substring_lower = object_substring.lower()
    for row in rows:
        if substring_lower in row.object_value.lower():
            return row
    return None


def save_report_bytes_to_temp_file(file_bytes: bytes) -> Optional[Path]:
    """Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
    import tempfile

    try:
        with tempfile.NamedTemporaryFile(
            suffix=ReportConst.XLSX_EXTENSION,
            prefix="leaks_report_",
            delete=False,
        ) as temp_file:
            temp_file.write(file_bytes)
            return Path(temp_file.name)
    except OSError:
        return None


def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
    """Прикладывает xlsx к Allure при падении теста"""
    try:
        xlsx_type = allure.attachment_type.XLSX
    except AttributeError:
        xlsx_type = None

    if xlsx_type is not None:
        allure.attach.file(
            str(file_path),
            name=file_name,
            attachment_type=xlsx_type,
            extension="xlsx",
        )
        return

    try:
        with file_path.open("rb") as raw_file:
            allure.attach(raw_file.read(), name=file_name, extension="xlsx")
    except OSError:
        pass























ws test
from __future__ import annotations

import asyncio
import pprint
import random
import re
from dataclasses import asdict, is_dataclass
from datetime import datetime, timedelta, timezone
from enum import IntEnum, IntFlag
from typing import Any, List, Optional, Set, Type, TypeVar
from zoneinfo import ZoneInfo

import allure
from msgpack import Timestamp as MsgpackTimestamp
from pytest import fail

from clients.websocket_client import WebSocketClient
from constants.architecture_constants import WebSocketClientConstants as WS_Const
from constants.enums import (
    ConfirmationStatus,
    ExportStatus,
    LdsStatus,
    LdsStatusDegradation,
    LdsStatusFaulty,
    LdsStatusInitialization,
    LeakStatus,
    ReplyStatus,
    StationaryReason,
    StationaryStatus,
    StoppedPumpingReason,
    UnStationaryReason,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.export_reports_model import ReportDataExportedContent, ReportDataExportedNotification
from models.get_messages_model import GetMessagesRequest
from models.subscribe_all_leaks_info_model import SubscribeAllLeaksInfoReply
from models.subscribe_common_scheme_model import DiagnosticArea, FlowArea
from models.subscribe_leaks_model import Leak
from models.subscribe_main_page_info_model import MainPageLeakInfo
from utils.helpers.ws_message_parser import ws_message_parser
from utils.msgpack_utils.message_filters import is_desired_invocation_id, is_desired_type

ObjectType = TypeVar("ObjectType")  # создает типовую переменную для поиска объектов в списке
RandomObjectType = TypeVar("RandomObjectType")


def convert_leak_volume_m3(volume: float) -> float:
    """
    Преобразует объем утечки в м3/час
    """
    #  Округляет результат для читабельности
    return round(volume * TestConst.MASS_KG, 3)


def datetime_minus_seconds(datetime_obj: datetime, delta_s: int) -> datetime:
    """
    Вычитает время в секундах из datetime
    """
    return (datetime_obj - timedelta(seconds=delta_s)).replace(microsecond=0)


def calculate_leak_start_time(imitator_start_time: datetime, leak_interval_seconds: int) -> datetime:
    """
    Рассчитывает время начала утечки на основе времени старта имитатора.

    :param imitator_start_time: datetime объект времени старта имитатора
    :param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
    :return: datetime время ожидаемого начала утечки
    """
    if not imitator_start_time:
        fail("Пришло пустое значение imitator_start_time")
    return (imitator_start_time + timedelta(seconds=leak_interval_seconds)).replace(microsecond=0)


def calculate_leak_end_time(
    imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int
) -> datetime:
    """
    Рассчитывает крайнее время обнаружения утечки (с учётом допустимой погрешности).

    :param imitator_start_time: datetime объект времени старта имитатора
    :param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
    :param allowed_diff_seconds: допустимая погрешность времени обнаружения (ALLOWED_TIME_DIFF_SECONDS)
    :return: datetime крайнее время обнаружения утечки
    """
    if not imitator_start_time:
        fail("Пришло пустое значение imitator_start_time")
    total_seconds = leak_interval_seconds + allowed_diff_seconds
    return (imitator_start_time + timedelta(seconds=total_seconds)).replace(microsecond=0)


def get_leak_time_window(
    imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int, detected_at_tz=None
) -> tuple[datetime, datetime]:
    """
    Возвращает временное окно для проверки времени обнаружения утечки.

    :param imitator_start_time: datetime объект времени старта имитатора
    :param leak_interval_seconds: интервал от старта до утечки в секундах
    :param allowed_diff_seconds: допустимая погрешность времени обнаружения
    :param detected_at_tz: timezone из времени обнаружения утечки (опционально)
    :return: tuple (leak_start_time, leak_end_time) для использования в is_between проверке
    """
    leak_start = calculate_leak_start_time(imitator_start_time, leak_interval_seconds)
    leak_end = calculate_leak_end_time(imitator_start_time, leak_interval_seconds, allowed_diff_seconds)

    # Если передан timezone, применяем его к временам для корректного сравнения
    if detected_at_tz:
        leak_start = leak_start.replace(tzinfo=detected_at_tz)
        leak_end = leak_end.replace(tzinfo=detected_at_tz)

    return leak_start, leak_end


def ensure_moscow_timezone(input_datetime: datetime) -> None | datetime:
    """
    Конвертирует datetime в московское время, если оно не в московской таймзоне.

    :param input_datetime: datetime объект
    :return: datetime в московской таймзоне
    """
    if input_datetime is None:
        return input_datetime

    # Если datetime без timezone - считаем что это UTC
    if input_datetime.tzinfo is None:
        input_datetime = input_datetime.replace(tzinfo=timezone.utc)

    # Конвертируем в московское время
    return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))


def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
    """
    Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
    """
    now = datetime.now(ZoneInfo(tz_name))
    utc_offset = now.utcoffset()
    if utc_offset is None:
        return None
    return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)


def localize_as_moscow(input_datetime: datetime) -> None | datetime:
    """
    Присваивает datetime московский часовой пояс без сдвига времени.
    Если datetime уже имеет timezone - конвертирует в московское время.
    """
    if input_datetime is None:
        return input_datetime

    moscow_tz = ZoneInfo(TestConst.ZONE_INFO)
    if input_datetime.tzinfo is None:
        return input_datetime.replace(tzinfo=moscow_tz)
    return input_datetime.astimezone(moscow_tz)


def format_datetime_moscow(value: Optional[datetime]) -> str:
    """Строковое представление datetime в Europe/Moscow для вложений Allure."""
    if value is None:
        return "None"
    return str(localize_as_moscow(value))


def get_rejection_time_window(
    imitator_start_time: datetime,
    start_seconds: int | float,
    reserve_seconds: int | float = 0,
) -> tuple[datetime, datetime]:
    """
    Возвращает временное окно для проверки сообщения об отбраковке.
    """
    imitator_msk = localize_as_moscow(imitator_start_time)
    range_start = imitator_msk + timedelta(seconds=start_seconds - reserve_seconds)
    range_end = localize_as_moscow(datetime.now())
    return range_start, range_end


def find_rejection_journal_message(
    messages_info: List[ObjectType],
    tag: str,
    range_start: datetime,
    range_end: datetime,
    technological_section: str,
    expected_event: str,
) -> tuple[list[ObjectType], ObjectType | None]:
    """
    Фильтрует сообщения журнала по tag и временному диапазону,
    затем ищет целевое сообщение по technologicalSection и event.
    """
    time_filtered = [
        msg for msg in messages_info if msg.tag == tag and range_start <= ensure_moscow_timezone(msg.time) <= range_end
    ]
    time_filtered.sort(key=lambda msg: ensure_moscow_timezone(msg.time), reverse=True)

    target_msg = next(
        (
            msg
            for msg in time_filtered
            if msg.technologicalSection == technological_section and msg.event.rstrip() == expected_event
        ),
        None,
    )
    return time_filtered, target_msg


def get_random_item(item_list: List[RandomObjectType]) -> RandomObjectType:
    """
    Получает случайный объект из списка
    """
    if not item_list:
        fail("Пустой список объектов")
    try:
        return random.choice(item_list)
    except (TypeError, ValueError):
        fail(f"Не удалось получить случайный элемент из списка: {item_list}")


def get_longest_flow_area(flow_areas: List[FlowArea]) -> FlowArea:
    """
    Получает самый протяженный участок карты течения по количеству ДУ из списка всех участков
    """
    if not flow_areas:
        fail("Список flow_areas пустой")
    try:
        longest_flow_area = max(flow_areas, key=lambda flow_area: len(flow_area.diagnosticAreas))
        return longest_flow_area
    except (TypeError, ValueError):
        fail(f"Не найден протяженный участок из списка flow_areas: {flow_areas}.")


def determine_lds_status_by_priority(lds_status_set: Set[int]) -> int:
    """
    Определяет режим работы СОУ по приоритету и наличию режимов работы у ДУ на самом протяженном участки карты течений
    """
    lds_status_priority = [
        LdsStatus.FAULTY.value,
        LdsStatus.INITIALIZATION.value,
        LdsStatus.DEGRADATION.value,
        LdsStatus.SERVICEABLE.value,
    ]
    if not lds_status_set:
        fail("Пустой список режимов СОУ ДУ")
    try:
        for status in lds_status_priority:
            if status in lds_status_set:
                return status
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        fail("Не удалось определить режим работу СОУ.")


def find_signal_type_by_address_suffix(signals_list: list, address_suffix: str) -> int:
    """
    Ищет в списке сигналов тип сигнала по части адреса
    """
    if not signals_list:
        fail("Пустой список сигналов")
    try:
        for sensor_signal in signals_list:
            if sensor_signal.address is not None and str(sensor_signal.address).endswith(address_suffix):
                return sensor_signal.signalType
        fail(f"Не найден тип сигнала по части адреса: {address_suffix} из списка: {signals_list}")
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        fail(f"Не найден тип сигнала по части адреса: {address_suffix} из списка: {signals_list}")


def find_signal_val_by_signal_type(signals_list: list, signal_type: int) -> str:
    """
    Ищет в списке сигналов значение сигнала по типу
    """
    if not signals_list:
        fail("Пустой список сигналов")
    try:
        for sensor_signal in signals_list:
            if sensor_signal.signalType is not None and sensor_signal.signalType == signal_type:
                return sensor_signal.value
        fail(f"Не найдено значение для типа сигнала: {signal_type} из списка: {signals_list}")
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        fail(f"Не найдено значение для типа сигнала: {signal_type} из списка: {signals_list}")


def find_object_by_field(item_list: List[ObjectType], field_name: str, value: Any) -> ObjectType:
    """
    Ищет объект в списке объектов по значению одного из полей объекта
    """
    if not item_list:
        fail("Список объектов пуст")
    try:
        return next((item for item in item_list if getattr(item, field_name) == value))
    except Exception:
        fail(f"Не найдено значение: {value} для поля: {field_name}, в списке: {item_list}.")


def find_object_by_a_few_fields(item_list: List[ObjectType], fields_dict: dict) -> ObjectType:
    """
    Ищет объект в списке объектов по значениям нескольких полей
    """
    if not item_list:
        return None

    return next(
        (item for item in item_list if all(getattr(item, field) == value for field, value in fields_dict.items())), None
    )


def get_signal(site_message, signal_type):
    if not site_message:
        return None
    return next((s for s in site_message.signals if s.signalType == signal_type.value), None)


def get_value(obj):
    return getattr(obj, "value", None)


def find_confirmed_leaks(item_list: List[Leak]) -> List[Leak]:
    """Ищет подтвержденные утечки"""
    try:
        return [
            item
            for item in item_list
            if item.confirmationStatus == ConfirmationStatus.CONFIRMED.value and item.detectedAt is not None
        ]
    except (AttributeError, KeyError, TypeError, ValueError):
        return []


def find_confirmed_leaks_on_main_page(item_list: List[MainPageLeakInfo]) -> List[MainPageLeakInfo]:
    """Ищет подтвержденные утечки"""
    try:
        return [
            item
            for item in item_list
            if item.leakStatus == LeakStatus.CONFIRMED.value and item.leakDetectedAt is not None
        ]
    except (AttributeError, KeyError, TypeError, ValueError):
        return []


def find_diagnostic_area_by_id(flow_areas: List[FlowArea], id_value: int) -> Optional[DiagnosticArea]:
    """
    Ищет ДУ по id в списке участков карты течений, исключает дубликаты по количеству pipeIds
    """
    candidates = []
    if not flow_areas:
        return None
    try:
        for flow_area in flow_areas:
            for diagnostic_area in flow_area.diagnosticAreas:
                if diagnostic_area.id == id_value:
                    candidates.append(diagnostic_area)
        if not candidates:
            return None
        elif len(candidates) == 1:
            return candidates[0]
        else:
            # Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
            return max(candidates, key=lambda candidate: len(candidate.pipeIds))
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        return None


def find_diagnostic_area_by_pipe_id(flow_areas: List[FlowArea], pipe_id: int) -> Optional[DiagnosticArea]:
    """
    Ищет ДУ по pipe id в списке участков карты течений, исключает дубликаты по количеству pipeIds
    """
    candidates = []
    if not flow_areas:
        return None
    try:
        for flow_area in flow_areas:
            for diagnostic_area in flow_area.diagnosticAreas:
                if diagnostic_area.pipeIds and pipe_id in diagnostic_area.pipeIds:
                    candidates.append(diagnostic_area)
        if not candidates:
            return None
        elif len(candidates) == 1:
            return candidates[0]
        else:
            # Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
            return max(candidates, key=lambda candidate: len(candidate.pipeIds))
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        return None


def find_diagnostic_areas_by_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
    """
    Получает список ДУ из списка flow_areas по списку id
    """
    diagnostic_areas = [
        result
        for diagnostic_area_id in id_list
        if (result := find_diagnostic_area_by_id(flow_areas, diagnostic_area_id)) is not None
    ]

    return diagnostic_areas


def find_diagnostic_areas_by_pipe_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
    """
    Получает список ДУ из списка flow_areas по списку pipe id
    """
    diagnostic_areas = [
        result for pipe_id in id_list if (result := find_diagnostic_area_by_pipe_id(flow_areas, pipe_id)) is not None
    ]

    return diagnostic_areas


def find_base_diagnostic_areas(flow_areas: List[FlowArea]) -> List[DiagnosticArea]:
    """
    Получает список базовых ДУ из списка flow_areas
    """
    return find_diagnostic_areas_by_ids(flow_areas, TestConst.DIAGNOSTIC_AREA_BASE_IDS)


def find_leak_by_coordinate(
    leaks_list: List[ObjectType], expected_coordinate: float, tolerance: float = TestConst.ALLOWED_DISTANCE_DIFF_METERS
) -> ObjectType:
    """
    Ищет утечку в списке по координатам с допустимой погрешностью
    поднимает pytest.fail если список пуст или утечка не найдена
    """
    if not leaks_list:
        fail("Список утечек пуст")

    for leak in leaks_list:
        leak_coordinate = getattr(leak, "leakCoordinate")
        if leak_coordinate is None or "":
            continue
        if abs(leak_coordinate - expected_coordinate) <= tolerance:
            return leak

    fail(
        f"Не найдена утечка с координатой {expected_coordinate} +- {tolerance} м"
        f"Список полученных утечек: {leaks_list}"
    )


def to_moscow_timezone(date_str: str) -> datetime:
    """
    Преобразует строку времени в московское время
    """
    if not date_str or not date_str.strip():
        fail("Пришло пустое значение для преобразования в московское время")

    try:
        if date_str.startswith(("'", '"', '')) or date_str.endswith(("'", '"', '')):
            date_str = date_str.strip().strip("'").strip('"')

        date_utc = datetime.strptime(date_str, TestConst.OUTPUT_TIME_FORMAT).replace(tzinfo=timezone.utc)
        return date_utc.astimezone(ZoneInfo(TestConst.ZONE_INFO))

    except (AttributeError, TypeError, ValueError):
        fail(f"Не удалось преобразовать время в московское: {date_str}.")


def create_dict_from_dataclass(cls: Type, **kwargs) -> dict:
    """Создает словарь из экземпляра dataclass c нужными параметрами"""
    if not is_dataclass(cls):
        fail(f"{cls} не dataclass")
    instance = cls(**kwargs)
    return asdict(instance)


def datetime_to_msgpack_timestamp(dt: datetime) -> list:
    """Конвертирует datetime в формат [Timestamp(seconds, nanoseconds), tz_offset] для отправки на бэкенд."""
    return [MsgpackTimestamp(seconds=int(dt.timestamp()), nanoseconds=0), 0]


def create_journal_req_body(**kwargs) -> dict:
    """Создает дефолтные параметры запроса к журналу"""
    result = create_dict_from_dataclass(GetMessagesRequest, **kwargs)
    period = result.get('periodTime')
    if period:
        for key in ('start', 'end'):
            if isinstance(period.get(key), datetime):
                period[key] = datetime_to_msgpack_timestamp(period[key])
    return result


def extract_first_number(value: object) -> Optional[float]:
    """
    Извлекает первое число из ячейки (int/float/str вида '55.55 км', '111.11 м3/ч').
    """
    if value is None:
        return None
    if isinstance(value, (int, float)) and not isinstance(value, bool):
        return float(value)
    if isinstance(value, str):
        matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
        if matches:
            try:
                return float(matches[0].replace(",", "."))
            except ValueError:
                return None
    return None


def _attach_ws_poll_failure(
    collected_messages: List[Any],
    total_wait_seconds: float,
    expected_message_type: str,
) -> None:
    """Краткая сводка и pprint каждого WS-сообщения при таймауте поллинга."""
    allure.attach(
        "\n".join(
            [
                f"Таймаут ожидания: {total_wait_seconds} с",
                f"Ожидаемый тип сообщения: {expected_message_type}",
                f"Всего сообщений за период поллинга: {len(collected_messages)}",
            ]
        ),
        name="WS poll timeout",
        attachment_type=allure.attachment_type.TEXT,
    )
    for msg in collected_messages:
        allure.attach(
            pprint.pformat(msg, width=120, sort_dicts=False),
            name="received ws message",
            attachment_type=allure.attachment_type.TEXT,
        )


def _attach_ws_reply_parse_failure(
    reply_payload: Optional[Any],
    invocation_id: str,
    request_name: str,
    error: BaseException,
) -> None:
    """Прикрепляет к Allure ответ бэка при ошибке парсинга."""
    allure.attach(
        "\n".join(
            [
                f"Запрос: {request_name}",
                f"invocation_id: {invocation_id}",
                f"Ошибка: {error}",
            ]
        ),
        name="WS parse failure",
        attachment_type=allure.attachment_type.TEXT,
    )
    if reply_payload is not None:
        allure.attach(
            pprint.pformat(reply_payload, width=120, sort_dicts=False),
            name="received ws message",
            attachment_type=allure.attachment_type.TEXT,
        )


def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
    """Забирает все сообщения из очереди ws без блокирующего receive_by_..."""
    messages: List[Any] = []
    while not ws_client.recv_queue.empty():
        try:
            messages.append(ws_client.recv_queue.get_nowait())
        except asyncio.QueueEmpty:
            break
    return messages


def _find_valid_report_export_notification(
    messages: List[Any],
    parser,
    notification_type: str,
) -> Optional[ReportDataExportedNotification]:
    """Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
    for msg in messages:
        if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
            continue
        try:
            notification = parser.parse_report_data_exported_notification_msg(msg)
        except (ValueError, TypeError, KeyError):
            continue
        if notification.replyStatus != ReplyStatus.OK.value:
            continue
        content: Optional[ReportDataExportedContent] = notification.replyContent
        if content is None:
            continue
        if content.exportStatus != ExportStatus.DONE:
            continue
        return notification
    return None


async def poll_for_report_export_notification(
    ws_client: WebSocketClient,
    parser,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
    При таймауте прикрепляет к Allure все полученные за период сообщения.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)
            notification = _find_valid_report_export_notification(
                batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
            )
            if notification is not None:
                return notification
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(
        collected_messages,
        total_wait_seconds,
        ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
    )
    return None


def _find_ws_reply_by_invocation_id(messages: List[Any], invocation_id: str, parser) -> Optional[list]:
    """
    Ищет последний ответ с заданным invocation_id и телом replyStatus.
    """
    reply_payload = None
    for msg in messages:
        if not isinstance(msg, list) or not is_desired_invocation_id(msg, invocation_id):
            continue
        if parser.find_reply_status_in_ws_msg(msg):
            reply_payload = msg
    return reply_payload


async def poll_for_exported_file(
    ws_client: WebSocketClient,
    parser,
    list_limit: int,
    expected_data_type: Any,
    name_substring: str,
    tu_name_substring: str,
    period_start: datetime,
    period_end: datetime,
    total_wait_seconds: float,
    poll_interval_seconds: float,
    period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
    """
    Периодически шлёт GetExportedDataListRequest, забирает ответы из очереди
    по invocation_id среди всех накопленных сообщений.
    При таймауте или ошибке парсинга прикрепляет к Allure полученные ответы.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    last_items_count = -1
    collected_messages: List[Any] = []
    request_name = ReportConst.GET_EXPORTED_DATA_LIST_REQUEST
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            drained_before_request = _drain_recv_queue(ws_client)
            collected_messages.extend(drained_before_request)
            await connect(
                ws_client,
                request_name,
                {"limit": list_limit},
            )
            invocation_id = ws_client.invocation_id
            await asyncio.sleep(poll_interval_seconds)

            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)
            list_reply_payload = _find_ws_reply_by_invocation_id(batch, invocation_id, parser)

            if list_reply_payload is None:
                continue

            try:
                parsed_payload = parser.parse_exported_data_list_msg(list_reply_payload)
            except Exception as error:
                _attach_ws_reply_parse_failure(list_reply_payload, invocation_id, request_name, error)
                for msg in collected_messages:
                    allure.attach(
                        pprint.pformat(msg, width=120, sort_dicts=False),
                        name="received ws message",
                        attachment_type=allure.attachment_type.TEXT,
                    )
                fail(f"Не удалось разобрать ответ на {request_name}: {error}")

            items = []
            if parsed_payload.replyContent is not None:
                items = parsed_payload.replyContent.exportedData or []

            if len(items) != last_items_count:
                allure.attach(
                    "\n".join(
                        f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
                        f"start={format_datetime_moscow(item.start)}, end={format_datetime_moscow(item.end)}"
                        for item in items
                    ),
                    name=f"Список сформированных файлов (всего: {len(items)})",
                    attachment_type=allure.attachment_type.TEXT,
                )
                last_items_count = len(items)

            match = find_matching_exported_item(
                items=items,
                expected_data_type=expected_data_type,
                name_substring=name_substring,
                tu_name_substring=tu_name_substring,
                period_start=period_start,
                period_end=period_end,
                period_tolerance_minutes=period_tolerance_minutes,
            )
            if match is not None:
                return match
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(
        collected_messages,
        total_wait_seconds,
        request_name,
    )
    return None


def _normalize_report_period_datetime(value: datetime) -> datetime:
    """Приводит datetime периода отчёта к московскому времени без микросекунд."""
    return localize_as_moscow(value).replace(microsecond=0)


def _exported_item_period_matches(
    item_start: datetime,
    item_end: datetime,
    period_start: datetime,
    period_end: datetime,
    tolerance_minutes: int,
) -> bool:
    """Проверяет start/end элемента списка в пределах периода запроса +- tolerance_minutes."""
    item_start_norm = _normalize_report_period_datetime(item_start)
    item_end_norm = _normalize_report_period_datetime(item_end)
    period_start_norm = _normalize_report_period_datetime(period_start)
    period_end_norm = _normalize_report_period_datetime(period_end)
    delta = timedelta(minutes=tolerance_minutes)
    return (period_start_norm - delta) <= item_start_norm <= (period_start_norm + delta) and (
        period_end_norm - delta
    ) <= item_end_norm <= (period_end_norm + delta)


def find_matching_exported_item(
    items: List[Any],
    expected_data_type: Any,
    name_substring: str,
    tu_name_substring: str,
    period_start: datetime,
    period_end: datetime,
    period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
    """
    Ищет элемент списка по типу, подстрокам в имени (отчёт + ТУ) и периоду start/end с допуском.
    """
    name_substring_lower = name_substring.lower()
    tu_name_lower = tu_name_substring.lower()

    matched_items = []
    for item in items:
        if item.exportedDataType != expected_data_type:
            continue
        item_name_lower = (item.name or "").lower()
        if name_substring_lower not in item_name_lower:
            continue
        if tu_name_lower not in item_name_lower:
            continue
        if item.start is None or item.end is None:
            continue
        if not _exported_item_period_matches(item.start, item.end, period_start, period_end, period_tolerance_minutes):
            continue
        matched_items.append(item)

    if not matched_items:
        return None
    return max(matched_items, key=lambda exported_item: exported_item.id)


def parse_journal_msg_value(value: str) -> tuple:
    """Парсит поле value в сообщении журнала"""
    try:
        # ищет группы цифр с точкой в строке
        matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
        coordinate, volume = (matches + [None, None])[:2]
        if coordinate is not None:
            try:
                coordinate = float(coordinate)
            except ValueError:
                coordinate = None
        if volume is not None:
            try:
                volume = float(volume)
            except ValueError:
                volume = None
        return coordinate, volume
    except (AttributeError, TypeError, ValueError):
        return None, None


def parse_bit_flags(
    value: int, enum_cls: Type[IntEnum | IntFlag], failures: Optional[List[str]] = None
) -> List[IntFlag]:
    """
    Распаковка битовых флагов
    """
    # 0 - это валидное состояние когда причин нет, в прошлой реализации тест бы падал если причин нет
    # хотя это может быть ожидаемо, например при тестировании исправности или где-нибудь еще
    if value == 0:
        return []

    found_flags = [flag for flag in enum_cls if value & flag.value]
    known_bits = sum(flag.value for flag in found_flags)

    if known_bits != value:
        unknown_bits = value ^ known_bits
        error_message = f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}"
        if failures is not None:
            failures.append(error_message)
        else:
            fail(f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}")

    # та же сортировка только не цифры, а их текстовое значение
    return sorted(found_flags, key=lambda flag: flag.value)


def get_reason_enum_by_lds_status(lds_status: int | LdsStatus, failures: Optional[List[str]] = None) -> Type[IntFlag]:
    """
    Получение класса причин по режимам СОУ
    """

    if isinstance(lds_status, int):
        try:
            lds_status = LdsStatus(lds_status)
        except ValueError:
            error_message = f"Неизвестный LdsStatus: {lds_status}"
            if failures is not None:
                failures.append(error_message)
            else:
                fail(error_message)

    reason_by_lds_status = {
        LdsStatus.FAULTY: LdsStatusFaulty,
        LdsStatus.INITIALIZATION: LdsStatusInitialization,
        LdsStatus.DEGRADATION: LdsStatusDegradation,
    }
    enum_class = reason_by_lds_status.get(lds_status)
    if enum_class is None:
        error_message = f"Для LdsStatus{lds_status.name} не определены причины"
        if failures is not None:
            failures.append(error_message)
        else:
            fail(error_message)
    return enum_class


def get_reason_enum_by_stationary_status(
    stationary_status: int | StationaryStatus, failures: Optional[List[str]] = None
) -> Type[IntFlag]:
    """
    Получение класса причин по режимам МТ
    """

    if isinstance(stationary_status, int):
        try:
            stationary_status = StationaryStatus(stationary_status)
        except ValueError:
            error_message = f"Неизвестный StationaryStatus: {stationary_status}"
            if failures is not None:
                failures.append(error_message)
            else:
                fail(error_message)

    reason_by_stationary_status = {
        StationaryStatus.STATIONARY: StationaryReason,
        StationaryStatus.UNSTATIONARY: UnStationaryReason,
        StationaryStatus.STOPPED: StoppedPumpingReason,
    }
    enum_class = reason_by_stationary_status.get(stationary_status)
    if enum_class is None:
        error_message = f"Для StationaryStatus{stationary_status.name} не определены причины"
        if failures is not None:
            failures.append(error_message)
        else:
            fail(error_message)
    return enum_class


def parse_lds_status_reasons(lds_status: int, lds_status_reasons: int, failures: Optional[List[str]] = None):
    """
    Получение списка ldsStatusReasons, соответствующего ldsStatus
    """
    enum_cls = get_reason_enum_by_lds_status(lds_status, failures)
    flags = parse_bit_flags(lds_status_reasons, enum_cls, failures)
    return flags


def parse_stationary_status_reasons(
    stationary_status: int, stationary_status_reasons: int, failures: Optional[List[str]] = None
):
    """
    Получение списка stationaryStatusReasons, соответствующего stationaryStatus
    """
    enum_cls = get_reason_enum_by_stationary_status(stationary_status, failures)
    flags = parse_bit_flags(stationary_status_reasons, enum_cls, failures)
    return flags


async def connect(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> None:
    """
    Подключение к заданной подписке
    """
    try:
        with allure.step(f"Вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
            await ws_client.invoke(ws_invoke_type, ws_invoke_params)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")


async def connect_stream(
    ws_client: WebSocketClient,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    purpose: str = "streaming-вызов WS",
) -> None:
    """
    Streaming-вызов (StreamInvocation)
    """
    try:
        with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
            await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(
            f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
            f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
        )


def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
    """Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
    if not isinstance(msg, list):
        return None
    if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
        return None
    if not is_desired_invocation_id(msg, invocation_id):
        return None
    if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
        return None
    error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
    if isinstance(error_text, str):
        return error_text
    return None


async def receive_download_exported_data_reply(
    ws_client: WebSocketClient,
    parser,
    invocation_id: str,
    request_name: str,
    total_wait_seconds: float,
    poll_interval_seconds: float = 0.5,
    purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
    """
    Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
    """
    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)

            for msg in batch:
                stream_error = _stream_completion_error(msg, invocation_id)
                if stream_error:
                    _attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
                    fail(
                        f"При {purpose} бэк вернул Completion с ошибкой "
                        f"({request_name}, invocation_id={invocation_id}): {stream_error}"
                    )

            for msg in batch:
                if (
                    not isinstance(msg, list)
                    or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
                    or not is_desired_invocation_id(msg, invocation_id)
                ):
                    continue
                if parser.find_reply_status_in_ws_msg(msg) is None:
                    continue
                try:
                    return parser.parse_download_exported_data_msg(msg)
                except Exception as error:
                    _attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
                    fail(
                        f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
                        f"но не удалось разобрать ответ с fileChunk: {error}"
                    )
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
    fail(
        f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
        f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
    )


async def connect_and_get_parsed_msg_by_tu_id(
    tu_id: int,
    ws_client: WebSocketClient,
    ws_message_type: str,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> SubscribeAllLeaksInfoReply:
    """
    Подключается, ищет и парсит allLeaksInfo сообщение для конкретного ТУ
    """
    await connect(ws_client, ws_invoke_type, ws_invoke_params)

    async def get_parsed_msg():
        """
        Ищет и парсит allLeaksInfo сообщение для конкретного ТУ
        """
        while True:
            payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
            parsed_payload = ws_message_parser.parse_all_leaks_info_msg(payload)
            #  Ищет сообщение с нужным ТУ
            if parsed_payload.replyContent.tuId == tu_id:
                return parsed_payload

    try:
        with allure.step(f"Получение сообщения с контентом типа: {ws_message_type} для ТУ {tu_id}"):
            return await asyncio.wait_for(get_parsed_msg(), timeout=timeout)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось получить сообщение allLeaksInfo для ТУ {tu_id}. Ошибка: {error}")


async def connect_and_get_msg(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> list:
    """
    Подключение типа get к заданной подписке и получение сообщения с заданным типом контента
    """
    await connect(ws_client, ws_invoke_type, ws_invoke_params)
    invocation_id = ws_client.invocation_id

    try:
        with allure.step(f"Получение входящего сообщения c invocation_id: {invocation_id}"):
            payload = await ws_client.receive_by_invocation_id(invocation_id)
        return payload
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")


async def connect_and_subscribe_msg(
    ws_client: WebSocketClient,
    ws_message_type: str,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> list:
    """
    Подключение типа subscribe к заданной подписке и получение сообщения с заданным типом контента
    """
    await connect(ws_client, ws_invoke_type, ws_invoke_params)
    try:
        with allure.step(f"Получение сообщения с контентом типа: {ws_message_type}"):
            payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)

        return payload
    except (asyncio.TimeoutError, OSError, ConnectionError, ConnectionResetError) as error:
        fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")


async def poll_balance_algorithm_diagnostic_areas(
    ws_client: WebSocketClient,
    ws_parser: ws_message_parser,
    imitator_start_time: datetime,
    end_time: datetime,
    poll_interval: float,
) -> list:
    """
    Опрашивает очередь ws_client на наличие BalanceAlgorithmResultsContent,
    собирает и возвращает все diagnosticAreas из flowAreas.
    """
    collected_diagnostic_areas = []
    ws_client.suppress_recv_logging = True
    ws_parser.suppress_recv_logging = True
    try:
        while datetime.now(tz=imitator_start_time.tzinfo) < end_time:
            await asyncio.sleep(poll_interval)

            latest_msg = None
            while not ws_client.recv_queue.empty():
                try:
                    msg = ws_client.recv_queue.get_nowait()
                except asyncio.QueueEmpty:
                    break
                if isinstance(msg, list) and is_desired_type(msg, "BalanceAlgorithmResultsContent"):
                    latest_msg = msg

            if latest_msg is None:
                continue

            parsed_payload = ws_message_parser.parse_balance_algorithm_msg(latest_msg)
            reply_content = parsed_payload.replyContent
            if reply_content and reply_content.flowAreas:
                for flow_area in reply_content.flowAreas:
                    if flow_area.diagnosticAreas:
                        collected_diagnostic_areas.extend(flow_area.diagnosticAreas)
    finally:
        ws_client.suppress_recv_logging = False
        ws_parser.suppress_recv_logging = False

    return collected_diagnostic_areas


def get_leak_diagnostic_area_samples(
    collected_diagnostic_areas: list,
    leak_diagnostic_area_name: str,
    total_wait: int,
) -> list:
    """
    Проверяет наличие diagnosticAreas и возвращает подмножество
    для ДУ с заданным leak_diagnostic_area_id. Падает, если данные не найдены.
    """
    if not collected_diagnostic_areas:
        fail(f"За {total_wait} секунд не пришло ни одной diagnosticArea в BalanceAlgorithmResultsContent")

    leak_diagnostic_area_samples = [
        diagnostic_area
        for diagnostic_area in collected_diagnostic_areas
        if diagnostic_area.name == leak_diagnostic_area_name
    ]
    if not leak_diagnostic_area_samples:
        fail(f"За {total_wait} секунд не пришло ни одного сообщения для ДУ с name={leak_diagnostic_area_name}.")
    return leak_diagnostic_area_samples