Загрузка данных


from __future__ import annotations

import asyncio
import random
import re
from dataclasses import asdict, is_dataclass
from datetime import datetime, timedelta, timezone
from enum import IntEnum, IntFlag
from typing import Any, List, Optional, Set, Type, TypeVar
from zoneinfo import ZoneInfo

import allure
from msgpack import Timestamp as MsgpackTimestamp
from pytest import fail

from clients.websocket_client import WebSocketClient
from constants.enums import (
    ConfirmationStatus,
    LdsStatus,
    LdsStatusDegradation,
    LdsStatusFaulty,
    LdsStatusInitialization,
    LeakStatus,
    StationaryReason,
    StationaryStatus,
    StoppedPumpingReason,
    UnStationaryReason,
)
from constants.test_constants import BaseTN3Constants as TestConst
from models.get_messages_model import GetMessagesRequest
from models.subscribe_all_leaks_info_model import SubscribeAllLeaksInfoReply
from models.subscribe_common_scheme_model import DiagnosticArea, FlowArea
from models.subscribe_leaks_model import Leak
from models.subscribe_main_page_info_model import MainPageLeakInfo
from utils.helpers.ws_message_parser import ws_message_parser
from utils.msgpack_utils.message_filters import is_desired_type

ObjectType = TypeVar("ObjectType")  # создает типовую переменную для поиска объектов в списке
RandomObjectType = TypeVar("RandomObjectType")


def convert_leak_volume_m3(volume: float) -> float:
    """
    Преобразует объем утечки в м3/час
    """
    #  Округляет результат для читабельности
    return round(volume * TestConst.MASS_KG, 3)


def datetime_minus_seconds(datetime_obj: datetime, delta_s: int) -> datetime:
    """
    Вычитает время в секундах из datetime
    """
    return (datetime_obj - timedelta(seconds=delta_s)).replace(microsecond=0)


def calculate_leak_start_time(imitator_start_time: datetime, leak_interval_seconds: int) -> datetime:
    """
    Рассчитывает время начала утечки на основе времени старта имитатора.

    :param imitator_start_time: datetime объект времени старта имитатора
    :param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
    :return: datetime время ожидаемого начала утечки
    """
    if not imitator_start_time:
        fail("Пришло пустое значение imitator_start_time")
    return (imitator_start_time + timedelta(seconds=leak_interval_seconds)).replace(microsecond=0)


def calculate_leak_end_time(
    imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int
) -> datetime:
    """
    Рассчитывает крайнее время обнаружения утечки (с учётом допустимой погрешности).

    :param imitator_start_time: datetime объект времени старта имитатора
    :param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
    :param allowed_diff_seconds: допустимая погрешность времени обнаружения (ALLOWED_TIME_DIFF_SECONDS)
    :return: datetime крайнее время обнаружения утечки
    """
    if not imitator_start_time:
        fail("Пришло пустое значение imitator_start_time")
    total_seconds = leak_interval_seconds + allowed_diff_seconds
    return (imitator_start_time + timedelta(seconds=total_seconds)).replace(microsecond=0)


def get_leak_time_window(
    imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int, detected_at_tz=None
) -> tuple[datetime, datetime]:
    """
    Возвращает временное окно для проверки времени обнаружения утечки.

    :param imitator_start_time: datetime объект времени старта имитатора
    :param leak_interval_seconds: интервал от старта до утечки в секундах
    :param allowed_diff_seconds: допустимая погрешность времени обнаружения
    :param detected_at_tz: timezone из времени обнаружения утечки (опционально)
    :return: tuple (leak_start_time, leak_end_time) для использования в is_between проверке
    """
    leak_start = calculate_leak_start_time(imitator_start_time, leak_interval_seconds)
    leak_end = calculate_leak_end_time(imitator_start_time, leak_interval_seconds, allowed_diff_seconds)

    # Если передан timezone, применяем его к временам для корректного сравнения
    if detected_at_tz:
        leak_start = leak_start.replace(tzinfo=detected_at_tz)
        leak_end = leak_end.replace(tzinfo=detected_at_tz)

    return leak_start, leak_end


def ensure_moscow_timezone(input_datetime: datetime) -> None | datetime:
    """
    Конвертирует datetime в московское время, если оно не в московской таймзоне.

    :param input_datetime: datetime объект
    :return: datetime в московской таймзоне
    """
    if input_datetime is None:
        return input_datetime

    # Если datetime без timezone - считаем что это UTC
    if input_datetime.tzinfo is None:
        input_datetime = input_datetime.replace(tzinfo=timezone.utc)

    # Конвертируем в московское время
    return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))


def localize_as_moscow(input_datetime: datetime) -> None | datetime:
    """
    Присваивает datetime московский часовой пояс без сдвига времени.
    Если datetime уже имеет timezone - конвертирует в московское время.
    """
    if input_datetime is None:
        return input_datetime

    moscow_tz = ZoneInfo(TestConst.ZONE_INFO)
    if input_datetime.tzinfo is None:
        return input_datetime.replace(tzinfo=moscow_tz)
    return input_datetime.astimezone(moscow_tz)


def get_rejection_time_window(
    imitator_start_time: datetime,
    start_seconds: int | float,
    reserve_seconds: int | float = 0,
) -> tuple[datetime, datetime]:
    """
    Возвращает временное окно для проверки сообщения об отбраковке.
    """
    imitator_msk = localize_as_moscow(imitator_start_time)
    range_start = imitator_msk + timedelta(seconds=start_seconds - reserve_seconds)
    range_end = localize_as_moscow(datetime.now())
    return range_start, range_end


def find_rejection_journal_message(
    messages_info: List[ObjectType],
    tag: str,
    range_start: datetime,
    range_end: datetime,
    technological_section: str,
    expected_event: str,
) -> tuple[list[ObjectType], ObjectType | None]:
    """
    Фильтрует сообщения журнала по tag и временному диапазону,
    затем ищет целевое сообщение по technologicalSection и event.
    """
    time_filtered = [
        msg for msg in messages_info if msg.tag == tag and range_start <= ensure_moscow_timezone(msg.time) <= range_end
    ]
    time_filtered.sort(key=lambda msg: ensure_moscow_timezone(msg.time), reverse=True)

    target_msg = next(
        (
            msg
            for msg in time_filtered
            if msg.technologicalSection == technological_section and msg.event.rstrip() == expected_event
        ),
        None,
    )
    return time_filtered, target_msg


def get_random_item(item_list: List[RandomObjectType]) -> RandomObjectType:
    """
    Получает случайный объект из списка
    """
    if not item_list:
        fail("Пустой список объектов")
    try:
        return random.choice(item_list)
    except (TypeError, ValueError):
        fail(f"Не удалось получить случайный элемент из списка: {item_list}")


def get_longest_flow_area(flow_areas: List[FlowArea]) -> FlowArea:
    """
    Получает самый протяженный участок карты течения по количеству ДУ из списка всех участков
    """
    if not flow_areas:
        fail("Список flow_areas пустой")
    try:
        longest_flow_area = max(flow_areas, key=lambda flow_area: len(flow_area.diagnosticAreas))
        return longest_flow_area
    except (TypeError, ValueError):
        fail(f"Не найден протяженный участок из списка flow_areas: {flow_areas}.")


def determine_lds_status_by_priority(lds_status_set: Set[int]) -> int:
    """
    Определяет режим работы СОУ по приоритету и наличию режимов работы у ДУ на самом протяженном участки карты течений
    """
    lds_status_priority = [
        LdsStatus.FAULTY.value,
        LdsStatus.INITIALIZATION.value,
        LdsStatus.DEGRADATION.value,
        LdsStatus.SERVICEABLE.value,
    ]
    if not lds_status_set:
        fail("Пустой список режимов СОУ ДУ")
    try:
        for status in lds_status_priority:
            if status in lds_status_set:
                return status
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        fail("Не удалось определить режим работу СОУ.")


def find_signal_type_by_address_suffix(signals_list: list, address_suffix: str) -> int:
    """
    Ищет в списке сигналов тип сигнала по части адреса
    """
    if not signals_list:
        fail("Пустой список сигналов")
    try:
        for sensor_signal in signals_list:
            if sensor_signal.address is not None and str(sensor_signal.address).endswith(address_suffix):
                return sensor_signal.signalType
        fail(f"Не найден тип сигнала по части адреса: {address_suffix} из списка: {signals_list}")
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        fail(f"Не найден тип сигнала по части адреса: {address_suffix} из списка: {signals_list}")


def find_signal_val_by_signal_type(signals_list: list, signal_type: int) -> str:
    """
    Ищет в списке сигналов значение сигнала по типу
    """
    if not signals_list:
        fail("Пустой список сигналов")
    try:
        for sensor_signal in signals_list:
            if sensor_signal.signalType is not None and sensor_signal.signalType == signal_type:
                return sensor_signal.value
        fail(f"Не найдено значение для типа сигнала: {signal_type} из списка: {signals_list}")
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        fail(f"Не найдено значение для типа сигнала: {signal_type} из списка: {signals_list}")


def find_object_by_field(item_list: List[ObjectType], field_name: str, value: Any) -> ObjectType:
    """
    Ищет объект в списке объектов по значению одного из полей объекта
    """
    if not item_list:
        fail("Список объектов пуст")
    try:
        return next((item for item in item_list if getattr(item, field_name) == value))
    except Exception:
        fail(f"Не найдено значение: {value} для поля: {field_name}, в списке: {item_list}.")


def find_confirmed_leaks(item_list: List[Leak]) -> List[Leak]:
    """Ищет подтвержденные утечки"""
    try:
        return [
            item
            for item in item_list
            if item.confirmationStatus == ConfirmationStatus.CONFIRMED.value and item.detectedAt is not None
        ]
    except (AttributeError, KeyError, TypeError, ValueError):
        return []


def find_confirmed_leaks_on_main_page(item_list: List[MainPageLeakInfo]) -> List[MainPageLeakInfo]:
    """Ищет подтвержденные утечки"""
    try:
        return [
            item
            for item in item_list
            if item.leakStatus == LeakStatus.CONFIRMED.value and item.leakDetectedAt is not None
        ]
    except (AttributeError, KeyError, TypeError, ValueError):
        return []


def find_diagnostic_area_by_id(flow_areas: List[FlowArea], id_value: int) -> Optional[DiagnosticArea]:
    """
    Ищет ДУ по id в списке участков карты течений, исключает дубликаты по количеству pipeIds
    """
    candidates = []
    if not flow_areas:
        return None
    try:
        for flow_area in flow_areas:
            for diagnostic_area in flow_area.diagnosticAreas:
                if diagnostic_area.id == id_value:
                    candidates.append(diagnostic_area)
        if not candidates:
            return None
        elif len(candidates) == 1:
            return candidates[0]
        else:
            # Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
            return max(candidates, key=lambda candidate: len(candidate.pipeIds))
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        return None


def find_diagnostic_area_by_pipe_id(flow_areas: List[FlowArea], pipe_id: int) -> Optional[DiagnosticArea]:
    """
    Ищет ДУ по pipe id в списке участков карты течений, исключает дубликаты по количеству pipeIds
    """
    candidates = []
    if not flow_areas:
        return None
    try:
        for flow_area in flow_areas:
            for diagnostic_area in flow_area.diagnosticAreas:
                if diagnostic_area.pipeIds and pipe_id in diagnostic_area.pipeIds:
                    candidates.append(diagnostic_area)
        if not candidates:
            return None
        elif len(candidates) == 1:
            return candidates[0]
        else:
            # Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
            return max(candidates, key=lambda candidate: len(candidate.pipeIds))
    except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
        return None


def find_diagnostic_areas_by_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
    """
    Получает список ДУ из списка flow_areas по списку id
    """
    diagnostic_areas = [
        find_diagnostic_area_by_id(flow_areas, diagnostic_area_id)
        for diagnostic_area_id in id_list
        if find_diagnostic_area_by_id(flow_areas, diagnostic_area_id) is not None
    ]
    return diagnostic_areas


def find_diagnostic_areas_by_pipe_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
    """
    Получает список ДУ из списка flow_areas по списку pipe id
    """
    diagnostic_areas = [
        find_diagnostic_area_by_pipe_id(flow_areas, pipe_id)
        for pipe_id in id_list
        if find_diagnostic_area_by_id(flow_areas, pipe_id) is not None
    ]
    return diagnostic_areas


def find_base_diagnostic_areas(flow_areas: List[FlowArea]) -> List[DiagnosticArea]:
    """
    Получает список базовых ДУ из списка flow_areas
    """
    return find_diagnostic_areas_by_ids(flow_areas, TestConst.DIAGNOSTIC_AREA_BASE_IDS)


def find_representative_diagnostic_areas(flow_areas: List[FlowArea]) -> List[DiagnosticArea]:
    """
    Получает список показательных ДУ из списка flow_areas, для проверки режимов СОУ
    """
    return find_diagnostic_areas_by_ids(flow_areas, TestConst.REPRESENTATIVE_DIAGNOSTIC_AREA_IDS)


def find_leak_by_coordinate(
    leaks_list: List[ObjectType], expected_coordinate: float, tolerance: float = TestConst.ALLOWED_DISTANCE_DIFF_METERS
) -> ObjectType:
    """
    Ищет утечку в списке по координатам с допустимой погрешностью
    поднимает pytest.fail если список пуст или утечка не найдена
    """
    if not leaks_list:
        fail("Список утечек пуст")

    for leak in leaks_list:
        leak_coordinate = getattr(leak, "leakCoordinate")
        if leak_coordinate is None or "":
            continue
        if abs(leak_coordinate - expected_coordinate) <= tolerance:
            return leak

    fail(
        f"Не найдена утечка с координатой {expected_coordinate} +- {tolerance} м"
        f"Список полученных утечек: {leaks_list}"
    )


def to_moscow_timezone(date_str: str) -> datetime:
    """
    Преобразует строку времени в московское время
    """
    if not date_str or not date_str.strip():
        fail("Пришло пустое значение для преобразования в московское время")

    try:
        if date_str.startswith(("'", '"', '')) or date_str.endswith(("'", '"', '')):
            date_str = date_str.strip().strip("'").strip('"')

        date_utc = datetime.strptime(date_str, TestConst.OUTPUT_TIME_FORMAT).replace(tzinfo=timezone.utc)
        return date_utc.astimezone(ZoneInfo(TestConst.ZONE_INFO))

    except (AttributeError, TypeError, ValueError):
        fail(f"Не удалось преобразовать время в московское: {date_str}.")


def create_dict_from_dataclass(cls: Type, **kwargs) -> dict:
    """Создает словарь из экземпляра dataclass c нужными параметрами"""
    if not is_dataclass(cls):
        fail(f"{cls} не dataclass")
    instance = cls(**kwargs)
    return asdict(instance)


def datetime_to_msgpack_timestamp(dt: datetime) -> list:
    """Конвертирует datetime в формат [Timestamp(seconds, nanoseconds), tz_offset] для отправки на бэкенд."""
    return [MsgpackTimestamp(seconds=int(dt.timestamp()), nanoseconds=0), 0]


def create_journal_req_body(**kwargs) -> dict:
    """Создает дефолтные параметры запроса к журналу"""
    result = create_dict_from_dataclass(GetMessagesRequest, **kwargs)
    period = result.get('periodTime')
    if period:
        for key in ('start', 'end'):
            if isinstance(period.get(key), datetime):
                period[key] = datetime_to_msgpack_timestamp(period[key])
    return result


def parse_journal_msg_value(value: str) -> tuple:
    """Парсит поле value в сообщении журнала"""
    try:
        # ищет группы цифр с точкой в строке
        matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
        coordinate, volume = (matches + [None, None])[:2]
        if coordinate is not None:
            try:
                coordinate = float(coordinate)
            except ValueError:
                coordinate = None
        if volume is not None:
            try:
                volume = float(volume)
            except ValueError:
                volume = None
        return coordinate, volume
    except (AttributeError, TypeError, ValueError):
        return None, None


def parse_bit_flags(
    value: int, enum_cls: Type[IntEnum | IntFlag], failures: Optional[List[str]] = None
) -> List[IntFlag]:
    """
    Распаковка битовых флагов
    """
    # 0 - это валидное состояние когда причин нет, в прошлой реализации тест бы падал если причин нет
    # хотя это может быть ожидаемо, например при тестировании исправности или где-нибудь еще
    if value == 0:
        return []

    found_flags = [flag for flag in enum_cls if value & flag.value]
    known_bits = sum(flag.value for flag in found_flags)

    if known_bits != value:
        unknown_bits = value ^ known_bits
        error_message = f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}"
        if failures is not None:
            failures.append(error_message)
        else:
            fail(f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}")

    # та же сортировка только не цифры, а их текстовое значение
    return sorted(found_flags, key=lambda flag: flag.value)


def get_reason_enum_by_lds_status(lds_status: int | LdsStatus, failures: Optional[List[str]] = None) -> Type[IntFlag]:
    """
    Получение класса причин по режимам СОУ
    """

    if isinstance(lds_status, int):
        try:
            lds_status = LdsStatus(lds_status)
        except ValueError:
            error_message = f"Неизвестный LdsStatus: {lds_status}"
            if failures is not None:
                failures.append(error_message)
            else:
                fail(error_message)

    reason_by_lds_status = {
        LdsStatus.FAULTY: LdsStatusFaulty,
        LdsStatus.INITIALIZATION: LdsStatusInitialization,
        LdsStatus.DEGRADATION: LdsStatusDegradation,
    }
    enum_class = reason_by_lds_status.get(lds_status)
    if enum_class is None:
        error_message = f"Для LdsStatus{lds_status.name} не определены причины"
        if failures is not None:
            failures.append(error_message)
        else:
            fail(error_message)
    return enum_class


def get_reason_enum_by_stationary_status(
    stationary_status: int | StationaryStatus, failures: Optional[List[str]] = None
) -> Type[IntFlag]:
    """
    Получение класса причин по режимам МТ
    """

    if isinstance(stationary_status, int):
        try:
            stationary_status = StationaryStatus(stationary_status)
        except ValueError:
            error_message = f"Неизвестный StationaryStatus: {stationary_status}"
            if failures is not None:
                failures.append(error_message)
            else:
                fail(error_message)

    reason_by_stationary_status = {
        StationaryStatus.STATIONARY: StationaryReason,
        StationaryStatus.UNSTATIONARY: UnStationaryReason,
        StationaryStatus.STOPPED: StoppedPumpingReason,
    }
    enum_class = reason_by_stationary_status.get(stationary_status)
    if enum_class is None:
        error_message = f"Для StationaryStatus{stationary_status.name} не определены причины"
        if failures is not None:
            failures.append(error_message)
        else:
            fail(error_message)
    return enum_class


def parse_lds_status_reasons(lds_status: int, lds_status_reasons: int, failures: Optional[List[str]] = None):
    """
    Получение списка ldsStatusReasons, соответствующего ldsStatus
    """
    enum_cls = get_reason_enum_by_lds_status(lds_status, failures)
    flags = parse_bit_flags(lds_status_reasons, enum_cls, failures)
    return flags


def parse_stationary_status_reasons(
    stationary_status: int, stationary_status_reasons: int, failures: Optional[List[str]] = None
):
    """
    Получение списка stationaryStatusReasons, соответствующего stationaryStatus
    """
    enum_cls = get_reason_enum_by_lds_status(stationary_status, failures)
    flags = parse_bit_flags(stationary_status_reasons, enum_cls, failures)
    return flags


async def connect(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> None:
    """
    Подключение к заданной подписке
    """
    try:
        with allure.step(f"Вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
            await ws_client.invoke(ws_invoke_type, ws_invoke_params)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")


async def connect_and_get_parsed_msg_by_tu_id(
    tu_id: int,
    ws_client: WebSocketClient,
    ws_message_type: str,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> SubscribeAllLeaksInfoReply:
    """
    Подключается, ищет и парсит allLeaksInfo сообщение для конкретного ТУ
    """
    await connect(ws_client, ws_invoke_type, ws_invoke_params)

    async def get_parsed_msg():
        """
        Ищет и парсит allLeaksInfo сообщение для конкретного ТУ
        """
        while True:
            payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
            parsed_payload = ws_message_parser.parse_all_leaks_info_msg(payload)
            #  Ищет сообщение с нужным ТУ
            if parsed_payload.replyContent.tuId == tu_id:
                return parsed_payload

    try:
        with allure.step(f"Получение сообщения с контентом типа: {ws_message_type} для ТУ {tu_id}"):
            return await asyncio.wait_for(get_parsed_msg(), timeout=timeout)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось получить сообщение allLeaksInfo для ТУ {tu_id}. Ошибка: {error}")


async def connect_and_get_msg(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> list:
    """
    Подключение типа get к заданной подписке и получение сообщения с заданным типом контента
    """
    await connect(ws_client, ws_invoke_type, ws_invoke_params)
    invocation_id = ws_client.invocation_id

    try:
        with allure.step(f"Получение входящего сообщения c invocation_id: {invocation_id}"):
            payload = await ws_client.receive_by_invocation_id(invocation_id)
        return payload
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")


async def connect_and_subscribe_msg(
    ws_client: WebSocketClient,
    ws_message_type: str,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> list:
    """
    Подключение типа subscribe к заданной подписке и получение сообщения с заданным типом контента
    """
    await connect(ws_client, ws_invoke_type, ws_invoke_params)
    try:
        with allure.step(f"Получение сообщения с контентом типа: {ws_message_type}"):
            payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)

        return payload
    except (asyncio.TimeoutError, OSError, ConnectionError, ConnectionResetError) as error:
        fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")


async def poll_balance_algorithm_diagnostic_areas(
    ws_client: WebSocketClient,
    ws_parser: ws_message_parser,
    imitator_start_time: datetime,
    end_time: datetime,
    poll_interval: float,
) -> list:
    """
    Опрашивает очередь ws_client на наличие BalanceAlgorithmResultsContent,
    собирает и возвращает все diagnosticAreas из flowAreas.
    """
    collected_diagnostic_areas = []
    ws_client.suppress_recv_logging = True
    ws_parser.suppress_recv_logging = True
    try:
        while datetime.now(tz=imitator_start_time.tzinfo) < end_time:
            await asyncio.sleep(poll_interval)

            latest_msg = None
            while not ws_client.recv_queue.empty():
                try:
                    msg = ws_client.recv_queue.get_nowait()
                except asyncio.QueueEmpty:
                    break
                if isinstance(msg, list) and is_desired_type(msg, "BalanceAlgorithmResultsContent"):
                    latest_msg = msg

            if latest_msg is None:
                continue

            parsed_payload = ws_message_parser.parse_balance_algorithm_msg(latest_msg)
            reply_content = parsed_payload.replyContent
            if reply_content and reply_content.flowAreas:
                for flow_area in reply_content.flowAreas:
                    if flow_area.diagnosticAreas:
                        collected_diagnostic_areas.extend(flow_area.diagnosticAreas)
    finally:
        ws_client.suppress_recv_logging = False
        ws_parser.suppress_recv_logging = False

    return collected_diagnostic_areas


def get_leak_diagnostic_area_samples(
    collected_diagnostic_areas: list,
    leak_diagnostic_area_name: str,
    total_wait: int,
) -> list:
    """
    Проверяет наличие diagnosticAreas и возвращает подмножество
    для ДУ с заданным leak_diagnostic_area_id. Падает, если данные не найдены.
    """
    if not collected_diagnostic_areas:
        fail(f"За {total_wait} секунд не пришло ни одной diagnosticArea в BalanceAlgorithmResultsContent")

    leak_diagnostic_area_samples = [
        diagnostic_area
        for diagnostic_area in collected_diagnostic_areas
        if diagnostic_area.name == leak_diagnostic_area_name
    ]
    if not leak_diagnostic_area_samples:
        fail(f"За {total_wait} секунд не пришло ни одного сообщения для ДУ с name={leak_diagnostic_area_name}.")
    return leak_diagnostic_area_samples