Загрузка данных


    return leak_diagnostic_area_samples


def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
    """Забирает все сообщения из очереди ws без блокирующего receive_by_* (ping не теряются)."""
    messages: List[Any] = []
    while not ws_client.recv_queue.empty():
        try:
            messages.append(ws_client.recv_queue.get_nowait())
        except asyncio.QueueEmpty:
            break
    return messages


def _find_valid_report_export_notification(
    messages: List[Any],
    parser,
    notification_type: str,
) -> Optional[Any]:
    """Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
    from constants.enums import ExportStatus, ReplyStatus

    for msg in messages:
        if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
            continue
        try:
            notification = parser.parse_report_data_exported_notification_msg(msg)
        except (ValueError, TypeError, KeyError):
            continue
        if (
            notification.replyStatus == ReplyStatus.OK.value
            and notification.replyContent is not None
            and notification.replyContent.exportStatus == ExportStatus.DONE
        ):
            return notification
    return None


async def poll_for_report_export_notification(
    ws_client: WebSocketClient,
    parser,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
    """
    from models.export_reports_model import REPORT_DATA_EXPORTED_NOTIFICATION

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            notification = _find_valid_report_export_notification(
                batch, parser, REPORT_DATA_EXPORTED_NOTIFICATION
            )
            if notification is not None:
                return notification
    finally:
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    return None


async def poll_for_exported_file(
    ws_client: WebSocketClient,
    parser,
    tu_id: int,
    expected_data_type: Any,
    name_substring: str,
    period_start: datetime,
    period_end: datetime,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Периодически шлёт getExportedFilesListRequest, забирает ответы из очереди
    по invocation_id среди всех накопленных сообщений.
    """
    from models.get_exported_files_list_model import GET_EXPORTED_FILES_LIST_REQUEST

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    last_items_count = -1
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            _drain_recv_queue(ws_client)
            await connect(
                ws_client,
                GET_EXPORTED_FILES_LIST_REQUEST,
                {"tuId": tu_id, "additionalProperties": None},
            )
            invocation_id = ws_client.invocation_id
            await asyncio.sleep(poll_interval_seconds)

            list_reply_payload = None
            for msg in _drain_recv_queue(ws_client):
                if isinstance(msg, list) and is_desired_invocation_id(msg, invocation_id):
                    list_reply_payload = msg
                    break

            if list_reply_payload is None:
                continue

            parsed_payload = parser.parse_exported_files_list_msg(list_reply_payload)
            items = []
            if parsed_payload.replyContent is not None:
                items = parsed_payload.replyContent.exportedData or []

            if len(items) != last_items_count:
                allure.attach(
                    "\n".join(
                        f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
                        f"start={item.start}, end={item.end}"
                        for item in items
                    ),
                    name=f"Список сформированных файлов (попытка, всего: {len(items)})",
                    attachment_type=allure.attachment_type.TEXT,
                )
                last_items_count = len(items)

            match = find_matching_exported_item(
                items=items,
                expected_data_type=expected_data_type,
                name_substring=name_substring,
                period_start=period_start,
                period_end=period_end,
            )
            if match is not None:
                return match
    finally:
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    return None


def _normalize_report_period_datetime(value: datetime) -> datetime: