Загрузка данных
utils/helpers/asserts.py
from __future__ import annotations
import traceback
from typing import Any, List, Optional, TypeVar
import allure
from assertpy import assert_that
from pytest import fail
ObjectType = TypeVar("ObjectType")
class SoftAssertions:
"""
Контекстный менеджер для "мягких" сравнений.
Внутри теста используется так:
with SoftAssertions() as soft:
StepCheck(..., failures=soft).actual(...).expected(...).equal_to()
По выходу, если были ошибки — они прикрепляются к Allure и поднимается Aggregated AssertionError.
"""
def __init__(self) -> None:
self.failures: List[str] = []
def __enter__(self) -> List[str]:
return self.failures
def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
# Если внутри контекста появились внешние исключения (не связанные с проверками) — не подавляем их.
if exc_type is not None:
return False
if not self.failures:
return False
# Прикрепляем все собранные failure-traceback'и к Allure, чтобы их было удобно смотреть
joined = "\n\n".join(self.failures)
allure.attach(joined, name="soft assertion failures", attachment_type=allure.attachment_type.TEXT)
# Поднимаем итоговую ошибку, чтобы CI увидел падение теста
raise AssertionError("Soft assertion failures:\n\n" + joined)
class StepMessageBuilder:
"""
Составляет разные сообщения для allure.step под конкретный вид assert
"""
def __init__(self, check_step: str, field_name: str) -> None:
self.check_step = check_step
self.field_name = field_name
def _build_message(self, message_parts) -> str:
"""
Собирает сообщение из списка с нужным разделителем
"""
return "\n".join([self.check_step] + message_parts)
@staticmethod
def _format_val(val: Any) -> str:
"""
Вспомогательная функция для аккуратного форматирования значения в сообщении.
Она пытается использовать repr(), но на случай исключения возвращает str().
"""
try:
return str(val)
except TypeError:
return repr(val)
@staticmethod
def _item_count(val: Any) -> int:
"""
Считает количество элементов, если возможно
"""
return len(val) if hasattr(val, "__len__") else 0
def equal_to_msg(
self,
exp_value: Any,
act_value: Any,
) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_not_equal_to_msg(
self,
exp_value: Any,
act_value: Any,
) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)} не равно фактическому",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_not_none_msg(self, act_value: Any) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} не пустое",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_not_empty_msg(self, act_value: Any) -> str:
item_count = self._item_count(act_value)
message_parts = [
f"Ожидаемый результат: {self.field_name} не пустое",
f"Фактический результат: количество элементов в {self.field_name} = {item_count}",
]
return self._build_message(message_parts)
def is_empty_msg(self, act_value: Any) -> str:
item_count = self._item_count(act_value)
message_parts = [
f"Ожидаемый результат: {self.field_name} пустое",
f"Фактический результат: количество элементов в {self.field_name} = {item_count}",
]
return self._build_message(message_parts)
def is_close_to_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_less_than_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: Значение в поле {self.field_name} < {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_greater_than_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: Значение в поле {self.field_name} > {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_greater_than_or_equal_to_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: Значение в поле {self.field_name} >= {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_between_msg(self, act_value: Any, lower_bound: Any, upper_bound: Any) -> str:
message_parts = [
f"Ожидаемый результат: "
f"Значение в поле {self.field_name} должно быть в диапазоне [{lower_bound}, {upper_bound}]",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def does_not_contain_msg(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> str:
message_parts = [
f"Ожидаемый результат: Список элементов: {objects_list}",
f"Не содержит элемента: {forbidden_object}",
]
return self._build_message(message_parts)
def contains_msg(self, container: Any, expected_item: Any) -> str:
if isinstance(container, str):
message_parts = [
f"Ожидаемый результат: '{container}' содержит подстроку '{expected_item}'",
f"Фактический результат: {self.field_name} = {self._format_val(container)}",
]
else:
message_parts = [
f"Ожидаемый результат: список {self._format_val(container)} содержит элемент {expected_item}",
f"Фактический результат: {self.field_name} = {self._format_val(container)}",
]
return self._build_message(message_parts)
class StepCheck:
"""
Обёртка для проверки
Внутри всегда формируется единое сообщение и открывается allure.step.
"""
def __init__(self, check_step: str, field_name: str, failures: Optional[List[str]] = None):
# Сохраняем название проверки
self.check_step = check_step
self._field_name = field_name
# Поля для хранения expected/actual/extra перед вызовом метода-проверки
self._expected: Optional[Any] = None
self._actual: Optional[Any] = None
self._extra_info: Optional[str] = None
self._msg_builder = StepMessageBuilder(check_step, field_name)
# Хранение фейлов
self._failures = failures
def expected(self, value: Any) -> StepCheck:
"""Задаём ожидаемое значение и возвращаем self для цепочки вызовов"""
self._expected = value
return self
def actual(self, value: Any) -> StepCheck:
"""Задаём фактическое значение и возвращаем self"""
self._actual = value
return self
def extra(self, text: str) -> StepCheck:
"""Задаём дополнительный текст"""
self._extra_info = text
return self
def _handle_assertion(self, exc: AssertionError) -> None:
"""
Сохраняет traceback в список failures или перебрасывает дальше, если list не передан
"""
if self._failures is not None:
self._failures.append(traceback.format_exc())
else:
raise exc
def equal_to(self, expected: Optional[Any] = None) -> None:
"""
Выполняет проверку is_equal_to. Можно передать expected в метод или задать раньше через .expected(...)
"""
# Если expected пришёл в аргументе — сохраняем его
if expected is not None:
self._expected = expected
# Проверяем, что actual задан
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове equal_to()")
msg = self._msg_builder.equal_to_msg(self._expected, self._actual)
try:
with allure.step(msg):
# Бросаем AssertionError в момент выполнения шага, чтобы Allure увидел failed-step
assert_that(self._actual).described_as(msg).is_equal_to(self._expected)
except AssertionError as exc:
# Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
self._handle_assertion(exc)
def is_not_equal_to(self, expected: Optional[Any] = None) -> None:
"""
Выполняет проверку is_not_equal_to. Можно передать expected в метод или задать раньше через .expected(...)
"""
# Если expected пришёл в аргументе — сохраняем его
if expected is not None:
self._expected = expected
# Проверяем, что actual задан
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_not_equal_to()")
msg = self._msg_builder.is_not_equal_to_msg(self._expected, self._actual)
try:
with allure.step(msg):
# Бросаем AssertionError в момент выполнения шага, чтобы Allure увидел failed-step
assert_that(self._actual).described_as(msg).is_not_equal_to(self._expected)
except AssertionError as exc:
# Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
self._handle_assertion(exc)
def is_not_none(self) -> None:
"""Проверка на существование поля"""
msg = self._msg_builder.is_not_none_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_not_none()
except AssertionError as exc:
# Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
self._handle_assertion(exc)
def is_not_empty(self) -> None:
"""Проверка на не пустое значение"""
if self._actual is None:
fail("Фактический результат должен быть заполнен при вызове is_not_empty()")
msg = self._msg_builder.is_not_empty_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_not_empty()
except AssertionError as exc:
self._handle_assertion(exc)
def is_empty(self) -> None:
"""Проверка на пустое значение"""
if self._actual is None:
fail("Фактический результат должен быть заполнен при вызове is_not_empty()")
msg = self._msg_builder.is_empty_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_empty()
except AssertionError as exc:
self._handle_assertion(exc)
def is_close_to(self, expected: Any, allowed_diff: int | float, extra_info: Any) -> None:
"""
Проверка допуска
"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_close_to()")
self._expected = expected
msg = self._msg_builder.is_close_to_msg(expected, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_close_to(expected, allowed_diff)
except AssertionError as exc:
self._handle_assertion(exc)
def is_less_than(self, threshold: Any, extra_info: Any = None) -> None:
"""Проверка, что значение меньше порога"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_less_than()")
msg = self._msg_builder.is_less_than_msg(self.check_step, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_less_than(threshold)
except AssertionError as exc:
self._handle_assertion(exc)
def is_greater_than(self, threshold: Any, extra_info: Any = None) -> None:
"""Проверка, что значение строго больше порога"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_greater_than()")
msg = self._msg_builder.is_greater_than_msg(threshold, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_greater_than(threshold)
except AssertionError as exc:
self._handle_assertion(exc)
def is_between(self, lower_bound: Any, upper_bound: Any) -> None:
"""Проверка, что значение в пределах установленных границ"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_between()")
msg = self._msg_builder.is_between_msg(self._actual, lower_bound, upper_bound)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_between(lower_bound, upper_bound)
except AssertionError as exc:
self._handle_assertion(exc)
def is_greater_than_or_equal_to(self, threshold: Any, extra_info: Any = None) -> None:
"""Проверка, что значение больше или равно порогу"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_greater_than_or_equal_to_msg()")
msg = self._msg_builder.is_greater_than_or_equal_to_msg(threshold, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_greater_than_or_equal_to(threshold)
except AssertionError as exc:
self._handle_assertion(exc)
def does_not_contain(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> None:
"""
Выполняет проверку does_not_contain.
"""
msg = self._msg_builder.does_not_contain_msg(objects_list, forbidden_object)
try:
with allure.step(msg):
assert_that(objects_list).described_as(msg).does_not_contain(forbidden_object)
except AssertionError as exc:
self._handle_assertion(exc)
def contains(self, container: Any, expected_item: Any) -> None:
"""Проверка, что container (список или строка) содержит expected_item."""
msg = self._msg_builder.contains_msg(container, expected_item)
try:
with allure.step(msg):
assert_that(container).described_as(msg).contains_msg(expected_item)
except AssertionError as exc:
self._handle_assertion(exc)
clients/websocket_client.py
import asyncio
import logging
import time
from datetime import datetime
from typing import Any, Callable, List, Optional
from zoneinfo import ZoneInfo
import msgpack
import websockets
from allure import attach, attachment_type
from constants.architecture_constants import WebSocketClientConstants as WS_Const
from utils.msgpack_utils.message_filters import is_desired_invocation_id, is_desired_type
from utils.msgpack_utils.msgpack_utils import encode_with_varint_prefix, parse_message
logger = logging.getLogger(__name__)
class WebSocketClient:
"""
Асинхронный ws-клиент для api-gateway по протоколу Async Api
"""
def __init__(
self,
host: str,
access_token: str,
reconnect_interval: float = WS_Const.DEFAULT_RECONNECT_INTERVAL,
):
self._host = host
self._access_token = access_token
self._reconnect_interval = reconnect_interval
self._ws_url = f"wss://{host.rstrip('/')}{WS_Const.WS_HUBS}"
self._buffer = b""
self._next_id = WS_Const.START_INVOCATION_ID
self._ws: websockets.ClientConnection | None = None
self.recv_queue: asyncio.Queue[Any] = asyncio.Queue()
self._recv_task: asyncio.Task | None = None
self._stop_event = asyncio.Event()
self._invocation_id: Optional[str] = None
self.suppress_recv_logging: bool = False
@property
def invocation_id(self):
return self._invocation_id
def clear_queue(self):
"""
Очищает очередь путем пересоздания экземпляра класса очереди
"""
# TODO разобраться почему не выполняется очистка очереди сообщений LDS-8599
while not self.recv_queue.empty():
try:
self.recv_queue.get_nowait()
self.recv_queue.task_done()
except asyncio.QueueEmpty as message_empty:
logger.info(f"Очередь сообщений очищена: {message_empty}")
break
async def __aenter__(self):
await self._connect_loop()
return self
async def __aexit__(self, exc_type, exc, tb):
self._stop_event.set()
if self._ws:
await self._ws.close()
if self._recv_task:
await self._recv_task
# TODO: дергать ручку завершения сессии в LDS-4083
async def _handshake(self) -> None:
payload = WS_Const.HANDSHAKE_MESSAGE + WS_Const.RS.decode()
logger.debug(
f"Отправлен handshake: {payload}",
)
await self._ws.send(payload)
buf = self._buffer
finish = time.monotonic() + WS_Const.HANDSHAKE_WAITING
while time.monotonic() < finish:
chunk = await self._ws.recv()
logger.debug(f"Ответ на handshake: {chunk}")
buf += chunk.encode() if isinstance(chunk, str) else chunk
if WS_Const.RS in buf:
return
raise TimeoutError("Handshake timeout: не получили сообщение с разделителем RS за указанное время")
async def _connect_loop(self) -> None:
"""
Цикл подключения с повторными попытками до наступления stop_event.
"""
while not self._stop_event.is_set():
try:
self.ws_request = f"{self._ws_url}/?access_token={self._access_token}"
logger.info(f"Попытка подключения по wss: {self._ws_url}/?access_token=...")
self._ws = await websockets.connect(
self.ws_request,
ping_interval=WS_Const.PING_INTERVAL,
ping_timeout=WS_Const.PING_TIMEOUT,
close_timeout=WS_Const.CLOSE_TIMEOUT,
)
# Handshake
await self._handshake()
# Запускаем приём в фоне
self._recv_task = asyncio.create_task(self._recv_loop())
logger.info("Websocket connected")
return
except ConnectionError:
logger.exception(
f"Websocket подключение не установлено, повтор подключения через: {self._reconnect_interval}"
)
await asyncio.sleep(self._reconnect_interval)
async def _recv_loop(self) -> None:
"""
Прием сообщений, парсинг и отправка в очередь.
"""
assert self._ws is not None
while not self._stop_event.is_set():
try:
chunk = await self._ws.recv()
if not self.suppress_recv_logging:
logger.debug(f"Сырые биты до обработки: {chunk[:100]}")
except websockets.ConnectionClosed as e:
logger.warning(f"WebSocket соединение разорвано: {e}")
return
result_message = parse_message(chunk)
str_message = str(result_message)
if not self.suppress_recv_logging:
logger.info(f"Обработанное сообщение от api-gateway: {str_message[:500]} полное сообщение в attach")
attach(
str_message,
name=f"Распакованное сообщение от api-gateway {datetime.now(ZoneInfo(WS_Const.ZONE_INFO))}",
attachment_type=attachment_type.TEXT,
)
await self.recv_queue.put(result_message)
async def invoke(self, target: str, args: list) -> None:
"""
Отправляет удаленный вызов invocation о websocket соединению
Метод формирует сообщение по протоколу SignalR, включая:
- типа сообщения
- заголовки
- уникальный идентификатор запроса
- имя целевого метода
- аргументы запроса
Сообщение запаковывается в messagepack и отправляется через текущее websocket соединение
"""
if not self._ws:
raise websockets.WebSocketException("Не установлено подключение по wss")
self._invocation_id = str(self._next_id)
self._next_id += 1
invocation = [
WS_Const.DEFAULT_SIGNALR_MESSAGE_TYPE,
WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
self._invocation_id,
target,
[args],
]
logger.info(f"Сообщение подготовлено к отправке: {invocation}")
payload = msgpack.packb(invocation, use_bin_type=True)
packet = encode_with_varint_prefix(payload)
logger.debug(f"Отправляем сообщение: {packet}")
await self._ws.send(packet)
async def invoke_stream(self, target: str, args: list) -> None:
"""
Отправляет streaming-вызов (StreamInvocation) по протоколу SignalR.
"""
if not self._ws:
raise websockets.WebSocketException("Не установлено подключение по wss")
self._invocation_id = str(self._next_id)
self._next_id += 1
invocation = [
WS_Const.STREAM_INVOCATION_MESSAGE_TYPE,
WS_Const.DEFAULT_SIGNALR_MAP_HEADERS,
self._invocation_id,
target,
[args],
]
logger.info(f"Streaming-сообщение подготовлено к отправке: {invocation}")
payload = msgpack.packb(invocation, use_bin_type=True)
packet = encode_with_varint_prefix(payload)
await self._ws.send(packet)
async def receive_by_type(self, message_type: str, timeout: WS_Const.FILTERING_TIMEOUT) -> List[Any]:
"""
Фильтрует сообщения по message_type
"""
try:
return await self._receive_by(filter_func=lambda msg: is_desired_type(msg, message_type), timeout=timeout)
except websockets.WebSocketException:
raise websockets.WebSocketException(f"Ошибка при фильтрации сообщений по {message_type}")
async def receive_by_invocation_id(
self, invocation_id: str, timeout: float = WS_Const.FILTERING_TIMEOUT
) -> List[Any]:
"""
Фильтрует сообщения по invocation_id
"""
try:
return await self._receive_by(
filter_func=lambda msg: is_desired_invocation_id(msg, invocation_id), timeout=timeout
)
except websockets.WebSocketException:
raise websockets.WebSocketException("Ошибка при фильтрации сообщений по invocation_id")
async def _receive_by(self, filter_func: Callable[[list], bool], timeout: float) -> List[Any]:
"""
Ждет и фильтрует сообщение по filter_func
"""
# 1) Единая точка вычисления дедлайна
deadline = time.monotonic() + timeout
while True:
# 2) Остаток времени до таймаута
remaining = deadline - time.monotonic()
if remaining <= 0:
raise asyncio.TimeoutError(f"Timeout при фильтрации сообщений {timeout:.1f} секунд")
try:
# 3) Получает сообщение
msg = await asyncio.wait_for(self.recv_queue.get(), timeout=remaining)
except asyncio.TimeoutError:
# 4) Явно перехватывает и пробрасывает свой TimeoutError
raise asyncio.TimeoutError(f"Timeout при фильтрации сообщений {timeout:.1f} секунд")
# 5) Фильтрация по filter_func
if isinstance(msg, list) and filter_func(msg):
return msg
constants/architecture_constants.py
import os
class ImitatorConstants:
TEST_SETTINGS_KEY_NAME: str = "test_settings"
IMITATOR_FLAGS_KEY_NAME: str = "imitator_flags"
IMITATOR_TIME_FORMAT: str = "%Y%m%dT%H%M%S"
IMITATOR_START_DELAY_S: int = 100
IMITATOR_FINISH_DELAY_MINUTE: float = 2.0
IMITATOR_CHECK_CMD: str = "pgrep -f Playground"
IMITATOR_KILL_CMD: str = "pkill -f Playground"
IMITATOR_PATH = "/data/imitator/lds-flow-playground-csv-latest"
IMITATOR_RUN_CMD: str = f"dotnet {IMITATOR_PATH}/TN.LDS.Flow.Playground.Application.dll"
IMITATOR_LOG_FILE_NAME: str = "imitator.log"
IMITATOR_KEY_NAME: str = "imitator_key"
SERVER_IP_KEY_NAME: str = "server_ip"
SANDBOX_PATH: str = "Sandbox_path" # Удалить при переработке json_config_model.py
SANDBOX_DATA: str = "data"
SANDBOX_RULES: str = "rules.txt"
SANDBOX_TAGS: str = "tags.txt"
STAND_ENV_NAMING: str = os.environ.get("STAND_NAME")[:-1]
CONFIG_PATH: str = f"/data/{STAND_ENV_NAMING}/configs"
SOURCE_TYPE_DEF_VALUE: str = "inflow"
SPEED_DEF_VALUE: int = 1
NS_DEF_VALUE: int = 2
KAFKA_OFFSET_EARLIEST: str = "earliest"
KAFKA_POLL_TIMEOUT_S: float = 1.0
KAFKA_SESSION_TIMEOUT_MS: int = 10000
TEST_ID_KEY: str = "test_id"
AUTOTEST_DATA_PATH: str = "/data/imitator/autotest_data"
POPEN_WAIT_TIMOUT_S: int = 5
LONG_PROCESS_TIMEOUT_S: int = 20
CMD_STATUS_OK: str = "OK"
CMD_STATUS_FAIL: str = "FAIL"
REDIS_STAND_ADDRESS: str = "10.7.49.210"
CORE_START_DELAY_S: int = 5
ENCODING_UTF_8: str = "utf-8"
ENCODING_UTF_8_SIG: str = "utf-8-sig"
ENCODING_LATIN_1: str = "latin-1"
WIN_ENCODING_CP866: str = "cp866" # Нужна только для запуска под WIN
WIN_ENCODING_CP1251: str = "cp1251" # Нужна только для запуска под WIN
OS_NAME_WIN: str = 'nt'
DEFAULT_ENCODINGS = [ENCODING_UTF_8_SIG, ENCODING_UTF_8, WIN_ENCODING_CP866, WIN_ENCODING_CP1251, ENCODING_LATIN_1]
HOST_MAP = {
"dev1": {IMITATOR_KEY_NAME: "DEV1_", SERVER_IP_KEY_NAME: "10.7.49.37"},
"dev2": {IMITATOR_KEY_NAME: "DEV2_", SERVER_IP_KEY_NAME: "10.7.49.38"},
"dev3": {IMITATOR_KEY_NAME: "DEV3_", SERVER_IP_KEY_NAME: "10.7.49.205"},
"test1": {IMITATOR_KEY_NAME: "TEST1_", SERVER_IP_KEY_NAME: "10.7.49.206"},
"test2": {IMITATOR_KEY_NAME: "TEST2_", SERVER_IP_KEY_NAME: "10.7.49.207"},
"test3": {IMITATOR_KEY_NAME: "TEST3_", SERVER_IP_KEY_NAME: "10.7.49.208"},
"test4": {IMITATOR_KEY_NAME: "TEST4_", SERVER_IP_KEY_NAME: "10.7.49.209"},
}
class ClickhouseConstants(ImitatorConstants):
CH_TABLE_NAMES: list = ["lds.records", "lds.records_lastvalue"]
EVO_OBJECT_ID_KEY_NAME: str = "evoObjectId"
EVO_PARAMETER_ID_KEY_NAME: str = "evoParameterId"
OBJECT_ID_KEY_NAME: str = "objectId"
PARAMETER_ID_KEY_NAME: str = "parameterId"
EVO_ID_PAIRS_CHUNK_SIZE: int = 450
NAME_CONTAINER: str = "clickhouse-2"
class DockerConstants:
HOSTNAME_CMD: str = "hostname"
STOP_CMD: str = "docker stop"
START_CMD: str = "docker start"
CHECK_STATUS_CMD: str = "docker inspect -f '{{.State.Status}}'"
RUNNING_STATUS: str = "running"
EXITED_STATUS: str = "exited"
CORE_CONTAINERS_GROUP: list = ["lds-core-node1", "lds-core-node2", "lds-core-node3"]
LB_CONTAINERS_GROUP: list = ["lds-layer-builder-node1", "lds-layer-builder-node2", "lds-layer-builder-node3"]
JOURNAL_CONTAINERS_GROUP: list = ["lds-journals-node1", "lds-journals-node2", "lds-journals-node3"]
WEB_APP_CONTAINERS_GROUP: list = ["lds-web-app-node1", "lds-web-app-node2", "lds-web-app-node3"]
API_GW_CONTAINERS_GROUP: list = ["lds-api-gw-node1", "lds-api-gw-node2", "lds-api-gw-node3"]
REPORTS_CONTAINERS_GROUP: list = ["lds-reports-node1", "lds-reports-node2", "lds-reports-node3"]
class RedisConstants:
LB_REDIS_KEY: str = "lds-layer-builder"
CORE_REDIS_KEY: str = "lds-core"
REDIS_KEY_FIND_CMD: str = "docker exec -i redis-redis-01-1-1 redis-cli KEYS"
REDIS_KEY_DEL_CMD: str = "| xargs -r docker exec -i redis-redis-01-1-1 redis-cli DEL"
class KeycloakClientConstants:
TOKEN_LEEWAY: int = 30
GRANT_TYPE: str = "password"
KEYCLOAK_HEADERS: dict = {"Content-Type": "application/x-www-form-urlencoded"}
TOKEN_KEY: str = "access_token"
ISSUED_AT_KEY: str = "issued_at"
EXPIRES_IN_KEY: str = "expires_in"
class TestOpsConstants:
TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
TESTOPS_UPLOAD_ERROR_MSG: str = "Ошибка при загрузке файлов allure отчета"
TESTOPS_UPLOAD_RESPONSE_MSG_KEY: str = "message"
TESTOPS_UPLOAD_FILES_KEY: str = "files"
POST_METHOD: str = "post"
ALLURE_RESULTS_DIR_NAME: str = "allure-results"
GZIP_FILE_SIGNATURE: bytes = b'\x1f\x8b'
class HTTPClientConstants:
GET_METHOD: str = "get"
POST_METHOD: str = "post"
TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
TESTOPS_ATTACHMENTS_LIST_ENDPOINT: str = "/test_cases/{test_case_id}/attachments"
TESTOPS_LOAD_ATTACHMENT_ENDPOINT: str = "/test_cases/{test_case_id}/attachments/{attachment_id}?download=1"
TESTOPS_ATTACHMENTS_KEY: str = "items"
TESTOPS_ATTACHMENT_FILENAME_KEY: str = "original_filename"
TESTOPS_ATTACHMENT_ID_KEY: str = "id"
TEST_ID_KEY: str = "test_id"
IMITATOR_RUN_DATA_FILENAME: str = "imitator_run_data.tar.gz" # Название архива данных для прогона
class WebSocketClientConstants:
RS: bytes = b'\x1E' # ASCII Record Separator
HANDSHAKE_WAITING: float | int = 5.0
HANDSHAKE_MESSAGE: str = "{\"protocol\":\"messagepack\",\"version\":1}"
WS_HUBS: str = "/hubs/ldsClientHub"
START_INVOCATION_ID: str = 1
DEFAULT_RECONNECT_INTERVAL: float | int = 5.0
PING_INTERVAL: int = 3
PING_TIMEOUT: int = 5
CLOSE_TIMEOUT: int = 30
DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1 # invocation
STREAM_INVOCATION_MESSAGE_TYPE: int = 4 # StreamInvocation
STREAM_ITEM_MESSAGE_TYPE: int = 2 # StreamItem
COMPLETION_MESSAGE_TYPE: int = 3 # Completion
# Текст ошибки Completion при неуспешном streaming (SignalR CompletionWithDetail)
COMPLETION_ERROR_MESSAGE_INDEX: int = 4
DEFAULT_SIGNALR_MAP_HEADERS: dict = {}
EVENT_TYPE_INDEX = 3
INVOCATION_ID_INDEX = 2
SERVICE_NAME = "web-app"
COMPONENT = "lds"
ROOT_DOMAIN = "tn.tngrp.ru"
FILTERING_TIMEOUT: int | float = 10.0
ZONE_INFO: str = 'Europe/Moscow'
class MockConstants:
MOCK_DURATION: int = 60
MOCK_TEST_DATA_ID: int = 1
MOCK_TEST_DATA_NAME: str = "mock.tar.gz"
class EnvKeyConstants:
CONNECTION_HOST: str = "CONNECTION_HOST"
KEYCLOAK_URL: str = "KEYCLOAK_URL"
KEYCLOAK_CLIENT_ID: str = "KEYCLOAK_CLIENT_ID"
KEYCLOAK_CLIENT_SECRET: str = "KEYCLOAK_CLIENT_SECRET"
KEYCLOAK_USERNAME: str = "KEYCLOAK_USERNAME"
KEYCLOAK_PASSWORD: str = "KEYCLOAK_PASSWORD"
TESTOPS_BASE_URL: str = "TESTOPS_BASE_URL"
SSH_KEY_NAME: str = "SSH_KEY_NAME"
SSH_USER_DEV: str = "SSH_USER_DEV"
STAND_NAME: str = "STAND_NAME"
DATA_PATH: str = "DATA_PATH"
OPC_URL: str = "OPC_URL"
TU_ID: str = "TU_ID"
enums
from enum import Enum, IntEnum, IntFlag
from typing import Mapping
class TU(Enum):
YAROSLAVL_MOSCOW = (1, "Ярославль - Москва", "volga.json")
TIKHORETSK_NOVOROSSIYSK_2 = (2, "Тихорецк-Новороссийск-2", "tn2.json")
TIKHORETSK_NOVOROSSIYSK_3 = (3, "Тихорецк-Новороссийск-3", "tn3.json")
RODIONOVSKAYA_TIKHORETSKAYA = (4, "Родионовская–Тихорецкая", "lt3_rt.json")
TIKHORETSKAYA_GRUSHEVAYA = (5, "Тихорецкая-6-Грушовая", "tn4_t6g.json")
def __init__(self, tu_id: int, description: str, file_name: str) -> None:
self.id = tu_id
self.description = description
self.file_name = file_name
def __str__(self):
return f"{self.id} - {self.description}"
@classmethod
def get_file_name_by_id(cls, target_id: int) -> str:
for item in cls:
if item.id == target_id:
return item.file_name
raise ValueError(f"ТУ с id = {target_id} не найден")
class ReplyStatus(Enum):
OK = 200
BAD_REQUEST = 400
UNAUTHORIZED = 401
FORBIDDEN = 403
NOT_FOUND = 404
REQUEST_TIMEOUT = 408
CONFLICT = 409
PRECONDITION_FAILED = 412
RANGE_NOT_SATISFIABLE = 416
TOO_MANY_REQUESTS = 429
INTERNAL_SERVER_ERROR = 500
NOT_IMPLEMENTED = 501
SERVICE_UNAVAILABLE = 503
GATEWAY_TIMEOUT = 504
UNKNOWN_ERROR = 520
class ExportedDataType(IntEnum):
"""
Тип экспортируемых данных
"""
STATIONARY_STATUS_REPORT = 6
LDS_STATUS_REPORT = 5
LEAKS_REPORT = 4
REJECTED_REPORT = 7
def to_download_name(self) -> str:
"""Строковый тип для DownloadExportedDataRequest.exportedDataType"""
return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]
_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
ExportedDataType.STATIONARY_STATUS_REPORT: "StationaryStatusReport",
ExportedDataType.LDS_STATUS_REPORT: "LdsStatusReport",
ExportedDataType.LEAKS_REPORT: "LeaksReport",
ExportedDataType.REJECTED_REPORT: "RejectedReport",
}
class ExportStatus(IntEnum):
"""Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""
NOT_READY = 0
DONE = 1
class StationaryStatus(Enum):
UNSTATIONARY = 1 # Нестационарный режим
STATIONARY = 2 # Стационарный режим
STOPPED = 3 # Режим остановленной перекачки
class LeakStatus(Enum):
CONFIRMED = 2
WAITING = 1
POSSIBLE = 3
class LeakLocationStatus(Enum):
NODATA = 1 # нет данных
LEFT_FROM_PUMP_STATION = 2 # Слева от МНС
RIGHT_FROM_PUMP_STATION = 3 # Справа от МНС
INSIDE_PUMP_STATION = 4 # Внутри МНС
INSIDE_NPS = 5 # Внутри НПС при неработающей/отсутствующей МНС
class FieldName(Enum):
SECTION_TYPE = "sectionType"
SIGNAL_TYPE = "signalType"
class FilterCriteriaType(Enum):
ONE_OF = "oneOf"
ALL_OF = "allOf"
class FilterCriteriaValue(Enum):
MASK = "mask"
MASK_REASON = "maskReason"
LEAK = "leak"
LEAK_COORDINATE = "leakCoordinate"
PUMPING_STATUS = "pumpingStatus"
FREE_FLOW = "freeFlow"
ACKNOWLEDGE = "acknowledge"
LEAK_TIME = "leakTime"
LEAK_VOLUME = "leakVolume"
LDS_STATUS = "ldsStatus"
CONTROLLED_SITES = "controlledSites"
LINEAR_PARTS = "linearParts"
SERVER_DOWN = "serverDown"
TIME_SYNCHRONIZATION_DISABLE = "timeSynchronizationDisable"
FREE_FLOW_START_COORDINATE = "freeFlowStartCoordinate"
class SortingParam(Enum):
OBJECT_NAME = "objectName"
ADDRESS = "address"
class SortingType(Enum):
ASCENDING = "ascending"
DESCENDING = "descending"
class Direction(Enum):
"""Направление прокрутки"""
PREV = 1
NEXT = 2
FIRST = 3
LAST = 4
class LdsStatus(Enum):
FAULTY = 1 # Неисправность
INITIALIZATION = 2 # Инициализация
DEGRADATION = 3 # Ухудшенные характеристики
SERVICEABLE = 4 # Исправность
class ConfirmationStatus(Enum):
FAULTY = 0 # Неисправность
AWAITING = 1 # Предварительная
NOT_CONFIRMED = 2 # Не подтверждена
CONFIRMED = 3 # Подтверждена
CONFIRMED_AND_LEAK_CLOSED = 4 # Завершена
class ReservedType(Enum):
FAULTY = 0 # Неисправность
STOP = 1 # Дифференциальный
STATIONARY_FLOW = 2 # Стационарный
UNSTATIONARY_FLOW = 3 # Модельный
BALANCE_IN_NPS = 4 # Баланс внутри НПС
CHANGED_IN_DECISION_MAKING = 5 # Стационарный + Изменено в АПР
CREATED_IN_DECISION_MAKING = 6 # Создано в АПР
class MessageType(IntFlag):
AUTHENTICATION = 1 # Вход в систему
REJECTION = 1 << 2 # Отбраковка сигналов
LDS_STATUS = 1 << 3 # Режим работы СОУ
INPUT_SIGNALS = 1 << 6 # Входные сигналы
PUMPING_STATUS = 1 << 7 # Режим работы МТ
MASKING_LDS = 1 << 8 # Маскирование СОУ
FREE_FLOWS = 1 << 9 # Самотечное течение
LEAKS = 1 << 10 # Утечка
class MessagePriority(IntFlag):
LOW = 1 # Прочее
COMMON = 1 << 1 # Информационное
MEDIUM = 1 << 2 # Значительное
HIGH = 1 << 3 # Важное
VERY_HIGH = 1 << 4 # Особой важности
class LdsStatusDegradation(IntFlag):
LEAK_ON_ADJACENT_DIAGNOSTIC_AREAS = 1 << 0 # Возникновение утечки на соседнем диагностическом участке
ADDITIVE_INJECTORS_OPERATION = 1 << 1 # Наличие ПТП
PIG_SENSOR_PASSAGE = 1 << 2 # Наличие СОД
TRIGGERING_EMERGENCY_RESET = 1 << 3 # Срабатывание аварийного сброса
STARTING_PUMPING_OUT_PUMPS = 1 << 4 # Работа насосов откачки
EXCEEDING_DISTANCE_BETWEEN_SERVICEABLE_PRESSURE_SENSORS = 1 << 5 # Расстояние между СИ давления более 50 км
FAULTY_PRESSURE_SENSORS_AT_PUMP_STATION_NODES = 1 << 6 # Отказ СИ давления на входе/выходе НПС
REJECTION_TEMPERATURE_SENSOR = 1 << 7 # Отказ СИ температуры
REJECTION_VISCOSITY_SENSOR = 1 << 8 # Отказ СИ вязкости
REJECTION_DENSITY_SENSOR = 1 << 9 # Отказ СИ плотности
GRAVITY_SECTION_IN_PUMPING_MODE = 1 << 10 # Наличие самотечного участка/участка с неполным сечением
ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = 1 << 11 # Менее 4 исправных СИ давления на разных КП ЛЧ и НПС
EXCEEDING_DISTANCE_BETWEEN_FLOW_METERS = 1 << 12 # Расстояние между СИ расхода на пути перекачки более 200 км
GRAVITY_SECTION_IN_STOPPED_PUMPING_MODE = 1 << 13 # Наличие самотечного участка в режиме остановленной перекачки
class LdsStatusFaulty(IntFlag):
NO_DATA_SOURCE_CONNECTION = 1 << 0 # Отсутствует связь серверного оборудования СОУ с источником «сырых» данных
ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = 1 << 1 # Менее 4 КП с достоверными СИ давления
ABSENCE_MIN_FLOW_METERS_REQUIRED_NUMBER = 1 << 2 # Недостоверность граничного СИ расхода
class LdsStatusInitialization(IntFlag):
ACCUMULATION_DATA = 1 << 0 # Накопление данных
EXITING_FAULTY_MODE = 1 << 1 # Выход СОУ из режима «Неисправна»
COLD_START_OF_SERVERS = 1 << 2 # Одновременный «холодный» запуск нескольких серверов СОУ
SWITCHING_SHUT_OFF_IN_STOPPED_PUMPING_MODE = 1 << 3 # Переключение запорной арматуры
USER_ACTION = 1 << 4 # По команде пользователя
class StationaryReason(IntFlag):
"""
Причины режима работы МТ: Стационар
"""
# Отклонения давления и расхода не превышают допустимых отклонений
PRESSURE_AND_FLOW_MOVING_AVERAGES_MEET_CRITERIA = 1 << 0
# Окончание периода времени после технологических переключений и отсутствия самотечного участка
ABSENCE_GRAVITY_SECTION_AND_TECHNOLOGICAL_SWITCHING = 1 << 1
class UnStationaryReason(IntFlag):
"""
Причины режима работы МТ: Нестационар
"""
# Пуск/остановка трубопровода; включение/отключение магистрального насоса; включение/отключение НПС
CHANGING_EQUIPMENT_STATUS = 1 << 0
# Начало/окончание работы насосов откачки емкостей на НПС и ЛЧ технологического участка
CHANGING_WORKING_OF_PUMPING_OUT_PUMPS = 1 << 1
# Изменение частоты вращения в ручном режиме и/или изменение уставки регулирования в автоматическом режиме
# работы МНА с ЧРП
CHANGING_MAIN_PUMPS_ROTATION_SPEED = 1 << 2
CHANGING_BLOCK_VALVES_STATUS = 1 << 3 # Полное или частичное открытие/закрытие задвижки
SWITCHING_TANKS = 1 << 4 # Переключение резервуаров
CHANGING_ACCEPTANCE_OR_DELIVERY_STATE = 1 << 5 # Начало или прекращение приема/сдачи нефти/нефтепродуктов
TRIGGERING_EMERGENCY_RESET_OR_PWSS_OPERATION = 1 << 6 # Задействование аварийного сброса
SAFETY_VALVES_ACTUATION = 1 << 7 # Срабатывание предохранительных клапанов
# Изменение уставки регулирования по давлению узлов регулирования давления, работающих в автоматическом режиме
# управления
CHANGING_PRESSURE_SETTING = 1 << 8
# Изменение процента открытия/закрытия заслонки узлов регулирования давления, работающих в ручном режиме управления
CHANGING_OPENING_PERCENTAGE_VALVE = 1 << 9
CHANGING_ADDITIVE_INJECTOR_STATUS_OR_FLOW = 1 << 10 # Начало/окончание ввода ПТП или изменение расхода вводимой ПТП
LEAK_END = 1 << 11 # Окончание утечки
# Наличие сигнала статуса «Открывается»/»Закрывается» запорной арматуры (не в режиме имитации),
# расположенной в точке, гидравлически связанной с рассматриваемым ДУ
TO_OPEN_OR_TO_CLOSE_STATUS = 1 << 12
# Нестационарный режим работы/отсутствие сигнала о режиме работы смежного ТУ, работающего
# в единой гидравлической системе с защищаемым ТУ
ADJACENT_TU = 1 << 13
COLD_START = 1 << 14 # Одновременный «холодный» запуск нескольких серверов СОУ
class StoppedPumpingReason(IntFlag):
"""
Причины режима работы МТ: Остановленный
"""
# На ДУ отсутствуют работающие НА, при этом показания СИ расхода не превышают 1 % от максимального значения
# диапазона измерений всех СИ расхода на технологическом участке
STOPPING_PUMPS = 1 << 0
CUTOFF_AREA = 1 << 1 # Участок отсечен запорной арматурой от подкачек/откачек
class RejectionCriteria(IntFlag):
"""Критерии отбраковки сигналов criteriaNames"""
QUALITY = 1 << 0 # qualityRejection
RANGE = 1 << 1 # rangeRejection
EMPTY = 1 << 2 # emptyRejection
TIME = 1 << 3 # timeRejection
CONSTANT_SIGNAL = 1 << 4 # constantSignalRejection
DISCHARGE = 1 << 5 # dischargeRejection
VTOR = 1 << 6 # VTORRejection
NEARBY = 1 << 7 # nearbyRejection
DIAGNOSTIC_INFO = 1 << 8 # diagnInfoRejection
@property
def backend_name(self) -> str:
names = {
"QUALITY": "qualityRejection",
"RANGE": "rangeRejection",
"EMPTY": "emptyRejection",
"TIME": "timeRejection",
"CONSTANT_SIGNAL": "constantSignalRejection",
"DISCHARGE": "dischargeRejection",
"SIGMA3": "sigma3Rejection",
"VTOR": "VTORRejection",
"NEARBY": "nearbyRejection",
"DIAGNOSTIC_INFO": "diagnInfoRejection",
}
return names.get(self.name or "", str(int(self)))
def __str__(self) -> str:
raw_value = int(self)
if raw_value == 0:
return "0"
active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
if active_flags:
return f"{'|'.join(active_flags)} ({raw_value})"
return str(raw_value)
class RejectionSensorTag(Enum):
"""
Теги датчиков для тестов отбраковки (id, description=tag)
Айди тегов подставляется на ходу из текущей версии конфы с сервера
"""
KP_8_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pin") # nearby_pressure_pin range_upper_pressure range_lower_pres
NPS_TIH_5_Vmom = (0, "AK.CHTN.NPS_TIH_5.UZR_1.Vmom") # diagnostic_info_flowrange_upper_flow range_lower_flow
KP_8_Pout = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout") # nearby_pressure_pout
KP_209_1_Pin = (0, "AK.CHTN.LU_VELKRIM.KP_209-1.SW_215-3-1.Pin") # empty_pressure
KP_7_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_7.SW_6-3.Pin") # vtor_pressure
NPS_KRIM_P_Vmom = (0, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom") # empty_flow quality_flow
def __init__(self, sensor_id: int, description: str) -> None:
self.id = sensor_id
self.description = description
@classmethod
def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
"""
Обновляет sensor_id по tag из конфигурации стенда.
"""
missing_tags = []
for sensor in cls:
sensor_id = sensor_ids_by_address.get(sensor.description)
if sensor_id is None:
missing_tags.append(sensor.description)
continue
sensor.id = sensor_id
if missing_tags:
raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")
def __str__(self):
return f"{self.id} - {self.description}"
class UserActions(IntFlag):
USER_LOGIN = 1 # Вход пользователя
USER_EXIT = 1 << 1 # Выход пользователя
FAILED_USER_LOGIN = 1 << 2 # Неуспешная попытка входа пользователя
ALGORITHMS_REINITIALIZATION = 1 << 3 # Переинициализация алгоритмов
SIGNAL_MASK_SIM = 1 << 4 # Маскирование и имитация входных сигналов
LDS_MASKING = 1 << 5 # Маскирование СОУ
EXPORT = 1 << 6 # Экспорт и выгрузки
SETTINGS_CHANGE = 1 << 7 # Изменение настроек
LEAK_ACK = 1 << 8 # Квитирование сообщения об утечке
LEAK_REMOVE = 1 << 9 # Исключение неактивных утечек
LDS_ADMIN = 1 << 10 # Администрирование СОУ
PIG_CONTROL = 1 << 11 # Управление СОД
class SiteKpKp(Enum):
"""controlledSiteId, segmentId"""
TIXORECZKAYA_NOVOVELICHKOVSKAYA = {'controlledSiteId': 6012, 'segmentId': 6013}
NOVOVELICHKOVSKAYA_KRYMSKAYA = {'controlledSiteId': 6074, 'segmentId': 6075}
KRYMSKAYA_GRUSHOVAYA = {'controlledSiteId': 6220, 'segmentId': 6221}
BACKUP_ROUTE_BEJSUG = {'controlledSiteId': 6242, 'segmentId': 6243}
BACKUP_ROUTE_PONURA = {'controlledSiteId': 6076, 'segmentId': 6077}
BACKUP_ROUTE_KUBAN = {'controlledSiteId': 6088, 'segmentId': 6089}
NPZ_AFIPSKIJ = {'controlledSiteId': 6244, 'segmentId': 6245}
NPZ_ILINSKIJ = {'controlledSiteId': 6120, 'segmentId': 6121}
class SignalType(IntFlag):
"""Типы сигналов: режим МТ - 8, режим СОУ - 256, самотеки -16"""
REGLU = 1 << 3
REGSOU = 1 << 8
GRAVITYPIPE = 1 << 4
@property
def backend_name(self) -> str:
names = {
"REGLU": "PumpingStatus",
"REGSOU": "LdsStatus",
"GRAVITYPIPE": "FreeFlow",
}
return names.get(self.name or "", str(int(self)))
def __str__(self) -> str:
raw_value = int(self)
if raw_value == 0:
return "0"
active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
if active_flags:
return f"{'|'.join(active_flags)} ({raw_value})"
return str(raw_value)
class GravityPipe(Enum):
expected_lds_status_gravity_true = (1, "Наличие самотека")
expected_lds_status_gravity_false = (0, "Отсутствие самотека")
def __init__(self, status_id: int, description: str) -> None:
self.id = status_id
self.description = description
def __str__(self):
return f"{self.id} - {self.description}"
test const
"""
Общие константы для тестов.
"""
from constants.enums import StationaryStatus
class BaseTN3Constants:
# ===== Константы для запросов журнала =====
COLUMN_SELECTION_DEF = [
'Time',
'User',
'MainPipeline',
'TechnologicalSection',
'TechnologicalObject',
'ControlPoint',
'Object',
'SignalName',
'Event',
'Value',
'MessageType',
'Tag',
'Status',
]
# ===== Типы сигналов и объектов =====
PRESSURE_SENSOR_OBJECT_TYPE = 2
FLOWMETER_OBJECT_TYPE = 3
PRESSURE_SIGNAL_TYPE = 1
FLOW_SIGNAL_TYPE = 4
# ===== Суффиксы адресов выходных сигналов =====
ADDRESS_SUFFIX_ACK_LEAK = "AckLeak"
ADDRESS_SUFFIX_LEAK = "Leak"
ADDRESS_SUFFIX_MASK = "Mask"
ADDRESS_SUFFIX_POINT_LEAK = "PointLeak"
ADDRESS_SUFFIX_Q_LEAK = "QLeak"
ADDRESS_SUFFIX_TIME_LEAK = "TimeLeak"
ADDRESS_SUFFIX_PUMPING_STATUS = "RegLU"
ADDRESS_SUFFIX_LDS_STATUS = "RegSOU"
# ===== Ключи поиска =====
LEAK_LINEAR_PART_ID_KEY = "id"
CONTROLLED_SITE_ID_AND_SEGMENT_ID = "controlledSiteId"
# ===== Ожидаемые значения выходных сигналов =====
OUTPUT_IS_ACK_LEAK = "1"
OUTPUT_IS_LEAK = "1"
OUTPUT_IS_NOT_LEAK = "0"
OUTPUT_IS_NOT_MASK = "0"
OUTPUT_IS_MASK = "1"
MASS_KG = 3600 # Коэффициент массы, нужно умножить, чтобы получить объем в м3/час
KGS_SM2 = 98066 # Коэффициент давления, нужно умножить, чтобы получить объем в кгс/см2
ALLOWED_VOLUME_DIFF = 0.3 # Относительная погрешность по объему
ALLOWED_DISTANCE_DIFF_METERS = 5000 # Погрешность координаты в метрах
KM_TO_METERS = 1000 # Перевод в метры
LEAK_START_INTERVAL = 2100 # Интервал от старта имитатора до первого обнаружения утечки - 35 минут по умолчанию
LEAK_LOCATION_STATUS = 1
# ===== Параметры выходных сигналов =====
OUTPUT_TEST_DELAY = 120 # Задержка для теста выходных сигналов в секундах
OUTPUT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # Формат времени для парсинга выходных сигналов
# ===== Параметры маскирования =====
IS_MASKED_TRUE = True
IS_MASKED_FALSE = False
# ===== Параметры имитации =====
PRESSURE_IMITATION_RANGE = (1, 40)
VOLUME_IMITATION_RANGE = (100, 2400)
GOOD_QUALITY_VAL = 1
# ===== Константы журнала =====
JOURNAL_EVENT_MASK = "Установка признака маскирования"
JOURNAL_EVENT_UNMASK = "Снятие признака маскирования"
JOURNAL_SIGNAL_PRESSURE = "Значение давления"
JOURNAL_SIGNAL_FLOW = "Расход"
JOURNAL_MESSAGE_TYPE_USER_ACTIONS = "Действия пользователя"
JOURNAL_STATUS_SUCCESS = "Успешно"
JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL = 2
JOURNAL_MASK_PAGINATION_LIMIT = 10
JOURNAL_EVENT_POSSIBLE_LEAK = "Возможна утечка"
JOURNAL_EVENT_DETECTED_LEAK = "Утечка."
JOURNAL_MESSAGE_TYPE_LEAKS = "Утечки"
JOURNAL_EVENT_COMPLETED_LEAKS = "Утечка завершена"
JOURNAL_EXPECTED_MASK_MSG_TOTAL = 4
JOURNAL_MASK_EXPECTED_EVENTS = {"Установка признака маскирования", "Снятие признака маскирования"}
JOURNAL_MASK_EXPECTED_SIGNALS = {"Значение давления", "Расход"}
JOURNAL_PAGINATION_LIMIT = 10
JOURNAL_PAGINATION_REJECT_LIMIT = 20
JOURNAL_EVENT_LEAK_ACKNOWLEDGED = "Сообщение об утечке квитировано"
JOURNAL_EVENT_LDS_INIT_ACCUM_DATA = "СОУ в инициализации (Накопление данных)"
JOURNAL_EVENT_LDS_INIT_COLD_START = "СОУ в инициализации (Одновременный «холодный» запуск нескольких серверов СОУ)"
JOURNAL_MESSAGE_TYPE_LDS_STATUS = "Режим работы СОУ"
JOURNAL_MESSAGE_TYPE_REJECTION = "Отбраковка"
SEC_PER_MIN = 60
# ===== Параметры подтверждения =====
IS_ACKNOWLEDGED_FALSE = False
# ===== Параметры BalanceAlgorithmResults =====
BALANCE_ALGORITHM_POLL_INTERVAL = 15 # Интервал опроса подписки в секундах
BALANCE_ALGORITHM_TOTAL_WAIT = 300 # Общее время опроса в секундах
DEBALANCE_TOLERANCE = 0.25 # Допустимое отклонение дебаланса от порога 30%
# ===== Прочие константы =====
BASIC_MESSAGE_TIMEOUT = 10.0 # Таймаут ожидания сообщений в секундах
MASK_MESSAGE_TIMEOUT = 180.0 # Таймаут ожидания сообщений в секундах
PRECISION = 3 # Точность округления для координат
DIGITS_WITH_DOT_PATTERN = r'\d+(?:\.\d+)?' # Регулярное выражение для поиска чисел с точкой
DIAGNOSTIC_AREA_BASE_IDS = [2, 3, 4, 5, 7, 8] # Список ДУ с isBase = true из конфигурации Тн-3
REPRESENTATIVE_DIAGNOSTIC_AREA_IDS = [2, 3] # Список показательных ДУ для определения режима СОУ
ZONE_INFO: str = "Europe/Moscow"
SECONDS_PER_HOUR: int = 3600
CRITERIA_NAMES_FIELD: str = 'criteriaNames'
class ExportReportConstants:
"""Константы для теста формирования отчёта об утечках"""
# Максимальное ожидание уведомления о готовности отчёта
NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
# Максимальное время ожидания появления отчёта в списке после уведомления
LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
# Интервал между запросами getExportedFilesListRequest
LIST_POLL_INTERVAL_SECONDS: float = 10.0
# Таймаут получения ответа на скачивание
DOWNLOAD_TIMEOUT_SECONDS: float = 60.0
# ===== Имя файла отчёта =====
LEAKS_REPORT_NAME_PART: str = "Отчет об утечках" # подстрока в имени файла/отчёта
XLSX_EXTENSION: str = ".xlsx"
# Сигнатура zip-архива, используется для проверки формата файла по содержимому
ZIP_SIGNATURE: bytes = b'PK\x03\x04'
# ===== Формат даты/времени в отчёте =====
REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
# Регулярное выражение для извлечения двух дат из заголовка
REPORT_HEADER_PERIOD_PATTERN: str = (
r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
# Регулярное выражение для извлечения двух дат из названия файла
REPORT_FILE_NAME_PERIOD_PATTERN: str = (
r'^Отчет об утечках (?P<tu>.+?) '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r' - '
r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r'\.xlsx$'
)
# Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
REPORT_TITLE_ROW: int = 1
REPORT_COLUMN_HEADERS_ROW: int = 2
REPORT_DATA_FIRST_ROW: int = 3
# ===== Названия колонок =====
COL_DATETIME: str = "Дата и время"
COL_OBJECT: str = "Объект"
COL_LDS_STATUS: str = "Режим работы СОУ"
COL_MASK_INFO: str = "Информация о маскировании"
COL_COORDINATE: str = "Координата"
COL_LEAK_VOLUME: str = "Объемный расход утечки"
COL_MT_MODE: str = "Режим работы МТ"
EXPECTED_COLUMN_HEADERS: list = [
COL_DATETIME,
COL_OBJECT,
COL_LDS_STATUS,
COL_MASK_INFO,
COL_COORDINATE,
COL_LEAK_VOLUME,
COL_MT_MODE,
]
LDS_STATUS_OK_TEXT: str = "СОУ исправна"
MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"
# ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
StationaryStatus.STOPPED.value: "Режим остановленной перекачки МТ",
}
# ===== Прочее =====
DEFAULT_SHEET_INDEX: int = 0
SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST: str = "SubscribeReportsDataExportedRequest"
EXPORT_REPORTS_COMMAND_REQUEST: str = "ExportReportsCommandRequest"
REPORT_DATA_EXPORTED_NOTIFICATION: str = "ReportDataExportedNotification"
GET_EXPORTED_DATA_LIST_REQUEST: str = "GetExportedDataListRequest"
EXPORTED_DATA_LIST_LIMIT: int = 10
DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"
# Допустимая погрешность при сравнении границ периода отчёта
REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
# Формат даты/времени в имени скачиваемого xlsx-файла
REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"
scena
"""
Сценарии тестов - функции-обёртки без
pytest маркеров.
Каждая функция содержит логику одного теста.
Pytest маркеры и allure декораторы применяются в тестовых файлах.
"""
import time
from datetime import datetime, timedelta
import allure
import pytest
from constants.enums import (
ConfirmationStatus,
Direction,
ExportedDataType,
ExportStatus,
GravityPipe,
LdsStatus,
LeakStatus,
MessageType,
ReplyStatus,
SignalType,
SiteKpKp,
StationaryStatus,
UserActions,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.get_messages_model import Filtering, FilteringObjects, Pagination
from test_config.models_for_tests import (
CaseData,
ExportLeaksReportState,
LDSStatusConfig,
LeakTestConfig,
SmokeSuiteConfig,
)
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.asserts import SoftAssertions, StepCheck
from utils.helpers.ws_message_parser import ws_message_parser as parser
from utils.helpers.ws_test_utils import get_value
async def basic_info(ws_client, cfg: SmokeSuiteConfig | LDSStatusConfig):
"""
Проверка базовой информации СОУ: список ТУ.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: BasicInfoContent"):
payload = await t_utils.connect_and_get_msg(ws_client, "getBasicInfoRequest", [])
parsed_payload = parser.parse_basic_info_msg(payload)
expected_tu = [(cfg.tu_id, cfg.tu_name)]
actual_tu = [(tu.tuId, tu.tuName) for tu in parsed_payload.replyContent.basicInfo.tus if tu.tuId == cfg.tu_id]
with allure.step(f"Поверка наличия {cfg.tu_name} в списке доступных ТУ на сервере"):
# Критическая проверка: если нужного ТУ нет в BasicInfoContent — считаем что ТУ отключен (через Zookeeper)
# и прерываем весь прогон.
if expected_tu[0] not in actual_tu:
msg = (
f"ТУ отключен: в BasicInfoContent отсутствует ТУ для запущенного набора данных: "
f"tuId={cfg.tu_id}, tuName='{cfg.tu_name}', suite={cfg.suite_name}. "
f"Необходимо убедиться, что ТУ включен (Zookeeper) и перезапустить прогон."
)
allure.attach(
f"Ожидаемый ТУ: {expected_tu}\nПолученные ТУ: {actual_tu}",
name="Предварительная проверка: ТУ отключен",
attachment_type=allure.attachment_type.TEXT,
)
pytest.fail(msg, pytrace=False)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса ответа", "replyStatus", soft_failures).actual(parsed_payload.replyStatus).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия объектов в списке ТУ", "tus", soft_failures).actual(
parsed_payload.replyContent.basicInfo.tus
).is_not_empty()
StepCheck(
f"Проверка наличия ТУ: {cfg.tu_name} в списке ТУ ",
"(tuId, tuName)",
soft_failures,
).actual(
actual_tu
).expected(expected_tu).equal_to()
async def journal_info(ws_client):
"""
Проверка наличия сообщений в журнале.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
request_body = t_utils.create_journal_req_body()
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(
parsed_payload.replyContent.messagesInfo
).is_not_empty()
async def lds_status_initialization(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка режима работы СОУ: Инициализация.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
# Получает список участков карты течения
flow_areas = parsed_payload.replyContent.flowAreas
# Получает самый протяженный участок карты течения
longest_flow_area = t_utils.get_longest_flow_area(flow_areas)
# Получает список ДУ
diagnostic_areas = longest_flow_area.diagnosticAreas
allure.attach(
f"Самый протяженный участок карты течений: {longest_flow_area}",
name="flowArea. Инициализация",
attachment_type=allure.attachment_type.TEXT,
)
# Получает коллекцию статусов списка ДУ
lds_status_set = {diagnostic_area.ldsStatus for diagnostic_area in diagnostic_areas}
# Определяет режим работы СОУ по приоритету
lds_status = t_utils.determine_lds_status_by_priority(lds_status_set)
StepCheck("Проверка режима работы СОУ", "ldsStatus").actual(lds_status).expected(
LdsStatus.INITIALIZATION.value
).equal_to()
async def diagnostics_of_signals_after_initialization(
ws_client,
cfg: SmokeSuiteConfig,
):
"""
Проверка выходных сигналов после окончания режима Инициализация по причине "холодного" пуска СОУ.
"""
with allure.step("Подписка на сигналы для участков"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [],
'controlledSites': [
SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
SiteKpKp.BACKUP_ROUTE_PONURA.value,
SiteKpKp.BACKUP_ROUTE_KUBAN.value,
SiteKpKp.NPZ_AFIPSKIJ.value,
SiteKpKp.NPZ_ILINSKIJ.value,
],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
controlled_site_dict = {
"controlled_site_first": SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
"controlled_site_second": SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
"controlled_site_third": SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
"controlled_site_fourth": SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
"controlled_site_fifth": SiteKpKp.BACKUP_ROUTE_PONURA.value,
"controlled_site_sixth": SiteKpKp.BACKUP_ROUTE_KUBAN.value,
"controlled_site_seventh": SiteKpKp.NPZ_AFIPSKIJ.value,
"controlled_site_eight": SiteKpKp.NPZ_ILINSKIJ.value,
}
controlled_site_messages = {}
for name, key in controlled_site_dict.items():
controlled_site_messages[name] = t_utils.find_object_by_a_few_fields(
parsed_payload.replyContent.controlledSiteSignals, key
)
all_signals = {}
for site_name, site_message in controlled_site_messages.items():
signal_dict = {'pump': None, 'sou': None, 'gravity': None}
if site_message:
all_signals[site_name] = {
'pump': t_utils.get_signal(site_message, SignalType.REGLU),
'sou': t_utils.get_signal(site_message, SignalType.REGSOU),
'gravity': t_utils.get_signal(site_message, SignalType.GRAVITYPIPE),
}
else:
all_signals[site_name] = signal_dict
first_kp_kp = all_signals.get("controlled_site_first") or {}
if first_kp_kp:
first_site_signal_pump = get_value(first_kp_kp.get("pump"))
first_site_signal_sou = get_value(first_kp_kp.get("sou"))
first_site_signal_gravity = get_value(first_kp_kp.get("gravity"))
second_kp_kp = all_signals.get("controlled_site_second") or {}
if second_kp_kp:
second_site_signal_pump = get_value(second_kp_kp.get("pump"))
second_site_signal_sou = get_value(second_kp_kp.get("sou"))
second_site_signal_gravity = get_value(second_kp_kp.get("gravity"))
third_kp_kp = all_signals.get("controlled_site_third") or {}
if third_kp_kp:
third_site_signal_pump = get_value(third_kp_kp.get("pump"))
third_site_signal_sou = get_value(third_kp_kp.get("sou"))
third_site_signal_gravity = get_value(third_kp_kp.get("gravity"))
fourth_kp_kp = all_signals.get("controlled_site_fourth") or {}
if fourth_kp_kp:
fourth_site_signal_pump = get_value(fourth_kp_kp.get("pump"))
fourth_site_signal_sou = get_value(fourth_kp_kp.get("sou"))
fourth_site_signal_gravity = get_value(fourth_kp_kp.get("gravity"))
fifth_kp_kp = all_signals.get("controlled_site_fifth") or {}
if fifth_kp_kp:
fifth_site_signal_pump = get_value(fifth_kp_kp.get("pump"))
fifth_site_signal_sou = get_value(fifth_kp_kp.get("sou"))
fifth_site_signal_gravity = get_value(fifth_kp_kp.get("gravity"))
sixth_kp_kp = all_signals.get("controlled_site_sixth") or {}
if sixth_kp_kp:
sixth_site_signal_pump = get_value(sixth_kp_kp.get("pump"))
sixth_site_signal_sou = get_value(sixth_kp_kp.get("sou"))
sixth_site_signal_gravity = get_value(sixth_kp_kp.get("gravity"))
seventh_kp_kp = all_signals.get("controlled_site_seventh") or {}
if seventh_kp_kp:
seventh_site_signal_pump = get_value(seventh_kp_kp.get("pump"))
seventh_site_signal_sou = get_value(seventh_kp_kp.get("sou"))
seventh_site_signal_gravity = get_value(seventh_kp_kp.get("gravity"))
eighth_kp_kp = all_signals.get("controlled_site_eight") or {}
if eighth_kp_kp:
eight_site_signal_pump = get_value(eighth_kp_kp.get("pump"))
eight_site_signal_sou = get_value(eighth_kp_kp.get("sou"))
eight_site_signal_gravity = get_value(eighth_kp_kp.get("gravity"))
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка сигнала - режим МТ на участке Тихорецкая-Нововеличковская",
"Режим МТ",
soft_failures,
).actual(first_site_signal_pump).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Тихорецкая-Нововеличковская",
"Режим СОУ",
soft_failures,
).actual(first_site_signal_sou).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} \n"
f"на участке Тихорецкая-Нововеличковская",
"Количество самотеков",
soft_failures,
).actual(first_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на участке Нововеличковская-Крымская",
"Режим МТ",
soft_failures,
).actual(second_site_signal_pump).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description}\n"
f"на участке Нововеличковская-Крымская",
"Количество самотеков",
soft_failures,
).actual(second_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Нововеличковская-Крымская",
"Режим СОУ",
soft_failures,
).actual(second_site_signal_sou).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на участке Крымская-Грушовая",
"Режим МТ",
soft_failures,
).actual(
third_site_signal_pump
).expected(str(cfg.exp_krymskaya_grushovaya_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_true.description} на участке Крымская-Грушовая",
"Количество самотеков",
soft_failures,
).actual(third_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_true.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Крымская-Грушовая",
"Режим СОУ",
soft_failures,
).actual(
third_site_signal_sou
).expected(str(cfg.exp_krymskaya_grushovaya_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Бейсуг",
"Режим МТ",
soft_failures,
).actual(
fourth_site_signal_pump
).expected(str(cfg.exp_backup_route_bejsug_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Бейсуг",
"Количество самотеков",
soft_failures,
).actual(fourth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Бейсуг",
"Режим СОУ",
soft_failures,
).actual(
fourth_site_signal_sou
).expected(str(cfg.exp_backup_route_bejsug_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Понура",
"Режим МТ",
soft_failures,
).actual(
fifth_site_signal_pump
).expected(str(cfg.exp_backup_route_ponura_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Понура",
"Режим СОУ",
soft_failures,
).actual(
fifth_site_signal_sou
).expected(str(cfg.exp_backup_route_ponura_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Понура",
"Количество самотеков",
soft_failures,
).actual(fifth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Кубань",
"Режим МТ",
soft_failures,
).actual(
sixth_site_signal_pump
).expected(str(cfg.exp_backup_route_kuban_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Кубань",
"Режим СОУ",
soft_failures,
).actual(
sixth_site_signal_sou
).expected(str(cfg.exp_backup_route_kuban_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Кубань",
"Количество самотеков",
soft_failures,
).actual(sixth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на НПЗ Афипский",
"Режим МТ",
soft_failures,
).actual(
seventh_site_signal_pump
).expected(str(cfg.exp_npz_afipskij_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на НПЗ Афипский",
"Режим СОУ",
soft_failures,
).actual(
seventh_site_signal_sou
).expected(str(cfg.exp_npz_afipskij_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Афипский",
"Количество самотеков",
soft_failures,
).actual(seventh_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на НПЗ Ильинский",
"Режим МТ",
soft_failures,
).actual(
eight_site_signal_pump
).expected(str(cfg.exp_npz_ilinskij_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на НПЗ Ильинский",
"Режим СОУ",
soft_failures,
).actual(
eight_site_signal_sou
).expected(str(cfg.exp_npz_ilinskij_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Ильинский",
"Количество самотеков",
soft_failures,
).actual(eight_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
async def lds_status_init_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка наличия записи в журнале о входе СОУ в режим Инициализация.
"""
with allure.step("Запрос сообщений журнала с фильтром messageTypes=LDS_STATUS"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LDS_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
lds_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_ACCUM_DATA}': {'True' if lds_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START}'"
):
if lds_msg is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with allure.step("Проверка актуальности сообщения"):
msg_time_msk = t_utils.ensure_moscow_timezone(lds_msg.time)
start_time_msk = t_utils.localize_as_moscow(imitator_start_time)
StepCheck(
f"Проверка: время сообщения позднее времени старта имитатора {msg_time_msk} > {start_time_msk}",
"time",
).actual(msg_time_msk > start_time_msk).expected(True).equal_to()
with SoftAssertions() as soft_failures:
StepCheck("Проверка event", "event", soft_failures).actual(lds_msg.event).expected(
TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START
).equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(lds_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
lds_msg.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
lds_msg.technologicalObject
).is_not_none()
StepCheck("Проверка priority не пустой", "priority", soft_failures).actual(lds_msg.priority).is_not_none()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(lds_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_LDS_STATUS
).equal_to()
async def main_page_info(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка установки режима МТ.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck(
f"Проверка установки стационара для ТУ {cfg.tu_name}",
"stationary_status",
soft_failures,
).actual(
parsed_payload.replyContent.tuInfo.stationaryStatus
).expected(cfg.expected_stationary_status).equal_to()
async def main_page_info_signals(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка счетчиков состояния сигналов
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageSignalsInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageSignalsInfoContent",
"subscribeMainPageSignalsInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_signals_msg(payload)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
field_name = "numberOfRejectedSignals"
# Проверяет что количество отбракованных сигналов больше или равно ОР
StepCheck(
f"Проверка количества отбракованных сигналов ТУ {cfg.tu_name}",
field_name,
soft_failures,
).actual(
parsed_payload.replyContent.signalsInfo.numberOfRejectedSignals
).is_greater_than_or_equal_to(cfg.expected_main_page_signals[field_name])
field_name = "numberOfMaskedSignals"
StepCheck(
f"Проверка количества маскированных сигналов ТУ {cfg.tu_name}",
field_name,
soft_failures,
).actual(
parsed_payload.replyContent.signalsInfo.numberOfMaskedSignals
).expected(cfg.expected_main_page_signals[field_name]).equal_to()
field_name = "numberOfImitatedSignals"
StepCheck(
f"Проверка количества имитированных сигналов ТУ {cfg.tu_name}",
field_name,
soft_failures,
).actual(
parsed_payload.replyContent.signalsInfo.numberOfImitatedSignals
).expected(cfg.expected_main_page_signals[field_name]).equal_to()
async def main_page_info_unstationary(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка установки режима Нестационар (для наборов с несколькими утечками).
Запускается после первой утечки, когда режим переходит в Нестационар.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck(
f"Проверка установки режима Нестационар для ТУ {cfg.tu_name}",
"stationary_status",
soft_failures,
).actual(parsed_payload.replyContent.tuInfo.stationaryStatus).expected(
StationaryStatus.UNSTATIONARY.value
).equal_to()
async def leak_is_confirm_on_main_page(ws_client, cfg: SmokeSuiteConfig):
"""
MainPageInfoContent - проверка подтвержденной утечки на ЭФ Состояние МТ
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent."):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
main_page_leak_info = parsed_payload.replyContent.tuInfo.leaksInfo
confirm_leak = t_utils.find_object_by_field(main_page_leak_info, "leakStatus", LeakStatus.CONFIRMED.value)
StepCheck("Проверка подтвержденной утечки на ЭФ Состояние МТ", "leakStatus").actual(
confirm_leak.leakStatus
).expected(LeakStatus.CONFIRMED.value).equal_to()
async def leak_is_complete_on_main_page(ws_client, cfg: SmokeSuiteConfig):
"""
MainPageInfoContent - отсутствует подтвержденная утечка на ЭФ Состояние МТ
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent."):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
main_page_leak_info = parsed_payload.replyContent.tuInfo.leaksInfo
confirmed_and_closed_leaks = t_utils.find_confirmed_leaks_on_main_page(main_page_leak_info)
StepCheck("Проверка подтвержденной утечки на ЭФ Состояние МТ", "leakStatus").actual(
confirmed_and_closed_leaks
).is_empty()
async def imitate_senor_signal(ws_client, cfg: SmokeSuiteConfig, test_data: CaseData):
"""
Проверка имитации сигнала датчика.
"""
# Распаковка данных для теста
sensor_id = test_data.params.get("sensor_id")
sensor_val, sensor_quality = test_data.expected_result
with allure.step(f"Отправка сообщения и обработка ответа об имитации сигнала датчика с id: {sensor_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"ImitateSignalRequest",
{
'id': sensor_id,
'tuId': cfg.tu_id,
'imitateInfo': {
'value': str(sensor_val),
'quality': sensor_quality,
'additionalProperties': None,
},
'additionalProperties': None,
},
)
parsed_payload = parser.parse_imitate_signal_msg(payload)
sensor_imitate_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос об имитации", "replyStatus").actual(
sensor_imitate_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчика из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [sensor_id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
sensor_data = parsed_payload.replyContent.inputSignals
sensor_imitate_data = t_utils.find_object_by_field(sensor_data, "id", sensor_id)
with allure.step(f"Отправка сообщения и обработка ответа о снятии имитации датчика с id: {sensor_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"UnimitateSignalRequest",
{'id': sensor_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_unimitate_signal_msg(payload)
sensor_unimitate_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о снятии имитации", "replyStatus").actual(
sensor_unimitate_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчика из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [sensor_id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
sensor_data = parsed_payload.replyContent.inputSignals
sensor_unimitate_data = t_utils.find_object_by_field(sensor_data, "id", sensor_id)
with SoftAssertions() as soft_failures:
StepCheck(f"Проверка имитации датчика с id: {sensor_id}", "isImitated", soft_failures).actual(
sensor_imitate_data.isImitated
).expected(True).equal_to()
StepCheck(f"Проверка показаний датчика с id: {sensor_id}", "value", soft_failures).actual(
sensor_imitate_data.imitation.value
).expected(sensor_val).equal_to()
StepCheck(f"Проверка качества сигнала датчика с id: {sensor_id}", "quality", soft_failures).actual(
sensor_imitate_data.quality
).expected(sensor_quality).equal_to()
StepCheck(f"Проверка снятия имитации датчика с id: {sensor_id}", "isImitated", soft_failures).actual(
sensor_unimitate_data.isImitated
).expected(False).equal_to()
async def mask_signal_msg(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка маскирования датчиков.
"""
with allure.step("Подключение по ws, получение и обработка данных датчиков давления и расхода"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetInputSignalsRequest",
{
'tuId': cfg.tu_id,
'sorting': None,
'filtering': None,
'columnsSelection': 512,
'search': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_msg(payload)
# Получает список датчиков давления
pressure_sensor_list = [
sensor
for sensor in parsed_payload.replyContent
if sensor.signalType == TestConst.PRESSURE_SIGNAL_TYPE
and sensor.objectType == TestConst.PRESSURE_SENSOR_OBJECT_TYPE
]
# Получает список расходомеров
flowmeter_list = [
sensor
for sensor in parsed_payload.replyContent
if sensor.signalType == TestConst.FLOW_SIGNAL_TYPE and sensor.objectType == TestConst.FLOWMETER_OBJECT_TYPE
]
# Случайно выбирает 1 расходомер и 1 датчик давления
pressure_sensor = t_utils.get_random_item(pressure_sensor_list)
flowmeter = t_utils.get_random_item(flowmeter_list)
with allure.step("Маскирование датчиков"):
with allure.step(
f"Отправка сообщения и обработка ответа о маскировании датчика давления с id: {pressure_sensor.id}"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"MaskSignalRequest",
{'id': pressure_sensor.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_mask_signal_msg(payload)
pressure_sensor_mask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus").actual(
pressure_sensor_mask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(f"Отправка сообщения и обработка ответа о маскировании расходомера с id: {flowmeter.id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"MaskSignalRequest",
{'id': flowmeter.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_mask_signal_msg(payload)
flowmeter_mask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus").actual(
flowmeter_mask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчиков из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [pressure_sensor.id, flowmeter.id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
sensor_data = parsed_payload.replyContent.inputSignals
pressure_sensor_mask_data = t_utils.find_object_by_field(sensor_data, "id", pressure_sensor.id)
flowmeter_mask_data = t_utils.find_object_by_field(sensor_data, "id", flowmeter.id)
with allure.step("Снятие маскирования датчиков"):
with allure.step(
f"Отправка сообщения и обработка ответа о снятии маскирования датчика давления с id: {pressure_sensor.id}"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"UnmaskSignalRequest",
{'id': pressure_sensor.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_unmask_signal_msg(payload)
pressure_sensor_unmask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о снятии маскирования", "replyStatus").actual(
pressure_sensor_unmask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
f"Отправка сообщения и обработка ответа о снятии маскирования расходомера с id: {flowmeter.id}"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"UnmaskSignalRequest",
{'id': flowmeter.id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_unmask_signal_msg(payload)
flowmeter_unmask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о снятии маскирования", "replyStatus").actual(
flowmeter_unmask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчиков из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [pressure_sensor.id, flowmeter.id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
sensor_data = parsed_payload.replyContent.inputSignals
pressure_sensor_unmask_data = t_utils.find_object_by_field(sensor_data, "id", pressure_sensor.id)
flowmeter_unmask_data = t_utils.find_object_by_field(sensor_data, "id", flowmeter.id)
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка маскирования датчика давления с id: {pressure_sensor.id}", "isMasked", soft_failures
).actual(pressure_sensor_mask_data.isMasked).expected(True).equal_to()
StepCheck(f"Проверка маскирования расходомера с id: {flowmeter.id}", "isMasked", soft_failures).actual(
flowmeter_mask_data.isMasked
).expected(True).equal_to()
StepCheck(
f"Проверка снятия маскирования датчика давления с id: {pressure_sensor.id}", "isMasked", soft_failures
).actual(pressure_sensor_unmask_data.isMasked).expected(False).equal_to()
StepCheck(f"Проверка снятия маскирования расходомера с id: {flowmeter.id}", "isMasked", soft_failures).actual(
flowmeter_unmask_data.isMasked
).expected(False).equal_to()
async def mask_info_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка записей журнала о маскировании и размаскировании.
"""
with allure.step("Запрос сообщений журнала с фильтром userActions"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_MASK_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(userActions=int(UserActions.SIGNAL_MASK_SIM), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
all_messages = parsed_payload.replyContent.messagesInfo
with allure.step("Фильтрация сообщений по событиям маскирования и временному диапазону"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
mask_unmask_msgs = [
msg
for msg in all_messages
if msg.event in TestConst.JOURNAL_MASK_EXPECTED_EVENTS
and msg.signalName in TestConst.JOURNAL_MASK_EXPECTED_SIGNALS
]
journal_messages = [
msg
for msg in mask_unmask_msgs
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
allure.attach(
f"Всего получено сообщений: {len(all_messages)}\n"
f"После фильтрации по event и signalName осталось сообщений: {len(mask_unmask_msgs)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}) "
f"осталось сообщений: {len(journal_messages)}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Группировка отфильтрованных сообщений"):
pressure_msgs = [msg for msg in journal_messages if msg.signalName == TestConst.JOURNAL_SIGNAL_PRESSURE]
flow_msgs = [msg for msg in journal_messages if msg.signalName == TestConst.JOURNAL_SIGNAL_FLOW]
mask_event_msgs = [msg for msg in journal_messages if msg.event == TestConst.JOURNAL_EVENT_MASK]
unmask_event_msgs = [msg for msg in journal_messages if msg.event == TestConst.JOURNAL_EVENT_UNMASK]
mask_signal_names = {msg.signalName for msg in mask_event_msgs}
unmask_signal_names = {msg.signalName for msg in unmask_event_msgs}
with SoftAssertions() as journal_soft_failures:
StepCheck(
"Проверка соответствия количества сообщений о действиях пользователя (снятие и установка "
"маскирования для датчиков давления и расходомеров)",
"total_count",
journal_soft_failures,
).actual(len(journal_messages)).expected(TestConst.JOURNAL_EXPECTED_MASK_MSG_TOTAL).equal_to()
StepCheck(
f"Проверка соответствия количества сообщений "
f"о действиях пользователя для датчиков давления - '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
"count",
journal_soft_failures,
).actual(len(pressure_msgs)).expected(TestConst.JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL).equal_to()
StepCheck(
f"Проверка соответствия количества сообщений "
f"о действиях пользователя для расходомеров - '{TestConst.JOURNAL_SIGNAL_FLOW}'",
"count",
journal_soft_failures,
).actual(len(flow_msgs)).expected(TestConst.JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_MASK}' содержит '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_PRESSURE in mask_signal_names).expected(True).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_MASK}' содержит '{TestConst.JOURNAL_SIGNAL_FLOW}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_FLOW in mask_signal_names).expected(True).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_UNMASK}' содержит '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_PRESSURE in unmask_signal_names).expected(True).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_UNMASK}' содержит '{TestConst.JOURNAL_SIGNAL_FLOW}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_FLOW in unmask_signal_names).expected(True).equal_to()
for signal_name in [TestConst.JOURNAL_SIGNAL_PRESSURE, TestConst.JOURNAL_SIGNAL_FLOW]:
mask_msg_for_signal = next((msg for msg in mask_event_msgs if msg.signalName == signal_name), None)
unmask_msg_for_signal = next((msg for msg in unmask_event_msgs if msg.signalName == signal_name), None)
if mask_msg_for_signal and unmask_msg_for_signal:
StepCheck(
f"Проверка совпадения tag для '{signal_name}' между маскированием и снятием",
"tag",
journal_soft_failures,
).actual(mask_msg_for_signal.tag).expected(unmask_msg_for_signal.tag).equal_to()
for msg in journal_messages:
msg_label = f"{msg.event} - {msg.signalName}"
StepCheck(
f"Проверка user не пустой [{msg_label}]",
"user",
journal_soft_failures,
).actual(msg.user).is_not_none()
StepCheck(
f"Проверка mainPipeline [{msg_label}]",
"mainPipeline",
journal_soft_failures,
).actual(
msg.mainPipeline
).expected(cfg.main_pipeline).equal_to()
StepCheck(
f"Проверка object не пустой [{msg_label}]",
"object",
journal_soft_failures,
).actual(msg.object).is_not_none()
StepCheck(
f"Проверка technologicalObject не пустой [{msg_label}]",
"technologicalObject",
journal_soft_failures,
).actual(msg.technologicalObject).is_not_none()
StepCheck(
f"Проверка technologicalSection [{msg_label}]",
"technologicalSection",
journal_soft_failures,
).actual(msg.technologicalSection).expected(cfg.tu_name).equal_to()
StepCheck(
f"Проверка priority не пустой [{msg_label}]",
"priority",
journal_soft_failures,
).actual(msg.priority).is_not_none()
StepCheck(
f"Проверка messageType [{msg_label}]",
"messageType",
journal_soft_failures,
).actual(
msg.messageType
).expected(TestConst.JOURNAL_MESSAGE_TYPE_USER_ACTIONS).equal_to()
StepCheck(
f"Проверка status [{msg_label}]",
"status",
journal_soft_failures,
).actual(
msg.status
).expected(TestConst.JOURNAL_STATUS_SUCCESS).equal_to()
async def mask_du_on_mini_scheme(ws_client, cfg: SmokeSuiteConfig):
"""
Маскирование ДУ на мини-схеме
Проверка маскированного участка в выходных сигналах
"""
linear_part_id = cfg.linear_part_identifier_for_mask
mask_reason = cfg.mask_reason
with allure.step(
"Подключение по ws, отправка сообщения типа: MaskLdsRequest. Совершается действие - маскирование ДУ"
):
payload = (
await t_utils.connect_and_get_msg(
ws_client,
"MaskLdsRequest",
{
'tuId': cfg.tu_id,
'maskInfo': [
{
'linearPartId': linear_part_id,
'reason': mask_reason,
'additionalProperties': None,
}
],
'additionalProperties': None,
},
),
)
time.sleep(cfg.basic_message_timeout)
parsed_payload = parser.parse_unmask_lds_message(payload)
flowmeter_mask_reply_status = parsed_payload.replyStatus
with allure.step(f"Получение словаря для линейного участка с id: {linear_part_id}.\n" f"ЭФ Выходные сигналы."):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Получение типов выходных сигналов из обработанных данных. ЭФ Выходные сигналы"):
leak_signals_list = leak_linear_part.signals
mask_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
)
with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}\n"):
with allure.step(
"Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo. "
"Получен результат маскирования ДУ на ЭФ Выходные сигнал"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Обработка полученных данных выходных сигналов"):
leak_signals_list = leak_linear_part.signals
mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent. "
"Получен результат маскирования ДУ на ЭФ Схема"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
linear_parts = parsed_payload.replyContent.linearParts
mask_linear_part = next((lp for lp in linear_parts if lp.id == linear_part_id), None)
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MessagesInfo. "
"Получен результат маскирования ДУ на ЭФ Журнал"
):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=10, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.MASKING_LDS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
if cfg.technological_section:
mask_message = t_utils.find_object_by_field(
messages_info, "technologicalSection", cfg.technological_section
)
else:
mask_message = parsed_payload.replyContent.messagesInfo[0]
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent. "
"Получен результат маскирования ДУ на ЭФ Состояние МТ"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
number_of_masked_lps = parsed_payload.replyContent.tuInfo.ldsStatus.numberOfMaskedLps
main_page_info_lds_status_obj = parsed_payload.replyContent.tuInfo.ldsStatus
mask_du_list = getattr(main_page_info_lds_status_obj, "maskedLps", None)
if mask_du_list:
masked_lps_name = next(iter(mask_du_list), None)
else:
masked_lps_name = None
# Проверки сообщений
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка сигнала маскирования ДУ в выходных сигналах", TestConst.ADDRESS_SUFFIX_MASK, soft_failures
).actual(mask_leak_value).expected(TestConst.OUTPUT_IS_MASK).equal_to()
StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus", soft_failures).actual(
flowmeter_mask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка статуса маскирования ДУ на схеме", "isMasked", soft_failures).actual(
mask_linear_part.isMasked
).expected(True).equal_to()
StepCheck("Проверка причины маски в журнале", "maskReason").actual(mask_linear_part.maskReason).expected(
cfg.mask_reason
).equal_to()
StepCheck("Проверка имени ТУ в журнале", "mainPipeline", soft_failures).actual(
mask_message.mainPipeline
).expected(cfg.main_pipe_line).equal_to()
StepCheck("Проверка имени ДУ в журнале", "technologicalObject", soft_failures).actual(
mask_message.technologicalObject
).expected(cfg.mask_du_name).equal_to()
StepCheck("Проверка события в журнале", "event", soft_failures).actual(mask_message.event).expected(
cfg.mask_du_event
).equal_to()
StepCheck("Проверка количества маскированных ДУ", "numberOfMaskedLps", soft_failures).actual(
number_of_masked_lps
).expected(cfg.mask_one_du).equal_to()
StepCheck("Проверка счетчика маски. ЭФ Состояние МТ", "Количество замаскированных ДУ", soft_failures).actual(
number_of_masked_lps
).expected(cfg.mask_one_du).equal_to()
StepCheck(
"Проверка имени маскированного ДУ. ЭФ Состояние МТ", "Наименование замаскированного ДУ", soft_failures
).actual(masked_lps_name).expected(cfg.mask_du_name).equal_to()
async def unmask_du_on_mini_scheme(ws_client, cfg: SmokeSuiteConfig):
"""
Размаскирование ДУ на мини-схеме
Проверка маскированного участка в выходных сигналах
"""
linear_part_id = cfg.linear_part_identifier_for_mask
unmask_reason = cfg.unmask_reason
with allure.step(
"Подключение по ws, отправка сообщения типа: UnmaskLdsRequest. Совершается действие - размаскирование ДУ"
):
payload = (
await t_utils.connect_and_get_msg(
ws_client,
"UnmaskLdsRequest",
{
'tuId': cfg.tu_id,
'maskInfo': [
{
'linearPartId': linear_part_id,
'reason': unmask_reason,
'additionalProperties': None,
}
],
'additionalProperties': None,
},
),
)
time.sleep(cfg.basic_message_timeout)
parsed_payload = parser.parse_unmask_lds_message(payload)
flowmeter_mask_reply_status = parsed_payload.replyStatus
with allure.step(f"Получение словаря для линейного участка с id: {linear_part_id}\n" f"ЭФ Выходные сигналы"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Получение типов выходных сигналов из обработанных данных. ЭФ Выходные сигналы"):
leak_signals_list = leak_linear_part.signals
mask_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
)
with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}\n"):
with allure.step("Получен результат маскирования ДУ на ЭФ Выходные сигналы"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Обработка полученных данных выходных сигналов"):
leak_signals_list = leak_linear_part.signals
mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MessagesInfo. "
"Получен результат маскирования ДУ на ЭФ Журнал"
):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=10, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.MASKING_LDS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
if cfg.technological_section:
mask_message = t_utils.find_object_by_field(
messages_info, "technologicalSection", cfg.technological_section
)
else:
mask_message = parsed_payload.replyContent.messagesInfo[0]
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent. "
"Получен результат маскирования ДУ на ЭФ Состояние МТ"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
number_of_masked_lps = parsed_payload.replyContent.tuInfo.ldsStatus.numberOfMaskedLps
main_page_info_lds_status_obj = parsed_payload.replyContent.tuInfo.ldsStatus
masked_lps_name = getattr(main_page_info_lds_status_obj, "maskedLps", None)
# Проверки сообщений
with SoftAssertions() as soft_failures:
StepCheck("Проверка кода ответа на запрос о размаскировании", "replyStatus", soft_failures).actual(
flowmeter_mask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck(
"Проверяем, что тег маскирования ДУ в выходных сигналах равен null",
TestConst.ADDRESS_SUFFIX_MASK,
soft_failures,
).actual(mask_leak_value).expected(TestConst.OUTPUT_IS_NOT_MASK).equal_to()
StepCheck("Проверяем имя ТУ в сообщении в журнале", "mainPipeline", soft_failures).actual(
mask_message.mainPipeline
).expected(cfg.main_pipe_line).equal_to()
StepCheck("Проверяем имя ДУ в сообщении в журнале", "technologicalObject", soft_failures).actual(
mask_message.technologicalObject
).expected(cfg.mask_du_name).equal_to()
StepCheck("Проверка события в сообщении в журнале", "event", soft_failures).actual(mask_message.event).expected(
cfg.unmask_du_event
).equal_to()
StepCheck("Проверка количества маскированных ДУ", "numberOfMaskedLps", soft_failures).actual(
number_of_masked_lps
).expected(cfg.not_mask_du).equal_to()
StepCheck("Проверка счетчика маски. ЭФ Состояние МТ", "Количество замаскированных ДУ", soft_failures).actual(
number_of_masked_lps
).expected(cfg.not_mask_du).equal_to()
StepCheck(
"Проверка отсутствия списка маскированных ДУ. ЭФ Состояние МТ",
"Отсутствуют замаскированные ДУ",
soft_failures,
).actual(masked_lps_name).is_none()
async def lds_status_initialization_out(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка выхода СОУ из Инициализации.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
flow_areas = parsed_payload.replyContent.flowAreas
longest_flow_area = t_utils.get_longest_flow_area(flow_areas)
diagnostic_areas = longest_flow_area.diagnosticAreas
allure.attach(
f"Самый протяженный участок карты течений: {longest_flow_area}",
name="flowArea. Выход из Инициализации",
attachment_type=allure.attachment_type.TEXT,
)
lds_status_set = {diagnostic_area.ldsStatus for diagnostic_area in diagnostic_areas}
lds_status = t_utils.determine_lds_status_by_priority(lds_status_set)
StepCheck(
"Проверка: СОУ находится не в режиме 'Инициализация'",
"ldsStatus",
).actual(
lds_status
).expected(LdsStatus.INITIALIZATION.value).is_not_equal_to()
async def lds_status_init_out_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка наличия записи в журнале о выходе СОУ из режима Инициализация.
"""
with allure.step("Запрос сообщений журнала с фильтром messageTypes=LDS_STATUS"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LDS_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
lds_msg = next(
(msg for msg in time_filtered if msg.technologicalSection == cfg.tu_name),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}': {'True' if lds_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}'"):
if lds_msg is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with allure.step("Проверка актуальности сообщения"):
msg_time_msk = t_utils.ensure_moscow_timezone(lds_msg.time)
start_time_msk = t_utils.localize_as_moscow(imitator_start_time)
StepCheck(
f"Проверка: время сообщения позднее времени старта имитатора {msg_time_msk} > {start_time_msk}",
"time",
).actual(msg_time_msk > start_time_msk).expected(True).equal_to()
with SoftAssertions() as soft_failures:
StepCheck("Проверка: event не является Инициализацией", "event", soft_failures).actual(lds_msg.event).expected(
TestConst.JOURNAL_EVENT_LDS_INIT_ACCUM_DATA
).is_not_equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(lds_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
lds_msg.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
lds_msg.technologicalObject
).is_not_none()
StepCheck("Проверка priority не пустой", "priority", soft_failures).actual(lds_msg.priority).is_not_none()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(lds_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_LDS_STATUS
).equal_to()
async def leaks_content(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка утечки через сообщение LeaksContent.
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: LeaksContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"LeaksContent",
"SubscribeLeaksRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_leaks_content_msg(payload)
leaks_list_info = parsed_payload.replyContent.leaksListInfo
# Ищет подтвержденные утечки
confirmed_leaks_list = t_utils.find_confirmed_leaks(leaks_list_info)
first_leak_info = t_utils.find_leak_by_coordinate(confirmed_leaks_list, leak.coordinate_meters)
# Конвертируем время обнаружения в московское время
leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.detectedAt)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.allowed_time_diff_seconds,
detected_at_tz=leak_detected_at.tzinfo,
)
leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.leakVolume)
leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
first_leak_info.diagnosticAreaName
).is_not_none()
StepCheck("Проверка статуса утечки", "confirmationStatus", soft_failures).actual(
first_leak_info.confirmationStatus
).expected(leak.expected_leak_status).equal_to()
StepCheck("Проверка источника события (алгоритм)", "type", soft_failures).actual(first_leak_info.type).expected(
leak.expected_algorithm_type
).equal_to()
StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_detected_at
).is_between(leak_wait_start_time, leak_wait_end_time)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
async def possible_leak_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка наличия сообщения 'Возможна утечка' в журнале.
"""
with allure.step("Подключение по ws, получение и обработка сообщений журнала типа: MessagesInfoContent"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
possible_leak_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_POSSIBLE_LEAK
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}': {'True' if possible_leak_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}'"
):
if possible_leak_msg is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса утечки в журнале", "event", soft_failures).actual(possible_leak_msg.event).expected(
TestConst.JOURNAL_EVENT_POSSIBLE_LEAK
).equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(
possible_leak_msg.mainPipeline
).expected(cfg.main_pipeline).equal_to()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(possible_leak_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_LEAKS
).equal_to()
StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
possible_leak_msg.technologicalSection
).is_not_none()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
possible_leak_msg.technologicalObject
).is_not_none()
async def leak_info_in_journal(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
end_time = datetime.now()
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
leak_message = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and TestConst.JOURNAL_EVENT_DETECTED_LEAK in msg.event
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Первичная проверка после фильтрации"):
StepCheck(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event содержит подстроку подтвержденной утечки'{TestConst.JOURNAL_EVENT_DETECTED_LEAK}'",
"event",
).actual(leak_message).is_not_none()
leak_coordinate_km, leak_volume_m3 = t_utils.parse_journal_msg_value(leak_message.value)
leak_coordinate_round = round(leak_coordinate_km * TestConst.KM_TO_METERS, TestConst.PRECISION)
leak_message_time = t_utils.ensure_moscow_timezone(leak_message.time)
with SoftAssertions() as soft_failures:
StepCheck("Проверка полученного события event", "event", soft_failures).contains(
leak_message.event, TestConst.JOURNAL_EVENT_DETECTED_LEAK
)
StepCheck("Проверка полученного ТУ", "technologicalSection", soft_failures).actual(
leak_message.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка типа полученного сообщения", "messageType", soft_failures).actual(
leak_message.messageType
).expected(TestConst.JOURNAL_MESSAGE_TYPE_LEAKS).equal_to()
StepCheck("Проверка имени ДУ", "technologicalObject", soft_failures).actual(
leak_message.technologicalObject
).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_message_time
).is_between(filter_start_msk, filter_end_msk)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
async def completed_leak_info_in_journal(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка наличия сообщения 'Утечка завершена' в журнале.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
end_time = datetime.now()
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
completed_leak_message = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_COMPLETED_LEAKS
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Первичная проверка после фильтрации"):
StepCheck(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_COMPLETED_LEAKS}'",
"event",
).actual(completed_leak_message).is_not_none()
leak_coordinate_km, leak_volume_m3 = t_utils.parse_journal_msg_value(completed_leak_message.value)
leak_coordinate_round = round(leak_coordinate_km * TestConst.KM_TO_METERS, TestConst.PRECISION)
leak_message_time = t_utils.ensure_moscow_timezone(completed_leak_message.time)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса утечки в журнале", "event", soft_failures).actual(
completed_leak_message.event
).expected(TestConst.JOURNAL_EVENT_COMPLETED_LEAKS).equal_to()
StepCheck("Проверка полученного ТУ", "technologicalSection", soft_failures).actual(
completed_leak_message.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени завершения утечки", "leakDetectedAt", soft_failures).actual(
leak_message_time
).is_between(filter_start_msk, filter_end_msk)
async def all_leaks_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка сообщения AllLeaksInfo об утечке.
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent"):
parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
cfg.tu_id,
ws_client,
"AllLeaksInfoContent",
"subscribeAllLeaksInfoRequest",
[],
)
StepCheck("Проверка наличия сообщения об утечке типа AllLeaksInfoContent", "leaksInfo").actual(
parsed_payload.replyContent.leaksInfo
).is_not_empty()
with allure.step("Обработка сообщения об утечке типа AllLeaksInfoContent"):
leaks_info = parsed_payload.replyContent.leaksInfo
# Если у утечки указано имя ДУ - ищем по нему, иначе берём первую
first_leak_info = t_utils.find_leak_by_coordinate(leaks_info, leak.coordinate_meters)
# Конвертируем время обнаружения в московское время
leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.leakDetectedAt)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.allowed_time_diff_seconds,
detected_at_tz=leak_detected_at.tzinfo,
)
leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.volume)
leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
first_leak_info.diagnosticAreaName
).is_not_none()
StepCheck("Проверка статуса СОУ", "ldsStatus", soft_failures).actual(first_leak_info.ldsStatus).expected(
leak.expected_lds_status
).equal_to()
StepCheck("Проверка маскирования утечки", "isMasked", soft_failures).actual(first_leak_info.isMasked).expected(
False
).equal_to()
StepCheck("Проверка квитирования утечки", "isAcknowledged", soft_failures).actual(
first_leak_info.isAcknowledged
).expected(False).equal_to()
StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_detected_at
).is_between(leak_wait_start_time, leak_wait_end_time)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
StepCheck("Проверка режима ТУ", "stationaryStatus", soft_failures).actual(
first_leak_info.stationaryStatus
).expected(leak.expected_stationary_status).equal_to()
async def all_leaks_is_empty(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка отсутствия информации об утечке
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent"):
parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
cfg.tu_id,
ws_client,
"AllLeaksInfoContent",
"subscribeAllLeaksInfoRequest",
[],
)
StepCheck("Проверка отсутствия информации об утечке в сообщении AllLeaksInfoContent", "leaksInfo").actual(
parsed_payload.replyContent.leaksInfo
).is_empty()
async def tu_leaks_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка сообщения TuLeaksInfo об утечке.
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: TuLeaksInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"TuLeaksInfoContent",
"subscribeTuLeaksInfoRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_tu_leaks_info_msg(payload)
StepCheck("Проверка наличия сообщения об утечке типа TuLeaksInfoContent", "leaksInfo").actual(
parsed_payload.replyContent.leaksInfo
).is_not_empty()
with allure.step("Обработка сообщения об утечке типа TuLeaksInfoContent"):
tu_leaks_info_list = parsed_payload.replyContent.leaksInfo
first_leak_info = t_utils.find_leak_by_coordinate(tu_leaks_info_list, leak.coordinate_meters)
# Конвертируем время обнаружения в московское время
leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.leakDetectedAt)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.allowed_time_diff_seconds,
detected_at_tz=leak_detected_at.tzinfo,
)
leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.volume)
leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck("Проверка наличия id участка утечки", "controlledSiteId", soft_failures).actual(
first_leak_info.controlledSiteId
).is_not_none()
StepCheck("Проверка статуса СОУ", "ldsStatus", soft_failures).actual(first_leak_info.ldsStatus).expected(
leak.expected_lds_status
).equal_to()
StepCheck("Проверка маскирования утечки", "isMasked", soft_failures).actual(first_leak_info.isMasked).expected(
False
).equal_to()
StepCheck("Проверка наличия pipeId в сообщении", "pipeId", soft_failures).actual(
first_leak_info.pipeId
).is_not_none()
StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_detected_at
).is_between(leak_wait_start_time, leak_wait_end_time)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
StepCheck("Проверка режима ТУ", "stationaryStatus", soft_failures).actual(
first_leak_info.stationaryStatus
).expected(leak.expected_stationary_status).equal_to()
async def lds_status_during_leak(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка режима работы СОУ во время утечки.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
flow_areas = parsed_payload.replyContent.flowAreas
status_config = leak.lds_status_during_leak_config
if status_config is None:
pytest.fail("Не задан leak.lds_status_during_leak_config для теста lds_status_during_leak")
leak_diagnostic_area = t_utils.find_diagnostic_area_by_pipe_id(
flow_areas, status_config.leak_diagnostic_area_pipe_id
)
if not leak_diagnostic_area:
pytest.fail(f"В сообщении не найден ДУ с id: {status_config.leak_diagnostic_area_id}")
# Формат конфига: status_config.in_neighbors / status_config.out_neighbors (dict[id] = expected_status)
in_neighbors: dict[int, int] = status_config.in_neighbors or {}
out_neighbors: dict[int, int] = status_config.out_neighbors or {}
all_neighbors = in_neighbors | out_neighbors
if not all_neighbors:
pytest.fail("Не заданы id, соседних с утечкой ДУ для теста lds_status_during_leak")
found_diagnostic_area_count = 0
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка режима работы СОУ на ДУ с утечкой, pipe_id ДУ: {status_config.leak_diagnostic_area_pipe_id}",
"ldsStatus",
soft_failures,
).actual(leak_diagnostic_area.ldsStatus).expected(status_config.leak_du_expected_lds_status).equal_to()
# Проверки соседних ДУ: поддерживаются 0. N соседей
for neighbor_pipe_id, expected_status in sorted(all_neighbors.items()):
diagnostic_area = t_utils.find_diagnostic_area_by_pipe_id(flow_areas, neighbor_pipe_id)
if diagnostic_area:
found_diagnostic_area_count += 1
StepCheck(
f"Проверка режима работы СОУ на соседнем ДУ, pipe_id ДУ: {neighbor_pipe_id}",
"ldsStatus",
soft_failures,
).actual(diagnostic_area.ldsStatus).expected(expected_status).equal_to()
if found_diagnostic_area_count == 0:
pytest.fail(f"Не найдены соседние с утечкой ДУ по pipe_id: {list(all_neighbors.keys())}")
async def acknowledge_leak_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig = None):
"""
Проверка квитирования утечки.
Для multi-leak наборов: после квитирования проверяется что утечка удалена из списка.
Для single-leak наборов: проверяется что список утечек пуст.
"""
with allure.step("Получение id утечки"):
with allure.step("Подключение по ws, получение и обработка сообщения об утечке типа: TuLeaksInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"TuLeaksInfoContent",
"subscribeTuLeaksInfoRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_tu_leaks_info_msg(payload)
with allure.step("Получение id утечки из принятого сообщения типа: TuLeaksInfoContent"):
StepCheck("Проверка наличия сообщения об утечке", "leaksInfo").actual(
parsed_payload.replyContent.leaksInfo
).is_not_empty()
leaks_info = parsed_payload.replyContent.leaksInfo
leak_to_ack = t_utils.find_leak_by_coordinate(leaks_info, leak.coordinate_meters)
acknowledged_leak_id = leak_to_ack.id
with allure.step(
"Подключение по ws, отправка сообщения и обработка ответа о квитировании утечки типа: AcknowledgeLeakRequest"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"AcknowledgeLeakRequest",
{'leakId': str(acknowledged_leak_id), 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_acknowledge_leak_msg(payload)
acknowledge_reply_status = parsed_payload.replyStatus
with allure.step(
"Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent для проверки квитирования"
):
with allure.step("Очистка очереди websocket сообщений"):
ws_client.clear_queue()
time.sleep(cfg.basic_message_timeout)
parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
cfg.tu_id,
ws_client,
"AllLeaksInfoContent",
"subscribeAllLeaksInfoRequest",
[],
)
remaining_leaks = parsed_payload.replyContent.leaksInfo
remaining_leak_ids = [leak.id for leak in remaining_leaks] if remaining_leaks else []
StepCheck("Проверка кода ответа на запрос о квитировании", "replyStatus").actual(acknowledge_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
# Проверяем что квитированная утечка исчезла из списка
StepCheck("Проверка отсутствия квитированной утечки в списке AllLeaksInfo", "id").does_not_contain(
remaining_leak_ids, acknowledged_leak_id
)
async def acknowledge_leak_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка записи в журнале о квитировании утечки.
"""
with allure.step("Запрос сообщений журнала с фильтром userActions=LEAK_ACK"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(userActions=int(UserActions.LEAK_ACK), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
ack_message = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}': {'True' if ack_message else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}'"
):
if ack_message is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with allure.step("Проверка актуальности сообщения"):
msg_time_msk = t_utils.ensure_moscow_timezone(ack_message.time)
start_time_msk = t_utils.localize_as_moscow(imitator_start_time)
StepCheck(
"Проверка: время сообщения позднее времени старта имитатора",
"time",
).actual(
msg_time_msk > start_time_msk
).expected(True).equal_to()
with SoftAssertions() as soft_failures:
StepCheck("Проверка event", "event", soft_failures).actual(ack_message.event).expected(
TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED
).equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(ack_message.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
ack_message.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
ack_message.technologicalObject
).is_not_none()
async def output_signals(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка наличия данных об утечке в выходных сигналах.
"""
linear_part_id = leak.linear_part_id
with allure.step(f"Получение списка выходных сигналов для линейного участка с id: {linear_part_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Получение типов выходных сигналов из обработанных данных"):
leak_signals_list = leak_linear_part.signals
ack_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_ACK_LEAK
)
leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_LEAK
)
mask_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
)
point_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_POINT_LEAK
)
q_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_Q_LEAK
)
time_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_TIME_LEAK
)
with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}"):
with allure.step("Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Обработка полученных данных выходных сигналов"):
leak_signals_list = leak_linear_part.signals
ack_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, ack_leak_signal_type)
leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, leak_signal_type)
mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
point_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, point_leak_signal_type)
q_leak_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, q_leak_signal_type)
time_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, time_leak_signal_type).strip()
StepCheck("Проверка наличия времени утечки", TestConst.ADDRESS_SUFFIX_TIME_LEAK).actual(
time_leak_value
).is_not_none()
time_leak_value_datetime = t_utils.to_moscow_timezone(time_leak_value)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.output_allowed_time_diff_seconds,
detected_at_tz=time_leak_value_datetime.tzinfo,
)
q_leak_value_m3 = t_utils.convert_leak_volume_m3(float(q_leak_leak_value))
point_leak_value_round = round(float(point_leak_value), cfg.precision)
with SoftAssertions() as soft_failures:
StepCheck("Проверка сигнала квитирования утечки", TestConst.ADDRESS_SUFFIX_ACK_LEAK, soft_failures).actual(
ack_leak_value
).expected(TestConst.OUTPUT_IS_ACK_LEAK).equal_to()
StepCheck("Проверка сигнала наличия утечки", TestConst.ADDRESS_SUFFIX_LEAK, soft_failures).actual(
leak_value
).expected(TestConst.OUTPUT_IS_LEAK).equal_to()
StepCheck("Проверка сигнала маскирования утечки", TestConst.ADDRESS_SUFFIX_MASK, soft_failures).actual(
mask_leak_value
).expected(TestConst.OUTPUT_IS_NOT_MASK).equal_to()
StepCheck("Проверка сигнала координаты утечки", TestConst.ADDRESS_SUFFIX_POINT_LEAK, soft_failures).actual(
point_leak_value_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка сигнала объема утечки", TestConst.ADDRESS_SUFFIX_Q_LEAK, soft_failures).actual(
q_leak_value_m3
).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
StepCheck("Проверка времени обнаружения утечки", TestConst.ADDRESS_SUFFIX_TIME_LEAK, soft_failures).actual(
time_leak_value_datetime
).is_between(leak_wait_start_time, leak_wait_end_time)
async def balance_algorithm_leak_waiting(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка подозрения утечки через BalanceAlgorithmResults
Логика:
- Подписка на BalanceAlgorithmResults однократно
- Раз в BALANCE_ALGORITHM_POLL_INTERVAL секунд забираем из очереди свежее сообщение
- Собираем все diagnosticAreas (только из flowAreas с непустым списком)
- Проверяем, что на ДУ с утечкой хотя бы раз пришёл isLeakPossible=True
- Проверяем, что на всех остальных ДУ isLeakPossible всегда False
- Проверяем дебаланс на ДУ с будущей утечкой, дебаланс должен быть выше значения порога - 20%
"""
poll_interval = TestConst.BALANCE_ALGORITHM_POLL_INTERVAL
total_wait = TestConst.BALANCE_ALGORITHM_TOTAL_WAIT
end_time = imitator_start_time + timedelta(
seconds=leak.balance_algorithm_leak_waiting_test.offset * 60 + total_wait
)
with allure.step(
f"Подписка и сбор BalanceAlgorithmResults раз в {poll_interval} с, в течение {total_wait} с после начала утечки"
):
await t_utils.connect(
ws_client,
"SubscribeBalanceAlgorithmResultsRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
collected_diagnostic_areas = await t_utils.poll_balance_algorithm_diagnostic_areas(
ws_client,
parser,
imitator_start_time,
end_time,
poll_interval,
)
if collected_diagnostic_areas is not None:
allure.attach(
str(collected_diagnostic_areas),
name="Тестируемый фрагмент ответа с бэка",
attachment_type=allure.attachment_type.TEXT,
)
diagnostic_area_names_with_possible = [
diagnostic_area.name for diagnostic_area in collected_diagnostic_areas if diagnostic_area.isLeakPossible
]
diagnostic_area_possible_leak = next(
(diagnostic_area for diagnostic_area in collected_diagnostic_areas if diagnostic_area.isLeakDetected),
None,
)
is_leak_possible_seen = any(diagnostic_area.isLeakPossible for diagnostic_area in collected_diagnostic_areas)
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка: получен хотя бы один ДУ с подозрением на утечку",
"isLeakPossible",
soft_failures,
).actual(diagnostic_area_names_with_possible).is_not_empty()
StepCheck(
f"Проверка: на ДУ {str(diagnostic_area_names_with_possible)} бы раз за "
f"{TestConst.BALANCE_ALGORITHM_TOTAL_WAIT / TestConst.SEC_PER_MIN} минут приходил"
" статус 'подозрение на утечку': isLeakPossible=True",
"isLeakPossible",
soft_failures,
).actual(is_leak_possible_seen).expected(True).equal_to()
if leak.flow_rate_settings_threshold is not None and diagnostic_area_possible_leak is not None:
threshold = leak.flow_rate_settings_threshold
tolerance = TestConst.DEBALANCE_TOLERANCE
lower_bound = threshold * (1 - tolerance)
StepCheck(
f"Проверка значения дебаланса на ДУ name={diagnostic_area_possible_leak.name} с будущей утечкой"
f" в пределах {int(tolerance * 100)}% снизу от порогового значения по объему: {threshold}).",
"debalance",
soft_failures,
).actual(abs(diagnostic_area_possible_leak.debalance)).is_greater_than(lower_bound)
async def balance_algorithm_leak_detected(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка наличия утечки (isLeakDetected) через BalanceAlgorithmResults.
Логика:
- Подписка на BalanceAlgorithmResultsContent
- Получение первого подходящего сообщения типа BalanceAlgorithmResultsContent
- Проверяем, что на ДУ с утечкой isLeakDetected=True
- Проверяем, что на всех остальных ДУ isLeakDetected=False
- Проверяем, что дебаланс на ДУ с утечкой > FLOW_RATE_SETTINGS_THRESHOLD
"""
with allure.step("Подписка и получение BalanceAlgorithmResultsContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"BalanceAlgorithmResultsContent",
"SubscribeBalanceAlgorithmResultsRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_balance_algorithm_msg(payload)
reply_content = parsed_payload.replyContent
if not reply_content or not reply_content.flowAreas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults отсутствуют flowAreas, "
"невозможно проверить наличие утечки"
)
all_diagnostic_areas = []
for flow_area in reply_content.flowAreas:
if flow_area.diagnosticAreas:
all_diagnostic_areas.extend(flow_area.diagnosticAreas)
if not all_diagnostic_areas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults во всех flowAreas отсутствуют diagnosticAreas, "
"невозможно проверить наличие утечки"
)
leak_diagnostic_area = next(
(diagnostic_area for diagnostic_area in all_diagnostic_areas if diagnostic_area.isLeakDetected),
None,
)
if leak_diagnostic_area is None:
pytest.fail("Ни одного ДУ с утечкой не найдено в ответе BalanceAlgorithmResultsContent")
leak_diagnostic_area_name = leak_diagnostic_area.name
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка: на ДУ name={leak_diagnostic_area_name} обнаружена утечка",
"isLeakDetected",
soft_failures,
).actual(leak_diagnostic_area.isLeakDetected).expected(True).equal_to()
foreign_with_detected = [
diagnostic_area
for diagnostic_area in all_diagnostic_areas
if diagnostic_area.name != leak_diagnostic_area_name and diagnostic_area.isLeakDetected
]
if not cfg.has_multiple_leaks:
StepCheck(
"Проверка: на остальных ДУ не обнаружена утечка, "
f" количество ДУ с неправильным статусом: {len(foreign_with_detected)}, "
f"их id: {[diagnostic_area.id for diagnostic_area in foreign_with_detected]})",
"isLeakDetected_without_leak",
soft_failures,
).actual(len(foreign_with_detected)).expected(0).equal_to()
if leak.flow_rate_settings_threshold is not None:
threshold = leak.flow_rate_settings_threshold
StepCheck(
f"Дебаланс на ДУ name={leak_diagnostic_area_name} по модулю больше порога для данного режима МТ:"
f" {threshold}",
"debalance",
soft_failures,
).actual(abs(leak_diagnostic_area.debalance)).is_greater_than(threshold)
async def balance_algorithm_leak_completed(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка отсутствия утечки (isLeakDetected) через BalanceAlgorithmResults.
Логика:
- Подписка на BalanceAlgorithmResultsContent.
- Получение первого подходящего сообщения типа BalanceAlgorithmResultsContent.
- Проверяем, что на всех ДУ флаг isLeakDetected=False.
- Проверяем, что дебаланс на всех ДУ < FLOW_RATE_SETTINGS_THRESHOLD.
"""
with allure.step("Подписка и получение BalanceAlgorithmResultsContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"BalanceAlgorithmResultsContent",
"SubscribeBalanceAlgorithmResultsRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_balance_algorithm_msg(payload)
reply_content = parsed_payload.replyContent
if not reply_content or not reply_content.flowAreas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults отсутствуют flowAreas, "
"невозможно проверить наличие/отсутствие утечки"
)
all_diagnostic_areas = []
for flow_area in reply_content.flowAreas:
if flow_area.diagnosticAreas:
all_diagnostic_areas.extend(flow_area.diagnosticAreas)
if not all_diagnostic_areas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults во всех flowAreas отсутствуют diagnosticAreas, "
"невозможно проверить наличие/отсутствие утечки"
)
with SoftAssertions() as soft_failures:
for diagnostic_area in all_diagnostic_areas:
diagnostic_area_name = diagnostic_area.name
StepCheck(
f"Проверка: на ДУ {diagnostic_area_name} не должно быть утечки",
"isLeakDetected_without_leak",
soft_failures,
).actual(diagnostic_area.isLeakDetected).expected(False).equal_to()
async def the_leak_is_complete_on_kg(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка факта завершения утечки на ЭФ КГ(табличное представление).
Логика:
LeaksContent - проверить, что утечка в статусе завершена
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: LeaksContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"LeaksContent",
"SubscribeLeaksRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_leaks_content_msg(payload)
leaks_list_info = parsed_payload.replyContent.leaksListInfo
complete_leak_info = t_utils.find_leak_by_coordinate(leaks_list_info, leak.coordinate_meters)
leak_coordinate_round = round(complete_leak_info.leakCoordinate, cfg.precision)
complete_leak = t_utils.find_object_by_field(
leaks_list_info, "confirmationStatus", ConfirmationStatus.CONFIRMED_AND_LEAK_CLOSED.value
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса утечки в КГ - завершена", "confirmationStatus", soft_failures).actual(
complete_leak.confirmationStatus
).expected(leak.expected_complete_leak_status).equal_to()
StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
complete_leak_info.diagnosticAreaName
).is_not_none()
StepCheck("Проверка источника события (алгоритм)", "type", soft_failures).actual(
complete_leak_info.type
).expected(leak.expected_algorithm_type).equal_to()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
async def leak_is_complete_in_output_signals(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""OutputSignalsInfo - нет утечки в выходных сигналах"""
linear_part_id = leak.linear_part_id
with allure.step(f"Получение списка выходных сигналов для линейного участка с id: {linear_part_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Получение типов выходных сигналов из обработанных данных"):
leak_signals_list = leak_linear_part.signals
leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_LEAK
)
with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}"):
with allure.step("Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
with allure.step("Обработка полученных данных выходных сигналов"):
leak_signals_list = leak_linear_part.signals
leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, leak_signal_type)
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка отсутствия времени утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_TIME_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия квитирования утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_ACK_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия объема утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_Q_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия координаты утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_POINT_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
async def complete_tu_leaks_info_content(ws_client, cfg: SmokeSuiteConfig):
"""
TuLeaksInfoContent - проверка отсутствия утечки на схеме
"""
with allure.step("Подключение по ws, получение и обработка сообщения об утечке типа: TuLeaksInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"TuLeaksInfoContent",
"subscribeTuLeaksInfoRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_tu_leaks_info_msg(payload)
leak_on_scheme = parsed_payload.replyContent.leaksInfo
StepCheck("Проверка отсутствия утечки на схеме", "leaksInfo").actual(leak_on_scheme).is_empty()
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
"""
Сценарий формирования отчёта об утечках.
Этапы:
1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
2. Отправка ExportReportsCommandRequest с фильтром по времени
(start = старт имитатора, end = старт имитатора + offset теста).
3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
5. Отправка DownloadExportedDataRequest по id отчёта.
6. Получение fileChunk по ответу на скачивание.
7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.
Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
"""
actual_report_state = ExportLeaksReportState()
with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
actual_report_state.report_test = leak.export_leaks_report_test
StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
actual_report_state.report_test
).is_not_none()
actual_report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
actual_report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=actual_report_state.report_test.offset)
)
actual_report_state.period_start_naive = report_utils.normalize_report_period_naive(
actual_report_state.period_start
)
actual_report_state.period_end_naive = report_utils.normalize_report_period_naive(
actual_report_state.period_end
)
actual_report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
leak.expected_stationary_status
)
actual_report_state.tu_description_lower = cfg.technological_unit.description.lower()
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
actual_report_state.time_offset_hours = time_offset_hours
StepCheck(
"Задан ожидаемый текст режима МТ для отчёта",
"expected_mt_mode",
).actual(actual_report_state.expected_mt_mode).is_not_none()
allure.attach(
f"period.start={actual_report_state.period_start}\n"
f"period.end={actual_report_state.period_end}\n"
f"offset_minutes={actual_report_state.report_test.offset}",
name="Фильтр периода отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
"timeOffset": actual_report_state.time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(actual_report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(actual_report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
actual_report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Извлечение полей пуш-нотификации"):
notification = actual_report_state.notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
actual_report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.LEAKS_REPORT,
name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=actual_report_state.period_start,
period_end=actual_report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = actual_report_state.report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={t_utils.format_datetime_moscow(report_item.start)}, "
f"end={t_utils.format_datetime_moscow(report_item.end)}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
actual_report_state.report_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
actual_report_state.period_start,
actual_report_state.period_end,
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
actual_report_state.report_item
).is_not_none()
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по "
f"id={actual_report_state.report_item.id}"
):
download_request = {
"exportedDataId": actual_report_state.report_item.id,
"exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": actual_report_state.time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта об утечках (exportedDataId={actual_report_state.report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - "
f"выпадашка уведомлений на UI"
)
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
actual_report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
actual_report_state.download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=actual_report_state.download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = actual_report_state.download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
actual_report_state.file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(actual_report_state.file_bytes) if actual_report_state.file_bytes else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(actual_report_state.file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
report_file_name = actual_report_state.report_file_name
report_file_name_lower = report_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(report_file_name)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
actual_report_state.period_start_naive,
actual_report_state.period_end_naive,
)
has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
with allure.step("Этап 8. Сохранение, обработка и проверка отчета по утечкам"):
actual_report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(actual_report_state.file_bytes)
try:
with allure.step("Проверка: временный xlsx файл создан"):
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
actual_report_state.temp_file_path
).is_not_none()
with allure.step("Этап 9. Открытие xlsx и чтение шапки"):
actual_report_state.worksheet = report_utils.load_report_worksheet(actual_report_state.temp_file_path)
actual_report_state.title_info = report_utils.parse_report_title(
report_utils.get_report_title_cell(actual_report_state.worksheet)
)
allure.attach(
f"Шапка отчёта (raw): {actual_report_state.title_info.raw_title}\n"
f"period_start: {actual_report_state.title_info.period_start}\n"
f"period_end: {actual_report_state.title_info.period_end}",
name="Шапка отчёта (1-я строка)",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных шапки отчёта для проверки"):
title_info = actual_report_state.title_info
report_title_lower = title_info.raw_title.lower()
leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
column_headers = report_utils.get_report_column_headers(actual_report_state.worksheet)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
report_utils.report_period_comparison_bounds(
actual_report_state.period_start_naive,
actual_report_state.period_end_naive,
)
)
header_period_start = title_info.period_start
header_period_end = title_info.period_end
with allure.step("Этап 10. Извлечение строк данных из отчёта"):
actual_report_state.data_rows = report_utils.iter_report_data_rows(actual_report_state.worksheet)
actual_report_state.target_row = report_utils.find_row_with_object(
actual_report_state.data_rows, cfg.technological_unit.description
)
allure.attach(
"\n".join(f"row#{row.row_index}: {row.cells}" for row in actual_report_state.data_rows),
name="Все строки данных отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных строки утечки для проверки"):
target_row = actual_report_state.target_row
leak_datetime_value = target_row.datetime_value if target_row else None
object_value_lower = target_row.object_value.lower() if target_row else ""
lds_status_value = target_row.lds_status.strip() if target_row else ""
masking_info_lower = target_row.masking_info.lower() if target_row else ""
leak_coordinate_meters = target_row.coordinate_meters if target_row else None
leak_volume_value = target_row.leak_volume if target_row else None
mt_mode_lower = target_row.mt_mode.lower() if target_row else ""
expected_mt_mode_lower = actual_report_state.expected_mt_mode.lower()
masking_not_masked_lower = ReportConst.MASKING_NOT_MASKED_TEXT.lower()
period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
report_utils.report_period_comparison_bounds(
actual_report_state.period_start_naive,
actual_report_state.period_end_naive,
)
)
with allure.step("Проверка содержимого строки утечки"):
StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
actual_report_state.data_rows
).is_not_empty()
StepCheck(
f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
).actual(actual_report_state.target_row).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Время утечки в диапазоне [старт имитатора, старт + offset теста] (+-1 мин)",
ReportConst.COL_DATETIME,
soft_failures,
).actual(leak_datetime_value).is_between(period_start_lo, period_end_hi)
StepCheck(
f"Колонка '{ReportConst.COL_OBJECT}' содержит '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
soft_failures,
).contains(object_value_lower, actual_report_state.tu_description_lower)
StepCheck(
f"Колонка '{ReportConst.COL_LDS_STATUS}'",
ReportConst.COL_LDS_STATUS,
soft_failures,
).actual(
lds_status_value
).expected(ReportConst.LDS_STATUS_OK_TEXT).equal_to()
StepCheck(
f"Колонка '{ReportConst.COL_MASK_INFO}' содержит '{ReportConst.MASKING_NOT_MASKED_TEXT}'",
ReportConst.COL_MASK_INFO,
soft_failures,
).contains(masking_info_lower, masking_not_masked_lower)
StepCheck(
f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью {cfg.allowed_distance_diff_meters} м)",
ReportConst.COL_COORDINATE,
soft_failures,
).actual(leak_coordinate_meters).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck(
f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
ReportConst.COL_LEAK_VOLUME,
soft_failures,
).actual(leak_volume_value).is_not_none()
StepCheck(
f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{actual_report_state.expected_mt_mode}'",
ReportConst.COL_MT_MODE,
soft_failures,
).contains(mt_mode_lower, expected_mt_mode_lower)
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if actual_report_state.temp_file_path and actual_report_state.report_file_name:
report_utils.attach_report_file_to_allure(
actual_report_state.temp_file_path, actual_report_state.report_file_name
)
raise
finally:
with allure.step("Удаление временного xlsx файла"):
temp_path = actual_report_state.temp_file_path
if temp_path is not None:
try:
temp_path.unlink(missing_ok=True)
except OSError:
pass
with allure.step("Проверка имени файла отчёта"):
with SoftAssertions() as soft_failures:
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(
f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name", soft_failures
).contains(report_file_name_lower, leaks_report_name_part_lower)
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'", "file_name", soft_failures
).contains(report_file_name_lower, actual_report_state.tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
soft_failures,
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
soft_failures,
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
with allure.step("Проверка двойной шапки отчёта"):
StepCheck("Лист xlsx открыт", "worksheet").actual(actual_report_state.worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).contains(report_title_lower, leaks_report_name_part_lower)
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(
column_headers
).expected(ReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
with SoftAssertions() as soft_failures:
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
actual_report_state.notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
notification_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
notification_export_status
).expected(ExportStatus.DONE).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
notification_error_message
).is_empty()
rep xlxs
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow
@dataclass
class ReportTitleInfo:
"""Разобранная шапка отчёта"""
raw_title: str
period_start: Optional[datetime] = None
period_end: Optional[datetime] = None
@dataclass
class LeakReportRow:
"""Разобранная строка данных по утечке"""
row_index: int
cells: Dict[str, str] = field(default_factory=dict)
@property
def datetime_value(self) -> Optional[datetime]:
return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))
@property
def object_value(self) -> str:
return self.cells.get(ReportConst.COL_OBJECT, "")
@property
def lds_status(self) -> str:
return self.cells.get(ReportConst.COL_LDS_STATUS, "")
@property
def masking_info(self) -> str:
return self.cells.get(ReportConst.COL_MASK_INFO, "")
@property
def coordinate_meters(self) -> Optional[float]:
coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
if coordinate_km is None:
return None
return coordinate_km * TestConst.KM_TO_METERS
@property
def leak_volume(self) -> Optional[float]:
return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))
@property
def mt_mode(self) -> str:
return self.cells.get(ReportConst.COL_MT_MODE, "")
def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
"""Проверяет zip-сигнатуру xlsx"""
if not file_bytes:
return False
return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)
def is_xlsx_extension(file_name: str) -> bool:
"""Проверяет расширение .xlsx без учёта регистра."""
return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)
def parse_report_datetime(value: object) -> Optional[datetime]:
"""Парсит дату/время из ячейки отчёта."""
if value is None:
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
except ValueError:
return None
return None
def _stringify_cell(value: object) -> str:
if value is None:
return ""
if isinstance(value, datetime):
return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
return str(value)
def normalize_report_period_naive(value: datetime) -> datetime:
"""Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)
def report_period_comparison_bounds(
period_start: datetime,
period_end: datetime,
tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
"""
Границы периода с допуском +-tolerance_minutes для start и end отдельно.
Возвращает (start_lower, start_upper, end_lower, end_upper).
"""
start = normalize_report_period_naive(period_start)
end = normalize_report_period_naive(period_end)
delta = timedelta(minutes=tolerance_minutes)
return start - delta, start + delta, end - delta, end + delta
def build_export_report_file_name(
tu_description: str,
period_start: datetime,
period_end: datetime,
) -> str:
"""
Имя xlsx при скачивании: «Отчет об утечках Тихорецк-Новороссийск-3 DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx».
"""
start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
return (
f"{ReportConst.LEAKS_REPORT_NAME_PART} {tu_description} {start_text} - {end_text}"
f"{ReportConst.XLSX_EXTENSION}"
)
def parse_period_from_export_file_name(file_name: str) -> tuple[Optional[datetime], Optional[datetime]]:
"""Извлекает границы периода из имени скачанного xlsx-файла."""
match = re.search(ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN, file_name.strip(), re.IGNORECASE)
if match is None:
return None, None
parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")
def _parse_part(value: str) -> Optional[datetime]:
try:
return datetime.strptime(value.replace("_", ":"), parse_format)
except ValueError:
return None
return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))
def parse_report_title(title_raw: object) -> ReportTitleInfo:
"""
Парсит шапку отчёта с именованными группами period_start/period_end.
"""
title_str = _stringify_cell(title_raw)
match = re.search(ReportConst.REPORT_HEADER_PERIOD_PATTERN, title_str)
if match is None:
return ReportTitleInfo(raw_title=title_str)
return ReportTitleInfo(
raw_title=title_str,
period_start=parse_report_datetime(match.group("period_start")),
period_end=parse_report_datetime(match.group("period_end")),
)
def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
"""Открывает первый лист xlsx. При ошибке возвращает None."""
if not file_path.exists():
return None
try:
workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
except Exception:
return None
sheet_names = workbook.sheetnames
if not sheet_names:
return None
return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]
def get_report_title_cell(worksheet: Worksheet) -> object:
return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value
def get_report_column_headers(worksheet: Worksheet) -> List[str]:
"""Возвращает непустые заголовки колонок из строки REPORT_COLUMN_HEADERS_ROW."""
headers: List[str] = []
column_index = 1
while True:
cell_value = worksheet.cell(row=ReportConst.REPORT_COLUMN_HEADERS_ROW, column=column_index).value
if cell_value is None or not str(cell_value).strip():
break
headers.append(_stringify_cell(cell_value).strip())
column_index += 1
return headers
def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
"""Собирает словарь {название колонки: значение ячейки} по строке данных."""
return {
header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
for column_index, header in enumerate(headers)
}
def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
"""
Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
Пустые строки пропускаются.
"""
headers = get_report_column_headers(worksheet)
if not headers:
return []
rows: List[LeakReportRow] = []
for excel_row_index, row_values in enumerate(
worksheet.iter_rows(
min_row=ReportConst.REPORT_DATA_FIRST_ROW,
max_col=len(headers),
values_only=True,
),
start=ReportConst.REPORT_DATA_FIRST_ROW,
):
if not any(cell is not None and str(cell).strip() for cell in row_values):
continue
rows.append(
LeakReportRow(
row_index=excel_row_index,
cells=build_column_cells(row_values, headers),
)
)
return rows
def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
"""Ищет первую строку, где колонка 'Объект' содержит подстроку без учёта регистра"""
substring_lower = object_substring.lower()
for row in rows:
if substring_lower in row.object_value.lower():
return row
return None
def save_report_bytes_to_temp_file(file_bytes: bytes) -> Optional[Path]:
"""Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
import tempfile
try:
with tempfile.NamedTemporaryFile(
suffix=ReportConst.XLSX_EXTENSION,
prefix="leaks_report_",
delete=False,
) as temp_file:
temp_file.write(file_bytes)
return Path(temp_file.name)
except OSError:
return None
def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
"""Прикладывает xlsx к Allure при падении теста"""
try:
xlsx_type = allure.attachment_type.XLSX
except AttributeError:
xlsx_type = None
if xlsx_type is not None:
allure.attach.file(
str(file_path),
name=file_name,
attachment_type=xlsx_type,
extension="xlsx",
)
return
try:
with file_path.open("rb") as raw_file:
allure.attach(raw_file.read(), name=file_name, extension="xlsx")
except OSError:
pass
ws test
from __future__ import annotations
import asyncio
import pprint
import random
import re
from dataclasses import asdict, is_dataclass
from datetime import datetime, timedelta, timezone
from enum import IntEnum, IntFlag
from typing import Any, List, Optional, Set, Type, TypeVar
from zoneinfo import ZoneInfo
import allure
from msgpack import Timestamp as MsgpackTimestamp
from pytest import fail
from clients.websocket_client import WebSocketClient
from constants.architecture_constants import WebSocketClientConstants as WS_Const
from constants.enums import (
ConfirmationStatus,
ExportStatus,
LdsStatus,
LdsStatusDegradation,
LdsStatusFaulty,
LdsStatusInitialization,
LeakStatus,
ReplyStatus,
StationaryReason,
StationaryStatus,
StoppedPumpingReason,
UnStationaryReason,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.export_reports_model import ReportDataExportedContent, ReportDataExportedNotification
from models.get_messages_model import GetMessagesRequest
from models.subscribe_all_leaks_info_model import SubscribeAllLeaksInfoReply
from models.subscribe_common_scheme_model import DiagnosticArea, FlowArea
from models.subscribe_leaks_model import Leak
from models.subscribe_main_page_info_model import MainPageLeakInfo
from utils.helpers.ws_message_parser import ws_message_parser
from utils.msgpack_utils.message_filters import is_desired_invocation_id, is_desired_type
ObjectType = TypeVar("ObjectType") # создает типовую переменную для поиска объектов в списке
RandomObjectType = TypeVar("RandomObjectType")
def convert_leak_volume_m3(volume: float) -> float:
"""
Преобразует объем утечки в м3/час
"""
# Округляет результат для читабельности
return round(volume * TestConst.MASS_KG, 3)
def datetime_minus_seconds(datetime_obj: datetime, delta_s: int) -> datetime:
"""
Вычитает время в секундах из datetime
"""
return (datetime_obj - timedelta(seconds=delta_s)).replace(microsecond=0)
def calculate_leak_start_time(imitator_start_time: datetime, leak_interval_seconds: int) -> datetime:
"""
Рассчитывает время начала утечки на основе времени старта имитатора.
:param imitator_start_time: datetime объект времени старта имитатора
:param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
:return: datetime время ожидаемого начала утечки
"""
if not imitator_start_time:
fail("Пришло пустое значение imitator_start_time")
return (imitator_start_time + timedelta(seconds=leak_interval_seconds)).replace(microsecond=0)
def calculate_leak_end_time(
imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int
) -> datetime:
"""
Рассчитывает крайнее время обнаружения утечки (с учётом допустимой погрешности).
:param imitator_start_time: datetime объект времени старта имитатора
:param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
:param allowed_diff_seconds: допустимая погрешность времени обнаружения (ALLOWED_TIME_DIFF_SECONDS)
:return: datetime крайнее время обнаружения утечки
"""
if not imitator_start_time:
fail("Пришло пустое значение imitator_start_time")
total_seconds = leak_interval_seconds + allowed_diff_seconds
return (imitator_start_time + timedelta(seconds=total_seconds)).replace(microsecond=0)
def get_leak_time_window(
imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int, detected_at_tz=None
) -> tuple[datetime, datetime]:
"""
Возвращает временное окно для проверки времени обнаружения утечки.
:param imitator_start_time: datetime объект времени старта имитатора
:param leak_interval_seconds: интервал от старта до утечки в секундах
:param allowed_diff_seconds: допустимая погрешность времени обнаружения
:param detected_at_tz: timezone из времени обнаружения утечки (опционально)
:return: tuple (leak_start_time, leak_end_time) для использования в is_between проверке
"""
leak_start = calculate_leak_start_time(imitator_start_time, leak_interval_seconds)
leak_end = calculate_leak_end_time(imitator_start_time, leak_interval_seconds, allowed_diff_seconds)
# Если передан timezone, применяем его к временам для корректного сравнения
if detected_at_tz:
leak_start = leak_start.replace(tzinfo=detected_at_tz)
leak_end = leak_end.replace(tzinfo=detected_at_tz)
return leak_start, leak_end
def ensure_moscow_timezone(input_datetime: datetime) -> None | datetime:
"""
Конвертирует datetime в московское время, если оно не в московской таймзоне.
:param input_datetime: datetime объект
:return: datetime в московской таймзоне
"""
if input_datetime is None:
return input_datetime
# Если datetime без timezone - считаем что это UTC
if input_datetime.tzinfo is None:
input_datetime = input_datetime.replace(tzinfo=timezone.utc)
# Конвертируем в московское время
return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))
def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
"""
Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
"""
now = datetime.now(ZoneInfo(tz_name))
utc_offset = now.utcoffset()
if utc_offset is None:
return None
return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)
def localize_as_moscow(input_datetime: datetime) -> None | datetime:
"""
Присваивает datetime московский часовой пояс без сдвига времени.
Если datetime уже имеет timezone - конвертирует в московское время.
"""
if input_datetime is None:
return input_datetime
moscow_tz = ZoneInfo(TestConst.ZONE_INFO)
if input_datetime.tzinfo is None:
return input_datetime.replace(tzinfo=moscow_tz)
return input_datetime.astimezone(moscow_tz)
def format_datetime_moscow(value: Optional[datetime]) -> str:
"""Строковое представление datetime в Europe/Moscow для вложений Allure."""
if value is None:
return "None"
return str(localize_as_moscow(value))
def get_rejection_time_window(
imitator_start_time: datetime,
start_seconds: int | float,
reserve_seconds: int | float = 0,
) -> tuple[datetime, datetime]:
"""
Возвращает временное окно для проверки сообщения об отбраковке.
"""
imitator_msk = localize_as_moscow(imitator_start_time)
range_start = imitator_msk + timedelta(seconds=start_seconds - reserve_seconds)
range_end = localize_as_moscow(datetime.now())
return range_start, range_end
def find_rejection_journal_message(
messages_info: List[ObjectType],
tag: str,
range_start: datetime,
range_end: datetime,
technological_section: str,
expected_event: str,
) -> tuple[list[ObjectType], ObjectType | None]:
"""
Фильтрует сообщения журнала по tag и временному диапазону,
затем ищет целевое сообщение по technologicalSection и event.
"""
time_filtered = [
msg for msg in messages_info if msg.tag == tag and range_start <= ensure_moscow_timezone(msg.time) <= range_end
]
time_filtered.sort(key=lambda msg: ensure_moscow_timezone(msg.time), reverse=True)
target_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == technological_section and msg.event.rstrip() == expected_event
),
None,
)
return time_filtered, target_msg
def get_random_item(item_list: List[RandomObjectType]) -> RandomObjectType:
"""
Получает случайный объект из списка
"""
if not item_list:
fail("Пустой список объектов")
try:
return random.choice(item_list)
except (TypeError, ValueError):
fail(f"Не удалось получить случайный элемент из списка: {item_list}")
def get_longest_flow_area(flow_areas: List[FlowArea]) -> FlowArea:
"""
Получает самый протяженный участок карты течения по количеству ДУ из списка всех участков
"""
if not flow_areas:
fail("Список flow_areas пустой")
try:
longest_flow_area = max(flow_areas, key=lambda flow_area: len(flow_area.diagnosticAreas))
return longest_flow_area
except (TypeError, ValueError):
fail(f"Не найден протяженный участок из списка flow_areas: {flow_areas}.")
def determine_lds_status_by_priority(lds_status_set: Set[int]) -> int:
"""
Определяет режим работы СОУ по приоритету и наличию режимов работы у ДУ на самом протяженном участки карты течений
"""
lds_status_priority = [
LdsStatus.FAULTY.value,
LdsStatus.INITIALIZATION.value,
LdsStatus.DEGRADATION.value,
LdsStatus.SERVICEABLE.value,
]
if not lds_status_set:
fail("Пустой список режимов СОУ ДУ")
try:
for status in lds_status_priority:
if status in lds_status_set:
return status
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
fail("Не удалось определить режим работу СОУ.")
def find_signal_type_by_address_suffix(signals_list: list, address_suffix: str) -> int:
"""
Ищет в списке сигналов тип сигнала по части адреса
"""
if not signals_list:
fail("Пустой список сигналов")
try:
for sensor_signal in signals_list:
if sensor_signal.address is not None and str(sensor_signal.address).endswith(address_suffix):
return sensor_signal.signalType
fail(f"Не найден тип сигнала по части адреса: {address_suffix} из списка: {signals_list}")
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
fail(f"Не найден тип сигнала по части адреса: {address_suffix} из списка: {signals_list}")
def find_signal_val_by_signal_type(signals_list: list, signal_type: int) -> str:
"""
Ищет в списке сигналов значение сигнала по типу
"""
if not signals_list:
fail("Пустой список сигналов")
try:
for sensor_signal in signals_list:
if sensor_signal.signalType is not None and sensor_signal.signalType == signal_type:
return sensor_signal.value
fail(f"Не найдено значение для типа сигнала: {signal_type} из списка: {signals_list}")
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
fail(f"Не найдено значение для типа сигнала: {signal_type} из списка: {signals_list}")
def find_object_by_field(item_list: List[ObjectType], field_name: str, value: Any) -> ObjectType:
"""
Ищет объект в списке объектов по значению одного из полей объекта
"""
if not item_list:
fail("Список объектов пуст")
try:
return next((item for item in item_list if getattr(item, field_name) == value))
except Exception:
fail(f"Не найдено значение: {value} для поля: {field_name}, в списке: {item_list}.")
def find_object_by_a_few_fields(item_list: List[ObjectType], fields_dict: dict) -> ObjectType:
"""
Ищет объект в списке объектов по значениям нескольких полей
"""
if not item_list:
return None
return next(
(item for item in item_list if all(getattr(item, field) == value for field, value in fields_dict.items())), None
)
def get_signal(site_message, signal_type):
if not site_message:
return None
return next((s for s in site_message.signals if s.signalType == signal_type.value), None)
def get_value(obj):
return getattr(obj, "value", None)
def find_confirmed_leaks(item_list: List[Leak]) -> List[Leak]:
"""Ищет подтвержденные утечки"""
try:
return [
item
for item in item_list
if item.confirmationStatus == ConfirmationStatus.CONFIRMED.value and item.detectedAt is not None
]
except (AttributeError, KeyError, TypeError, ValueError):
return []
def find_confirmed_leaks_on_main_page(item_list: List[MainPageLeakInfo]) -> List[MainPageLeakInfo]:
"""Ищет подтвержденные утечки"""
try:
return [
item
for item in item_list
if item.leakStatus == LeakStatus.CONFIRMED.value and item.leakDetectedAt is not None
]
except (AttributeError, KeyError, TypeError, ValueError):
return []
def find_diagnostic_area_by_id(flow_areas: List[FlowArea], id_value: int) -> Optional[DiagnosticArea]:
"""
Ищет ДУ по id в списке участков карты течений, исключает дубликаты по количеству pipeIds
"""
candidates = []
if not flow_areas:
return None
try:
for flow_area in flow_areas:
for diagnostic_area in flow_area.diagnosticAreas:
if diagnostic_area.id == id_value:
candidates.append(diagnostic_area)
if not candidates:
return None
elif len(candidates) == 1:
return candidates[0]
else:
# Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
return max(candidates, key=lambda candidate: len(candidate.pipeIds))
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_diagnostic_area_by_pipe_id(flow_areas: List[FlowArea], pipe_id: int) -> Optional[DiagnosticArea]:
"""
Ищет ДУ по pipe id в списке участков карты течений, исключает дубликаты по количеству pipeIds
"""
candidates = []
if not flow_areas:
return None
try:
for flow_area in flow_areas:
for diagnostic_area in flow_area.diagnosticAreas:
if diagnostic_area.pipeIds and pipe_id in diagnostic_area.pipeIds:
candidates.append(diagnostic_area)
if not candidates:
return None
elif len(candidates) == 1:
return candidates[0]
else:
# Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
return max(candidates, key=lambda candidate: len(candidate.pipeIds))
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_diagnostic_areas_by_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
"""
Получает список ДУ из списка flow_areas по списку id
"""
diagnostic_areas = [
result
for diagnostic_area_id in id_list
if (result := find_diagnostic_area_by_id(flow_areas, diagnostic_area_id)) is not None
]
return diagnostic_areas
def find_diagnostic_areas_by_pipe_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
"""
Получает список ДУ из списка flow_areas по списку pipe id
"""
diagnostic_areas = [
result for pipe_id in id_list if (result := find_diagnostic_area_by_pipe_id(flow_areas, pipe_id)) is not None
]
return diagnostic_areas
def find_base_diagnostic_areas(flow_areas: List[FlowArea]) -> List[DiagnosticArea]:
"""
Получает список базовых ДУ из списка flow_areas
"""
return find_diagnostic_areas_by_ids(flow_areas, TestConst.DIAGNOSTIC_AREA_BASE_IDS)
def find_leak_by_coordinate(
leaks_list: List[ObjectType], expected_coordinate: float, tolerance: float = TestConst.ALLOWED_DISTANCE_DIFF_METERS
) -> ObjectType:
"""
Ищет утечку в списке по координатам с допустимой погрешностью
поднимает pytest.fail если список пуст или утечка не найдена
"""
if not leaks_list:
fail("Список утечек пуст")
for leak in leaks_list:
leak_coordinate = getattr(leak, "leakCoordinate")
if leak_coordinate is None or "":
continue
if abs(leak_coordinate - expected_coordinate) <= tolerance:
return leak
fail(
f"Не найдена утечка с координатой {expected_coordinate} +- {tolerance} м"
f"Список полученных утечек: {leaks_list}"
)
def to_moscow_timezone(date_str: str) -> datetime:
"""
Преобразует строку времени в московское время
"""
if not date_str or not date_str.strip():
fail("Пришло пустое значение для преобразования в московское время")
try:
if date_str.startswith(("'", '"', '')) or date_str.endswith(("'", '"', '')):
date_str = date_str.strip().strip("'").strip('"')
date_utc = datetime.strptime(date_str, TestConst.OUTPUT_TIME_FORMAT).replace(tzinfo=timezone.utc)
return date_utc.astimezone(ZoneInfo(TestConst.ZONE_INFO))
except (AttributeError, TypeError, ValueError):
fail(f"Не удалось преобразовать время в московское: {date_str}.")
def create_dict_from_dataclass(cls: Type, **kwargs) -> dict:
"""Создает словарь из экземпляра dataclass c нужными параметрами"""
if not is_dataclass(cls):
fail(f"{cls} не dataclass")
instance = cls(**kwargs)
return asdict(instance)
def datetime_to_msgpack_timestamp(dt: datetime) -> list:
"""Конвертирует datetime в формат [Timestamp(seconds, nanoseconds), tz_offset] для отправки на бэкенд."""
return [MsgpackTimestamp(seconds=int(dt.timestamp()), nanoseconds=0), 0]
def create_journal_req_body(**kwargs) -> dict:
"""Создает дефолтные параметры запроса к журналу"""
result = create_dict_from_dataclass(GetMessagesRequest, **kwargs)
period = result.get('periodTime')
if period:
for key in ('start', 'end'):
if isinstance(period.get(key), datetime):
period[key] = datetime_to_msgpack_timestamp(period[key])
return result
def extract_first_number(value: object) -> Optional[float]:
"""
Извлекает первое число из ячейки (int/float/str вида '55.55 км', '111.11 м3/ч').
"""
if value is None:
return None
if isinstance(value, (int, float)) and not isinstance(value, bool):
return float(value)
if isinstance(value, str):
matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
if matches:
try:
return float(matches[0].replace(",", "."))
except ValueError:
return None
return None
def _attach_ws_poll_failure(
collected_messages: List[Any],
total_wait_seconds: float,
expected_message_type: str,
) -> None:
"""Краткая сводка и pprint каждого WS-сообщения при таймауте поллинга."""
allure.attach(
"\n".join(
[
f"Таймаут ожидания: {total_wait_seconds} с",
f"Ожидаемый тип сообщения: {expected_message_type}",
f"Всего сообщений за период поллинга: {len(collected_messages)}",
]
),
name="WS poll timeout",
attachment_type=allure.attachment_type.TEXT,
)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
def _attach_ws_reply_parse_failure(
reply_payload: Optional[Any],
invocation_id: str,
request_name: str,
error: BaseException,
) -> None:
"""Прикрепляет к Allure ответ бэка при ошибке парсинга."""
allure.attach(
"\n".join(
[
f"Запрос: {request_name}",
f"invocation_id: {invocation_id}",
f"Ошибка: {error}",
]
),
name="WS parse failure",
attachment_type=allure.attachment_type.TEXT,
)
if reply_payload is not None:
allure.attach(
pprint.pformat(reply_payload, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
"""Забирает все сообщения из очереди ws без блокирующего receive_by_..."""
messages: List[Any] = []
while not ws_client.recv_queue.empty():
try:
messages.append(ws_client.recv_queue.get_nowait())
except asyncio.QueueEmpty:
break
return messages
def _find_valid_report_export_notification(
messages: List[Any],
parser,
notification_type: str,
) -> Optional[ReportDataExportedNotification]:
"""Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
for msg in messages:
if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
continue
try:
notification = parser.parse_report_data_exported_notification_msg(msg)
except (ValueError, TypeError, KeyError):
continue
if notification.replyStatus != ReplyStatus.OK.value:
continue
content: Optional[ReportDataExportedContent] = notification.replyContent
if content is None:
continue
if content.exportStatus != ExportStatus.DONE:
continue
return notification
return None
async def poll_for_report_export_notification(
ws_client: WebSocketClient,
parser,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
При таймауте прикрепляет к Allure все полученные за период сообщения.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
notification = _find_valid_report_export_notification(
batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
)
if notification is not None:
return notification
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
)
return None
def _find_ws_reply_by_invocation_id(messages: List[Any], invocation_id: str, parser) -> Optional[list]:
"""
Ищет последний ответ с заданным invocation_id и телом replyStatus.
"""
reply_payload = None
for msg in messages:
if not isinstance(msg, list) or not is_desired_invocation_id(msg, invocation_id):
continue
if parser.find_reply_status_in_ws_msg(msg):
reply_payload = msg
return reply_payload
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
list_limit: int,
expected_data_type: Any,
name_substring: str,
tu_name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
"""
Периодически шлёт GetExportedDataListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
При таймауте или ошибке парсинга прикрепляет к Allure полученные ответы.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
collected_messages: List[Any] = []
request_name = ReportConst.GET_EXPORTED_DATA_LIST_REQUEST
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
drained_before_request = _drain_recv_queue(ws_client)
collected_messages.extend(drained_before_request)
await connect(
ws_client,
request_name,
{"limit": list_limit},
)
invocation_id = ws_client.invocation_id
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
list_reply_payload = _find_ws_reply_by_invocation_id(batch, invocation_id, parser)
if list_reply_payload is None:
continue
try:
parsed_payload = parser.parse_exported_data_list_msg(list_reply_payload)
except Exception as error:
_attach_ws_reply_parse_failure(list_reply_payload, invocation_id, request_name, error)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
fail(f"Не удалось разобрать ответ на {request_name}: {error}")
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={format_datetime_moscow(item.start)}, end={format_datetime_moscow(item.end)}"
for item in items
),
name=f"Список сформированных файлов (всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
tu_name_substring=tu_name_substring,
period_start=period_start,
period_end=period_end,
period_tolerance_minutes=period_tolerance_minutes,
)
if match is not None:
return match
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
request_name,
)
return None
def _normalize_report_period_datetime(value: datetime) -> datetime:
"""Приводит datetime периода отчёта к московскому времени без микросекунд."""
return localize_as_moscow(value).replace(microsecond=0)
def _exported_item_period_matches(
item_start: datetime,
item_end: datetime,
period_start: datetime,
period_end: datetime,
tolerance_minutes: int,
) -> bool:
"""Проверяет start/end элемента списка в пределах периода запроса +- tolerance_minutes."""
item_start_norm = _normalize_report_period_datetime(item_start)
item_end_norm = _normalize_report_period_datetime(item_end)
period_start_norm = _normalize_report_period_datetime(period_start)
period_end_norm = _normalize_report_period_datetime(period_end)
delta = timedelta(minutes=tolerance_minutes)
return (period_start_norm - delta) <= item_start_norm <= (period_start_norm + delta) and (
period_end_norm - delta
) <= item_end_norm <= (period_end_norm + delta)
def find_matching_exported_item(
items: List[Any],
expected_data_type: Any,
name_substring: str,
tu_name_substring: str,
period_start: datetime,
period_end: datetime,
period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
"""
Ищет элемент списка по типу, подстрокам в имени (отчёт + ТУ) и периоду start/end с допуском.
"""
name_substring_lower = name_substring.lower()
tu_name_lower = tu_name_substring.lower()
matched_items = []
for item in items:
if item.exportedDataType != expected_data_type:
continue
item_name_lower = (item.name or "").lower()
if name_substring_lower not in item_name_lower:
continue
if tu_name_lower not in item_name_lower:
continue
if item.start is None or item.end is None:
continue
if not _exported_item_period_matches(item.start, item.end, period_start, period_end, period_tolerance_minutes):
continue
matched_items.append(item)
if not matched_items:
return None
return max(matched_items, key=lambda exported_item: exported_item.id)
def parse_journal_msg_value(value: str) -> tuple:
"""Парсит поле value в сообщении журнала"""
try:
# ищет группы цифр с точкой в строке
matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
coordinate, volume = (matches + [None, None])[:2]
if coordinate is not None:
try:
coordinate = float(coordinate)
except ValueError:
coordinate = None
if volume is not None:
try:
volume = float(volume)
except ValueError:
volume = None
return coordinate, volume
except (AttributeError, TypeError, ValueError):
return None, None
def parse_bit_flags(
value: int, enum_cls: Type[IntEnum | IntFlag], failures: Optional[List[str]] = None
) -> List[IntFlag]:
"""
Распаковка битовых флагов
"""
# 0 - это валидное состояние когда причин нет, в прошлой реализации тест бы падал если причин нет
# хотя это может быть ожидаемо, например при тестировании исправности или где-нибудь еще
if value == 0:
return []
found_flags = [flag for flag in enum_cls if value & flag.value]
known_bits = sum(flag.value for flag in found_flags)
if known_bits != value:
unknown_bits = value ^ known_bits
error_message = f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}"
if failures is not None:
failures.append(error_message)
else:
fail(f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}")
# та же сортировка только не цифры, а их текстовое значение
return sorted(found_flags, key=lambda flag: flag.value)
def get_reason_enum_by_lds_status(lds_status: int | LdsStatus, failures: Optional[List[str]] = None) -> Type[IntFlag]:
"""
Получение класса причин по режимам СОУ
"""
if isinstance(lds_status, int):
try:
lds_status = LdsStatus(lds_status)
except ValueError:
error_message = f"Неизвестный LdsStatus: {lds_status}"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
reason_by_lds_status = {
LdsStatus.FAULTY: LdsStatusFaulty,
LdsStatus.INITIALIZATION: LdsStatusInitialization,
LdsStatus.DEGRADATION: LdsStatusDegradation,
}
enum_class = reason_by_lds_status.get(lds_status)
if enum_class is None:
error_message = f"Для LdsStatus{lds_status.name} не определены причины"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
return enum_class
def get_reason_enum_by_stationary_status(
stationary_status: int | StationaryStatus, failures: Optional[List[str]] = None
) -> Type[IntFlag]:
"""
Получение класса причин по режимам МТ
"""
if isinstance(stationary_status, int):
try:
stationary_status = StationaryStatus(stationary_status)
except ValueError:
error_message = f"Неизвестный StationaryStatus: {stationary_status}"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
reason_by_stationary_status = {
StationaryStatus.STATIONARY: StationaryReason,
StationaryStatus.UNSTATIONARY: UnStationaryReason,
StationaryStatus.STOPPED: StoppedPumpingReason,
}
enum_class = reason_by_stationary_status.get(stationary_status)
if enum_class is None:
error_message = f"Для StationaryStatus{stationary_status.name} не определены причины"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
return enum_class
def parse_lds_status_reasons(lds_status: int, lds_status_reasons: int, failures: Optional[List[str]] = None):
"""
Получение списка ldsStatusReasons, соответствующего ldsStatus
"""
enum_cls = get_reason_enum_by_lds_status(lds_status, failures)
flags = parse_bit_flags(lds_status_reasons, enum_cls, failures)
return flags
def parse_stationary_status_reasons(
stationary_status: int, stationary_status_reasons: int, failures: Optional[List[str]] = None
):
"""
Получение списка stationaryStatusReasons, соответствующего stationaryStatus
"""
enum_cls = get_reason_enum_by_stationary_status(stationary_status, failures)
flags = parse_bit_flags(stationary_status_reasons, enum_cls, failures)
return flags
async def connect(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> None:
"""
Подключение к заданной подписке
"""
try:
with allure.step(f"Вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
await ws_client.invoke(ws_invoke_type, ws_invoke_params)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")
async def connect_stream(
ws_client: WebSocketClient,
ws_invoke_type: str,
ws_invoke_params: Any = None,
purpose: str = "streaming-вызов WS",
) -> None:
"""
Streaming-вызов (StreamInvocation)
"""
try:
with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(
f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
)
def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
"""Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
if not isinstance(msg, list):
return None
if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
return None
if not is_desired_invocation_id(msg, invocation_id):
return None
if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
return None
error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
if isinstance(error_text, str):
return error_text
return None
async def receive_download_exported_data_reply(
ws_client: WebSocketClient,
parser,
invocation_id: str,
request_name: str,
total_wait_seconds: float,
poll_interval_seconds: float = 0.5,
purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
"""
Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
for msg in batch:
stream_error = _stream_completion_error(msg, invocation_id)
if stream_error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
fail(
f"При {purpose} бэк вернул Completion с ошибкой "
f"({request_name}, invocation_id={invocation_id}): {stream_error}"
)
for msg in batch:
if (
not isinstance(msg, list)
or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
or not is_desired_invocation_id(msg, invocation_id)
):
continue
if parser.find_reply_status_in_ws_msg(msg) is None:
continue
try:
return parser.parse_download_exported_data_msg(msg)
except Exception as error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
fail(
f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
f"но не удалось разобрать ответ с fileChunk: {error}"
)
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
fail(
f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
)
async def connect_and_get_parsed_msg_by_tu_id(
tu_id: int,
ws_client: WebSocketClient,
ws_message_type: str,
ws_invoke_type: str,
ws_invoke_params: Any = None,
timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> SubscribeAllLeaksInfoReply:
"""
Подключается, ищет и парсит allLeaksInfo сообщение для конкретного ТУ
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
async def get_parsed_msg():
"""
Ищет и парсит allLeaksInfo сообщение для конкретного ТУ
"""
while True:
payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
parsed_payload = ws_message_parser.parse_all_leaks_info_msg(payload)
# Ищет сообщение с нужным ТУ
if parsed_payload.replyContent.tuId == tu_id:
return parsed_payload
try:
with allure.step(f"Получение сообщения с контентом типа: {ws_message_type} для ТУ {tu_id}"):
return await asyncio.wait_for(get_parsed_msg(), timeout=timeout)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось получить сообщение allLeaksInfo для ТУ {tu_id}. Ошибка: {error}")
async def connect_and_get_msg(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> list:
"""
Подключение типа get к заданной подписке и получение сообщения с заданным типом контента
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
invocation_id = ws_client.invocation_id
try:
with allure.step(f"Получение входящего сообщения c invocation_id: {invocation_id}"):
payload = await ws_client.receive_by_invocation_id(invocation_id)
return payload
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")
async def connect_and_subscribe_msg(
ws_client: WebSocketClient,
ws_message_type: str,
ws_invoke_type: str,
ws_invoke_params: Any = None,
timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> list:
"""
Подключение типа subscribe к заданной подписке и получение сообщения с заданным типом контента
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
try:
with allure.step(f"Получение сообщения с контентом типа: {ws_message_type}"):
payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
return payload
except (asyncio.TimeoutError, OSError, ConnectionError, ConnectionResetError) as error:
fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")
async def poll_balance_algorithm_diagnostic_areas(
ws_client: WebSocketClient,
ws_parser: ws_message_parser,
imitator_start_time: datetime,
end_time: datetime,
poll_interval: float,
) -> list:
"""
Опрашивает очередь ws_client на наличие BalanceAlgorithmResultsContent,
собирает и возвращает все diagnosticAreas из flowAreas.
"""
collected_diagnostic_areas = []
ws_client.suppress_recv_logging = True
ws_parser.suppress_recv_logging = True
try:
while datetime.now(tz=imitator_start_time.tzinfo) < end_time:
await asyncio.sleep(poll_interval)
latest_msg = None
while not ws_client.recv_queue.empty():
try:
msg = ws_client.recv_queue.get_nowait()
except asyncio.QueueEmpty:
break
if isinstance(msg, list) and is_desired_type(msg, "BalanceAlgorithmResultsContent"):
latest_msg = msg
if latest_msg is None:
continue
parsed_payload = ws_message_parser.parse_balance_algorithm_msg(latest_msg)
reply_content = parsed_payload.replyContent
if reply_content and reply_content.flowAreas:
for flow_area in reply_content.flowAreas:
if flow_area.diagnosticAreas:
collected_diagnostic_areas.extend(flow_area.diagnosticAreas)
finally:
ws_client.suppress_recv_logging = False
ws_parser.suppress_recv_logging = False
return collected_diagnostic_areas
def get_leak_diagnostic_area_samples(
collected_diagnostic_areas: list,
leak_diagnostic_area_name: str,
total_wait: int,
) -> list:
"""
Проверяет наличие diagnosticAreas и возвращает подмножество
для ДУ с заданным leak_diagnostic_area_id. Падает, если данные не найдены.
"""
if not collected_diagnostic_areas:
fail(f"За {total_wait} секунд не пришло ни одной diagnosticArea в BalanceAlgorithmResultsContent")
leak_diagnostic_area_samples = [
diagnostic_area
for diagnostic_area in collected_diagnostic_areas
if diagnostic_area.name == leak_diagnostic_area_name
]
if not leak_diagnostic_area_samples:
fail(f"За {total_wait} секунд не пришло ни одного сообщения для ДУ с name={leak_diagnostic_area_name}.")
return leak_diagnostic_area_samples