Загрузка данных
модель тестов
expected_mt_mode: Optional[str] = None
time_offset_hours: Optional[int] = None
вс клиент
logger.debug(f"Отправляем сообщение: {packet}")
await self._ws.send(packet)
async def invoke_stream(self, target: str, args: list) -> None:
"""
Отправляет streaming-вызов (StreamInvocation) по протоколу SignalR.
"""
if not self._ws:
raise websockets.WebSocketException("Не установлено подключение по wss")
self._invocation_id = str(self._next_id)
self._next_id += 1
invocation = [
WS_Const.STREAM_INVOCATION_MESSAGE_TYPE,
WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
self._invocation_id,
target,
[args],
]
logger.info(f"Streaming-сообщение подготовлено к отправке: {invocation}")
payload = msgpack.packb(invocation, use_bin_type=True)
packet = encode_with_varint_prefix(payload)
await self._ws.send(packet)
арх конст
CLOSE_TIMEOUT: int = 30
DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1 # invocation
STREAM_INVOCATION_MESSAGE_TYPE: int = 4 # StreamInvocation
STREAM_ITEM_MESSAGE_TYPE: int = 2 # StreamItem
COMPLETION_MESSAGE_TYPE: int = 3 # Completion
# Текст ошибки в кадре Completion при неуспешном streaming (SignalR CompletionWithDetail)
COMPLETION_ERROR_MESSAGE_INDEX: int = 4
DEFAULT_SIGNALR_MAP_HEADERS: dict = {}
тест конст
ZONE_INFO: str = "Europe/Moscow"
SECONDS_PER_HOUR: int = 3600
плюс удалить про час пояс
сценарий
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
"""
Сценарий формирования отчёта об утечках.
Этапы:
1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
2. Отправка ExportReportsCommandRequest с фильтром по времени
(start = старт имитатора, end = старт имитатора + offset теста).
3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
5. Отправка DownloadExportedDataRequest по id отчёта.
6. Получение fileChunk по ответу на скачивание.
7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.
Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
"""
report_state = ExportLeaksReportState()
with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
report_state.report_test = leak.export_leaks_report_test
StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
report_state.report_test
).is_not_none()
report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.report_test.offset)
)
report_state.period_start_naive = report_state.period_start.replace(tzinfo=None)
report_state.period_end_naive = report_state.period_end.replace(tzinfo=None)
report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
leak.expected_stationary_status
)
report_state.tu_description_lower = cfg.technological_unit.description.lower()
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
report_state.time_offset_hours = time_offset_hours
StepCheck(
"Задан ожидаемый текст режима МТ для отчёта",
"expected_mt_mode",
).actual(report_state.expected_mt_mode).is_not_none()
allure.attach(
f"period.start={report_state.period_start}\n"
f"period.end={report_state.period_end}\n"
f"offset_minutes={report_state.report_test.offset}",
name="Фильтр периода отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"
):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
"timeOffset": report_state.time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification").actual(
report_state.notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus").actual(
report_state.notification.replyStatus if report_state.notification else None
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent").actual(
report_state.notification.replyContent if report_state.notification else None
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus").actual(
report_state.notification.replyContent.exportStatus
if report_state.notification and report_state.notification.replyContent
else None
).expected(ExportStatus.DONE).equal_to()
StepCheck("Проверка отсутствия ошибки в нотификации", "errorMessage").actual(
(report_state.notification.replyContent.errorMessage or "")
if report_state.notification and report_state.notification.replyContent
else None
).expected("").equal_to()
with allure.step(
f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"
):
report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.LEAKS_REPORT,
name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
period_start=report_state.period_start,
period_end=report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.report_item
).is_not_none()
report_item = report_state.report_item
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={report_item.start}, end={report_item.end}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
report_state.report_file_name = report_item.name + ReportConst.XLSX_EXTENSION
download_request = {
"exportedDataId": report_state.report_item.id,
"exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": report_state.time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта об утечках (exportedDataId={report_state.report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - выпадашка уведомлений на UI"
)
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
):
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
report_state.download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=report_state.download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
вс тест утил
from constants.architecture_constants import WebSocketClientConstants as WS_Const
# Конвертируем в московское время
return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))
def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
"""
Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
"""
now = datetime.now(ZoneInfo(tz_name))
utc_offset = now.utcoffset()
if utc_offset is None:
return None
return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")
async def connect_stream(
ws_client: WebSocketClient,
ws_invoke_type: str,
ws_invoke_params: Any = None,
purpose: str = "streaming-вызов WS",
) -> None:
"""
Streaming-вызов (StreamInvocation)
"""
try:
with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(
f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
)
def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
"""Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
if not isinstance(msg, list):
return None
if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
return None
if not is_desired_invocation_id(msg, invocation_id):
return None
if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
return None
error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
if isinstance(error_text, str):
return error_text
return None
async def receive_download_exported_data_reply(
ws_client: WebSocketClient,
parser,
invocation_id: str,
request_name: str,
total_wait_seconds: float,
poll_interval_seconds: float = 0.5,
purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
"""
Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
for msg in batch:
stream_error = _stream_completion_error(msg, invocation_id)
if stream_error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
fail(
f"При {purpose} бэк вернул Completion с ошибкой "
f"({request_name}, invocation_id={invocation_id}): {stream_error}"
)
for msg in batch:
if (
not isinstance(msg, list)
or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
or not is_desired_invocation_id(msg, invocation_id)
):
continue
if parser._find_reply_status_in_ws_msg(msg) is None:
continue
try:
return parser.parse_download_exported_data_msg(msg)
except Exception as error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
fail(
f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
f"но не удалось разобрать ответ с fileChunk: {error}"
)
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
fail(
f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
)