Загрузка данных


def _ws_message_event_type(msg: Any) -> str:
    if isinstance(msg, list) and len(msg) > WS_Const.EVENT_TYPE_INDEX:
        return str(msg[WS_Const.EVENT_TYPE_INDEX])
    return type(msg).__name__


def _ws_message_invocation_id(msg: Any) -> str:
    if isinstance(msg, list) and len(msg) > WS_Const.INVOCATION_ID_INDEX:
        return str(msg[WS_Const.INVOCATION_ID_INDEX])
    return "-"


def _pformat_for_allure(obj: Any) -> str:
    return pprint.pformat(obj, width=120, sort_dicts=False)


def _dto_to_debug_dict(obj: Any) -> Any:
    if is_dataclass(obj):
        return {field.name: _dto_to_debug_dict(getattr(obj, field.name)) for field in fields(obj)}
    if isinstance(obj, Enum):
        return f"{obj.name}({obj.value})"
    if isinstance(obj, UUID):
        return str(obj)
    if isinstance(obj, datetime):
        return obj.isoformat()
    if isinstance(obj, list):
        return [_dto_to_debug_dict(item) for item in obj]
    if isinstance(obj, dict):
        return {key: _dto_to_debug_dict(value) for key, value in obj.items()}
    return obj


def _format_ws_message_dto(msg: Any, parser, event_type: str) -> Optional[str]:
    if not isinstance(msg, list):
        return None

    if event_type == ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION:
        try:
            dto = parser.parse_report_data_exported_notification_msg(msg)
            return _pformat_for_allure(_dto_to_debug_dict(dto))
        except (ValueError, TypeError, KeyError, AttributeError) as error:
            return f"Ошибка парсинга DTO ({event_type}): {error}"

    if parser._find_reply_status_in_ws_msg(msg) is None:
        return None

    for parse_fn in (
        parser.parse_exported_files_list_msg,
        parser.parse_download_exported_data_msg,
    ):
        try:
            dto = parse_fn(msg)
            debug_dict = _dto_to_debug_dict(dto)
            reply_content = debug_dict.get("replyContent")
            if isinstance(reply_content, dict) and isinstance(reply_content.get("fileChunk"), (bytes, bytearray)):
                reply_content["fileChunk"] = f"<bytes len={len(reply_content['fileChunk'])}>"
            return _pformat_for_allure(debug_dict)
        except (ValueError, TypeError, KeyError, AttributeError):
            continue

    return None


def _attach_readable_ws_messages(
    collected_messages: List[Any],
    parser,
    summary_title: str,
    summary_lines: List[str],
    attachment_prefix: str = "WS",
) -> None:
    """Сводка + отдельный attach на каждое сообщение (frame и DTO при возможности парсинга)."""
    allure.attach(
        "\n".join(summary_lines),
        name=summary_title,
        attachment_type=allure.attachment_type.TEXT,
    )
    for index, msg in enumerate(collected_messages):
        event_type = _ws_message_event_type(msg)
        allure.attach(
            _pformat_for_allure(msg),
            name=f"{attachment_prefix} [{index}] frame — {event_type}",
            attachment_type=allure.attachment_type.TEXT,
        )
        dto_text = _format_ws_message_dto(msg, parser, event_type)
        if dto_text:
            allure.attach(
                dto_text,
                name=f"{attachment_prefix} [{index}] DTO — {event_type}",
                attachment_type=allure.attachment_type.TEXT,
            )


def _report_notification_reject_reason(notification: ReportDataExportedNotification) -> Optional[str]:
    if notification.replyStatus != ReplyStatus.OK.value:
        return f"replyStatus={notification.replyStatus} (ожидали {ReplyStatus.OK.value})"
    if notification.replyContent is None:
        return "replyContent=None"
    if notification.replyContent.exportStatus != ExportStatus.DONE:
        return (
            f"exportStatus={notification.replyContent.exportStatus} "
            f"(ожидали {ExportStatus.DONE})"
        )
    return None


def _attach_report_export_poll_failure(
    collected_messages: List[Any],
    parser,
    notification_type: str,
    total_wait_seconds: float,
    debug_context: Optional[str] = None,
) -> None:
    """Прикрепляет к Allure сводку и сырые WS-сообщения при таймауте ожидания нотификации."""
    summary_lines = [
        f"Таймаут ожидания: {total_wait_seconds} с",
        f"Ожидаемый тип сообщения: {notification_type}",
        f"Всего сообщений за период поллинга: {len(collected_messages)}",
    ]
    if debug_context:
        summary_lines.append(f"Контекст запроса формирования отчёта: {debug_context}")

    if not collected_messages:
        summary_lines.append("")
        summary_lines.append("За время ожидания не получено ни одного WS-сообщения.")
    else:
        type_counts: dict[str, int] = {}
        notification_candidates = 0
        summary_lines.append("")
        summary_lines.append("Сводка по сообщениям:")
        for index, msg in enumerate(collected_messages):
            event_type = _ws_message_event_type(msg)
            type_counts[event_type] = type_counts.get(event_type, 0) + 1
            line = (
                f"[{index}] type={event_type!r} "
                f"invocation_id={_ws_message_invocation_id(msg)!r}"
            )
            if isinstance(msg, list) and is_desired_type(msg, notification_type):
                notification_candidates += 1
                try:
                    notification = parser.parse_report_data_exported_notification_msg(msg)
                    reject_reason = _report_notification_reject_reason(notification)
                    if reject_reason:
                        line += f" -> распознано, но отклонено: {reject_reason}"
                    else:
                        line += " -> распознано как валидная нотификация (не должно попасть сюда)"
                except (ValueError, TypeError, KeyError) as error:
                    line += f" -> ошибка парсинга: {error}"
            summary_lines.append(line)

        summary_lines.append("")
        summary_lines.append(
            f"Сообщений типа {notification_type!r}: {notification_candidates}"
        )
        summary_lines.append("Распределение по type:")
        for event_type, count in sorted(type_counts.items(), key=lambda item: item[0]):
            summary_lines.append(f"  {event_type}: {count}")

    _attach_readable_ws_messages(
        collected_messages=collected_messages,
        parser=parser,
        summary_title="Диагностика: ожидание ReportDataExportedNotification",
        summary_lines=summary_lines,
        attachment_prefix="WS notification poll",
    )


def _attach_exported_files_poll_failure(
    collected_messages: List[Any],
    parser,
    total_wait_seconds: float,
    tu_id: int,
    expected_data_type: Any,
    name_substring: str,
    period_start: datetime,
    period_end: datetime,
    last_invocation_id: Optional[str],
    debug_context: Optional[str] = None,
) -> None:
    """Прикрепляет к Allure сводку при таймауте лонг-поллинга списка файлов."""
    summary_lines = [
        f"Таймаут ожидания: {total_wait_seconds} с",
        f"tuId={tu_id}, тип отчёта={expected_data_type}, подстрока имени={name_substring!r}",
        f"период: {period_start} — {period_end}",
        f"Последний invocation_id запроса списка: {last_invocation_id}",
        f"Всего WS-сообщений за период: {len(collected_messages)}",
    ]
    if debug_context:
        summary_lines.append(f"Контекст: {debug_context}")

    if not collected_messages:
        summary_lines.append("За время ожидания не получено ни одного WS-сообщения.")
    else:
        summary_lines.append("")
        for index, msg in enumerate(collected_messages):
            summary_lines.append(
                f"[{index}] type={_ws_message_event_type(msg)!r} "
                f"invocation_id={_ws_message_invocation_id(msg)!r}"
                + (
                    " <- совпадает с последним запросом списка"
                    if last_invocation_id
                    and isinstance(msg, list)
                    and is_desired_invocation_id(msg, last_invocation_id)
                    else ""
                )
            )

    _attach_readable_ws_messages(
        collected_messages=collected_messages,
        parser=parser,
        summary_title="Диагностика: лонг-поллинг getExportedFilesListRequest",
        summary_lines=summary_lines,
        attachment_prefix="WS files list poll",
    )


