Загрузка данных
return leak_diagnostic_area_samples
def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
"""Забирает все сообщения из очереди ws без блокирующего receive_by_* (ping не теряются)."""
messages: List[Any] = []
while not ws_client.recv_queue.empty():
try:
messages.append(ws_client.recv_queue.get_nowait())
except asyncio.QueueEmpty:
break
return messages
def _find_valid_report_export_notification(
messages: List[Any],
parser,
notification_type: str,
) -> Optional[Any]:
"""Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
from constants.enums import ExportStatus, ReplyStatus
for msg in messages:
if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
continue
try:
notification = parser.parse_report_data_exported_notification_msg(msg)
except (ValueError, TypeError, KeyError):
continue
if (
notification.replyStatus == ReplyStatus.OK.value
and notification.replyContent is not None
and notification.replyContent.exportStatus == ExportStatus.DONE
):
return notification
return None
async def poll_for_report_export_notification(
ws_client: WebSocketClient,
parser,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
"""
from models.export_reports_model import REPORT_DATA_EXPORTED_NOTIFICATION
deadline = asyncio.get_event_loop().time() + total_wait_seconds
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
notification = _find_valid_report_export_notification(
batch, parser, REPORT_DATA_EXPORTED_NOTIFICATION
)
if notification is not None:
return notification
finally:
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
return None
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
tu_id: int,
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Периодически шлёт getExportedFilesListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
"""
from models.get_exported_files_list_model import GET_EXPORTED_FILES_LIST_REQUEST
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
_drain_recv_queue(ws_client)
await connect(
ws_client,
GET_EXPORTED_FILES_LIST_REQUEST,
{"tuId": tu_id, "additionalProperties": None},
)
invocation_id = ws_client.invocation_id
await asyncio.sleep(poll_interval_seconds)
list_reply_payload = None
for msg in _drain_recv_queue(ws_client):
if isinstance(msg, list) and is_desired_invocation_id(msg, invocation_id):
list_reply_payload = msg
break
if list_reply_payload is None:
continue
parsed_payload = parser.parse_exported_files_list_msg(list_reply_payload)
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={item.start}, end={item.end}"
for item in items
),
name=f"Список сформированных файлов (попытка, всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
period_start=period_start,
period_end=period_end,
)
if match is not None:
return match
finally:
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
return None
def _normalize_report_period_datetime(value: datetime) -> datetime: