Загрузка данных
def _attach_ws_poll_failure(
collected_messages: List[Any],
total_wait_seconds: float,
expected_message_type: str,
) -> None:
"""Краткая сводка и pprint каждого WS-сообщения при таймауте поллинга."""
allure.attach(
"\n".join(
[
f"Таймаут ожидания: {total_wait_seconds} с",
f"Ожидаемый тип сообщения: {expected_message_type}",
f"Всего сообщений за период поллинга: {len(collected_messages)}",
]
),
name="WS poll timeout",
attachment_type=allure.attachment_type.TEXT,
)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
"""Забирает все сообщения из очереди ws без блокирующего receive_by_* (ping не теряются)."""
messages: List[Any] = []
while not ws_client.recv_queue.empty():
try:
messages.append(ws_client.recv_queue.get_nowait())
except asyncio.QueueEmpty:
break
return messages
def _find_valid_report_export_notification(
messages: List[Any],
parser,
notification_type: str,
) -> Optional[ReportDataExportedNotification]:
"""Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
for msg in messages:
if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
continue
try:
notification = parser.parse_report_data_exported_notification_msg(msg)
except (ValueError, TypeError, KeyError):
continue
if notification.replyStatus != ReplyStatus.OK.value:
continue
content: Optional[ReportDataExportedContent] = notification.replyContent
if content is None:
continue
if content.exportStatus != ExportStatus.DONE:
continue
return notification
return None
async def poll_for_report_export_notification(
ws_client: WebSocketClient,
parser,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
При таймауте прикрепляет к Allure все полученные за период сообщения.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
notification = _find_valid_report_export_notification(
batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
)
if notification is not None:
return notification
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
)
return None
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
tu_id: int,
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Периодически шлёт getExportedFilesListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
При таймауте прикрепляет к Allure все полученные за период сообщения.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
drained_before_request = _drain_recv_queue(ws_client)
collected_messages.extend(drained_before_request)
await connect(
ws_client,
ReportConst.GET_EXPORTED_FILES_LIST_REQUEST,
{"tuId": tu_id, "additionalProperties": None},
)
invocation_id = ws_client.invocation_id
await asyncio.sleep(poll_interval_seconds)
list_reply_payload = None
for msg in _drain_recv_queue(ws_client):
collected_messages.append(msg)
if isinstance(msg, list) and is_desired_invocation_id(msg, invocation_id):
list_reply_payload = msg
break
if list_reply_payload is None:
continue
parsed_payload = parser.parse_exported_files_list_msg(list_reply_payload)
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={item.start}, end={item.end}"
for item in items
),
name=f"Список сформированных файлов (попытка, всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
period_start=period_start,
period_end=period_end,
)
if match is not None:
return match
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
ReportConst.GET_EXPORTED_FILES_LIST_REQUEST,
)
return None