Загрузка данных



def _attach_ws_poll_failure(
    collected_messages: List[Any],
    total_wait_seconds: float,
    expected_message_type: str,
) -> None:
    """Краткая сводка и pprint каждого WS-сообщения при таймауте поллинга."""
    allure.attach(
        "\n".join(
            [
                f"Таймаут ожидания: {total_wait_seconds} с",
                f"Ожидаемый тип сообщения: {expected_message_type}",
                f"Всего сообщений за период поллинга: {len(collected_messages)}",
            ]
        ),
        name="WS poll timeout",
        attachment_type=allure.attachment_type.TEXT,
    )
    for msg in collected_messages:
        allure.attach(
            pprint.pformat(msg, width=120, sort_dicts=False),
            name="received ws message",
            attachment_type=allure.attachment_type.TEXT,
        )


def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
    """Забирает все сообщения из очереди ws без блокирующего receive_by_* (ping не теряются)."""
    messages: List[Any] = []
    while not ws_client.recv_queue.empty():
        try:
            messages.append(ws_client.recv_queue.get_nowait())
        except asyncio.QueueEmpty:
            break
    return messages


def _find_valid_report_export_notification(
    messages: List[Any],
    parser,
    notification_type: str,
) -> Optional[ReportDataExportedNotification]:
    """Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
    for msg in messages:
        if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
            continue
        try:
            notification = parser.parse_report_data_exported_notification_msg(msg)
        except (ValueError, TypeError, KeyError):
            continue
        if notification.replyStatus != ReplyStatus.OK.value:
            continue
        content: Optional[ReportDataExportedContent] = notification.replyContent
        if content is None:
            continue
        if content.exportStatus != ExportStatus.DONE:
            continue
        return notification
    return None


async def poll_for_report_export_notification(
    ws_client: WebSocketClient,
    parser,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
    При таймауте прикрепляет к Allure все полученные за период сообщения.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)
            notification = _find_valid_report_export_notification(
                batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
            )
            if notification is not None:
                return notification
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(
        collected_messages,
        total_wait_seconds,
        ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
    )
    return None


async def poll_for_exported_file(
    ws_client: WebSocketClient,
    parser,
    tu_id: int,
    expected_data_type: Any,
    name_substring: str,
    period_start: datetime,
    period_end: datetime,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Периодически шлёт getExportedFilesListRequest, забирает ответы из очереди
    по invocation_id среди всех накопленных сообщений.
    При таймауте прикрепляет к Allure все полученные за период сообщения.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    last_items_count = -1
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            drained_before_request = _drain_recv_queue(ws_client)
            collected_messages.extend(drained_before_request)
            await connect(
                ws_client,
                ReportConst.GET_EXPORTED_FILES_LIST_REQUEST,
                {"tuId": tu_id, "additionalProperties": None},
            )
            invocation_id = ws_client.invocation_id
            await asyncio.sleep(poll_interval_seconds)

            list_reply_payload = None
            for msg in _drain_recv_queue(ws_client):
                collected_messages.append(msg)
                if isinstance(msg, list) and is_desired_invocation_id(msg, invocation_id):
                    list_reply_payload = msg
                    break

            if list_reply_payload is None:
                continue

            parsed_payload = parser.parse_exported_files_list_msg(list_reply_payload)
            items = []
            if parsed_payload.replyContent is not None:
                items = parsed_payload.replyContent.exportedData or []

            if len(items) != last_items_count:
                allure.attach(
                    "\n".join(
                        f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
                        f"start={item.start}, end={item.end}"
                        for item in items
                    ),
                    name=f"Список сформированных файлов (попытка, всего: {len(items)})",
                    attachment_type=allure.attachment_type.TEXT,
                )
                last_items_count = len(items)

            match = find_matching_exported_item(
                items=items,
                expected_data_type=expected_data_type,
                name_substring=name_substring,
                period_start=period_start,
                period_end=period_end,
            )
            if match is not None:
                return match
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(
        collected_messages,
        total_wait_seconds,
        ReportConst.GET_EXPORTED_FILES_LIST_REQUEST,
    )
    return None