Загрузка данных


модель тестов

    expected_mt_mode: Optional[str] = None
    time_offset_hours: Optional[int] = None











вс клиент
        logger.debug(f"Отправляем сообщение: {packet}")
        await self._ws.send(packet)

    async def invoke_stream(self, target: str, args: list) -> None:
        """
        Отправляет streaming-вызов (StreamInvocation) по протоколу SignalR.
        """
        if not self._ws:
            raise websockets.WebSocketException("Не установлено подключение по wss")
        self._invocation_id = str(self._next_id)
        self._next_id += 1
        invocation = [
            WS_Const.STREAM_INVOCATION_MESSAGE_TYPE,
            WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
            self._invocation_id,
            target,
            [args],
        ]
        logger.info(f"Streaming-сообщение подготовлено к отправке: {invocation}")
        payload = msgpack.packb(invocation, use_bin_type=True)
        packet = encode_with_varint_prefix(payload)
        await self._ws.send(packet)














арх конст
    CLOSE_TIMEOUT: int = 30
    DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1  # invocation
    STREAM_INVOCATION_MESSAGE_TYPE: int = 4  # StreamInvocation
    STREAM_ITEM_MESSAGE_TYPE: int = 2  # StreamItem
    COMPLETION_MESSAGE_TYPE: int = 3  # Completion
    # Текст ошибки в кадре Completion при неуспешном streaming (SignalR CompletionWithDetail)
    COMPLETION_ERROR_MESSAGE_INDEX: int = 4
    DEFAULT_SIGNALR_MAP_HEADERS: dict = {}






















тест конст
    ZONE_INFO: str = "Europe/Moscow"
    SECONDS_PER_HOUR: int = 3600



плюс удалить про час пояс





















сценарий
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
    """
    Сценарий формирования отчёта об утечках.

    Этапы:
    1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
    2. Отправка ExportReportsCommandRequest с фильтром по времени
       (start = старт имитатора, end = старт имитатора + offset теста).
    3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
    4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
    5. Отправка DownloadExportedDataRequest по id отчёта.
    6. Получение fileChunk по ответу на скачивание.
    7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.

    Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
    """
    report_state = ExportLeaksReportState()

    with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
        report_state.report_test = leak.export_leaks_report_test
        StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
            report_state.report_test
        ).is_not_none()

        report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
        report_state.period_end = t_utils.localize_as_moscow(
            imitator_start_time + timedelta(minutes=report_state.report_test.offset)
        )
        report_state.period_start_naive = report_state.period_start.replace(tzinfo=None)
        report_state.period_end_naive = report_state.period_end.replace(tzinfo=None)
        report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
            leak.expected_stationary_status
        )
        report_state.tu_description_lower = cfg.technological_unit.description.lower()
        time_offset_hours = t_utils.report_time_offset_hours()
        StepCheck(
            f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
            "time_offset_hours",
        ).actual(time_offset_hours).is_not_none()
        report_state.time_offset_hours = time_offset_hours

        StepCheck(
            "Задан ожидаемый текст режима МТ для отчёта",
            "expected_mt_mode",
        ).actual(report_state.expected_mt_mode).is_not_none()

        allure.attach(
            f"period.start={report_state.period_start}\n"
            f"period.end={report_state.period_end}\n"
            f"offset_minutes={report_state.report_test.offset}",
            name="Фильтр периода отчёта",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(
        f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"
    ):
        await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])

    with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
        request_payload = {
            "tuId": cfg.tu_id,
            "exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
            "timeOffset": report_state.time_offset_hours,
            "period": {
                "start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
                "end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
                "additionalProperties": {},
            },
        }
        await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)

    with allure.step(
        f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
    ):
        report_state.notification = await t_utils.poll_for_report_export_notification(
            ws_client=ws_client,
            parser=parser,
            total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Проверка пуш-нотификации о готовности отчёта"):
        StepCheck("Получена пуш-нотификация о готовности отчёта", "notification").actual(
            report_state.notification
        ).is_not_none()
        StepCheck("Проверка статуса пуш-нотификации", "replyStatus").actual(
            report_state.notification.replyStatus if report_state.notification else None
        ).expected(ReplyStatus.OK.value).equal_to()
        StepCheck("Проверка наличия контента нотификации", "replyContent").actual(
            report_state.notification.replyContent if report_state.notification else None
        ).is_not_none()
        StepCheck("Проверка exportStatus в нотификации", "exportStatus").actual(
            report_state.notification.replyContent.exportStatus
            if report_state.notification and report_state.notification.replyContent
            else None
        ).expected(ExportStatus.DONE).equal_to()
        StepCheck("Проверка отсутствия ошибки в нотификации", "errorMessage").actual(
            (report_state.notification.replyContent.errorMessage or "")
            if report_state.notification and report_state.notification.replyContent
            else None
        ).expected("").equal_to()

    with allure.step(
        f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"
    ):
        report_state.report_item = await t_utils.poll_for_exported_file(
            ws_client=ws_client,
            parser=parser,
            list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
            expected_data_type=ExportedDataType.LEAKS_REPORT,
            name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
            period_start=report_state.period_start,
            period_end=report_state.period_end,
            total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
        StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
            report_state.report_item
        ).is_not_none()
        report_item = report_state.report_item
        allure.attach(
            f"id={report_item.id}, name={report_item.name}, "
            f"exportedDataType={report_item.exportedDataType}, "
            f"start={report_item.start}, end={report_item.end}",
            name="Найденный отчёт в списке",
            attachment_type=allure.attachment_type.TEXT,
        )
        report_state.report_file_name = report_item.name + ReportConst.XLSX_EXTENSION

    download_request = {
        "exportedDataId": report_state.report_item.id,
        "exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
        "additionalProperties": None,
        "timeOffset": report_state.time_offset_hours,
    }

    download_purpose = (
        f"скачивание xlsx-отчёта об утечках (exportedDataId={report_state.report_item.id}) "
        f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - выпадашка уведомлений на UI"
    )

    with allure.step(
        f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
    ):
        await t_utils.connect_stream(
            ws_client,
            ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            download_request,
            purpose=download_purpose,
        )
        report_state.download_invocation_id = ws_client.invocation_id

    with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
        report_state.download_reply = await t_utils.receive_download_exported_data_reply(
            ws_client=ws_client,
            parser=parser,
            invocation_id=report_state.download_invocation_id,
            request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
            purpose=download_purpose,
        )



















вс тест утил
from constants.architecture_constants import WebSocketClientConstants as WS_Const















    # Конвертируем в московское время
    return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))


