Загрузка данных
def _ws_message_event_type(msg: Any) -> str:
if isinstance(msg, list) and len(msg) > WS_Const.EVENT_TYPE_INDEX:
return str(msg[WS_Const.EVENT_TYPE_INDEX])
return type(msg).__name__
def _ws_message_invocation_id(msg: Any) -> str:
if isinstance(msg, list) and len(msg) > WS_Const.INVOCATION_ID_INDEX:
return str(msg[WS_Const.INVOCATION_ID_INDEX])
return "-"
def _pformat_for_allure(obj: Any) -> str:
return pprint.pformat(obj, width=120, sort_dicts=False)
def _dto_to_debug_dict(obj: Any) -> Any:
if is_dataclass(obj):
return {field.name: _dto_to_debug_dict(getattr(obj, field.name)) for field in fields(obj)}
if isinstance(obj, Enum):
return f"{obj.name}({obj.value})"
if isinstance(obj, UUID):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, list):
return [_dto_to_debug_dict(item) for item in obj]
if isinstance(obj, dict):
return {key: _dto_to_debug_dict(value) for key, value in obj.items()}
return obj
def _format_ws_message_dto(msg: Any, parser, event_type: str) -> Optional[str]:
if not isinstance(msg, list):
return None
if event_type == ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION:
try:
dto = parser.parse_report_data_exported_notification_msg(msg)
return _pformat_for_allure(_dto_to_debug_dict(dto))
except (ValueError, TypeError, KeyError, AttributeError) as error:
return f"Ошибка парсинга DTO ({event_type}): {error}"
if parser._find_reply_status_in_ws_msg(msg) is None:
return None
for parse_fn in (
parser.parse_exported_files_list_msg,
parser.parse_download_exported_data_msg,
):
try:
dto = parse_fn(msg)
debug_dict = _dto_to_debug_dict(dto)
reply_content = debug_dict.get("replyContent")
if isinstance(reply_content, dict) and isinstance(reply_content.get("fileChunk"), (bytes, bytearray)):
reply_content["fileChunk"] = f"<bytes len={len(reply_content['fileChunk'])}>"
return _pformat_for_allure(debug_dict)
except (ValueError, TypeError, KeyError, AttributeError):
continue
return None
def _attach_readable_ws_messages(
collected_messages: List[Any],
parser,
summary_title: str,
summary_lines: List[str],
attachment_prefix: str = "WS",
) -> None:
"""Сводка + отдельный attach на каждое сообщение (frame и DTO при возможности парсинга)."""
allure.attach(
"\n".join(summary_lines),
name=summary_title,
attachment_type=allure.attachment_type.TEXT,
)
for index, msg in enumerate(collected_messages):
event_type = _ws_message_event_type(msg)
allure.attach(
_pformat_for_allure(msg),
name=f"{attachment_prefix} [{index}] frame — {event_type}",
attachment_type=allure.attachment_type.TEXT,
)
dto_text = _format_ws_message_dto(msg, parser, event_type)
if dto_text:
allure.attach(
dto_text,
name=f"{attachment_prefix} [{index}] DTO — {event_type}",
attachment_type=allure.attachment_type.TEXT,
)
def _report_notification_reject_reason(notification: ReportDataExportedNotification) -> Optional[str]:
if notification.replyStatus != ReplyStatus.OK.value:
return f"replyStatus={notification.replyStatus} (ожидали {ReplyStatus.OK.value})"
if notification.replyContent is None:
return "replyContent=None"
if notification.replyContent.exportStatus != ExportStatus.DONE:
return (
f"exportStatus={notification.replyContent.exportStatus} "
f"(ожидали {ExportStatus.DONE})"
)
return None
def _attach_report_export_poll_failure(
collected_messages: List[Any],
parser,
notification_type: str,
total_wait_seconds: float,
debug_context: Optional[str] = None,
) -> None:
"""Прикрепляет к Allure сводку и сырые WS-сообщения при таймауте ожидания нотификации."""
summary_lines = [
f"Таймаут ожидания: {total_wait_seconds} с",
f"Ожидаемый тип сообщения: {notification_type}",
f"Всего сообщений за период поллинга: {len(collected_messages)}",
]
if debug_context:
summary_lines.append(f"Контекст запроса формирования отчёта: {debug_context}")
if not collected_messages:
summary_lines.append("")
summary_lines.append("За время ожидания не получено ни одного WS-сообщения.")
else:
type_counts: dict[str, int] = {}
notification_candidates = 0
summary_lines.append("")
summary_lines.append("Сводка по сообщениям:")
for index, msg in enumerate(collected_messages):
event_type = _ws_message_event_type(msg)
type_counts[event_type] = type_counts.get(event_type, 0) + 1
line = (
f"[{index}] type={event_type!r} "
f"invocation_id={_ws_message_invocation_id(msg)!r}"
)
if isinstance(msg, list) and is_desired_type(msg, notification_type):
notification_candidates += 1
try:
notification = parser.parse_report_data_exported_notification_msg(msg)
reject_reason = _report_notification_reject_reason(notification)
if reject_reason:
line += f" -> распознано, но отклонено: {reject_reason}"
else:
line += " -> распознано как валидная нотификация (не должно попасть сюда)"
except (ValueError, TypeError, KeyError) as error:
line += f" -> ошибка парсинга: {error}"
summary_lines.append(line)
summary_lines.append("")
summary_lines.append(
f"Сообщений типа {notification_type!r}: {notification_candidates}"
)
summary_lines.append("Распределение по type:")
for event_type, count in sorted(type_counts.items(), key=lambda item: item[0]):
summary_lines.append(f" {event_type}: {count}")
_attach_readable_ws_messages(
collected_messages=collected_messages,
parser=parser,
summary_title="Диагностика: ожидание ReportDataExportedNotification",
summary_lines=summary_lines,
attachment_prefix="WS notification poll",
)
def _attach_exported_files_poll_failure(
collected_messages: List[Any],
parser,
total_wait_seconds: float,
tu_id: int,
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
last_invocation_id: Optional[str],
debug_context: Optional[str] = None,
) -> None:
"""Прикрепляет к Allure сводку при таймауте лонг-поллинга списка файлов."""
summary_lines = [
f"Таймаут ожидания: {total_wait_seconds} с",
f"tuId={tu_id}, тип отчёта={expected_data_type}, подстрока имени={name_substring!r}",
f"период: {period_start} — {period_end}",
f"Последний invocation_id запроса списка: {last_invocation_id}",
f"Всего WS-сообщений за период: {len(collected_messages)}",
]
if debug_context:
summary_lines.append(f"Контекст: {debug_context}")
if not collected_messages:
summary_lines.append("За время ожидания не получено ни одного WS-сообщения.")
else:
summary_lines.append("")
for index, msg in enumerate(collected_messages):
summary_lines.append(
f"[{index}] type={_ws_message_event_type(msg)!r} "
f"invocation_id={_ws_message_invocation_id(msg)!r}"
+ (
" <- совпадает с последним запросом списка"
if last_invocation_id
and isinstance(msg, list)
and is_desired_invocation_id(msg, last_invocation_id)
else ""
)
)
_attach_readable_ws_messages(
collected_messages=collected_messages,
parser=parser,
summary_title="Диагностика: лонг-поллинг getExportedFilesListRequest",
summary_lines=summary_lines,
attachment_prefix="WS files list poll",
)
def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
"""Забирает все сообщения из очереди ws без блокирующего receive_by_* (ping не теряются)."""
messages: List[Any] = []
while not ws_client.recv_queue.empty():
try:
messages.append(ws_client.recv_queue.get_nowait())
except asyncio.QueueEmpty:
break
return messages
def _find_valid_report_export_notification(
messages: List[Any],
parser,
notification_type: str,
) -> Optional[ReportDataExportedNotification]:
"""Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
for msg in messages:
if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
continue
try:
notification = parser.parse_report_data_exported_notification_msg(msg)
except (ValueError, TypeError, KeyError):
continue
if notification.replyStatus != ReplyStatus.OK.value:
continue
content: Optional[ReportDataExportedContent] = notification.replyContent
if content is None:
continue
if content.exportStatus != ExportStatus.DONE:
continue
return notification
return None
async def poll_for_report_export_notification(
ws_client: WebSocketClient,
parser,
total_wait_seconds: float,
poll_interval_seconds: float,
debug_context: Optional[str] = None,
) -> Optional[Any]:
"""
Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
При таймауте прикрепляет к Allure все полученные за период сообщения.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
notification = _find_valid_report_export_notification(
batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
)
if notification is not None:
return notification
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_report_export_poll_failure(
collected_messages=collected_messages,
parser=parser,
notification_type=ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
total_wait_seconds=total_wait_seconds,
debug_context=debug_context,
)
return None
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
tu_id: int,
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
debug_context: Optional[str] = None,
) -> Optional[Any]:
"""
Периодически шлёт getExportedFilesListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
При таймауте прикрепляет к Allure все полученные за период сообщения.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
collected_messages: List[Any] = []
last_invocation_id: Optional[str] = None
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
drained_before_request = _drain_recv_queue(ws_client)
collected_messages.extend(drained_before_request)
await connect(
ws_client,
ReportConst.GET_EXPORTED_FILES_LIST_REQUEST,
{"tuId": tu_id, "additionalProperties": None},
)
invocation_id = ws_client.invocation_id
last_invocation_id = invocation_id
await asyncio.sleep(poll_interval_seconds)
list_reply_payload = None
for msg in _drain_recv_queue(ws_client):
collected_messages.append(msg)
if isinstance(msg, list) and is_desired_invocation_id(msg, invocation_id):
list_reply_payload = msg
break
if list_reply_payload is None:
continue
parsed_payload = parser.parse_exported_files_list_msg(list_reply_payload)
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={item.start}, end={item.end}"
for item in items
),
name=f"Список сформированных файлов (попытка, всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
period_start=period_start,
period_end=period_end,
)
if match is not None:
return match
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_exported_files_poll_failure(
collected_messages=collected_messages,
parser=parser,
total_wait_seconds=total_wait_seconds,
tu_id=tu_id,
expected_data_type=expected_data_type,
name_substring=name_substring,
period_start=period_start,
period_end=period_end,
last_invocation_id=last_invocation_id,
debug_context=debug_context,
)
return None