def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
    """Забирает все сообщения из очереди ws без блокирующего receive_by_* (ping не теряются)."""
    messages: List[Any] = []
    while not ws_client.recv_queue.empty():
        try:
            messages.append(ws_client.recv_queue.get_nowait())
        except asyncio.QueueEmpty:
            break
    return messages


def _find_valid_report_export_notification(
    messages: List[Any],
    parser,
    notification_type: str,
) -> Optional[ReportDataExportedNotification]:
    """Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
    for msg in messages:
        if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
            continue
        try:
            notification = parser.parse_report_data_exported_notification_msg(msg)
        except (ValueError, TypeError, KeyError):
            continue
        if notification.replyStatus != ReplyStatus.OK.value:
            continue
        content: Optional[ReportDataExportedContent] = notification.replyContent
        if content is None:
            continue
        if content.exportStatus != ExportStatus.DONE:
            continue
        return notification
    return None


async def poll_for_report_export_notification(
    ws_client: WebSocketClient,
    parser,
    total_wait_seconds: float,
    poll_interval_seconds: float,
    debug_context: Optional[str] = None,
) -> Optional[Any]:
    """
    Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
    При таймауте прикрепляет к Allure все полученные за период сообщения.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)
            notification = _find_valid_report_export_notification(
                batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
            )
            if notification is not None:
                return notification
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_report_export_poll_failure(
        collected_messages=collected_messages,
        parser=parser,
        notification_type=ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
        total_wait_seconds=total_wait_seconds,
        debug_context=debug_context,
    )
    return None


async def poll_for_exported_file(
    ws_client: WebSocketClient,
    parser,
    tu_id: int,
    expected_data_type: Any,
    name_substring: str,
    period_start: datetime,
    period_end: datetime,
    total_wait_seconds: float,
    poll_interval_seconds: float,
    debug_context: Optional[str] = None,
) -> Optional[Any]:
    """
    Периодически шлёт getExportedFilesListRequest, забирает ответы из очереди
    по invocation_id среди всех накопленных сообщений.
    При таймауте прикрепляет к Allure все полученные за период сообщения.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    last_items_count = -1
    collected_messages: List[Any] = []
    last_invocation_id: Optional[str] = None
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            drained_before_request = _drain_recv_queue(ws_client)
            collected_messages.extend(drained_before_request)
            await connect(
                ws_client,
                ReportConst.GET_EXPORTED_FILES_LIST_REQUEST,
                {"tuId": tu_id, "additionalProperties": None},
            )
            invocation_id = ws_client.invocation_id
            last_invocation_id = invocation_id
            await asyncio.sleep(poll_interval_seconds)

            list_reply_payload = None
            for msg in _drain_recv_queue(ws_client):
                collected_messages.append(msg)
                if isinstance(msg, list) and is_desired_invocation_id(msg, invocation_id):
                    list_reply_payload = msg
                    break

            if list_reply_payload is None:
                continue

            parsed_payload = parser.parse_exported_files_list_msg(list_reply_payload)
            items = []
            if parsed_payload.replyContent is not None:
                items = parsed_payload.replyContent.exportedData or []

            if len(items) != last_items_count:
                allure.attach(
                    "\n".join(
                        f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
                        f"start={item.start}, end={item.end}"
                        for item in items
                    ),
                    name=f"Список сформированных файлов (попытка, всего: {len(items)})",
                    attachment_type=allure.attachment_type.TEXT,
                )
                last_items_count = len(items)

            match = find_matching_exported_item(
                items=items,
                expected_data_type=expected_data_type,
                name_substring=name_substring,
                period_start=period_start,
                period_end=period_end,
            )
            if match is not None:
                return match
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_exported_files_poll_failure(
        collected_messages=collected_messages,
        parser=parser,
        total_wait_seconds=total_wait_seconds,
        tu_id=tu_id,
        expected_data_type=expected_data_type,
        name_substring=name_substring,
        period_start=period_start,
        period_end=period_end,
        last_invocation_id=last_invocation_id,
        debug_context=debug_context,
    )
    return None