def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
    """
    Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
    """
    now = datetime.now(ZoneInfo(tz_name))
    utc_offset = now.utcoffset()
    if utc_offset is None:
        return None
    return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)























    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")


async def connect_stream(
    ws_client: WebSocketClient,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    purpose: str = "streaming-вызов WS",
) -> None:
    """
    Streaming-вызов (StreamInvocation)
    """
    try:
        with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
            await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(
            f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
            f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
        )


def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
    """Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
    if not isinstance(msg, list):
        return None
    if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
        return None
    if not is_desired_invocation_id(msg, invocation_id):
        return None
    if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
        return None
    error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
    if isinstance(error_text, str):
        return error_text
    return None


async def receive_download_exported_data_reply(
    ws_client: WebSocketClient,
    parser,
    invocation_id: str,
    request_name: str,
    total_wait_seconds: float,
    poll_interval_seconds: float = 0.5,
    purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
    """
    Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
    """
    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)

            for msg in batch:
                stream_error = _stream_completion_error(msg, invocation_id)
                if stream_error:
                    _attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
                    fail(
                        f"При {purpose} бэк вернул Completion с ошибкой "
                        f"({request_name}, invocation_id={invocation_id}): {stream_error}"
                    )

            for msg in batch:
                if (
                    not isinstance(msg, list)
                    or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
                    or not is_desired_invocation_id(msg, invocation_id)
                ):
                    continue
                if parser._find_reply_status_in_ws_msg(msg) is None:
                    continue
                try:
                    return parser.parse_download_exported_data_msg(msg)
                except Exception as error:
                    _attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
                    fail(
                        f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
                        f"но не удалось разобрать ответ с fileChunk: {error}"
                    )
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
    fail(
        f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
        f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
    )