Загрузка данных


constants/enums.py
class TU(Enum):
    """Технологический участок. id - legacy для имитатора (tn{id}_tags.txt); при use_lds_configurator tu_id берётся из Администрирования."""




    DONE = 1


class SouAdminStatus(BaseStrEnum):
    """Статус СОУ в разделе Администрирование (GetBasicInfoAdminResponse)."""

    STOPPED = (1, 'СОУ выключена')
    RUNNING = (2, 'СОУ включена')

    def __new__(cls, value: int, report_text: str) -> "SouAdminStatus":
        member = object.__new__(cls)
        member._value_ = value
        member.report_text = report_text
        return member

    @classmethod
    def report_text_by_value(cls, status_value: int) -> str | None:
        """Текст статуса СОУ в Администрировании по числовому значению."""
        try:
            return cls(status_value).report_text
        except ValueError:
            return None

















constants/test_constants.py
внизу 
class LdsConfiguratorConstants:
    """Константы для setup/teardown через раздел Администрирование."""

    GET_BASIC_INFO_ADMIN_RETRIES: int = 3
    POLL_TIMEOUT_SECONDS: float = 120.0
    POLL_INTERVAL_SECONDS: float = 15.0
    MAIN_PAGE_SYNC_TIMEOUT_SECONDS: float = 30.0

    GET_BASIC_INFO_ADMIN_REQUEST: str = "GetBasicInfoAdminRequest"
    SUBSCRIBE_MAIN_PAGE_INFO_REQUEST: str = "subscribeMainPageInfoRequest"
    MAIN_PAGE_INFO_CONTENT: str = "MainPageInfoContent"
    STOP_LDS_REQUEST: str = "StopLdsRequest"
    LAUNCH_LDS_REQUEST: str = "LaunchLdsRequest"
    GET_TUS_INFORMATION_REQUEST: str = "GetTusInformationRequest"
























models/get_basic_info_admin_model.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, List, Optional

from constants.enums import ReplyStatus


@dataclass
class AdminTuInfo:
    """ТУ из ответа GetBasicInfoAdminResponse."""

    tuId: int
    tuName: str
    mnId: int
    mnName: str
    ostId: int
    ostName: str
    configurationVersion: int
    status: int


@dataclass
class AdminBasicInfo:
    tus: List[AdminTuInfo]
    appVersion: str
    appUpdatedAt: Any


@dataclass
class AdminBasicInfoContent:
    basicInfo: AdminBasicInfo


@dataclass
class GetBasicInfoAdminReply:
    replyStatus: ReplyStatus
    replyContent: Optional[AdminBasicInfoContent] = None
    replyErrors: Optional[object] = None


@dataclass
class GetBasicInfoAdminReplyMessage:
    payload: GetBasicInfoAdminReply
















models/get_basic_info_admin_model.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, List, Optional

from constants.enums import ReplyStatus


@dataclass
class AdminTuInfo:
    """ТУ из ответа GetBasicInfoAdminResponse."""

    tuId: int
    tuName: str
    mnId: int
    mnName: str
    ostId: int
    ostName: str
    configurationVersion: int
    status: int


@dataclass
class AdminBasicInfo:
    tus: List[AdminTuInfo]
    appVersion: str
    appUpdatedAt: Any


@dataclass
class AdminBasicInfoContent:
    basicInfo: AdminBasicInfo


@dataclass
class GetBasicInfoAdminReply:
    replyStatus: ReplyStatus
    replyContent: Optional[AdminBasicInfoContent] = None
    replyErrors: Optional[object] = None


@dataclass
class GetBasicInfoAdminReplyMessage:
    payload: GetBasicInfoAdminReply
























models/get_tus_information_model.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, List, Optional

from constants.enums import ReplyStatus


@dataclass
class TuInformation:
    tuId: int
    launchedBy: Optional[str]
    launchedAt: Optional[Any]
    actualConfigurationExists: bool


@dataclass
class TusInformationContent:
    tusInfo: List[TuInformation]


@dataclass
class GetTusInformationRequest:
    tuIds: List[int]
    additionalProperties: Optional[object] = None


@dataclass
class GetTusInformationReply:
    replyStatus: ReplyStatus
    replyContent: Optional[TusInformationContent] = None
    replyErrors: Optional[object] = None


@dataclass
class GetTusInformationReplyMessage:
    payload: GetTusInformationReply

















models/launch_lds_model.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from constants.enums import ReplyStatus


@dataclass
class LaunchLdsRequest:
    tuId: int
    additionalProperties: Optional[object] = None


@dataclass
class LaunchLdsReply:
    replyStatus: ReplyStatus
    replyErrors: Optional[List[Dict[str, Any]]] = None


@dataclass
class LaunchLdsReplyMessage:
    payload: LaunchLdsReply



















models/stop_lds_model.py
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from constants.enums import ReplyStatus


@dataclass
class StopLdsRequest:
    tuId: int
    additionalProperties: Optional[object] = None


@dataclass
class StopLdsReply:
    replyStatus: ReplyStatus
    replyErrors: Optional[List[Dict[str, Any]]] = None


@dataclass
class StopLdsReplyMessage:
    payload: StopLdsReply















в датасеты:
    main_pipeline=MAIN_PIPELINE,
    # ===== LDS Configurator =====
    use_lds_configurator=True,
    tu_name=TECHNOLOGICAL_UNIT.description,
    lds_configurator_setup_test=CaseMarkers(test_case_id="", offset=0),













test_config/models_for_tests.py
    # ===== Технологический участок =====
    technological_unit: TU = TU.TIKHORETSK_NOVOROSSIYSK_3

    # ===== LDS Configurator (Администрирование) =====
    use_lds_configurator: bool = False
    tu_name: str = ""
    resolved_tu_id: Optional[int] = None
    lds_configurator_setup_test: Optional[CaseMarkers] = None







    # ===== Свойства для удобства =====
    def __post_init__(self) -> None:
        if not self.tu_name:
            object.__setattr__(self, "tu_name", self.technological_unit.description)

    @property
    def tu_id(self) -> int:
        """ID технологического участка"""
        if self.use_lds_configurator:
            if self.resolved_tu_id is None:
                raise RuntimeError(
                    "resolved_tu_id не установлен - выполнить lds_configurator_setup перед тестами сценария"
                )
            return self.resolved_tu_id
        # legacy: захардкоженный id из enum TU (имитатор использует тот же id для tn{id}_tags.txt)
        return self.technological_unit.id




    @property
    def infra_tu_id(self) -> int:
        """legacy: id ТУ для инфра-setup (имитатор, tags.txt, file_name)."""
        return self.technological_unit.id

















test_scenarios/__init__.py
import test_scenarios.smoke_scenarios as scenarios
from test_scenarios import lds_configurator_scenarios, lds_status_scenarios, rejected_scenarios





    scenarios.export_mt_mode_report,
    lds_configurator_scenarios.lds_configurator_setup,
    lds_configurator_scenarios.lds_configurator_teardown,

























test_scenarios/lds_configurator_scenarios.py
"""
Сценарии setup/teardown СОУ через раздел Администрирование (LDS Configurator).
"""

from __future__ import annotations

from typing import Any, Dict, Optional

import allure
from pytest import fail

from clients.websocket_client import WebSocketClient
from constants.enums import SouAdminStatus
from constants.test_constants import LdsConfiguratorConstants as LdsCfgConst
from test_config.models_for_tests import BaseSuiteConfig
from utils.helpers import lds_configurator_utils as lds_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.ws_message_parser import ws_message_parser as parser


def _save_group_state(group_state: Optional[Dict[str, Any]], cfg: BaseSuiteConfig, tu_id: int) -> None:
    """
    Сохраняет resolved tu_id и флаги в group_state для teardown в conftest.
    """
    if group_state is None:
        return
    group_state["use_lds_configurator"] = cfg.use_lds_configurator
    group_state["resolved_tu_id"] = tu_id
    group_state["tu_name"] = cfg.tu_name


async def lds_configurator_setup(
    ws_client: WebSocketClient,
    cfg: BaseSuiteConfig,
    group_state: Optional[Dict[str, Any]] = None,
) -> None:
    """
    Critical-stop сценарий: подготовка стенда и холодный запуск СОУ на конфигурации из Администрирования.

    1. Получить tu_id по tu_name из Администрирования.
    2. Сверить статус с ЭФ Состояние МТ (MainPageInfoContent).
    3. При необходимости остановить уже запущенную СОУ.
    4. Выполнить LaunchLdsRequest и дождаться готовности.
    5. Подтвердить launchedAt после момента запуска.
    """
    tu_id: int
    sou_status: SouAdminStatus
    is_on_main_page: bool

    with allure.step(f"Шаг 1. Получение ТУ '{cfg.tu_name}' из Администрирования"):
        admin_reply = await lds_utils.get_basic_info_admin_with_retry(ws_client, parser)
        admin_tu = lds_utils.find_tu_by_name(admin_reply, cfg.tu_name)
        lds_utils.validate_admin_tu(admin_tu)
        sou_status = SouAdminStatus(admin_tu.status)
        tu_id = admin_tu.tuId
        cfg.resolved_tu_id = tu_id
        _save_group_state(group_state, cfg, tu_id)
        allure.attach(
            f"tuId={tu_id}\n"
            f"tuName={admin_tu.tuName!r}\n"
            f"status={sou_status} ({SouAdminStatus.report_text_by_value(admin_tu.status)})",
            name="Найденный ТУ",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step("Шаг 2. Сверка статуса СОУ между Администрированием и Состоянием МТ"):
        is_on_main_page = await lds_utils.is_tu_present_on_main_page(ws_client, parser, tu_id)
        lds_utils.check_sou_status_sync(sou_status, is_on_main_page, tu_id, cfg.tu_name)

    launch_checkpoint = t_utils.moscow_now()
    allure.attach(
        t_utils.format_datetime_moscow(launch_checkpoint),
        name="Момент фиксации времени перед холодным запуском",
        attachment_type=allure.attachment_type.TEXT,
    )

    if sou_status == SouAdminStatus.RUNNING and is_on_main_page:
        with allure.step("Шаг 3. Остановка СОУ перед перезапуском (уже была запущена)"):
            await lds_utils.invoke_lds_command(ws_client, parser, LdsCfgConst.STOP_LDS_REQUEST, tu_id)
            with allure.step("Проверка: СОУ выключена в Администрировании"):
                if not await lds_utils.poll_admin_tu_status(ws_client, parser, tu_id, SouAdminStatus.STOPPED):
                    fail(
                        "Не удалось перезапустить СОУ: статус в Администрировании не стал 'выключена' за 2 минуты",
                        pytrace=False,
                    )
            with allure.step("Проверка: ТУ исчезла из Состояния МТ"):
                if not await lds_utils.poll_main_page_tu_presence(ws_client, tu_id, expect_present=False):
                    fail(
                        "Не удалось перезапустить СОУ: ТУ не исчезла из Состояния МТ за 2 минуты",
                        pytrace=False,
                    )

    with allure.step("Шаг 4. Холодный запуск СОУ (LaunchLdsRequest)"):
        await lds_utils.invoke_lds_command(ws_client, parser, LdsCfgConst.LAUNCH_LDS_REQUEST, tu_id)

    with allure.step("Шаг 5. Ожидание включения СОУ в Администрировании"):
        with allure.step("Проверка: статус стал 'включена'"):
            if not await lds_utils.poll_admin_tu_status(ws_client, parser, tu_id, SouAdminStatus.RUNNING):
                fail(
                    "Не удалось запустить СОУ: статус в Администрировании не стал 'включена' за 2 минуты",
                    pytrace=False,
                )

    with allure.step("Шаг 6. Ожидание появления ТУ в Состоянии МТ"):
        with allure.step("Проверка: ТУ отображается в Состоянии МТ"):
            if not await lds_utils.poll_main_page_tu_presence(ws_client, tu_id, expect_present=True):
                fail(
                    "Не удалось запустить СОУ: ТУ не появилась в Состоянии МТ за 2 минуты",
                    pytrace=False,
                )

    with allure.step("Шаг 7. Подтверждение времени запуска (GetTusInformation)"):
        await lds_utils.verify_launched_at(ws_client, parser, tu_id, launch_checkpoint)


async def lds_configurator_teardown(
    ws_client: WebSocketClient,
    tu_id: int,
    tu_name: str,
) -> None:
    """
    Teardown набора: остановка СОУ после прогона.

    Некритичные отклонения оформляются как Allure ALERT без падения прогона.
    Ошибки ws при StopLds логируются и не прерывают pytest session.
    """
    with allure.step(f"Teardown. Проверка статуса СОУ (tuId={tu_id}, «{tu_name}»)"):
        try:
            admin_reply = await lds_utils.get_basic_info_admin(ws_client, parser)
        except Exception as error:
            lds_utils.attach_allure_alert(
                f"Не удалось получить статус СОУ при teardown: {error}. "
                f"tuId={tu_id}, tuName={tu_name!r}"
            )
            return

        sou_status = lds_utils.get_admin_tu_status(admin_reply, tu_id)
        if sou_status != SouAdminStatus.RUNNING:
            lds_utils.attach_allure_alert(
                f"СОУ не в статусе 'включена' при teardown (status={sou_status}), остановка пропущена. "
                f"tuId={tu_id}, tuName='{tu_name}'"
            )
            return

    with allure.step("Teardown. Остановка СОУ (StopLdsRequest)"):
        try:
            await lds_utils.invoke_lds_command(ws_client, parser, LdsCfgConst.STOP_LDS_REQUEST, tu_id)
        except Exception as error:
            lds_utils.attach_allure_alert(
                f"Ошибка при StopLdsRequest: {error}. tuId={tu_id}, tuName={tu_name!r}"
            )
            return

    with allure.step("Teardown. Ожидание выключения СОУ в Администрировании"):
        if not await lds_utils.poll_admin_tu_status(ws_client, parser, tu_id, SouAdminStatus.STOPPED):
            lds_utils.attach_allure_alert(
                f"СОУ не выключилась за 2 минуты после StopLdsRequest. "
                f"tuId={tu_id}, tuName={tu_name!r}. Проверить вручную."
            )


















tests/test_lds_status_regress.py

from test_config.models_for_tests import CaseMarkers, LDSStatusConfig
from test_scenarios import lds_configurator_scenarios





    @pytest.mark.asyncio
    @pytest.mark.critical_stop
    async def test_lds_configurator_setup(
        self, ws_client: WebSocketClient, config: LDSStatusConfig, request: pytest.FixtureRequest
    ) -> None:
        """[LdsConfigurator] Настройка и холодный запуск СОУ через Администрирование"""
        tag = "LdsConfigurator"
        title = f"[{tag}] Настройка и запуск СОУ. ЭФ: Администрирование"
        _apply_allure_markers(config.lds_configurator_setup_test, tag, title)
        await lds_configurator_scenarios.lds_configurator_setup(
            ws_client, config, request.config.group_state
        )

    @pytest.mark.asyncio
    async def test_lds_status_basic_info(self, ws_client: WebSocketClient, config: LDSStatusConfig) -> None:

















tests/test_smoke.py


from test_config.models_for_tests import CaseMarkers, LeakTestConfig, SmokeSuiteConfig
from test_scenarios import lds_configurator_scenarios





    @pytest.mark.asyncio
    @pytest.mark.critical_stop
    async def test_lds_configurator_setup(
        self, ws_client: WebSocketClient, config: SmokeSuiteConfig, request: pytest.FixtureRequest
    ) -> None:
        """[LdsConfigurator] Настройка и холодный запуск СОУ через Администрирование"""
        tag = "LdsConfigurator"
        title = f"[{tag}] Настройка и холодный запуск СОУ. ЭФ: Администрирование"
        _apply_allure_markers(config.lds_configurator_setup_test, tag, title)
        await lds_configurator_scenarios.lds_configurator_setup(
            ws_client, config, request.config.group_state
        )

    @pytest.mark.asyncio
    async def test_basic_info(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:























utils/helpers/lds_configurator_utils.py
"""
Вспомогательные функции setup/teardown СОУ через раздел Администрирование.
"""

from __future__ import annotations

import asyncio
import logging
from datetime import datetime
from typing import Any, Optional

import allure
from pytest import fail

from clients.websocket_client import WebSocketClient
from constants.enums import ReplyStatus, SouAdminStatus
from constants.test_constants import LdsConfiguratorConstants as LdsCfgConst
from models.get_basic_info_admin_model import AdminTuInfo, GetBasicInfoAdminReply
from models.get_tus_information_model import GetTusInformationReply
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.ws_message_parser import WsMessageParser
from utils.msgpack_utils.message_filters import is_desired_type

logger = logging.getLogger(__name__)


def attach_allure_alert(message: str) -> None:
    """
    Публикует предупреждение в Allure и лог без падения прогона.

    Используется в teardown при некритичных отклонениях.
    """
    allure.attach(message, name="ALERT", attachment_type=allure.attachment_type.TEXT)
    logger.warning("[LDS_CONFIGURATOR] %s", message)


async def get_basic_info_admin(ws_client: WebSocketClient, parser: WsMessageParser) -> GetBasicInfoAdminReply:
    """
    Выполняет GetBasicInfoAdminRequest и парсит ответ.
    """
    payload = await t_utils.connect_and_get_msg(ws_client, LdsCfgConst.GET_BASIC_INFO_ADMIN_REQUEST, [])
    return parser.parse_get_basic_info_admin_msg(payload)


async def get_basic_info_admin_with_retry(
    ws_client: WebSocketClient,
    parser: WsMessageParser,
    retries: int = LdsCfgConst.GET_BASIC_INFO_ADMIN_RETRIES,
) -> GetBasicInfoAdminReply:
    """
    Запрашивает GetBasicInfoAdminResponse с повторными попытками.
    """
    last_error: Optional[BaseException] = None
    for attempt in range(1, retries + 1):
        with allure.step(f"Запрос списка ТУ в Администрировании - попытка {attempt} из {retries}"):
            try:
                return await get_basic_info_admin(ws_client, parser)
            except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError, RuntimeError) as error:
                last_error = error
                if attempt < retries:
                    await asyncio.sleep(1)

    with allure.step("Проверка: GetBasicInfoAdminResponse получен"):
        fail(
            f"Не удалось получить GetBasicInfoAdminResponse за {retries} попыток: {last_error}",
            pytrace=False,
        )


def find_tu_by_name(admin_reply: GetBasicInfoAdminReply, tu_name: str) -> AdminTuInfo:
    """
    Ищет ТУ по точному совпадению tuName в ответе Администрирования.
    """
    with allure.step(f"Поиск ТУ '{tu_name}' по точному совпадению tuName в GetBasicInfoAdminResponse"):
        tus = admin_reply.replyContent.basicInfo.tus if admin_reply.replyContent else None
        with allure.step("Проверка: в ответе есть список ТУ"):
            if not tus:
                fail(
                    f"GetBasicInfoAdminResponse не содержит списка ТУ (ожидался tuName='{tu_name}')",
                    pytrace=False,
                )
        for tu in tus:
            if tu.tuName == tu_name:
                return tu
        available = [tu.tuName for tu in tus]
        with allure.step("Проверка: ТУ из набора данных найден в Администрировании"):
            fail(
                f"ТУ '{tu_name}' не найден в GetBasicInfoAdminResponse. Доступные tu_name: {available}",
                pytrace=False,
            )


def validate_admin_tu(tu: AdminTuInfo) -> None:
    """
    Проверяет обязательные поля AdminTuInfo и допустимость статуса СОУ.
    """
    with allure.step(f"Валидация параметров ТУ '{tu.tuName}' (tuId={tu.tuId})"):
        with allure.step("Проверка: tuId и mnId заполнены"):
            if not tu.tuId:
                fail(f"Некорректный tuId для ТУ '{tu.tuName}': {tu.tuId}", pytrace=False)
            if not tu.mnId:
                fail(f"Некорректный mnId для ТУ '{tu.tuName}': {tu.mnId}", pytrace=False)
        with allure.step("Проверка: статус СОУ известен Администрированию"):
            try:
                SouAdminStatus(tu.status)
            except ValueError:
                fail(f"Неизвестный статус СОУ для ТУ '{tu.tuName}': {tu.status}", pytrace=False)


def _tu_id_in_main_page_message(msg: Any, tu_id: int) -> bool:
    """True, если WS-сообщение MainPageInfoContent содержит указанный tuId."""
    if not isinstance(msg, list) or not is_desired_type(msg, LdsCfgConst.MAIN_PAGE_INFO_CONTENT):
        return False
    for item in msg:
        if isinstance(item, dict) and item.get("replyContent", {}).get("tuId") == tu_id:
            return True
        if isinstance(item, list):
            for elem in item:
                if isinstance(elem, dict) and elem.get("replyContent", {}).get("tuId") == tu_id:
                    return True
    return False


def _drain_recv_queue(ws_client: WebSocketClient) -> list[Any]:
    """Забирает все сообщения из очереди WS без блокирующего ожидания."""
    messages: list[Any] = []
    while not ws_client.recv_queue.empty():
        try:
            messages.append(ws_client.recv_queue.get_nowait())
        except asyncio.QueueEmpty:
            break
    return messages


async def is_tu_present_on_main_page(
    ws_client: WebSocketClient,
    parser: WsMessageParser,
    tu_id: int,
    timeout: float = LdsCfgConst.MAIN_PAGE_SYNC_TIMEOUT_SECONDS,
) -> bool:
    """
    Подписывается на MainPageInfoContent и определяет, отображается ли ТУ в Состоянии МТ.
    """
    with allure.step(f"Подписка на Состояние МТ (MainPageInfoContent) для tuId={tu_id}"):
        ws_client.clear_queue()
        await t_utils.connect(
            ws_client,
            LdsCfgConst.SUBSCRIBE_MAIN_PAGE_INFO_REQUEST,
            {"tuIds": [tu_id], "additionalProperties": None},
        )

    deadline = asyncio.get_running_loop().time() + timeout
    while asyncio.get_running_loop().time() < deadline:
        remaining = deadline - asyncio.get_running_loop().time()
        if remaining <= 0:
            break
        try:
            payload = await ws_client.receive_by_type(LdsCfgConst.MAIN_PAGE_INFO_CONTENT, timeout=remaining)
        except asyncio.TimeoutError:
            break
        parsed = parser.parse_main_page_msg(payload)
        if parsed.replyContent and parsed.replyContent.tuId == tu_id:
            return True
    return False


def check_sou_status_sync(
    sou_status: SouAdminStatus,
    is_on_main_page: bool,
    tu_id: int,
    tu_name: str,
) -> None:
    """
    Сверяет статус СОУ в Администрировании и на ЭФ Состояние МТ.
    """
    with allure.step(
        f"Сверка статуса СОУ: ЭФ Администрирование vs ЭФ Состояние МТ (tuId={tu_id}, «{tu_name}»)"
    ):
        expected_on_page = sou_status == SouAdminStatus.RUNNING
        with allure.step("Проверка: статусы Администрирования и Состояния МТ согласованы"):
            if is_on_main_page == expected_on_page:
                return
            admin_text = SouAdminStatus.report_text_by_value(int(sou_status))
            page_text = "СОУ запущена" if is_on_main_page else "СОУ не запущена"
            fail(
                f"Рассинхронизация для ТУ '{tu_name}' (tuId={tu_id}): "
                f"Администрирование - {admin_text} ({sou_status.value}); "
                f"Главная страница - {page_text}. "
                f"Статусы в разных подписках не совпадают.",
                pytrace=False,
            )


async def invoke_lds_command(
    ws_client: WebSocketClient,
    parser: WsMessageParser,
    request_name: str,
    tu_id: int,
) -> None:
    """
    Отправляет StopLdsRequest или LaunchLdsRequest и ждёт Completion с replyStatus=200.
    """
    with allure.step(f"Команда {request_name} для tuId={tu_id}"):
        await t_utils.connect(ws_client, request_name, {"tuId": tu_id})
        invocation_id = ws_client.invocation_id
        payload = await ws_client.receive_by_invocation_id(invocation_id)
        if request_name == LdsCfgConst.STOP_LDS_REQUEST:
            reply = parser.parse_stop_lds_msg(payload)
        else:
            reply = parser.parse_launch_lds_msg(payload)
        with allure.step(f"Проверка: {request_name} завершился успешно (replyStatus=200)"):
            if reply.replyStatus != ReplyStatus.OK:
                fail(
                    f"{request_name} завершился с replyStatus={reply.replyStatus}, "
                    f"ошибки: {reply.replyErrors}",
                    pytrace=False,
                )


async def poll_admin_tu_status(
    ws_client: WebSocketClient,
    parser: WsMessageParser,
    tu_id: int,
    expected_status: SouAdminStatus,
    total_wait_seconds: float = LdsCfgConst.POLL_TIMEOUT_SECONDS,
    poll_interval_seconds: float = LdsCfgConst.POLL_INTERVAL_SECONDS,
) -> bool:
    """
    Long-poll GetBasicInfoAdmin до смены статуса ТУ в Администрировании.
    """
    status_label = SouAdminStatus.report_text_by_value(int(expected_status))
    with allure.step(
        f"Ожидание статуса '{status_label}' в Администрировании "
        f"(tuId={tu_id}, таймаут {int(total_wait_seconds)} с)"
    ):
        deadline = asyncio.get_running_loop().time() + total_wait_seconds
        while asyncio.get_running_loop().time() < deadline:
            admin_reply = await get_basic_info_admin(ws_client, parser)
            tus = admin_reply.replyContent.basicInfo.tus if admin_reply.replyContent else []
            tu = next((item for item in tus if item.tuId == tu_id), None)
            if tu and tu.status == int(expected_status):
                return True
            await asyncio.sleep(poll_interval_seconds)
        return False


async def poll_main_page_tu_presence(
    ws_client: WebSocketClient,
    tu_id: int,
    expect_present: bool,
    total_wait_seconds: float = LdsCfgConst.POLL_TIMEOUT_SECONDS,
    poll_interval_seconds: float = LdsCfgConst.POLL_INTERVAL_SECONDS,
) -> bool:
    """
    Long-poll MainPageInfoContent: ожидание появления или исчезновения ТУ в Состоянии МТ.
    """
    action = "появления" if expect_present else "исчезновения"
    with allure.step(
        f"Ожидание {action} ТУ в Состоянии МТ "
        f"(tuId={tu_id}, таймаут {int(total_wait_seconds)} с)"
    ):
        ws_client.clear_queue()
        await t_utils.connect(
            ws_client,
            LdsCfgConst.SUBSCRIBE_MAIN_PAGE_INFO_REQUEST,
            {"tuIds": [tu_id], "additionalProperties": None},
        )
        deadline = asyncio.get_running_loop().time() + total_wait_seconds
        while asyncio.get_running_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            found = any(_tu_id_in_main_page_message(msg, tu_id) for msg in batch)
            if expect_present and found:
                return True
            if not expect_present and not found:
                return True

        t_utils._attach_ws_poll_failure(
            [],
            total_wait_seconds,
            f"{LdsCfgConst.MAIN_PAGE_INFO_CONTENT} tuId={tu_id} present={expect_present}",
        )
        return False


async def verify_launched_at(
    ws_client: WebSocketClient,
    parser: WsMessageParser,
    tu_id: int,
    launch_checkpoint: datetime,
) -> None:
    """
    Проверяет, что launchedAt в GetTusInformation позже момента холодного запуска.
    """
    with allure.step(f"Запрос GetTusInformation для tuId={tu_id}"):
        payload = await t_utils.connect_and_get_msg(
            ws_client,
            LdsCfgConst.GET_TUS_INFORMATION_REQUEST,
            {"tuIds": [tu_id]},
        )
        reply: GetTusInformationReply = parser.parse_get_tus_information_msg(payload)
        tus_info = reply.replyContent.tusInfo if reply.replyContent else []
        tu_info = next((item for item in tus_info if item.tuId == tu_id), None)

        with allure.step("Проверка: в ответе есть информация о запуске ТУ"):
            if tu_info is None:
                fail(f"GetTusInformationResponse не содержит tuId={tu_id}", pytrace=False)

        launched_at = parser.timestamp_to_datetime(tu_info.launchedAt)
        with allure.step("Проверка: поле launchedAt заполнено"):
            if launched_at is None:
                fail(f"GetTusInformationResponse: launchedAt отсутствует для tuId={tu_id}", pytrace=False)

        launched_at_msk = t_utils.localize_as_moscow(launched_at)
        checkpoint_msk = t_utils.localize_as_moscow(launch_checkpoint)
        allure.attach(
            f"launchedAt: {t_utils.format_datetime_moscow(launched_at_msk)}\n"
            f"checkpoint: {t_utils.format_datetime_moscow(checkpoint_msk)}",
            name="Сравнение времени запуска СОУ",
            attachment_type=allure.attachment_type.TEXT,
        )

        with allure.step("Проверка: launchedAt позже момента команды 'Запустить СОУ'"):
            if launched_at_msk <= checkpoint_msk:
                fail(
                    f"Время запуска СОУ на бэкенде ({t_utils.format_datetime_moscow(launched_at_msk)}) "
                    f"не позже момента команды 'Запустить СОУ' "
                    f"({t_utils.format_datetime_moscow(checkpoint_msk)})",
                    pytrace=False,
                )


def get_admin_tu_status(admin_reply: GetBasicInfoAdminReply, tu_id: int) -> Optional[SouAdminStatus]:
    """
    Возвращает статус СОУ из GetBasicInfoAdmin для указанного tuId.
    """
    tus = admin_reply.replyContent.basicInfo.tus if admin_reply.replyContent else []
    tu = next((item for item in tus if item.tuId == tu_id), None)
    if tu is None:
        return None
    try:
        return SouAdminStatus(tu.status)
    except ValueError:
        return None






























utils/helpers/ws_message_parser.py



        return self._find_and_parse_message(data_class=DownloadExportedDataReply, data=data)

    def parse_get_basic_info_admin_msg(self, data: list) -> GetBasicInfoAdminReply:
        """Парсит ответ GetBasicInfoAdminRequest."""
        return self._find_and_parse_message(data_class=GetBasicInfoAdminReply, data=data)

    def parse_launch_lds_msg(self, data: list) -> LaunchLdsReply:
        """Парсит Completion-ответ LaunchLdsRequest."""
        return self._find_and_parse_message(data_class=LaunchLdsReply, data=data)

    def parse_stop_lds_msg(self, data: list) -> StopLdsReply:
        """Парсит Completion-ответ StopLdsRequest."""
        return self._find_and_parse_message(data_class=StopLdsReply, data=data)

    def parse_get_tus_information_msg(self, data: list) -> GetTusInformationReply:
        """Парсит ответ GetTusInformationRequest."""
        return self._find_and_parse_message(data_class=GetTusInformationReply, data=data)












            return value if isinstance(value, ExportedDataType) else ExportedDataType(value)

        def _to_reply_status(value: Any) -> ReplyStatus:
            return value if isinstance(value, ReplyStatus) else ReplyStatus(value)

        return Config(
            type_hooks={
                UUID: self.convert_to_uuid,
                datetime: self.timestamp_to_datetime,
                ExportStatus: _to_export_status,
                ExportedDataType: _to_exported_data_type,
                ReplyStatus: _to_reply_status,
            }
        )



























utils/helpers/ws_test_utils.py
def moscow_now() -> datetime:
    """Текущее время в часовом поясе Europe/Moscow."""
    return datetime.now(ZoneInfo(TestConst.ZONE_INFO))


























conftest.py
import asyncio





from test_config.datasets import get_config_by_name
from test_scenarios import lds_configurator_scenarios




        "imitator_start_time": None,  # datetime объект времени старта имитатора для расчёта интервалов утечек
        "use_lds_configurator": True,
        "resolved_tu_id": None,
        "tu_name": None,




SMOKE_SUITE_LEVEL_MAPPING = {
    'test_lds_configurator_setup': 'lds_configurator_setup_test',






LDS_STATUS_SUITE_LEVEL_MAPPING = {
    'test_lds_configurator_setup': 'lds_configurator_setup_test',







    # Для suite-level тестов берём из config
    if 'config' in params:
        if test_name == 'test_lds_configurator_setup':
            suite_config = params['config']
            if not suite_config.use_lds_configurator:
                return None
            if not suite_config.tu_name.strip():
                pytest.fail(
                    f"Набор '{suite_config.suite_name}': tu_name обязателен к заполнению в датасете набора при use_lds_configurator=True"
                )
            return suite_config.lds_configurator_setup_test




        test_data_name = item.get_closest_marker("test_data_name").args[0]
        # legacy: id из enum TU для имитатора (tn{id}_tags.txt), не resolved_tu_id из Администрирования











    yield  # pytest продолжит выполнение теста


def _run_lds_configurator_teardown_if_needed(cfg: dict) -> None:
    """
    Выполняет WS-teardown СОУ при смене набора или завершении сессии.

    Ошибки не прерывают pytest: логируются и оформляются как ALERT внутри сценария.
    Состояние group_state сбрасывается в finally.
    """
    if not cfg.get("use_lds_configurator") or not cfg.get("resolved_tu_id"):
        return

    tu_id = cfg["resolved_tu_id"]
    tu_name = cfg.get("tu_name") or ""

    async def _teardown() -> None:
        ws_host = get_ws_host()
        token = get_token()
        async with WebSocketClient(ws_host, token) as client:
            await lds_configurator_scenarios.lds_configurator_teardown(client, tu_id, tu_name)

    try:
        asyncio.run(_teardown())
    except Exception:
        logger.exception("[TEARDOWN] [ERROR] Ошибка LDS Configurator teardown для tuId=%s", tu_id)
    finally:
        cfg["resolved_tu_id"] = None
        cfg["use_lds_configurator"] = False
        cfg["tu_name"] = None






    if next_suite != cfg["current_suite"]:
        _run_lds_configurator_teardown_if_needed(cfg)




 
    """
    В завершении сессии — отправляем единый Allure‑отчёт в TestOps.
    """
    # 1) teardown стенда: LDS Configurator + остановка имитатора
    try:
        stand_manager = getattr(session.config, "group_state", {}).get("stand_manager")
        group_state = getattr(session.config, "group_state", {})
        _run_lds_configurator_teardown_if_needed(group_state)
        stand_manager = group_state.get("stand_manager")
        if stand_manager: