Загрузка данных
constants/enums.py
class TU(Enum):
"""Технологический участок. id - legacy для имитатора (tn{id}_tags.txt); при use_lds_configurator tu_id берётся из Администрирования."""
DONE = 1
class SouAdminStatus(BaseStrEnum):
"""Статус СОУ в разделе Администрирование (GetBasicInfoAdminResponse)."""
STOPPED = (1, 'СОУ выключена')
RUNNING = (2, 'СОУ включена')
def __new__(cls, value: int, report_text: str) -> "SouAdminStatus":
member = object.__new__(cls)
member._value_ = value
member.report_text = report_text
return member
@classmethod
def report_text_by_value(cls, status_value: int) -> str | None:
"""Текст статуса СОУ в Администрировании по числовому значению."""
try:
return cls(status_value).report_text
except ValueError:
return None
constants/test_constants.py
внизу
class LdsConfiguratorConstants:
"""Константы для setup/teardown через раздел Администрирование."""
GET_BASIC_INFO_ADMIN_RETRIES: int = 3
POLL_TIMEOUT_SECONDS: float = 120.0
POLL_INTERVAL_SECONDS: float = 15.0
MAIN_PAGE_SYNC_TIMEOUT_SECONDS: float = 30.0
GET_BASIC_INFO_ADMIN_REQUEST: str = "GetBasicInfoAdminRequest"
SUBSCRIBE_MAIN_PAGE_INFO_REQUEST: str = "subscribeMainPageInfoRequest"
MAIN_PAGE_INFO_CONTENT: str = "MainPageInfoContent"
STOP_LDS_REQUEST: str = "StopLdsRequest"
LAUNCH_LDS_REQUEST: str = "LaunchLdsRequest"
GET_TUS_INFORMATION_REQUEST: str = "GetTusInformationRequest"
models/get_basic_info_admin_model.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, List, Optional
from constants.enums import ReplyStatus
@dataclass
class AdminTuInfo:
"""ТУ из ответа GetBasicInfoAdminResponse."""
tuId: int
tuName: str
mnId: int
mnName: str
ostId: int
ostName: str
configurationVersion: int
status: int
@dataclass
class AdminBasicInfo:
tus: List[AdminTuInfo]
appVersion: str
appUpdatedAt: Any
@dataclass
class AdminBasicInfoContent:
basicInfo: AdminBasicInfo
@dataclass
class GetBasicInfoAdminReply:
replyStatus: ReplyStatus
replyContent: Optional[AdminBasicInfoContent] = None
replyErrors: Optional[object] = None
@dataclass
class GetBasicInfoAdminReplyMessage:
payload: GetBasicInfoAdminReply
models/get_basic_info_admin_model.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, List, Optional
from constants.enums import ReplyStatus
@dataclass
class AdminTuInfo:
"""ТУ из ответа GetBasicInfoAdminResponse."""
tuId: int
tuName: str
mnId: int
mnName: str
ostId: int
ostName: str
configurationVersion: int
status: int
@dataclass
class AdminBasicInfo:
tus: List[AdminTuInfo]
appVersion: str
appUpdatedAt: Any
@dataclass
class AdminBasicInfoContent:
basicInfo: AdminBasicInfo
@dataclass
class GetBasicInfoAdminReply:
replyStatus: ReplyStatus
replyContent: Optional[AdminBasicInfoContent] = None
replyErrors: Optional[object] = None
@dataclass
class GetBasicInfoAdminReplyMessage:
payload: GetBasicInfoAdminReply
models/get_tus_information_model.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, List, Optional
from constants.enums import ReplyStatus
@dataclass
class TuInformation:
tuId: int
launchedBy: Optional[str]
launchedAt: Optional[Any]
actualConfigurationExists: bool
@dataclass
class TusInformationContent:
tusInfo: List[TuInformation]
@dataclass
class GetTusInformationRequest:
tuIds: List[int]
additionalProperties: Optional[object] = None
@dataclass
class GetTusInformationReply:
replyStatus: ReplyStatus
replyContent: Optional[TusInformationContent] = None
replyErrors: Optional[object] = None
@dataclass
class GetTusInformationReplyMessage:
payload: GetTusInformationReply
models/launch_lds_model.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from constants.enums import ReplyStatus
@dataclass
class LaunchLdsRequest:
tuId: int
additionalProperties: Optional[object] = None
@dataclass
class LaunchLdsReply:
replyStatus: ReplyStatus
replyErrors: Optional[List[Dict[str, Any]]] = None
@dataclass
class LaunchLdsReplyMessage:
payload: LaunchLdsReply
models/stop_lds_model.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from constants.enums import ReplyStatus
@dataclass
class StopLdsRequest:
tuId: int
additionalProperties: Optional[object] = None
@dataclass
class StopLdsReply:
replyStatus: ReplyStatus
replyErrors: Optional[List[Dict[str, Any]]] = None
@dataclass
class StopLdsReplyMessage:
payload: StopLdsReply
в датасеты:
main_pipeline=MAIN_PIPELINE,
# ===== LDS Configurator =====
use_lds_configurator=True,
tu_name=TECHNOLOGICAL_UNIT.description,
lds_configurator_setup_test=CaseMarkers(test_case_id="", offset=0),
test_config/models_for_tests.py
# ===== Технологический участок =====
technological_unit: TU = TU.TIKHORETSK_NOVOROSSIYSK_3
# ===== LDS Configurator (Администрирование) =====
use_lds_configurator: bool = False
tu_name: str = ""
resolved_tu_id: Optional[int] = None
lds_configurator_setup_test: Optional[CaseMarkers] = None
# ===== Свойства для удобства =====
def __post_init__(self) -> None:
if not self.tu_name:
object.__setattr__(self, "tu_name", self.technological_unit.description)
@property
def tu_id(self) -> int:
"""ID технологического участка"""
if self.use_lds_configurator:
if self.resolved_tu_id is None:
raise RuntimeError(
"resolved_tu_id не установлен - выполнить lds_configurator_setup перед тестами сценария"
)
return self.resolved_tu_id
# legacy: захардкоженный id из enum TU (имитатор использует тот же id для tn{id}_tags.txt)
return self.technological_unit.id
@property
def infra_tu_id(self) -> int:
"""legacy: id ТУ для инфра-setup (имитатор, tags.txt, file_name)."""
return self.technological_unit.id
test_scenarios/__init__.py
import test_scenarios.smoke_scenarios as scenarios
from test_scenarios import lds_configurator_scenarios, lds_status_scenarios, rejected_scenarios
scenarios.export_mt_mode_report,
lds_configurator_scenarios.lds_configurator_setup,
lds_configurator_scenarios.lds_configurator_teardown,
test_scenarios/lds_configurator_scenarios.py
"""
Сценарии setup/teardown СОУ через раздел Администрирование (LDS Configurator).
"""
from __future__ import annotations
from typing import Any, Dict, Optional
import allure
from pytest import fail
from clients.websocket_client import WebSocketClient
from constants.enums import SouAdminStatus
from constants.test_constants import LdsConfiguratorConstants as LdsCfgConst
from test_config.models_for_tests import BaseSuiteConfig
from utils.helpers import lds_configurator_utils as lds_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.ws_message_parser import ws_message_parser as parser
def _save_group_state(group_state: Optional[Dict[str, Any]], cfg: BaseSuiteConfig, tu_id: int) -> None:
"""
Сохраняет resolved tu_id и флаги в group_state для teardown в conftest.
"""
if group_state is None:
return
group_state["use_lds_configurator"] = cfg.use_lds_configurator
group_state["resolved_tu_id"] = tu_id
group_state["tu_name"] = cfg.tu_name
async def lds_configurator_setup(
ws_client: WebSocketClient,
cfg: BaseSuiteConfig,
group_state: Optional[Dict[str, Any]] = None,
) -> None:
"""
Critical-stop сценарий: подготовка стенда и холодный запуск СОУ на конфигурации из Администрирования.
1. Получить tu_id по tu_name из Администрирования.
2. Сверить статус с ЭФ Состояние МТ (MainPageInfoContent).
3. При необходимости остановить уже запущенную СОУ.
4. Выполнить LaunchLdsRequest и дождаться готовности.
5. Подтвердить launchedAt после момента запуска.
"""
tu_id: int
sou_status: SouAdminStatus
is_on_main_page: bool
with allure.step(f"Шаг 1. Получение ТУ '{cfg.tu_name}' из Администрирования"):
admin_reply = await lds_utils.get_basic_info_admin_with_retry(ws_client, parser)
admin_tu = lds_utils.find_tu_by_name(admin_reply, cfg.tu_name)
lds_utils.validate_admin_tu(admin_tu)
sou_status = SouAdminStatus(admin_tu.status)
tu_id = admin_tu.tuId
cfg.resolved_tu_id = tu_id
_save_group_state(group_state, cfg, tu_id)
allure.attach(
f"tuId={tu_id}\n"
f"tuName={admin_tu.tuName!r}\n"
f"status={sou_status} ({SouAdminStatus.report_text_by_value(admin_tu.status)})",
name="Найденный ТУ",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Шаг 2. Сверка статуса СОУ между Администрированием и Состоянием МТ"):
is_on_main_page = await lds_utils.is_tu_present_on_main_page(ws_client, parser, tu_id)
lds_utils.check_sou_status_sync(sou_status, is_on_main_page, tu_id, cfg.tu_name)
launch_checkpoint = t_utils.moscow_now()
allure.attach(
t_utils.format_datetime_moscow(launch_checkpoint),
name="Момент фиксации времени перед холодным запуском",
attachment_type=allure.attachment_type.TEXT,
)
if sou_status == SouAdminStatus.RUNNING and is_on_main_page:
with allure.step("Шаг 3. Остановка СОУ перед перезапуском (уже была запущена)"):
await lds_utils.invoke_lds_command(ws_client, parser, LdsCfgConst.STOP_LDS_REQUEST, tu_id)
with allure.step("Проверка: СОУ выключена в Администрировании"):
if not await lds_utils.poll_admin_tu_status(ws_client, parser, tu_id, SouAdminStatus.STOPPED):
fail(
"Не удалось перезапустить СОУ: статус в Администрировании не стал 'выключена' за 2 минуты",
pytrace=False,
)
with allure.step("Проверка: ТУ исчезла из Состояния МТ"):
if not await lds_utils.poll_main_page_tu_presence(ws_client, tu_id, expect_present=False):
fail(
"Не удалось перезапустить СОУ: ТУ не исчезла из Состояния МТ за 2 минуты",
pytrace=False,
)
with allure.step("Шаг 4. Холодный запуск СОУ (LaunchLdsRequest)"):
await lds_utils.invoke_lds_command(ws_client, parser, LdsCfgConst.LAUNCH_LDS_REQUEST, tu_id)
with allure.step("Шаг 5. Ожидание включения СОУ в Администрировании"):
with allure.step("Проверка: статус стал 'включена'"):
if not await lds_utils.poll_admin_tu_status(ws_client, parser, tu_id, SouAdminStatus.RUNNING):
fail(
"Не удалось запустить СОУ: статус в Администрировании не стал 'включена' за 2 минуты",
pytrace=False,
)
with allure.step("Шаг 6. Ожидание появления ТУ в Состоянии МТ"):
with allure.step("Проверка: ТУ отображается в Состоянии МТ"):
if not await lds_utils.poll_main_page_tu_presence(ws_client, tu_id, expect_present=True):
fail(
"Не удалось запустить СОУ: ТУ не появилась в Состоянии МТ за 2 минуты",
pytrace=False,
)
with allure.step("Шаг 7. Подтверждение времени запуска (GetTusInformation)"):
await lds_utils.verify_launched_at(ws_client, parser, tu_id, launch_checkpoint)
async def lds_configurator_teardown(
ws_client: WebSocketClient,
tu_id: int,
tu_name: str,
) -> None:
"""
Teardown набора: остановка СОУ после прогона.
Некритичные отклонения оформляются как Allure ALERT без падения прогона.
Ошибки ws при StopLds логируются и не прерывают pytest session.
"""
with allure.step(f"Teardown. Проверка статуса СОУ (tuId={tu_id}, «{tu_name}»)"):
try:
admin_reply = await lds_utils.get_basic_info_admin(ws_client, parser)
except Exception as error:
lds_utils.attach_allure_alert(
f"Не удалось получить статус СОУ при teardown: {error}. "
f"tuId={tu_id}, tuName={tu_name!r}"
)
return
sou_status = lds_utils.get_admin_tu_status(admin_reply, tu_id)
if sou_status != SouAdminStatus.RUNNING:
lds_utils.attach_allure_alert(
f"СОУ не в статусе 'включена' при teardown (status={sou_status}), остановка пропущена. "
f"tuId={tu_id}, tuName='{tu_name}'"
)
return
with allure.step("Teardown. Остановка СОУ (StopLdsRequest)"):
try:
await lds_utils.invoke_lds_command(ws_client, parser, LdsCfgConst.STOP_LDS_REQUEST, tu_id)
except Exception as error:
lds_utils.attach_allure_alert(
f"Ошибка при StopLdsRequest: {error}. tuId={tu_id}, tuName={tu_name!r}"
)
return
with allure.step("Teardown. Ожидание выключения СОУ в Администрировании"):
if not await lds_utils.poll_admin_tu_status(ws_client, parser, tu_id, SouAdminStatus.STOPPED):
lds_utils.attach_allure_alert(
f"СОУ не выключилась за 2 минуты после StopLdsRequest. "
f"tuId={tu_id}, tuName={tu_name!r}. Проверить вручную."
)
tests/test_lds_status_regress.py
from test_config.models_for_tests import CaseMarkers, LDSStatusConfig
from test_scenarios import lds_configurator_scenarios
@pytest.mark.asyncio
@pytest.mark.critical_stop
async def test_lds_configurator_setup(
self, ws_client: WebSocketClient, config: LDSStatusConfig, request: pytest.FixtureRequest
) -> None:
"""[LdsConfigurator] Настройка и холодный запуск СОУ через Администрирование"""
tag = "LdsConfigurator"
title = f"[{tag}] Настройка и запуск СОУ. ЭФ: Администрирование"
_apply_allure_markers(config.lds_configurator_setup_test, tag, title)
await lds_configurator_scenarios.lds_configurator_setup(
ws_client, config, request.config.group_state
)
@pytest.mark.asyncio
async def test_lds_status_basic_info(self, ws_client: WebSocketClient, config: LDSStatusConfig) -> None:
tests/test_smoke.py
from test_config.models_for_tests import CaseMarkers, LeakTestConfig, SmokeSuiteConfig
from test_scenarios import lds_configurator_scenarios
@pytest.mark.asyncio
@pytest.mark.critical_stop
async def test_lds_configurator_setup(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, request: pytest.FixtureRequest
) -> None:
"""[LdsConfigurator] Настройка и холодный запуск СОУ через Администрирование"""
tag = "LdsConfigurator"
title = f"[{tag}] Настройка и холодный запуск СОУ. ЭФ: Администрирование"
_apply_allure_markers(config.lds_configurator_setup_test, tag, title)
await lds_configurator_scenarios.lds_configurator_setup(
ws_client, config, request.config.group_state
)
@pytest.mark.asyncio
async def test_basic_info(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
utils/helpers/lds_configurator_utils.py
"""
Вспомогательные функции setup/teardown СОУ через раздел Администрирование.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import Any, Optional
import allure
from pytest import fail
from clients.websocket_client import WebSocketClient
from constants.enums import ReplyStatus, SouAdminStatus
from constants.test_constants import LdsConfiguratorConstants as LdsCfgConst
from models.get_basic_info_admin_model import AdminTuInfo, GetBasicInfoAdminReply
from models.get_tus_information_model import GetTusInformationReply
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.ws_message_parser import WsMessageParser
from utils.msgpack_utils.message_filters import is_desired_type
logger = logging.getLogger(__name__)
def attach_allure_alert(message: str) -> None:
"""
Публикует предупреждение в Allure и лог без падения прогона.
Используется в teardown при некритичных отклонениях.
"""
allure.attach(message, name="ALERT", attachment_type=allure.attachment_type.TEXT)
logger.warning("[LDS_CONFIGURATOR] %s", message)
async def get_basic_info_admin(ws_client: WebSocketClient, parser: WsMessageParser) -> GetBasicInfoAdminReply:
"""
Выполняет GetBasicInfoAdminRequest и парсит ответ.
"""
payload = await t_utils.connect_and_get_msg(ws_client, LdsCfgConst.GET_BASIC_INFO_ADMIN_REQUEST, [])
return parser.parse_get_basic_info_admin_msg(payload)
async def get_basic_info_admin_with_retry(
ws_client: WebSocketClient,
parser: WsMessageParser,
retries: int = LdsCfgConst.GET_BASIC_INFO_ADMIN_RETRIES,
) -> GetBasicInfoAdminReply:
"""
Запрашивает GetBasicInfoAdminResponse с повторными попытками.
"""
last_error: Optional[BaseException] = None
for attempt in range(1, retries + 1):
with allure.step(f"Запрос списка ТУ в Администрировании - попытка {attempt} из {retries}"):
try:
return await get_basic_info_admin(ws_client, parser)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError, RuntimeError) as error:
last_error = error
if attempt < retries:
await asyncio.sleep(1)
with allure.step("Проверка: GetBasicInfoAdminResponse получен"):
fail(
f"Не удалось получить GetBasicInfoAdminResponse за {retries} попыток: {last_error}",
pytrace=False,
)
def find_tu_by_name(admin_reply: GetBasicInfoAdminReply, tu_name: str) -> AdminTuInfo:
"""
Ищет ТУ по точному совпадению tuName в ответе Администрирования.
"""
with allure.step(f"Поиск ТУ '{tu_name}' по точному совпадению tuName в GetBasicInfoAdminResponse"):
tus = admin_reply.replyContent.basicInfo.tus if admin_reply.replyContent else None
with allure.step("Проверка: в ответе есть список ТУ"):
if not tus:
fail(
f"GetBasicInfoAdminResponse не содержит списка ТУ (ожидался tuName='{tu_name}')",
pytrace=False,
)
for tu in tus:
if tu.tuName == tu_name:
return tu
available = [tu.tuName for tu in tus]
with allure.step("Проверка: ТУ из набора данных найден в Администрировании"):
fail(
f"ТУ '{tu_name}' не найден в GetBasicInfoAdminResponse. Доступные tu_name: {available}",
pytrace=False,
)
def validate_admin_tu(tu: AdminTuInfo) -> None:
"""
Проверяет обязательные поля AdminTuInfo и допустимость статуса СОУ.
"""
with allure.step(f"Валидация параметров ТУ '{tu.tuName}' (tuId={tu.tuId})"):
with allure.step("Проверка: tuId и mnId заполнены"):
if not tu.tuId:
fail(f"Некорректный tuId для ТУ '{tu.tuName}': {tu.tuId}", pytrace=False)
if not tu.mnId:
fail(f"Некорректный mnId для ТУ '{tu.tuName}': {tu.mnId}", pytrace=False)
with allure.step("Проверка: статус СОУ известен Администрированию"):
try:
SouAdminStatus(tu.status)
except ValueError:
fail(f"Неизвестный статус СОУ для ТУ '{tu.tuName}': {tu.status}", pytrace=False)
def _tu_id_in_main_page_message(msg: Any, tu_id: int) -> bool:
"""True, если WS-сообщение MainPageInfoContent содержит указанный tuId."""
if not isinstance(msg, list) or not is_desired_type(msg, LdsCfgConst.MAIN_PAGE_INFO_CONTENT):
return False
for item in msg:
if isinstance(item, dict) and item.get("replyContent", {}).get("tuId") == tu_id:
return True
if isinstance(item, list):
for elem in item:
if isinstance(elem, dict) and elem.get("replyContent", {}).get("tuId") == tu_id:
return True
return False
def _drain_recv_queue(ws_client: WebSocketClient) -> list[Any]:
"""Забирает все сообщения из очереди WS без блокирующего ожидания."""
messages: list[Any] = []
while not ws_client.recv_queue.empty():
try:
messages.append(ws_client.recv_queue.get_nowait())
except asyncio.QueueEmpty:
break
return messages
async def is_tu_present_on_main_page(
ws_client: WebSocketClient,
parser: WsMessageParser,
tu_id: int,
timeout: float = LdsCfgConst.MAIN_PAGE_SYNC_TIMEOUT_SECONDS,
) -> bool:
"""
Подписывается на MainPageInfoContent и определяет, отображается ли ТУ в Состоянии МТ.
"""
with allure.step(f"Подписка на Состояние МТ (MainPageInfoContent) для tuId={tu_id}"):
ws_client.clear_queue()
await t_utils.connect(
ws_client,
LdsCfgConst.SUBSCRIBE_MAIN_PAGE_INFO_REQUEST,
{"tuIds": [tu_id], "additionalProperties": None},
)
deadline = asyncio.get_running_loop().time() + timeout
while asyncio.get_running_loop().time() < deadline:
remaining = deadline - asyncio.get_running_loop().time()
if remaining <= 0:
break
try:
payload = await ws_client.receive_by_type(LdsCfgConst.MAIN_PAGE_INFO_CONTENT, timeout=remaining)
except asyncio.TimeoutError:
break
parsed = parser.parse_main_page_msg(payload)
if parsed.replyContent and parsed.replyContent.tuId == tu_id:
return True
return False
def check_sou_status_sync(
sou_status: SouAdminStatus,
is_on_main_page: bool,
tu_id: int,
tu_name: str,
) -> None:
"""
Сверяет статус СОУ в Администрировании и на ЭФ Состояние МТ.
"""
with allure.step(
f"Сверка статуса СОУ: ЭФ Администрирование vs ЭФ Состояние МТ (tuId={tu_id}, «{tu_name}»)"
):
expected_on_page = sou_status == SouAdminStatus.RUNNING
with allure.step("Проверка: статусы Администрирования и Состояния МТ согласованы"):
if is_on_main_page == expected_on_page:
return
admin_text = SouAdminStatus.report_text_by_value(int(sou_status))
page_text = "СОУ запущена" if is_on_main_page else "СОУ не запущена"
fail(
f"Рассинхронизация для ТУ '{tu_name}' (tuId={tu_id}): "
f"Администрирование - {admin_text} ({sou_status.value}); "
f"Главная страница - {page_text}. "
f"Статусы в разных подписках не совпадают.",
pytrace=False,
)
async def invoke_lds_command(
ws_client: WebSocketClient,
parser: WsMessageParser,
request_name: str,
tu_id: int,
) -> None:
"""
Отправляет StopLdsRequest или LaunchLdsRequest и ждёт Completion с replyStatus=200.
"""
with allure.step(f"Команда {request_name} для tuId={tu_id}"):
await t_utils.connect(ws_client, request_name, {"tuId": tu_id})
invocation_id = ws_client.invocation_id
payload = await ws_client.receive_by_invocation_id(invocation_id)
if request_name == LdsCfgConst.STOP_LDS_REQUEST:
reply = parser.parse_stop_lds_msg(payload)
else:
reply = parser.parse_launch_lds_msg(payload)
with allure.step(f"Проверка: {request_name} завершился успешно (replyStatus=200)"):
if reply.replyStatus != ReplyStatus.OK:
fail(
f"{request_name} завершился с replyStatus={reply.replyStatus}, "
f"ошибки: {reply.replyErrors}",
pytrace=False,
)
async def poll_admin_tu_status(
ws_client: WebSocketClient,
parser: WsMessageParser,
tu_id: int,
expected_status: SouAdminStatus,
total_wait_seconds: float = LdsCfgConst.POLL_TIMEOUT_SECONDS,
poll_interval_seconds: float = LdsCfgConst.POLL_INTERVAL_SECONDS,
) -> bool:
"""
Long-poll GetBasicInfoAdmin до смены статуса ТУ в Администрировании.
"""
status_label = SouAdminStatus.report_text_by_value(int(expected_status))
with allure.step(
f"Ожидание статуса '{status_label}' в Администрировании "
f"(tuId={tu_id}, таймаут {int(total_wait_seconds)} с)"
):
deadline = asyncio.get_running_loop().time() + total_wait_seconds
while asyncio.get_running_loop().time() < deadline:
admin_reply = await get_basic_info_admin(ws_client, parser)
tus = admin_reply.replyContent.basicInfo.tus if admin_reply.replyContent else []
tu = next((item for item in tus if item.tuId == tu_id), None)
if tu and tu.status == int(expected_status):
return True
await asyncio.sleep(poll_interval_seconds)
return False
async def poll_main_page_tu_presence(
ws_client: WebSocketClient,
tu_id: int,
expect_present: bool,
total_wait_seconds: float = LdsCfgConst.POLL_TIMEOUT_SECONDS,
poll_interval_seconds: float = LdsCfgConst.POLL_INTERVAL_SECONDS,
) -> bool:
"""
Long-poll MainPageInfoContent: ожидание появления или исчезновения ТУ в Состоянии МТ.
"""
action = "появления" if expect_present else "исчезновения"
with allure.step(
f"Ожидание {action} ТУ в Состоянии МТ "
f"(tuId={tu_id}, таймаут {int(total_wait_seconds)} с)"
):
ws_client.clear_queue()
await t_utils.connect(
ws_client,
LdsCfgConst.SUBSCRIBE_MAIN_PAGE_INFO_REQUEST,
{"tuIds": [tu_id], "additionalProperties": None},
)
deadline = asyncio.get_running_loop().time() + total_wait_seconds
while asyncio.get_running_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
found = any(_tu_id_in_main_page_message(msg, tu_id) for msg in batch)
if expect_present and found:
return True
if not expect_present and not found:
return True
t_utils._attach_ws_poll_failure(
[],
total_wait_seconds,
f"{LdsCfgConst.MAIN_PAGE_INFO_CONTENT} tuId={tu_id} present={expect_present}",
)
return False
async def verify_launched_at(
ws_client: WebSocketClient,
parser: WsMessageParser,
tu_id: int,
launch_checkpoint: datetime,
) -> None:
"""
Проверяет, что launchedAt в GetTusInformation позже момента холодного запуска.
"""
with allure.step(f"Запрос GetTusInformation для tuId={tu_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
LdsCfgConst.GET_TUS_INFORMATION_REQUEST,
{"tuIds": [tu_id]},
)
reply: GetTusInformationReply = parser.parse_get_tus_information_msg(payload)
tus_info = reply.replyContent.tusInfo if reply.replyContent else []
tu_info = next((item for item in tus_info if item.tuId == tu_id), None)
with allure.step("Проверка: в ответе есть информация о запуске ТУ"):
if tu_info is None:
fail(f"GetTusInformationResponse не содержит tuId={tu_id}", pytrace=False)
launched_at = parser.timestamp_to_datetime(tu_info.launchedAt)
with allure.step("Проверка: поле launchedAt заполнено"):
if launched_at is None:
fail(f"GetTusInformationResponse: launchedAt отсутствует для tuId={tu_id}", pytrace=False)
launched_at_msk = t_utils.localize_as_moscow(launched_at)
checkpoint_msk = t_utils.localize_as_moscow(launch_checkpoint)
allure.attach(
f"launchedAt: {t_utils.format_datetime_moscow(launched_at_msk)}\n"
f"checkpoint: {t_utils.format_datetime_moscow(checkpoint_msk)}",
name="Сравнение времени запуска СОУ",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Проверка: launchedAt позже момента команды 'Запустить СОУ'"):
if launched_at_msk <= checkpoint_msk:
fail(
f"Время запуска СОУ на бэкенде ({t_utils.format_datetime_moscow(launched_at_msk)}) "
f"не позже момента команды 'Запустить СОУ' "
f"({t_utils.format_datetime_moscow(checkpoint_msk)})",
pytrace=False,
)
def get_admin_tu_status(admin_reply: GetBasicInfoAdminReply, tu_id: int) -> Optional[SouAdminStatus]:
"""
Возвращает статус СОУ из GetBasicInfoAdmin для указанного tuId.
"""
tus = admin_reply.replyContent.basicInfo.tus if admin_reply.replyContent else []
tu = next((item for item in tus if item.tuId == tu_id), None)
if tu is None:
return None
try:
return SouAdminStatus(tu.status)
except ValueError:
return None
utils/helpers/ws_message_parser.py
return self._find_and_parse_message(data_class=DownloadExportedDataReply, data=data)
def parse_get_basic_info_admin_msg(self, data: list) -> GetBasicInfoAdminReply:
"""Парсит ответ GetBasicInfoAdminRequest."""
return self._find_and_parse_message(data_class=GetBasicInfoAdminReply, data=data)
def parse_launch_lds_msg(self, data: list) -> LaunchLdsReply:
"""Парсит Completion-ответ LaunchLdsRequest."""
return self._find_and_parse_message(data_class=LaunchLdsReply, data=data)
def parse_stop_lds_msg(self, data: list) -> StopLdsReply:
"""Парсит Completion-ответ StopLdsRequest."""
return self._find_and_parse_message(data_class=StopLdsReply, data=data)
def parse_get_tus_information_msg(self, data: list) -> GetTusInformationReply:
"""Парсит ответ GetTusInformationRequest."""
return self._find_and_parse_message(data_class=GetTusInformationReply, data=data)
return value if isinstance(value, ExportedDataType) else ExportedDataType(value)
def _to_reply_status(value: Any) -> ReplyStatus:
return value if isinstance(value, ReplyStatus) else ReplyStatus(value)
return Config(
type_hooks={
UUID: self.convert_to_uuid,
datetime: self.timestamp_to_datetime,
ExportStatus: _to_export_status,
ExportedDataType: _to_exported_data_type,
ReplyStatus: _to_reply_status,
}
)
utils/helpers/ws_test_utils.py
def moscow_now() -> datetime:
"""Текущее время в часовом поясе Europe/Moscow."""
return datetime.now(ZoneInfo(TestConst.ZONE_INFO))
conftest.py
import asyncio
from test_config.datasets import get_config_by_name
from test_scenarios import lds_configurator_scenarios
"imitator_start_time": None, # datetime объект времени старта имитатора для расчёта интервалов утечек
"use_lds_configurator": True,
"resolved_tu_id": None,
"tu_name": None,
SMOKE_SUITE_LEVEL_MAPPING = {
'test_lds_configurator_setup': 'lds_configurator_setup_test',
LDS_STATUS_SUITE_LEVEL_MAPPING = {
'test_lds_configurator_setup': 'lds_configurator_setup_test',
# Для suite-level тестов берём из config
if 'config' in params:
if test_name == 'test_lds_configurator_setup':
suite_config = params['config']
if not suite_config.use_lds_configurator:
return None
if not suite_config.tu_name.strip():
pytest.fail(
f"Набор '{suite_config.suite_name}': tu_name обязателен к заполнению в датасете набора при use_lds_configurator=True"
)
return suite_config.lds_configurator_setup_test
test_data_name = item.get_closest_marker("test_data_name").args[0]
# legacy: id из enum TU для имитатора (tn{id}_tags.txt), не resolved_tu_id из Администрирования
yield # pytest продолжит выполнение теста
def _run_lds_configurator_teardown_if_needed(cfg: dict) -> None:
"""
Выполняет WS-teardown СОУ при смене набора или завершении сессии.
Ошибки не прерывают pytest: логируются и оформляются как ALERT внутри сценария.
Состояние group_state сбрасывается в finally.
"""
if not cfg.get("use_lds_configurator") or not cfg.get("resolved_tu_id"):
return
tu_id = cfg["resolved_tu_id"]
tu_name = cfg.get("tu_name") or ""
async def _teardown() -> None:
ws_host = get_ws_host()
token = get_token()
async with WebSocketClient(ws_host, token) as client:
await lds_configurator_scenarios.lds_configurator_teardown(client, tu_id, tu_name)
try:
asyncio.run(_teardown())
except Exception:
logger.exception("[TEARDOWN] [ERROR] Ошибка LDS Configurator teardown для tuId=%s", tu_id)
finally:
cfg["resolved_tu_id"] = None
cfg["use_lds_configurator"] = False
cfg["tu_name"] = None
if next_suite != cfg["current_suite"]:
_run_lds_configurator_teardown_if_needed(cfg)
"""
В завершении сессии — отправляем единый Allure‑отчёт в TestOps.
"""
# 1) teardown стенда: LDS Configurator + остановка имитатора
try:
stand_manager = getattr(session.config, "group_state", {}).get("stand_manager")
group_state = getattr(session.config, "group_state", {})
_run_lds_configurator_teardown_if_needed(group_state)
stand_manager = group_state.get("stand_manager")
if stand_manager: