Загрузка данных
GET_EXPORTED_DATA_LIST_REQUEST: str = "GetExportedDataListRequest"
EXPORTED_DATA_LIST_LIMIT: int = 10
models\get_exported_files_list_model.py
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
from constants.enums import ExportedDataType
@dataclass
class ReplyErrors:
reason: str
errorType: Optional[str] = None
location: Optional[str] = None
@dataclass
class ExportedDataItem:
id: int
name: str
exportedDataType: ExportedDataType
start: Optional[datetime] = None
end: Optional[datetime] = None
@dataclass
class ExportedDataListContent:
"""Контент ответа со списком сформированных файлов."""
exportedData: List[ExportedDataItem]
@dataclass
class GetExportedDataListRequest:
"""Запрос списка сформированных файлов (выпадающий список на UI)."""
limit: int
@dataclass
class GetExportedDataListReply:
"""Ответ со списком сформированных файлов."""
replyStatus: int
replyContent: Optional[ExportedDataListContent] = None
replyErrors: Optional[ReplyErrors] = None
вс мс парсер
return self._find_and_parse_message(data_class=ReportDataExportedNotification, data=data)
def parse_exported_data_list_msg(self, data: list) -> GetExportedDataListReply:
"""
Парсит ответ GetExportedDataListReply со списком сформированных файлов.
"""
return self._find_and_parse_message(data_class=GetExportedDataListReply, data=data)
цы test utils
attachment_type=allure.attachment_type.TEXT,
)
def _attach_ws_reply_parse_failure(
reply_payload: Optional[Any],
invocation_id: str,
request_name: str,
error: BaseException,
) -> None:
"""Прикрепляет к Allure ответ бэка при ошибке парсинга."""
allure.attach(
"\n".join(
[
f"Запрос: {request_name}",
f"invocation_id: {invocation_id}",
f"Ошибка: {error}",
]
),
name="WS parse failure",
attachment_type=allure.attachment_type.TEXT,
)
if reply_payload is not None:
allure.attach(
pprint.pformat(reply_payload, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
list_limit: int,
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Периодически шлёт GetExportedDataListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
При таймауте или ошибке парсинга прикрепляет к Allure полученные ответы.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
collected_messages: List[Any] = []
request_name = ReportConst.GET_EXPORTED_DATA_LIST_REQUEST
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
drained_before_request = _drain_recv_queue(ws_client)
collected_messages.extend(drained_before_request)
await connect(
ws_client,
request_name,
{"limit": list_limit},
)
invocation_id = ws_client.invocation_id
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
list_reply_payload = _find_ws_reply_by_invocation_id(batch, invocation_id, parser)
if list_reply_payload is None:
continue
try:
parsed_payload = parser.parse_exported_data_list_msg(list_reply_payload)
except Exception as error:
_attach_ws_reply_parse_failure(list_reply_payload, invocation_id, request_name, error)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
fail(f"Не удалось разобрать ответ на {request_name}: {error}")
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={item.start}, end={item.end}"
for item in items
),
name=f"Список сформированных файлов (попытка, всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
period_start=period_start,
period_end=period_end,
)
if match is not None:
return match
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
request_name,
)
return None