Загрузка данных
тест редж
"""
Тесты отбраковки сигналов с датчиков давления и расходомеров.
Параметризация:
SUITE_PARAMS генерирует (config, rejection_case, case_number) для каждого RejectionTestCase
из каждого IsRejectedConfig.
Запуск:
pytest tests/test_is_rejected_regress.py --suites=is_rejected_regress
"""
from datetime import datetime
from typing import Any, List, Optional
import allure
import pytest
from clients.websocket_client import WebSocketClient
from test_config.datasets import ALL_IS_REJECTED_CONFIGS
from test_config.models_for_tests import CaseMarkers, IsRejectedConfig, RejectionTestCase
from test_scenarios import rejected_scenarios as scenarios
# ===== ГЕНЕРАЦИЯ ПАРАМЕТРОВ =====
def _get_suite_markers(config: IsRejectedConfig) -> List[pytest.MarkDecorator]:
"""Маркеры для тестового набора."""
return [
pytest.mark.test_suite_name(config.suite_name),
pytest.mark.test_suite_data_id(config.suite_data_id),
pytest.mark.test_data_name(config.archive_name),
pytest.mark.tu_id(config.technological_unit.id),
]
def _generate_rejection_params() -> List[Any]:
"""
Генерирует параметры для тестов отбраковки.
Один параметр на каждый (config, rejection_case, case_number).
"""
params = []
for config in ALL_IS_REJECTED_CONFIGS:
for rejection_case in config.rejection_cases:
param_id = f"{config.suite_name}_{rejection_case.name}"
params.append(
pytest.param(
config,
rejection_case,
id=param_id,
marks=_get_suite_markers(config),
)
)
return params
# ===== ПАРАМЕТРЫ ДЛЯ ТЕСТОВ =====
REJECTION_PARAMS: List[Any] = _generate_rejection_params()
def _generate_suite_params() -> List[Any]:
"""Генерирует параметры для suite-level тестов отбраковки (один параметр на набор)."""
params = []
for config in ALL_IS_REJECTED_CONFIGS:
params.append(
pytest.param(
config,
id=config.suite_name,
marks=_get_suite_markers(config),
)
)
return params
SUITE_PARAMS: List[Any] = _generate_suite_params()
# ===== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =====
def _apply_allure_markers(
test_config: Optional[CaseMarkers], tag: str, title: str, description: Optional[str] = None
) -> None:
"""Применяет allure-маркеры из конфига теста."""
if not test_config:
pytest.skip("Не заполнена конфигурация теста: тест пропущен")
allure.dynamic.tag(tag)
allure.dynamic.tag("REGRESS")
allure.dynamic.title(title)
if description:
allure.dynamic.description(description)
# ===== ТЕСТЫ ОТБРАКОВКИ =====
@pytest.mark.parametrize("config, rejection_case", REJECTION_PARAMS)
class TestIsRejectedScenarios:
"""
Тесты отбраковки сигналов.
Для каждого RejectionTestCase из конфига запускается 4 теста по одному на подписку.
"""
@pytest.mark.asyncio
async def test_rejection_input_signals(
self,
ws_client: WebSocketClient,
config: IsRejectedConfig,
rejection_case: RejectionTestCase,
) -> None:
"""[InputSignals] Проверка отбраковки во входных сигналах по подписке SubscribeInputSignalsRequest"""
sensor = rejection_case.sensor
tag = "InputSignals"
title = (
f"[{tag}] {rejection_case.expected_event} {sensor.description} (id={sensor.id}) "
f"({rejection_case.name}). ЭФ: Входные сигналы"
)
_apply_allure_markers(
rejection_case.rejection_input_signals_test,
tag,
title,
(
f"Проверка отбраковки сигнала {sensor.description} (id={sensor.id}), "
f"на наборе данных {config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {rejection_case.rejection_input_signals_test.offset} мин.\n"
f"Тип отбраковки: {rejection_case.name}\n"
"Подписка: SubscribeInputSignalsRequest"
),
)
await scenarios.rejection_input_signals(ws_client, config, rejection_case)
@pytest.mark.asyncio
async def test_rejection_journal(
self,
ws_client: WebSocketClient,
config: IsRejectedConfig,
rejection_case: RejectionTestCase,
imitator_start_time: datetime,
) -> None:
"""[MessagesInfo] Проверка записи об отбраковке в журнале"""
sensor = rejection_case.sensor
tag = "MessagesInfo"
title = (
f"[{tag}] {rejection_case.expected_event} {sensor.description} (id={sensor.id}) "
f"({rejection_case.name}) в журнале. ЭФ: Журнал"
)
_apply_allure_markers(
rejection_case.rejection_journal_test,
tag,
title,
(
f"Проверка записи в журнале об отбраковке сигнала {sensor.description} (id={sensor.id}), "
f"на наборе данных {config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {rejection_case.rejection_journal_test.offset} мин.\n"
f"Тип отбраковки: {rejection_case.name}\n"
"Синхронный запрос типа: GetMessagesRequest с фильтром messageTypes=REJECTION"
),
)
await scenarios.rejection_journal(ws_client, config, rejection_case, imitator_start_time)
@pytest.mark.asyncio
async def test_rejection_main_page(
self,
ws_client: WebSocketClient,
config: IsRejectedConfig,
rejection_case: RejectionTestCase,
) -> None:
"""[MainPageSignalsInfo] Проверка счетчика отбраковки на состоянии МТ"""
tag = "MainPageSignalsInfo"
title = (
f"[{tag}] {rejection_case.expected_event} - "
f"счетчик отбраковки на состоянии МТ показывает отбраковку"
f"({rejection_case.name}). ЭФ: Состояние МТ"
)
_apply_allure_markers(
rejection_case.rejection_main_page_test,
tag,
title,
(
f"Проверка количества отбракованных сигналов > 0 при отбраковке {rejection_case.name}, "
f"на наборе данных {config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {rejection_case.rejection_main_page_test.offset} мин.\n"
"Подписка: subscribeMainPageSignalsInfoRequest"
),
)
await scenarios.rejection_main_page(ws_client, config)
@pytest.mark.asyncio
async def test_rejection_scheme_signals_state(
self,
ws_client: WebSocketClient,
config: IsRejectedConfig,
rejection_case: RejectionTestCase,
) -> None:
"""[SchemeSignalsState] Проверка отбраковки сигнала на схеме по подписке SubscribeSchemeSignalsStateRequest"""
sensor = rejection_case.sensor
tag = "SchemeSignalsState"
title = (
f"[{tag}] {rejection_case.expected_event} {sensor.description} (id={sensor.id}) "
f"({rejection_case.name}). ЭФ: Схема"
)
_apply_allure_markers(
rejection_case.rejection_scheme_signals_state_test,
tag,
title,
(
f"Проверка отбраковки сигнала {sensor.description} (id={sensor.id}), "
f"на наборе данных {config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {rejection_case.rejection_scheme_signals_state_test.offset} мин.\n"
f"Тип отбраковки: {rejection_case.name}\n"
f"Ожидаемый criteriaNames: {rejection_case.expected_criteria_names}\n"
"Подписка: SubscribeSchemeSignalsStateRequest"
),
)
await scenarios.rejection_scheme_signals_state(ws_client, config, rejection_case)
@pytest.mark.parametrize("config", SUITE_PARAMS)
class TestIsRejectedReport:
"""Suite-level тест общего отчёта по отбраковкам"""
@pytest.mark.asyncio
async def test_rejection_report(
self,
ws_client: WebSocketClient,
config: IsRejectedConfig,
imitator_start_time: datetime,
) -> None:
"""[ExportReports] Проверка общего отчёта об отбракованных входных данных"""
tag = "ExportReports"
title = f"[{tag}] Проверка общего отчёта об отбракованных входных данных. " f"ЭФ: Выпадашка отчётов"
_apply_allure_markers(
config.rejection_report_test,
tag,
title,
(
f"Проверка формирования и содержимого xlsx-отчёта об отбракованных входных данных "
f"на наборе данных {config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Период отчёта: от старта имитатора до старта + "
f"{config.rejection_report_test.offset} мин.\n"
"Этапы сценария:\n"
"1) SubscribeReportsDataExportedRequest - подписка на пуш-нотификации\n"
"2) ExportReportsCommandRequest - запрос формирования отчёта (тип RejectedReport)\n"
"3) Ожидание ReportDataExportedNotification\n"
"4) Лонг-поллинг GetExportedDataListRequest - поиск отчёта в списке\n"
"5) DownloadExportedDataRequest (StreamInvocation) - скачивание по exportedDataId\n"
"6) Проверка xlsx: шапка, колонки, строки по каждому RejectionTestCase из набора\n"
"7) Проверка имени файла (.xlsx, название отчёта, ТУ, период +-1 мин)\n"
"Во вложениях Allure xlsx прикладывается только при падении теста"
),
)
await scenarios.export_rejection_report(ws_client, config, imitator_start_time)
тест смок
"""
Единый тестовый модуль для всех наборов данных (single-leak и multi-leak).
Архитектура параметризации:
- SUITE_PARAMS: тесты уровня набора (один раз на набор данных)
- LEAK_PARAMS: тесты уровня утечки (один раз на каждую утечку)
Для добавления нового набора:
1. Создать файл в test_config/datasets/
2. Тесты подхватятся автоматически
Запуск:
- Все тесты: pytest tests/test_smoke.py
- Один набор: pytest tests/test_smoke.py --suites=select_4
- Несколько наборов: pytest tests/test_smoke.py --suites=imitative_17,select_19_20
"""
import datetime
from typing import Any, List, Optional
import allure
import pytest
from clients.websocket_client import WebSocketClient
from constants.test_constants import BaseTN3Constants as Base_const
from test_config.datasets import ALL_SMOKE_CONFIGS
from test_config.models_for_tests import CaseMarkers, LeakTestConfig, SmokeSuiteConfig
from test_scenarios import lds_status_scenarios
from test_scenarios import smoke_scenarios as scenarios
# ===== ГЕНЕРАЦИЯ ПАРАМЕТРОВ =====
def _get_suite_markers(config: SmokeSuiteConfig) -> List[pytest.MarkDecorator]:
"""Маркеры для тестового набора."""
return [
pytest.mark.test_suite_name(config.suite_name),
pytest.mark.test_suite_data_id(config.suite_data_id),
pytest.mark.test_data_name(config.archive_name),
pytest.mark.tu_id(config.technological_unit.id),
]
def _generate_suite_params() -> List[Any]:
"""
Генерирует параметры для тестов уровня набора данных.
Один параметр на каждый config.
"""
return [
pytest.param(config, id=config.suite_name, marks=_get_suite_markers(config)) for config in ALL_SMOKE_CONFIGS
]
def _generate_leak_params() -> List[Any]:
"""
Генерирует параметры для тестов уровня утечки.
Для single-leak конфигов: один параметр (config, leak, 1)
Для multi-leak конфигов: N параметров (config, leak_n, n) для каждой утечки
"""
params: List[Any] = []
for config in ALL_SMOKE_CONFIGS:
# Собираем все утечки из конфига
if config.has_multiple_leaks:
leaks = config.leaks
elif config.leak:
leaks = [config.leak]
else:
continue # Нет утечек в конфиге
# Создаём параметр для каждой утечки
for index, leak in enumerate(leaks):
leak_number = index + 1
# ID для Allure/pytest: select_4 или select_19_20_leak_1
if len(leaks) > 1:
param_id = f"{config.suite_name}_leak_{leak_number}"
else:
param_id = config.suite_name
params.append(
pytest.param(
config,
leak,
leak_number,
id=param_id,
marks=_get_suite_markers(config),
)
)
return params
# ===== ПАРАМЕТРЫ ДЛЯ ТЕСТОВ =====
SUITE_PARAMS: List[Any] = _generate_suite_params()
LEAK_PARAMS: List[Any] = _generate_leak_params()
# ===== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =====
def _apply_allure_markers(test_config: CaseMarkers, tag: str, title: str, description: Optional[str] = None) -> None:
"""Применяет allure-маркеры из конфига теста."""
if not test_config:
pytest.skip("Не заполнена конфигурация теста: тест пропущен")
allure.dynamic.tag(tag)
allure.dynamic.tag("SMOKE")
allure.dynamic.title(title)
if description:
allure.dynamic.description(description)
# ===== ТЕСТЫ УРОВНЯ НАБОРА =====
# Запускаются один раз для каждого config
@pytest.mark.parametrize("config", SUITE_PARAMS)
class TestSuiteScenarios:
"""
Тесты уровня набора данных.
Запускаются один раз для каждого конфига.
"""
@pytest.mark.asyncio
@pytest.mark.critical_stop
async def test_basic_info(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[BasicInfo] Проверка базовой информации СОУ: список ТУ"""
tag = "BasicInfo"
title = f"[{tag}] Проверка списка ТУ. ЭФ: Главная страница"
_apply_allure_markers(config.basic_info_test, tag, title)
await scenarios.basic_info(ws_client, config)
@pytest.mark.asyncio
async def test_journal_info(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[MessagesInfo] Проверка наличия сообщений в журнале"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка наличия сообщений в журнале. ЭФ: Журнал.Реальное время"
_apply_allure_markers(
config.journal_info_test,
tag,
title,
"Проверка наличия сообщений в журнале.\n" "Синхронный запрос типа: MessagesInfo",
)
await scenarios.journal_info(ws_client)
@pytest.mark.asyncio
async def test_lds_status_initialization(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[CommonScheme] Проверка режима работы СОУ: Инициализация"""
tag = "CommonScheme"
title = f"[{tag}] Проверка режима работы СОУ: 'Инициализация'. ЭФ: Схема"
_apply_allure_markers(
config.lds_status_initialization_test,
tag,
title,
(
f"Проверка режима работы СОУ на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки : {config.lds_status_initialization_test.offset} мин.\n"
"Подписка на сообщения типа: CommonScheme\n"
"Ожидаемый режим работы СОУ: Инициализация"
),
)
await scenarios.lds_status_initialization(ws_client, config)
@pytest.mark.asyncio
async def test_diagnostics_of_signals_after_initialization(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig
) -> None:
"""[OutputSignalsInfo] Проверка выходных сигналов после выхода из инициализации"""
tag = "OutputSignalsInfo"
title = f"[{tag}] Проверка выходных сигналов после выхода из инициализации. ЭФ Диагностика сигналов"
_apply_allure_markers(
config.diagnostics_of_signals_after_initialization_test,
tag,
title,
(
f"Проверка состояния СОУ и перекачки на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки : {config.diagnostics_of_signals_after_initialization_test.offset} мин.\n"
),
)
await scenarios.diagnostics_of_signals_after_initialization(ws_client, config)
@pytest.mark.asyncio
async def test_lds_status_init_in_journal(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, imitator_start_time: datetime
) -> None:
"""[MessagesInfo] Проверка записи в журнале о входе СОУ в Инициализацию"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка записи в журнале: СОУ в Инициализации. ЭФ: Журнал. Реальное время"
description = (
f"Проверка записи в журнале о входе СОУ в Инициализацию на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.lds_status_init_in_journal_test.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo с фильтром messageTypes=LDS_STATUS\n"
)
_apply_allure_markers(config.lds_status_init_in_journal_test, tag, title, description)
await scenarios.lds_status_init_in_journal(ws_client, config, imitator_start_time)
@pytest.mark.asyncio
async def test_main_page_info(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[MainPageInfo] Проверка установки режима МТ"""
tag = "MainPageInfo"
title = f"[{tag}] Проверка установки режима работы МТ: стационарный. ЭФ: Главная страница.Контент таблица по ТУ"
_apply_allure_markers(
config.main_page_info_test,
tag,
title,
(
f"Проверка установки режима работы МТ: стационарный на данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.main_page_info_test.offset} мин.\n"
"Подписка на сообщения типа: MainPageInfo\n"
"Ожидаемый режим работы МТ: Стационарный"
),
)
await scenarios.main_page_info(ws_client, config)
@pytest.mark.asyncio
async def test_main_page_info_signals(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[MainPageSignalsInfo] Проверка счетчиков состояния сигналов"""
tag = "MainPageSignalsInfo"
title = f"[{tag}] Проверка счетчиков состояния сигналов. ЭФ: Главная страница.Контент таблица по ТУ"
_apply_allure_markers(
config.main_page_info_signals_test,
tag,
title,
(
f"Проверка счетчиков состояния сигналов на данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.main_page_info_signals_test.offset} мин.\n"
"Подписка на сообщения типа: MainPageSignalsInfo\n"
),
)
await scenarios.main_page_info_signals(ws_client, config)
@pytest.mark.asyncio
async def test_imitate_flowmeter_signal(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[ImitateSignal] Проверка имитации расходомера"""
tag = "ImitateSignal"
title = f"[{tag}] Проверка имитации сигнала расходомера. ЭФ: Схема, Входные сигналы"
_apply_allure_markers(
config.imitate_flowmeter_signal_test,
tag,
title,
(
"Проверка работы имитации и снятия имитации c расходомера "
f"на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.imitate_flowmeter_signal_test.offset} мин.\n"
"Синхронные запросы типа: ImitateSignalRequest, UnImitateSignalRequest\n"
"Подписка на сообщения типа: InputSignalsContent\n"
"Проверки:\n"
"Статус-код ответа на синхронный запрос имитации.\n"
"Значения в полях isImitated, quality и imitation.value расходомера "
"во входных сигналах после имитации.\n"
"Статус-код ответа на синхронный запрос снятия имитации.\n"
"Значение в поле isImitated расходомера во входных сигналах после снятия имитации.\n"
"Примечание: что бы не повлиять на проверки утечек, тест на имитацию выполняется во время "
"инициализации"
),
)
test_data = config.imitate_flowmeter_signal_test_data
await scenarios.imitate_sensor_signal(ws_client, config, test_data)
@pytest.mark.asyncio
async def test_imitate_pressure_sensor_signal(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[ImitateSignal] Проверка имитации датчика давления"""
tag = "ImitateSignal"
title = f"[{tag}] Проверка имитации сигнала датчика давления. ЭФ: Схема, Входные сигналы"
_apply_allure_markers(
config.imitate_pressure_sensor_signal_test,
tag,
title,
(
"Проверка работы имитации и снятия имитации c датчика давления "
f"на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.imitate_pressure_sensor_signal_test.offset} мин.\n"
"Синхронные запросы типа: ImitateSignalRequest, UnImitateSignalRequest\n"
"Подписка на сообщения типа: InputSignalsContent\n"
"Проверки:\n"
"Статус-код ответа на синхронный запрос имитации.\n"
"Значения в полях isImitated, quality и imitation.value датчика давления "
"во входных сигналах после имитации.\n"
"Статус-код ответа на синхронный запрос снятия имитации.\n"
"Значение в поле isImitated датчика давления во входных сигналах после снятия имитации.\n"
"Примечание: что бы не повлиять на проверки утечек, тест на имитацию выполняется во время "
"инициализации"
),
)
test_data = config.imitate_pressure_sensor_signal_test_data
await scenarios.imitate_sensor_signal(ws_client, config, test_data)
@pytest.mark.asyncio
async def test_mask_signal(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[MaskSignal] Проверка маскирования датчиков"""
tag = "MaskSignal"
title = f"[{tag}] проверка маскирования датчиков. ЭФ: Схема"
_apply_allure_markers(
config.mask_signal_test,
tag,
title,
(
f"Проверка работы маскирования и снятия маскирования на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.mask_signal_test.offset} мин.\n"
"Синхронные запросы типа: MaskSignalRequest, UnmaskSignalRequest\n"
"Подписка на сообщения типа: InputSignalsContent\n"
"Проверки:\n"
"Статус-код ответа на синхронный запрос маскирования.\n"
"Значение в поле isMasked сигнала во входных сигналах после маскирования.\n"
"Статус-код ответа на синхронный запрос снятия маскирования.\n"
"Значение в поле isMasked сигнала во входных сигналах после снятия маскирования.\n"
"Примечание: что бы не повлиять на проверки утечек, тест на маскирование выполняется во время "
"инициализации"
),
)
test_data = config.mask_signal_test_data
await scenarios.mask_signal_test(ws_client, config, test_data)
@pytest.mark.asyncio
async def test_mask_info_in_journal(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, imitator_start_time: datetime
) -> None:
"""[MessagesInfo] Проверка записей журнала о маскировании и размаскировании"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка записей журнала о маскировании и размаскировании. ЭФ: Журнал.Реальное время"
description = (
f"Проверка записей журнала о маскировании и размаскировании на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.mask_info_in_journal_test.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo\n"
"Проверки: структура, фильтрация по времени, совпадение тегов между маскированием и снятием.\n"
"Примечание: тест выполняется после теста маскирования, чтобы записи успели попасть в журнал"
)
_apply_allure_markers(config.mask_info_in_journal_test, tag, title, description)
await scenarios.mask_info_in_journal(ws_client, config, imitator_start_time)
@pytest.mark.asyncio
async def test_mask_du_on_mini_scheme(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""Проверка маскирования ДУ"""
tag = "MaskDuOnMiniScheme"
title = f"[{tag}] проверка маскирования линейного участка. ЭФ: Схема"
description = (
f"Проверка маскирования линейного участка, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.mask_signal_test.offset} мин.\n"
"Синхронные запросы типа: MaskLdsCommandRequest\n"
"Проверки:\n"
"Статус-код ответа на синхронный запрос маскирования.\n"
"Проверка маскирования ДУ в выходных сигналах.\n"
"Проверка маскирования на схеме.\n"
"Проверка сообщения о маскировании в журнале.\n"
)
_apply_allure_markers(config.mask_du_on_mini_scheme_test, tag, title, description)
await scenarios.mask_du_on_mini_scheme(ws_client, config)
@pytest.mark.asyncio
async def test_unmask_du_on_mini_scheme(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""Проверка размаскирования ДУ"""
tag = "UnmaskDuOnMiniScheme"
title = f"[{tag}] проверка снятия маскирования линейному участку. ЭФ: Схема"
description = (
f"Проверка снятия маскирования линейному участку, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.unmask_du_on_mini_scheme_test.offset} мин.\n"
"Синхронные запросы типа: UnmaskDuOnMiniScheme\n"
"Проверки:\n"
"Статус-код ответа на синхронный запрос снятия маски.\n"
"Проверка тега с маской в выходных сигналах.\n"
"Проверка сообщения о снятии маскирования в журнале.\n"
)
_apply_allure_markers(config.unmask_du_on_mini_scheme_test, tag, title, description)
await scenarios.unmask_du_on_mini_scheme(ws_client, config)
@pytest.mark.asyncio
async def test_lds_status_initialization_out(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[CommonScheme] Проверка выхода СОУ из Инициализации"""
tag = "CommonScheme"
title = f"[{tag}] Проверка выхода СОУ из Инициализации. ЭФ: Схема"
_apply_allure_markers(
config.lds_status_initialization_out_test,
tag,
title,
(
f"Проверка выхода СОУ из Инициализации на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.lds_status_initialization_out_test.offset} мин.\n"
"Подписка на сообщения типа: CommonScheme\n"
"Ожидаемый режим работы СОУ: не Инициализация"
),
)
await scenarios.lds_status_initialization_out(ws_client, config)
@pytest.mark.asyncio
async def test_lds_status_init_out_in_journal(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, imitator_start_time: datetime
) -> None:
"""[MessagesInfo] Проверка записи в журнале о выходе СОУ из Инициализации"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка записи в журнале: выход СОУ из Инициализации. ЭФ: Журнал. Реальное время"
description = (
f"Проверка записи в журнале о выходе СОУ из Инициализации на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.lds_status_init_out_in_journal_test.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo с фильтром messageTypes=LDS_STATUS\n"
)
_apply_allure_markers(config.lds_status_init_out_in_journal_test, tag, title, description)
await scenarios.lds_status_init_out_in_journal(ws_client, config, imitator_start_time)
@pytest.mark.asyncio
async def test_main_page_info_unstationary(self, ws_client: WebSocketClient, config: SmokeSuiteConfig) -> None:
"""[MainPageInfo] Проверка установки режима Нестационарный (для multi-leak)"""
tag = "MainPageInfo"
title = (
f"[{tag}] Проверка установки режима работы МТ: нестационарный. ЭФ: Главная страница. Контент таблица по ТУ"
)
_apply_allure_markers(
config.main_page_info_unstationary_test,
tag,
title,
(
f"Проверка установки режима работы МТ: нестационарный на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {config.main_page_info_unstationary_test.offset} мин.\n"
"Подписка на сообщения типа: MainPageInfo\n"
"Ожидаемый режим работы МТ: Нестационарный"
),
)
await scenarios.main_page_info_unstationary(ws_client, config)
@pytest.mark.asyncio
async def test_mode_mt_in_journal(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
imitator_start_time: datetime,
) -> None:
"""[MessagesInfo] Проверка наличия сообщения о режиме МТ в журнале"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка сообщений о режиме МТ. ЭФ Журнал."
description = (
f"Проверка сообщений о режиме МТ на наборе данных {config.suite_name}, \n"
f"Время проведения проверки {config.mode_mt_in_journal_test.offset}"
)
_apply_allure_markers(config.mode_mt_in_journal_test, tag, title, description)
test_data = config.exp_mode_mt_message
await scenarios.mode_mt_in_journal(ws_client, config, imitator_start_time, test_data)
# ===== ТЕСТЫ УРОВНЯ УТЕЧКИ =====
# Запускаются для каждой утечки в конфиге
@pytest.mark.parametrize("config, leak, leak_number", LEAK_PARAMS)
class TestLeakScenarios:
"""
Тесты уровня утечки.
Для single-leak: запускаются один раз.
Для multi-leak: запускаются для каждой утечки отдельно.
"""
@pytest.mark.asyncio
async def test_all_leaks_info(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[AllLeaksInfo] Проверка утечки в пуше"""
tag = "AllLeaksInfo"
title = f"[{tag}] Проверка сообщения об утечке. ЭФ: Пуш-сообщение об утечке на всех ЭФ"
_apply_allure_markers(
leak.all_leaks_info_test,
tag,
title,
(
f"Проверка пуш-сообщения об утечке на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.all_leaks_info_test.offset} мин.\n"
"Подписка на сообщения типа: AllLeaksInfo\n"
f"Допустимое время обнаружения {leak.allowed_time_diff_minutes} мин. с момента начала утечки, "
f"т к для данных {config.suite_name} интенсивность утечки {leak.leak_rate_percentages}%.\n"
"Примечание: тесты сообщений об утечке должны выполняться раньше теста на квитирование"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.all_leaks_info(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_all_leaks_is_empty(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""[AllLeaksInfo] Проверка отсутствия информации об утечке"""
tag = "AllLeaksInfo"
title = f"[{tag}] Проверка отсутствия информации об утечке в сообщении. ЭФ: Пуш-сообщение об утечке"
_apply_allure_markers(
leak.all_leaks_is_empty_test,
tag,
title,
(
"Проверка отсутствия информации об утечке в сообщении, \n"
f"Время проведения проверки: {leak.all_leaks_is_empty_test.offset} мин.\n"
"Подписка на сообщения типа: AllLeaksInfo\n"
"Примечание: проверяем, когда в КГ табличный вид список пуст"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.all_leaks_is_empty(ws_client, config)
@pytest.mark.asyncio
async def test_leaks_content(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[LeaksContent] Проверка сообщения об утечке в таблице КГ"""
tag = "LeaksContent"
title = f"[{tag}] Проверка сообщения об утечке. ЭФ: КГ.Табличное представление"
_apply_allure_markers(
leak.leaks_content_test,
tag,
title,
(
f"Проверка сообщения об утечке в таблице КГ на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.leaks_content_test.offset} мин.\n"
"Подписка на сообщения типа: LeaksContent\n"
f"Допустимое время обнаружения {leak.allowed_time_diff_minutes} мин. с момента начала утечки, "
f"т к для данных {config.suite_name} интенсивность утечки {leak.leak_rate_percentages}%.\n"
"Примечание: тесты сообщений об утечке должны выполняться раньше теста на квитирование"
),
)
# Добавляем номер утечки в title для multi-leak
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.leaks_content(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_leak_info_in_journal(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[MessagesInfo] Проверка сообщения об утечке в журнале"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка сообщения об утечке в журнале. ЭФ: Журнал.Реальное время"
_apply_allure_markers(
leak.leak_info_in_journal,
tag,
title,
(
f"Проверка сообщения об утечке в журнале на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.leak_info_in_journal.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo\n"
f"Допустимое время обнаружения {leak.allowed_time_diff_minutes} мин. с момента начала утечки, "
f"т к для данных {config.suite_name} интенсивность утечки {leak.leak_rate_percentages}%.\n"
"Примечание: тесты сообщений об утечке должны выполняться раньше теста на квитирование"
),
)
# Добавляем номер утечки в title для multi-leak
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.leak_info_in_journal(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_completed_leak_info_in_journal(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[MessagesInfo] Проверка сообщения о завершенной утечке в журнале"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка сообщения о завершенной утечке в журнале. ЭФ: Журнал.Реальное время"
description = (
f"Проверка сообщения о завершенной утечке в журнале на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.completed_leak_info_in_journal_test.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo\n"
)
_apply_allure_markers(
leak.completed_leak_info_in_journal_test,
tag,
title,
description,
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.completed_leak_info_in_journal(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_possible_leak_in_journal(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[MessagesInfo] Проверка наличия сообщения 'Возможна утечка' в журнале"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка сообщения 'Возможна утечка' в журнале. ЭФ: Журнал. Реальное время"
description = (
f"Проверка наличия сообщения 'Возможна утечка' в журнале на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.possible_leak_in_journal_test.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo"
)
_apply_allure_markers(leak.possible_leak_in_journal_test, tag, title, description)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.possible_leak_in_journal(ws_client, config, imitator_start_time)
@pytest.mark.asyncio
async def test_tu_leaks_info(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[TuLeaksInfo] Проверка утечки на ТУ"""
tag = "TuLeaksInfo"
title = f"[{tag}] Проверка сообщения об утечке. Сообщения на ЭФ: Схема, Минисхема, Гидроуклон"
_apply_allure_markers(
leak.tu_leaks_info_test,
tag,
title,
(
f"Проверка сообщения об утечке на схемах и гидроуклоне на наборе данных {config.suite_name}, \n"
f"на технологическом участке {leak.diagnostic_area_name}\n"
f"Время проведения проверки: {leak.tu_leaks_info_test.offset} мин.\n"
"Подписка на сообщения типа: TuLeaksInfo\n"
f"Допустимое время обнаружения {leak.allowed_time_diff_minutes} мин. с момента начала утечки, "
f"т к для данных {config.suite_name} интенсивность утечки {leak.leak_rate_percentages}%.\n"
"Примечание: тесты сообщений об утечке должны выполняться раньше теста на квитирование"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.tu_leaks_info(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_lds_status_during_leak(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""[CommonScheme] Проверка режима работы СОУ во время утечки"""
tag = "CommonScheme"
title = f"[{tag}] Проверка режима работы СОУ во время утечки. ЭФ: Схема"
_apply_allure_markers(
leak.lds_status_during_leak_test,
tag,
title,
(
f"Проверка режима работы СОУ во время утечки на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.lds_status_during_leak_test.offset} мин.\n"
"Подписка на сообщения типа: CommonScheme\n"
"Примечание: проверка режимов СОУ во время утечки должна выполняться раньше теста на квитирование\n"
"В рамках данного теста проверяется режим СОУ на ДУ с утечкой и на соседних ДУ"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.lds_status_during_leak(ws_client, config, leak)
@pytest.mark.asyncio
async def test_acknowledge_leak_info(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, leak: LeakTestConfig, leak_number: int
) -> None:
"""[AcknowledgeLeak] Проверка квитирования утечки"""
tag = "AcknowledgeLeak"
title = f"[{tag}] Проверка квитирования утечки. Отсутствие Пуш-сообщений об утечке на всех ЭФ"
_apply_allure_markers(
leak.acknowledge_leak_test,
tag,
title,
(
"Проверка квитирования утечки на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.acknowledge_leak_test.offset} мин.\n"
"Синхронный запрос типа: AcknowledgeLeak\n"
"Подписка на сообщения типа: TuLeaksInfo, AllLeaksInfo\n"
"Проверки:\n"
"Статус-код ответа на синхронный запрос о квитировании,\n"
"Отсутствие пуш-сообщений об утечках после квитирования"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.acknowledge_leak_info(ws_client, config, leak)
@pytest.mark.asyncio
async def test_acknowledge_leak_in_journal(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[MessagesInfo] Проверка записи в журнале о квитировании утечки"""
tag = "MessagesInfo"
title = f"[{tag}] Проверка записи в журнале о квитировании утечки. ЭФ: Журнал. Реальное время"
description = (
f"Проверка записи о квитировании утечки в журнале на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.acknowledge_leak_in_journal_test.offset} мин.\n"
"Синхронный запрос типа: MessagesInfo с фильтром userActions=LEAK_ACK\n"
)
_apply_allure_markers(leak.acknowledge_leak_in_journal_test, tag, title, description)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.acknowledge_leak_in_journal(ws_client, config, imitator_start_time)
@pytest.mark.asyncio
async def test_output_signals(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[OutputSignalsInfo] Проверка данных об утечке в выходных сигналах"""
tag = "OutputSignalsInfo"
title = (
f"[{tag}] Проверка наличия данных об утечке в выходных сигналах. ЭФ: Диагностика сигналов.Выходные "
f"сигналы"
)
_apply_allure_markers(
leak.output_signals_test,
tag,
title,
(
f"Проверка наличия данных об утечке в выходных сигналах на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.output_signals_test.offset} мин.\n"
"Синхронный запрос типа: GetOutputSignals\n"
"Подписка на сообщения типа: SubscribeOutputSignals\n"
f"Допустимое время обнаружения {leak.allowed_time_diff_minutes} мин. с момента начала утечки, "
f"т к для данных {config.suite_name} интенсивность утечки {leak.leak_rate_percentages}%.\n"
"Примечание: "
"В offset указано время проверок сообщения выходных сигналов + 1 минута "
"для корректной отработки проверок.\n"
"Данный тест так же проверяет квитирование, offset выставлять после запуска теста на квитирование "
"утечки"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.output_signals(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_balance_algorithm_leak_waiting(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[BalanceAlgorithmResultsContent] Проверка сообщения 'подозрение на утечку' в таблице КГ"""
tag = "BalanceAlgorithmResultsContent"
title = f"[{tag}] Проверка сообщения 'подозрение на утечку'. ЭФ: КГ.Графическое представление"
_apply_allure_markers(
leak.balance_algorithm_leak_waiting_test,
tag,
title,
(
f"Проверка сообщения 'подозрение на утечку' в таблице КГ, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: "
f"{leak.balance_algorithm_leak_waiting_test.offset} - "
f"{leak.balance_algorithm_leak_waiting_test.offset + Base_const.BALANCE_ALGORITHM_TOTAL_WAIT / 60} "
f"мин.\n"
"Подписка на сообщения типа: BalanceAlgorithmResultsContent\n"
),
)
# Добавляем номер утечки в title для multi-leak
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.balance_algorithm_leak_waiting(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_balance_algorithm_leak_detected(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""[BalanceAlgorithmResultsContent] Проверка наличия утечки (isLeakDetected) в графическом представлении КГ"""
tag = "BalanceAlgorithmResultsContent"
title = f"[{tag}] Проверка наличия утечки (isLeakDetected). ЭФ: КГ Графическое представление"
_apply_allure_markers(
leak.balance_algorithm_leak_detected_test,
tag,
title,
(
f"Проверка наличия утечки (isLeakDetected) в графическом представлении КГ, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.balance_algorithm_leak_detected_test.offset} мин.\n"
"Подписка на сообщения типа: BalanceAlgorithmResultsContent\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.balance_algorithm_leak_detected(ws_client, config, leak)
@pytest.mark.asyncio
async def test_balance_algorithm_leak_completed(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""
[BalanceAlgorithmResultsContent] Проверка отсутствия утечки (isLeakDetected) в графическом представлении КГ
"""
tag = "BalanceAlgorithmResultsContent"
title = (
f"[{tag}] Проверка отсутствия утечки (isLeakDetected) после завершения утечки. "
f"ЭФ: КГ Графическое представление"
)
_apply_allure_markers(
leak.balance_algorithm_leak_detected_test,
tag,
title,
(
f"Проверка отсутствия утечки (isLeakDetected) в графическом представлении КГ, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.balance_algorithm_leak_detected_test.offset} мин.\n"
"Подписка на сообщения типа: BalanceAlgorithmResultsContent\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.balance_algorithm_leak_completed(ws_client, config)
@pytest.mark.asyncio
async def test_lds_status_after_confirming_leak(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, leak: LeakTestConfig, leak_number: int
) -> None:
"""
[CommonScheme] Проверка режима работы СОУ: Инициализация
Причина: Накопление данных
"""
tag = "CommonScheme"
title = f"[{tag}] Проверка режима работы СОУ: Инициализация, по причине: Накопление данных. ЭФ: Схема"
_apply_allure_markers(
leak.lds_status_after_confirming_leak_test,
tag,
title,
(
f"Проверка режима работы СОУ на выбранном ДУ, на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
"Время проведения проверки : "
f"{leak.lds_status_after_confirming_leak_test.offset} мин.\n"
"Подписка на сообщения типа: CommonScheme\n"
"Ожидаемый режим работы СОУ: Инициализация\n "
"Ожидаемая причина режима работы СОУ: Накопление данных"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
test_data = leak.lds_status_after_confirming_leak_data
await lds_status_scenarios.lds_status_check_with_reasons(ws_client, config, test_data)
await lds_status_scenarios.lds_status_check_with_reasons(ws_client, config, test_data)
@pytest.mark.asyncio
async def test_lds_status_completed_leak(
self, ws_client: WebSocketClient, config: SmokeSuiteConfig, leak: LeakTestConfig, leak_number: int
) -> None:
"""
[CommonScheme] Проверка режима работы СОУ после завершения утечки: Инициализация
Причина: Накопление данных
"""
tag = "CommonScheme"
title = (
f"[{tag}] Проверка режима работы СОУ после завершения утечки: Инициализация"
"Причина: Накопление данных. ЭФ: Схема"
)
_apply_allure_markers(
leak.lds_status_after_confirming_leak_test,
tag,
title,
(
f"Проверка режима работы СОУ на выбранном ДУ, на наборе данных {config.suite_name}, \n"
f"на технологическом участке {config.technological_unit.description}\n"
"Время проведения проверки : "
f"{leak.lds_status_completed_leak_test.offset} мин.\n"
"Подписка на сообщения типа: CommonScheme\n"
"Ожидаемый режим работы СОУ: Инициализация\n "
"Ожидаемая причина режима работы СОУ: Накопление данных"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
test_data = leak.lds_status_after_completed_leak_data
await lds_status_scenarios.lds_status_check_with_reasons(ws_client, config, test_data)
@pytest.mark.asyncio
async def test_leak_is_complete_on_main_page(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""Проверка отсутствия утечки на ЭФ Состояние МТ"""
tag = "MainPageInfoContent"
title = f"[{tag}] Проверка отсутствия утечки на ЭФ Состояние МТ."
_apply_allure_markers(
leak.leak_is_complete_on_main_page_test,
tag,
title,
(
f"Проверка отсутствия утечки на ЭФ Состояние МТ\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.leak_is_complete_on_main_page_test.offset} мин.\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.leak_is_complete_on_main_page(ws_client, config)
@pytest.mark.asyncio
async def test_leak_is_confirm_on_main_page(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""Проверка подтвержденной утечки на ЭФ Состояние МТ"""
tag = "MainPageInfoContent"
title = f"[{tag}] Проверка подтвержденной утечки на ЭФ Состояние МТ"
_apply_allure_markers(
leak.leak_is_confirm_on_main_page_test,
tag,
title,
(
f"Проверка наличия утечки на ЭФ Состояние МТ \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.leak_is_confirm_on_main_page_test.offset} мин.\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.leak_is_confirm_on_main_page(ws_client, config)
@pytest.mark.asyncio
async def test_the_leak_is_complete_on_kg(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""Проверка события завершения утечки в контроле герметичности"""
tag = "LeaksContent"
title = f"[{tag}] Проверка cобытия завершения утечки ЭФ КГ табличный вид."
_apply_allure_markers(
leak.the_leak_is_complete_on_kg_test,
tag,
title,
(
f"Проверка ЭФ КГ табличное представление, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.the_leak_is_complete_on_kg_test.offset} мин.\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.the_leak_is_complete_on_kg(ws_client, config, leak)
@pytest.mark.asyncio
async def test_leak_is_complete_in_output_signals(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""Проверка отсутствия утечки в выходных сигналах"""
tag = "OutputSignalsInfo"
title = f"[{tag}] Проверка отсутствия утечки в выходных сигналах."
_apply_allure_markers(
leak.leak_is_complete_in_output_signals_test,
tag,
title,
(
f"Проверка ЭФ Выходные сигналы \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.leak_is_complete_in_output_signals_test.offset} мин.\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.leak_is_complete_in_output_signals(ws_client, config, leak)
@pytest.mark.asyncio
async def test_complete_tu_leaks_info_content(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
) -> None:
"""Проверка сообщения в журнале о завершении утечки"""
tag = "TuLeaksInfoContent"
title = f"[{tag}] Проверка отсутствия утечки на схеме"
_apply_allure_markers(
leak.complete_tu_leaks_info_content_test,
tag,
title,
(
f"Проверка ЭФ Схема, \n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.complete_tu_leaks_info_content_test.offset} мин.\n"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.complete_tu_leaks_info_content(ws_client, config)
@pytest.mark.asyncio
async def test_export_leaks_report(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
tag = "ExportReports"
title = f"[{tag}] Проверка формирования отчёта об утечках. ЭФ: Выпадашка отчётов"
_apply_allure_markers(
leak.export_leaks_report_test,
tag,
title,
(
f"Проверка формирования и содержимого xlsx-отчёта об утечках на наборе данных "
f"{config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Период отчёта: от старта имитатора до старта + сдвиг теста"
f"{leak.export_leaks_report_test.offset} мин.\n"
"Этапы сценария:\n"
"1) SubscribeReportsDataExportedRequest - подписка на пуш-нотификации\n"
"2) ExportReportsCommandRequest - запрос формирования отчёта (тип LeaksReport, фильтр по периоду)\n"
"3) Ожидание ReportDataExportedNotification - пуш-нотификация о формировании отчёта\n"
"4) Лонг-поллинг GetExportedDataListRequest - поиск отчёта в списке "
"(имя, ТУ, период start/end с погрешностью +-1 мин)\n"
"5) DownloadExportedDataRequest (StreamInvocation) - скачивание по exportedDataId\n"
"6) Приём fileChunk, проверка формата xlsx (zip-сигнатура, контент существует)\n"
"7) Проверка имени файла: .xlsx, 'Отчет об утечках', название ТУ, "
"диапазон дат в имени (+-1 мин)\n"
"8) Проверка шапки xlsx: заголовок с периодом (+-1 мин), названия колонок\n"
"9) Проверка строки утечки: дата в периоде, объект, режим СОУ, маскирование, "
"координата, объём, режим работы МТ\n"
"Во вложениях Allure xlsx прикладывается только при падении теста"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.export_leaks_report(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_export_lds_status_report(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
tag = "ExportReports"
title = f"[{tag}] Проверка формирования отчёта о режиме работы СОУ. ЭФ: Выпадашка отчётов"
_apply_allure_markers(
leak.export_lds_status_report_test,
tag,
title,
(
f"Проверка формирования и содержимого xlsx-отчёта о режиме работы СОУ на наборе данных "
f"{config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Период отчёта: от старта имитатора до старта + сдвиг теста "
f"{leak.export_lds_status_report_test.offset} мин.\n"
"Этапы сценария:\n"
"1) SubscribeReportsDataExportedRequest - подписка на пуш-нотификации\n"
"2) ExportReportsCommandRequest - запрос формирования отчёта (тип LdsStateReport, фильтр по периоду)\n"
"3) Ожидание ReportDataExportedNotification\n"
"4) Лонг-поллинг GetExportedDataListRequest - поиск отчёта в списке\n"
"5) DownloadExportedDataRequest (StreamInvocation) - скачивание по exportedDataId\n"
"6) Проверка xlsx: участки, длительности режимов СОУ, суммарное время работы\n"
"7) Проверка двойной шапки и названий колонок\n"
"8) Проверка имени файла (.xlsx, название отчёта, ТУ, период +-1 мин)\n"
"Во вложениях Allure xlsx прикладывается только при падении теста"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.export_lds_status_report(ws_client, config, leak, imitator_start_time)
@pytest.mark.asyncio
async def test_export_mt_mode_report(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
tag = "ExportReports"
title = f"[{tag}] Проверка формирования отчёта о режиме работы МТ. ЭФ: Выпадашка отчётов"
_apply_allure_markers(
leak.export_mt_mode_report_test,
tag,
title,
(
f"Проверка формирования и содержимого xlsx-отчёта о режиме работы МТ на наборе данных "
f"{config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Период отчёта: от старта имитатора до старта + сдвиг теста "
f"{leak.export_mt_mode_report_test.offset} мин.\n"
"Этапы сценария:\n"
"1) SubscribeReportsDataExportedRequest - подписка на пуш-нотификации\n"
"2) ExportReportsCommandRequest - запрос формирования отчёта (тип StationaryStatusReport)\n"
"3) Ожидание ReportDataExportedNotification\n"
"4) Лонг-поллинг GetExportedDataListRequest - поиск отчёта в списке\n"
"5) DownloadExportedDataRequest (StreamInvocation) - скачивание по exportedDataId\n"
"6) Проверка xlsx: участки, длительности режимов МТ, суммарное время, доминирующий режим\n"
"7) Проверка двойной шапки и названий колонок\n"
"8) Проверка имени файла (.xlsx, название отчёта, ТУ, период +-1 мин)\n"
"Во вложениях Allure xlsx прикладывается только при падении теста"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.export_mt_mode_report(ws_client, config, leak, imitator_start_time)
ассерт
from __future__ import annotations
import traceback
from typing import Any, List, Optional, TypeVar
import allure
from assertpy import assert_that
from pytest import fail
ObjectType = TypeVar("ObjectType")
class SoftAssertions:
"""
Контекстный менеджер для "мягких" сравнений.
Внутри теста используется так:
with SoftAssertions() as soft:
StepCheck(..., failures=soft).actual(...).expected(...).equal_to()
По выходу, если были ошибки — они прикрепляются к Allure и поднимается Aggregated AssertionError.
"""
def __init__(self) -> None:
self.failures: List[str] = []
def __enter__(self) -> List[str]:
return self.failures
def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
# Если внутри контекста появились внешние исключения (не связанные с проверками) — не подавляем их.
if exc_type is not None:
return False
if not self.failures:
return False
# Прикрепляем все собранные failure-traceback'и к Allure, чтобы их было удобно смотреть
joined = "\n\n".join(self.failures)
allure.attach(joined, name="soft assertion failures", attachment_type=allure.attachment_type.TEXT)
# Поднимаем итоговую ошибку, чтобы CI увидел падение теста
raise AssertionError("Soft assertion failures:\n\n" + joined)
class StepMessageBuilder:
"""
Составляет разные сообщения для allure.step под конкретный вид assert
"""
def __init__(self, check_step: str, field_name: str) -> None:
self.check_step = check_step
self.field_name = field_name
def _build_message(self, message_parts) -> str:
"""
Собирает сообщение из списка с нужным разделителем
"""
return "\n".join([self.check_step] + message_parts)
@staticmethod
def _format_val(val: Any) -> str:
"""
Вспомогательная функция для аккуратного форматирования значения в сообщении.
Она пытается использовать repr(), но на случай исключения возвращает str().
"""
try:
return str(val)
except TypeError:
return repr(val)
@staticmethod
def _item_count(val: Any) -> int:
"""
Считает количество элементов, если возможно
"""
return len(val) if hasattr(val, "__len__") else 0
def equal_to_msg(
self,
exp_value: Any,
act_value: Any,
) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_not_equal_to_msg(
self,
exp_value: Any,
act_value: Any,
) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)} не равно фактическому",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_not_none_msg(self, act_value: Any) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} не пустое",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_none_msg(self, act_value: Any) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} пустое",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def is_not_empty_msg(self, act_value: Any) -> str:
item_count = self._item_count(act_value)
message_parts = [
f"Ожидаемый результат: {self.field_name} не пустое",
f"Фактический результат: количество элементов в {self.field_name} = {item_count}",
]
return self._build_message(message_parts)
def is_empty_msg(self, act_value: Any) -> str:
item_count = self._item_count(act_value)
message_parts = [
f"Ожидаемый результат: {self.field_name} пустое",
f"Фактический результат: количество элементов в {self.field_name} = {item_count}",
]
return self._build_message(message_parts)
def is_close_to_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: {self.field_name} = {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_less_than_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: Значение в поле {self.field_name} < {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_greater_than_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: Значение в поле {self.field_name} > {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_greater_than_or_equal_to_msg(self, exp_value: Any, act_value: Any, extra_info: Optional[Any] = None) -> str:
message_parts = [
f"Ожидаемый результат: Значение в поле {self.field_name} >= {self._format_val(exp_value)}",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
if extra_info:
message_parts.append(f"Дополнительная информация: {self._format_val(extra_info)}")
return self._build_message(message_parts)
def is_between_msg(self, act_value: Any, lower_bound: Any, upper_bound: Any) -> str:
message_parts = [
f"Ожидаемый результат: "
f"Значение в поле {self.field_name} должно быть в диапазоне [{lower_bound}, {upper_bound}]",
f"Фактический результат: {self.field_name} = {self._format_val(act_value)}",
]
return self._build_message(message_parts)
def does_not_contain_msg(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> str:
message_parts = [
f"Ожидаемый результат: Список элементов: {objects_list}",
f"Не содержит элемента: {forbidden_object}",
]
return self._build_message(message_parts)
def contains_msg(self, container: Any, expected_item: Any) -> str:
if isinstance(container, str):
message_parts = [f"Ожидаемый результат:\n строка: '{container}' содержит подстроку '{expected_item}'"]
else:
message_parts = [
f"Ожидаемый результат:\n список: {self._format_val(container)} содержит элемент {expected_item}"
]
return self._build_message(message_parts)
def is_true_with_details_msg(self, expected_text: str, actual_text: str) -> str:
message_parts = [
f"Ожидаемый результат: {expected_text}",
f"Фактический результат: {actual_text}",
]
return self._build_message(message_parts)
class StepCheck:
"""
Обёртка для проверки
Внутри всегда формируется единое сообщение и открывается allure.step.
"""
def __init__(self, check_step: str, field_name: str, failures: Optional[List[str]] = None):
# Сохраняем название проверки
self.check_step = check_step
self._field_name = field_name
# Поля для хранения expected/actual/extra перед вызовом метода-проверки
self._expected: Optional[Any] = None
self._actual: Optional[Any] = None
self._extra_info: Optional[str] = None
self._msg_builder = StepMessageBuilder(check_step, field_name)
# Хранение фейлов
self._failures = failures
def expected(self, value: Any) -> StepCheck:
"""Задаём ожидаемое значение и возвращаем self для цепочки вызовов"""
self._expected = value
return self
def actual(self, value: Any) -> StepCheck:
"""Задаём фактическое значение и возвращаем self"""
self._actual = value
return self
def extra(self, text: str) -> StepCheck:
"""Задаём дополнительный текст"""
self._extra_info = text
return self
def _handle_assertion(self, exc: AssertionError) -> None:
"""
Сохраняет traceback в список failures или перебрасывает дальше, если list не передан
"""
if self._failures is not None:
self._failures.append(traceback.format_exc())
else:
raise exc
def equal_to(self, expected: Optional[Any] = None) -> None:
"""
Выполняет проверку is_equal_to. Можно передать expected в метод или задать раньше через .expected(...)
"""
# Если expected пришёл в аргументе — сохраняем его
if expected is not None:
self._expected = expected
# Проверяем, что actual задан
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове equal_to()")
msg = self._msg_builder.equal_to_msg(self._expected, self._actual)
try:
with allure.step(msg):
# Бросаем AssertionError в момент выполнения шага, чтобы Allure увидел failed-step
assert_that(self._actual).described_as(msg).is_equal_to(self._expected)
except AssertionError as exc:
# Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
self._handle_assertion(exc)
def is_not_equal_to(self, expected: Optional[Any] = None) -> None:
"""
Выполняет проверку is_not_equal_to. Можно передать expected в метод или задать раньше через .expected(...)
"""
# Если expected пришёл в аргументе — сохраняем его
if expected is not None:
self._expected = expected
# Проверяем, что actual задан
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_not_equal_to()")
msg = self._msg_builder.is_not_equal_to_msg(self._expected, self._actual)
try:
with allure.step(msg):
# Бросаем AssertionError в момент выполнения шага, чтобы Allure увидел failed-step
assert_that(self._actual).described_as(msg).is_not_equal_to(self._expected)
except AssertionError as exc:
# Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
self._handle_assertion(exc)
def is_not_none(self) -> None:
"""Проверка на существование поля"""
msg = self._msg_builder.is_not_none_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_not_none()
except AssertionError as exc:
# Ловушка для исключения сразу после выхода из with - сохраняем traceback и продолжаем
self._handle_assertion(exc)
def is_none(self) -> None:
"""Проверка на отсутствие поля"""
msg = self._msg_builder.is_none_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_none()
except AssertionError as exc:
self._handle_assertion(exc)
def is_not_empty(self) -> None:
"""Проверка на не пустое значение"""
if self._actual is None:
fail("Фактический результат должен быть заполнен при вызове is_not_empty()")
msg = self._msg_builder.is_not_empty_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_not_empty()
except AssertionError as exc:
self._handle_assertion(exc)
def is_empty(self) -> None:
"""Проверка на пустое значение"""
if self._actual is None:
fail("Фактический результат должен быть заполнен при вызове is_not_empty()")
msg = self._msg_builder.is_empty_msg(self._actual)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_empty()
except AssertionError as exc:
self._handle_assertion(exc)
def is_close_to(self, expected: Any, allowed_diff: int | float, extra_info: Any) -> None:
"""
Проверка допуска
"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_close_to()")
self._expected = expected
msg = self._msg_builder.is_close_to_msg(expected, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_close_to(expected, allowed_diff)
except AssertionError as exc:
self._handle_assertion(exc)
def is_less_than(self, threshold: Any, extra_info: Any = None) -> None:
"""Проверка, что значение меньше порога"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_less_than()")
msg = self._msg_builder.is_less_than_msg(threshold, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_less_than(threshold)
except AssertionError as exc:
self._handle_assertion(exc)
def is_true_with_details(self, expected_text: str, actual_text: str) -> None:
"""Проверка булева условия с описанием ожидания и факта."""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_true_with_details()")
msg = self._msg_builder.is_true_with_details_msg(expected_text, actual_text)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_true()
except AssertionError as exc:
self._handle_assertion(exc)
def is_greater_than(self, threshold: Any, extra_info: Any = None) -> None:
"""Проверка, что значение строго больше порога"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_greater_than()")
msg = self._msg_builder.is_greater_than_msg(threshold, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_greater_than(threshold)
except AssertionError as exc:
self._handle_assertion(exc)
def is_between(self, lower_bound: Any, upper_bound: Any) -> None:
"""Проверка, что значение в пределах установленных границ"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_between()")
msg = self._msg_builder.is_between_msg(self._actual, lower_bound, upper_bound)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_between(lower_bound, upper_bound)
except AssertionError as exc:
self._handle_assertion(exc)
def is_greater_than_or_equal_to(self, threshold: Any, extra_info: Any = None) -> None:
"""Проверка, что значение больше или равно порогу"""
if self._actual is None:
raise ValueError("Фактический результат должен быть заполнен при вызове is_greater_than_or_equal_to_msg()")
msg = self._msg_builder.is_greater_than_or_equal_to_msg(threshold, self._actual, extra_info)
try:
with allure.step(msg):
assert_that(self._actual).described_as(msg).is_greater_than_or_equal_to(threshold)
except AssertionError as exc:
self._handle_assertion(exc)
def does_not_contain(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> None:
"""
Выполняет проверку does_not_contain.
"""
msg = self._msg_builder.does_not_contain_msg(objects_list, forbidden_object)
try:
with allure.step(msg):
assert_that(objects_list).described_as(msg).does_not_contain(forbidden_object)
except AssertionError as exc:
self._handle_assertion(exc)
def contains(self, container: Any, expected_item: Any) -> None:
"""Проверка, что container (список или строка) содержит expected_item."""
msg = self._msg_builder.contains_msg(container, expected_item)
try:
with allure.step(msg):
assert_that(container).described_as(msg).contains(expected_item)
except AssertionError as exc:
self._handle_assertion(exc)
редж реп
"""
Утилиты для разбора xlsx-отчёта об отбракованных входных данных.
"""
from __future__ import annotations
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from typing import Iterable, List, Optional, Tuple
from constants.enums import RejectionSensorTag
from constants.test_constants import ExportRejectedReportConstants as RejectedReportConst
from test_config.models_for_tests import RejectionReportRow, RejectionTestCase
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers.lds_status_report_xlsx_utils import format_duration_seconds, parse_duration_seconds
from utils.helpers.ws_test_utils import localize_as_moscow
MergeKey = Tuple[Optional[datetime], str, str, str, str]
@dataclass
class RejectionReportCaseCheck:
"""Подготовленные данные для проверки одного RejectionTestCase в xlsx-отчёте."""
case_label: str
tag_description: str
report_event: str
window_start: datetime
window_end: datetime
row_found: bool
found_row_summary: str
datetime_in_window: bool = False
datetime_actual_text: str = "(пусто)"
actual_duration_seconds: int = 0
expected_duration_seconds: int = 0
expected_duration_text: str = ""
pipe_section: str = ""
actual_signal_suffix: str = ""
expected_signal_suffix: str = ""
def prepare_rejection_report_case_checks(
monitored_rows: Iterable[RejectionReportRow],
rejection_cases: Iterable[RejectionTestCase],
imitator_start_time: datetime,
) -> List[RejectionReportCaseCheck]:
"""Собирает все вычисленные значения для проверки строк отчёта по кейсам набора."""
case_checks: List[RejectionReportCaseCheck] = []
for rejection_case in rejection_cases:
report_event = expected_event_to_report_event(rejection_case.expected_event)
window_start, window_end = get_case_time_window(imitator_start_time, rejection_case)
case_label = f"события '{report_event}' - {rejection_case.sensor.description}"
expected_signal_suffix = report_signal_suffix_by_expected_name(rejection_case.expected_signal_name)
raw_case_rows = filter_rows_for_rejection_case(monitored_rows, rejection_case, imitator_start_time)
merged_case_rows = merge_rejection_rows(raw_case_rows)
primary_row = select_primary_merged_row(merged_case_rows)
if primary_row is None:
case_checks.append(
RejectionReportCaseCheck(
case_label=case_label,
tag_description=rejection_case.sensor.description,
report_event=report_event,
window_start=window_start,
window_end=window_end,
row_found=False,
found_row_summary="строка не найдена",
expected_signal_suffix=expected_signal_suffix,
)
)
continue
merge_key = build_merge_key(primary_row)
expected_duration_seconds = sum_duration_for_merge_key(raw_case_rows, merge_key)
pipe_section, actual_signal_suffix = split_object_column(primary_row.object_value)
datetime_in_window = is_datetime_within_closed_interval(
primary_row.datetime_value,
window_start,
window_end,
)
case_checks.append(
RejectionReportCaseCheck(
case_label=case_label,
tag_description=rejection_case.sensor.description,
report_event=report_event,
window_start=window_start,
window_end=window_end,
row_found=True,
found_row_summary=(
f"{primary_row.tag_value} | {primary_row.event_value} | {primary_row.datetime_value}"
),
datetime_in_window=datetime_in_window,
datetime_actual_text=str(primary_row.datetime_value) if primary_row.datetime_value else "(пусто)",
actual_duration_seconds=primary_row.duration_seconds,
expected_duration_seconds=expected_duration_seconds,
expected_duration_text=format_duration_seconds(expected_duration_seconds),
pipe_section=pipe_section,
actual_signal_suffix=actual_signal_suffix,
expected_signal_suffix=expected_signal_suffix,
)
)
return case_checks
def expected_event_to_report_event(expected_event: str) -> str:
"""Преобразует формулировку события из журнала в формулировку отчёта."""
return expected_event.replace("Отбраковка", "Отбракован", 1)
def report_signal_suffix_by_expected_name(expected_signal_name: str) -> str:
"""Возвращает суффикс сигнала в колонке 'Объект' отчёта по expected_signal_name из кейса."""
return RejectedReportConst.REPORT_SIGNAL_SUFFIX_BY_EXPECTED_NAME.get(
expected_signal_name,
expected_signal_name,
)
def split_object_column(object_value: str) -> tuple[str, str]:
"""
Разбирает колонку "Объект":
- до последней точки - участок трубопровода (имя объекта);
- после последней точки - название сигнала.
"""
if not object_value:
return "", ""
if RejectedReportConst.OBJECT_SIGNAL_SEPARATOR not in object_value:
return object_value.strip(), ""
pipe_section, signal_suffix = object_value.rsplit(
RejectedReportConst.OBJECT_SIGNAL_SEPARATOR,
RejectedReportConst.OBJECT_SIGNAL_RSPLIT_MAXSPLIT,
)
return pipe_section.strip(), signal_suffix.strip()
def is_datetime_within_closed_interval(
value: datetime,
interval_start: datetime,
interval_end: datetime,
) -> bool:
"""True, если value (в Europe/Moscow) попадает в закрытый интервал [interval_start, interval_end]."""
localized_value = localize_as_moscow(value)
return interval_start <= localized_value <= interval_end
def report_header_contains_expected_title(raw_title: str) -> bool:
"""Проверяет, что первая строка шапки xlsx содержит ожидаемый заголовок отчёта."""
title_lower = raw_title.lower()
return (
RejectedReportConst.REJECTED_REPORT_HEADER_TITLE_PART in title_lower
or RejectedReportConst.REJECTED_REPORT_HEADER_TITLE_PART_ALT in title_lower
)
def build_merge_key(row: RejectionReportRow) -> MergeKey:
"""Ключ объединения строк с одинаковым содержимым, кроме длительности."""
return (
row.datetime_value,
row.object_value,
row.event_value,
row.value_text,
row.tag_value,
)
def parse_rejection_report_row(row_index: int, cells: dict[str, str]) -> RejectionReportRow:
"""Собирает RejectionReportRow из словаря ячеек строки отчёта."""
duration_seconds = parse_duration_seconds(cells.get(RejectedReportConst.COL_DURATION)) or 0
return RejectionReportRow(
row_index=row_index,
datetime_value=report_utils.parse_report_datetime(cells.get(RejectedReportConst.COL_DATETIME)),
object_value=(cells.get(RejectedReportConst.COL_OBJECT) or "").strip(),
event_value=(cells.get(RejectedReportConst.COL_EVENT) or "").strip(),
value_text=(cells.get(RejectedReportConst.COL_VALUE) or "").strip(),
duration_seconds=duration_seconds,
tag_value=(cells.get(RejectedReportConst.COL_TAG) or "").strip(),
)
def iter_rejection_report_rows(worksheet) -> List[RejectionReportRow]:
"""Возвращает строки данных отчёта, начиная с третьей строки листа."""
headers = report_utils.get_report_column_headers(
worksheet,
headers_row=RejectedReportConst.REPORT_COLUMN_HEADERS_ROW,
)
if not headers:
return []
rows: List[RejectionReportRow] = []
for excel_row_index, row_values in enumerate(
worksheet.iter_rows(
min_row=RejectedReportConst.REPORT_DATA_FIRST_ROW,
max_col=len(headers),
values_only=True,
),
start=RejectedReportConst.REPORT_DATA_FIRST_ROW,
):
if not any(cell is not None and str(cell).strip() for cell in row_values):
continue
cells = report_utils.build_column_cells(row_values, headers)
rows.append(parse_rejection_report_row(excel_row_index, cells))
return rows
def filter_rows_by_monitored_tags(
rows: Iterable[RejectionReportRow],
monitored_tags: Iterable[RejectionSensorTag],
) -> List[RejectionReportRow]:
"""Оставляет только строки с тегами из RejectionSensorTag."""
allowed_tags = {tag.description for tag in monitored_tags}
return [row for row in rows if row.tag_value in allowed_tags]
def get_case_time_window(
imitator_start_time: datetime,
rejection_case: RejectionTestCase,
tolerance_seconds: int = RejectedReportConst.TIME_FILTER_TOLERANCE_SECONDS,
) -> tuple[datetime, datetime]:
"""Возвращает окно фильтрации строк отчёта для конкретного RejectionTestCase."""
imitator_msk = localize_as_moscow(imitator_start_time)
window_start = imitator_msk + timedelta(seconds=rejection_case.time_range_start_s - tolerance_seconds)
window_end = imitator_msk + timedelta(seconds=rejection_case.time_range_end_s + tolerance_seconds)
return window_start, window_end
def filter_rows_for_rejection_case(
rows: Iterable[RejectionReportRow],
rejection_case: RejectionTestCase,
imitator_start_time: datetime,
) -> List[RejectionReportRow]:
"""Фильтрует строки отчёта по тегу, событию и временному окну RejectionTestCase."""
report_event = expected_event_to_report_event(rejection_case.expected_event)
window_start, window_end = get_case_time_window(imitator_start_time, rejection_case)
filtered_rows: List[RejectionReportRow] = []
for row in rows:
if row.tag_value != rejection_case.sensor.description:
continue
if row.event_value != report_event:
continue
if row.datetime_value is None:
continue
if not is_datetime_within_closed_interval(row.datetime_value, window_start, window_end):
continue
filtered_rows.append(row)
return filtered_rows
def merge_rejection_rows(rows: Iterable[RejectionReportRow]) -> List[RejectionReportRow]:
"""
Объединяет полностью идентичные строки, суммируя длительность отбраковки.
"""
merged_rows: dict[MergeKey, RejectionReportRow] = {}
for row in rows:
merge_key = build_merge_key(row)
if merge_key not in merged_rows:
merged_rows[merge_key] = replace(row)
continue
merged_rows[merge_key].duration_seconds += row.duration_seconds
return list(merged_rows.values())
def select_primary_merged_row(merged_rows: List[RejectionReportRow]) -> Optional[RejectionReportRow]:
"""Выбирает основную строку отбраковки - с максимальной суммарной длительностью."""
if not merged_rows:
return None
return max(merged_rows, key=lambda row: row.duration_seconds)
def sum_duration_for_merge_key(rows: Iterable[RejectionReportRow], merge_key: MergeKey) -> int:
"""Суммирует длительности всех сырых строк с одинаковым ключом объединения."""
return sum(row.duration_seconds for row in rows if build_merge_key(row) == merge_key)
def format_rejection_rows_for_allure(rows: Iterable[RejectionReportRow]) -> str:
"""Форматирует строки отчёта для вложения в Allure."""
lines = []
for row in rows:
duration_text = format_duration_seconds(row.duration_seconds)
lines.append(
f"row#{row.row_index}: {row.datetime_value} | {row.object_value} | "
f"{row.event_value} | {row.value_text} | {duration_text} | {row.tag_value}"
)
return "\n".join(lines) if lines else "(нет строк)"
реп эксел
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow
@dataclass
class ReportTitleInfo:
"""Разобранная шапка отчёта"""
raw_title: str
period_start: Optional[datetime] = None
period_end: Optional[datetime] = None
@dataclass
class LeakReportRow:
"""Разобранная строка данных по утечке"""
row_index: int
cells: Dict[str, str] = field(default_factory=dict)
@property
def datetime_value(self) -> Optional[datetime]:
return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))
@property
def object_value(self) -> str:
return self.cells.get(ReportConst.COL_OBJECT, "")
@property
def lds_status(self) -> str:
return self.cells.get(ReportConst.COL_LDS_STATUS, "")
@property
def masking_info(self) -> str:
return self.cells.get(ReportConst.COL_MASK_INFO, "")
@property
def coordinate_meters(self) -> Optional[float]:
coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
if coordinate_km is None:
return None
return coordinate_km * TestConst.KM_TO_METERS
@property
def leak_volume(self) -> Optional[float]:
return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))
@property
def mt_mode(self) -> str:
return self.cells.get(ReportConst.COL_MT_MODE, "")
def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
"""Проверяет zip-сигнатуру xlsx"""
if not file_bytes:
return False
return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)
def is_xlsx_extension(file_name: str) -> bool:
"""Проверяет расширение .xlsx без учёта регистра."""
return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)
def parse_report_datetime(value: object) -> Optional[datetime]:
"""Парсит дату/время из ячейки отчёта."""
if value is None:
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
except ValueError:
return None
return None
def _stringify_cell(value: object) -> str:
if value is None:
return ""
if isinstance(value, datetime):
return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
return str(value)
def normalize_report_period_naive(value: datetime) -> datetime:
"""Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)
def report_period_comparison_bounds(
period_start: datetime,
period_end: datetime,
tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
"""
Границы периода с допуском +-tolerance_minutes для start и end отдельно.
Возвращает (start_lower, start_upper, end_lower, end_upper).
"""
start = normalize_report_period_naive(period_start)
end = normalize_report_period_naive(period_end)
delta = timedelta(minutes=tolerance_minutes)
return start - delta, start + delta, end - delta, end + delta
def build_export_report_file_name(
tu_description: str,
period_start: datetime,
period_end: datetime,
report_name_part: str = ReportConst.LEAKS_REPORT_NAME_PART,
name_tu_separator: str = " ",
) -> str:
"""
Имя xlsx при скачивании: '{название}{sep}{ТУ} DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx'.
По умолчанию - отчёт об утечках.
"""
start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
return (
f"{report_name_part}{name_tu_separator}{tu_description} {start_text} - {end_text}"
f"{ReportConst.XLSX_EXTENSION}"
)
def parse_period_from_export_file_name(
file_name: str,
period_pattern: str | None = None,
) -> tuple[Optional[datetime], Optional[datetime]]:
"""Извлекает границы периода из имени скачанного xlsx-файла."""
match = re.search(
period_pattern or ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
file_name.strip(),
re.IGNORECASE,
)
if match is None:
return None, None
parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")
def _parse_part(value: str) -> Optional[datetime]:
try:
return datetime.strptime(value.replace("_", ":"), parse_format)
except ValueError:
return None
return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))
def parse_report_title(
title_raw: object,
header_period_pattern: str | None = None,
) -> ReportTitleInfo:
"""
Парсит шапку отчёта с именованными группами period_start/period_end.
"""
title_str = _stringify_cell(title_raw)
pattern = header_period_pattern or ReportConst.REPORT_HEADER_PERIOD_PATTERN
match = re.search(pattern, title_str)
if match is None:
return ReportTitleInfo(raw_title=title_str)
return ReportTitleInfo(
raw_title=title_str,
period_start=parse_report_datetime(match.group("period_start")),
period_end=parse_report_datetime(match.group("period_end")),
)
def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
"""Открывает первый лист xlsx. При ошибке возвращает None."""
if not file_path.exists():
return None
try:
workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
except Exception:
return None
sheet_names = workbook.sheetnames
if not sheet_names:
return None
return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]
def get_report_title_cell(worksheet: Worksheet) -> object:
return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value
def get_report_column_headers(
worksheet: Worksheet,
headers_row: int = ReportConst.REPORT_COLUMN_HEADERS_ROW,
) -> List[str]:
"""Возвращает непустые заголовки колонок из указанной строки шапки."""
headers: List[str] = []
column_index = 1
while True:
cell_value = worksheet.cell(row=headers_row, column=column_index).value
if cell_value is None or not str(cell_value).strip():
break
headers.append(_stringify_cell(cell_value).strip())
column_index += 1
return headers
def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
"""Собирает словарь {название колонки: значение ячейки} по строке данных."""
return {
header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
for column_index, header in enumerate(headers)
}
def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
"""
Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
Пустые строки пропускаются.
"""
headers = get_report_column_headers(worksheet)
if not headers:
return []
rows: List[LeakReportRow] = []
for excel_row_index, row_values in enumerate(
worksheet.iter_rows(
min_row=ReportConst.REPORT_DATA_FIRST_ROW,
max_col=len(headers),
values_only=True,
),
start=ReportConst.REPORT_DATA_FIRST_ROW,
):
if not any(cell is not None and str(cell).strip() for cell in row_values):
continue
rows.append(
LeakReportRow(
row_index=excel_row_index,
cells=build_column_cells(row_values, headers),
)
)
return rows
def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
"""Ищет первую строку, где колонка 'Объект' содержит подстроку без учёта регистра"""
substring_lower = object_substring.lower()
for row in rows:
if substring_lower in row.object_value.lower():
return row
return None
def save_report_bytes_to_temp_file(
file_bytes: bytes,
prefix: str = "leaks_report_",
) -> Optional[Path]:
"""Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
import tempfile
try:
with tempfile.NamedTemporaryFile(
suffix=ReportConst.XLSX_EXTENSION,
prefix=prefix,
delete=False,
) as temp_file:
temp_file.write(file_bytes)
return Path(temp_file.name)
except OSError:
return None
def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
"""Прикладывает xlsx к Allure при падении теста"""
try:
xlsx_type = allure.attachment_type.XLSX
except AttributeError:
xlsx_type = None
if xlsx_type is not None:
allure.attach.file(
str(file_path),
name=file_name,
attachment_type=xlsx_type,
extension="xlsx",
)
return
try:
with file_path.open("rb") as raw_file:
allure.attach(raw_file.read(), name=file_name, extension="xlsx")
except OSError:
pass
def read_worksheet_cell_value(
file_path: Path,
row: int,
column: int,
*,
data_only: bool = True,
sheet_index: int = ReportConst.DEFAULT_SHEET_INDEX,
) -> object:
"""
Читает значение ячейки из xlsx.
Для формул используем data_only=False, оказалось иначе openpyxl вернёт вычисленное значение.
"""
if not file_path.exists():
return None
try:
workbook = load_workbook(filename=str(file_path), read_only=True, data_only=data_only)
except Exception:
return None
sheet_names = workbook.sheetnames
if not sheet_names:
return None
worksheet = workbook[sheet_names[sheet_index]]
return worksheet.cell(row=row, column=column).value
def sum_duration_columns_across_rows(
section_rows: list,
mode_duration_columns: list[str],
) -> dict[str, int]:
"""Суммирует длительности по колонкам режимов для всех строк участков."""
totals = {column_name: 0 for column_name in mode_duration_columns}
for section_row in section_rows:
for column_name, duration_seconds in section_row.mode_durations_seconds.items():
totals[column_name] += duration_seconds
return totals
вс мс парс
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Type, TypeVar
from uuid import UUID
from zoneinfo import ZoneInfo
from allure import attach, attachment_type
from dacite import Config, DaciteError, from_dict
from msgpack import Timestamp
from pytest import fail
import models.subscribe_scheme_signals_state_model as signals_state_model
from constants.architecture_constants import WebSocketClientConstants
from constants.enums import ExportedDataType, ExportStatus
from models.acknowledge_leak_model import AcknowledgeLeakReply
from models.basic_info_model import BasicInfoReply
from models.export_reports_model import ReportDataExportedNotification
from models.get_exported_files_list_model import GetExportedDataListReply
from models.get_input_signals_model import GetInputSignalsReply
from models.get_messages_model import GetMessagesReply
from models.get_output_signals_model import GetOutputSignalsReply
from models.imitate_signal_model import ImitateSignalReply
from models.launch_pig_model import LaunchPigReply
from models.mask_lds_command_model import MaskLdsReply
from models.mask_signal_model import MaskSignalReply
from models.subscribe_all_leaks_info_model import SubscribeAllLeaksInfoReply
from models.subscribe_balance_algorithm_results_model import SubscribeBalanceAlgorithmResultsReply
from models.subscribe_common_scheme_model import SubscribeCommonSchemeReply
from models.subscribe_input_signals_model import InputSignal, SubscribeInputSignalsContent, SubscribeInputSignalsReply
from models.subscribe_leaks_model import SubscribeLeaksReply
from models.subscribe_main_page_info_model import SubscribeMainPageInfoReply
from models.subscribe_main_page_signals_info_model import SubscribeMainPageSignalsInfoReply
from models.subscribe_output_signals_model import SubscribeOutputSignalsReply
from models.subscribe_tu_leaks_info_model import SubscribeTuLeaksInfoReply
from models.unimitate_signal_model import UnimitateSignalReply
from models.unmask_lds_command_model import UnmaskLdsReply
from models.unmask_signal_model import UnmaskSignalReply
from models.upload_exported_file_model import DownloadExportedDataReply
MessageType = TypeVar("MessageType") # создает типовую переменную для парсинга сообщений
ContentType = TypeVar("ContentType")
_SIGNAL_DATA_POSITION = 1
_MIN_SIGNAL_TUPLE_LENGTH = 2
class WsMessageParser:
"""
Парсинг websocket сообщений
"""
def __init__(self, dacite_config: Config = None):
self._dacite_config = dacite_config or self._get_default_config()
self.suppress_recv_logging: bool = False
@staticmethod
def timestamp_to_datetime(value: Any) -> Optional[datetime]:
"""
Преобразует время из формата пары msgpack. Timestamp и tz в datetime с timezone
"""
try:
if value is None:
return None
if isinstance(value, list) and len(value) == 2:
time_timestamp, timezone_offset = value
if isinstance(time_timestamp, Timestamp):
datetime_timezone = timezone(timedelta(minutes=timezone_offset))
return datetime.fromtimestamp(time_timestamp.seconds, datetime_timezone)
if isinstance(value, Timestamp):
return datetime.fromtimestamp(value.seconds, tz=timezone.utc)
except (AttributeError, TypeError, ValueError) as error:
fail(f"Ошибка конвертации времени: {error}")
@staticmethod
def convert_to_uuid(value: Any) -> Optional[UUID]:
"""
Преобразует строку в UUID
"""
try:
if value is None:
return None
elif isinstance(value, UUID):
return value
elif isinstance(value, str):
return UUID(value)
except (AttributeError, TypeError, ValueError) as error:
fail(f"Ошибка конвертации UUID: {error}")
def parse_acknowledge_leak_msg(self, data: list) -> AcknowledgeLeakReply:
"""
Парсит acknowledgeLeak сообщение
"""
return self._find_and_parse_message(data_class=AcknowledgeLeakReply, data=data)
def parse_all_leaks_info_msg(self, data: list) -> SubscribeAllLeaksInfoReply:
"""
Парсит allLeaksInfo сообщение
"""
return self._find_and_parse_message(data_class=SubscribeAllLeaksInfoReply, data=data)
def parse_basic_info_msg(self, data: list) -> BasicInfoReply:
"""
Парсит basicInfo сообщение
"""
return self._find_and_parse_message(data_class=BasicInfoReply, data=data)
def parse_common_scheme_info_msg(self, data: list) -> SubscribeCommonSchemeReply:
"""
Парсит tuLeaksInfo сообщение
"""
return self._find_and_parse_message(data_class=SubscribeCommonSchemeReply, data=data)
def parse_imitate_signal_msg(self, data: list) -> ImitateSignalReply:
"""
Парсит сообщение ImitateSignal
"""
return self._find_and_parse_message(data_class=ImitateSignalReply, data=data)
def parse_input_signals_info_msg(self, data: list) -> SubscribeInputSignalsReply:
"""
Парсит сообщение InputSignalsInfo
"""
payload = self.find_reply_status_in_ws_msg(data)
reply_content = payload.get('replyContent')
input_signals_list = reply_content.get('inputSignals')
parsed_payload = SubscribeInputSignalsReply(
replyStatus=payload.get('replyStatus'),
replyErrors=payload.get('replyErrors'),
replyContent=SubscribeInputSignalsContent(
tuId=reply_content.get('tuId'),
inputSignals=[
self._parse_message(InputSignal, item[_SIGNAL_DATA_POSITION])
for item in input_signals_list
if self._is_valid_signal_tuple(item)
],
),
)
if parsed_payload.replyErrors:
fail(f"Ошибка в сообщении типа InputSignalsInfo: {parsed_payload.replyErrors}")
return parsed_payload
def parse_input_signals_msg(self, data: list) -> GetInputSignalsReply:
"""
Парсит сообщение getInputSignals
"""
return self._find_and_parse_message(data_class=GetInputSignalsReply, data=data)
def parse_journal_msg(self, data: list) -> GetMessagesReply:
"""
Парсит сообщение журнала messagesInfo
"""
return self._find_and_parse_message(data_class=GetMessagesReply, data=data)
def parse_leaks_content_msg(self, data: list) -> SubscribeLeaksReply:
"""
Парсит LeaksContent сообщение
"""
return self._find_and_parse_message(data_class=SubscribeLeaksReply, data=data)
def parse_main_page_msg(self, data: list) -> SubscribeMainPageInfoReply:
"""
Парсит сообщение mainPageInfo
"""
return self._find_and_parse_message(data_class=SubscribeMainPageInfoReply, data=data)
def parse_main_page_signals_msg(self, data: list) -> SubscribeMainPageSignalsInfoReply:
"""
Парсит сообщение mainPageSignalsInfo
"""
return self._find_and_parse_message(data_class=SubscribeMainPageSignalsInfoReply, data=data)
def parse_launch_pig_msg(self, data: list) -> LaunchPigReply:
"""
Парсит сообщение LaunchPig
"""
return self._find_and_parse_message(data_class=LaunchPigReply, data=data)
def parse_mask_signal_msg(self, data: list) -> MaskSignalReply:
"""
Парсит сообщение MaskSignal
"""
return self._find_and_parse_message(data_class=MaskSignalReply, data=data)
def parse_mask_lds_message(self, data: list) -> MaskLdsReply:
"""
Парсит сообщение maskLDSRequest
"""
return self._find_and_parse_message(data_class=MaskLdsReply, data=data)
def parse_unmask_lds_message(self, data: list) -> UnmaskLdsReply:
"""
Парсит сообщение UnmaskLDSRequest
"""
return self._find_and_parse_message(data_class=UnmaskLdsReply, data=data)
def parse_output_signals_info_msg(self, data: list) -> SubscribeOutputSignalsReply:
"""
Парсит OutputSignalsInfo сообщение
"""
return self._find_and_parse_message(data_class=SubscribeOutputSignalsReply, data=data)
def parse_output_signals_msg(self, data: list):
"""
Парсит getOutputSignals сообщение
"""
return self._find_and_parse_message(data_class=GetOutputSignalsReply, data=data)
def parse_balance_algorithm_msg(self, data: list) -> SubscribeBalanceAlgorithmResultsReply:
"""
Парсит BalanceAlgorithmResults сообщение
"""
return self._find_and_parse_message(data_class=SubscribeBalanceAlgorithmResultsReply, data=data)
def parse_tu_leaks_info_msg(self, data: list) -> SubscribeTuLeaksInfoReply:
"""
Парсит tuLeaksInfo сообщение
"""
return self._find_and_parse_message(data_class=SubscribeTuLeaksInfoReply, data=data)
def parse_scheme_signals_state_msg(self, data: list) -> signals_state_model.SchemeSignalsStateReply:
"""
Парсит сообщение SchemeSignalsStateContent.
signalsStates приходят как [[signal_type, data_dict], ...] - конвертируем кортежи в словари.
"""
payload = self.find_reply_status_in_ws_msg(data)
reply_content = payload.get('replyContent', {})
reply_content['signalsStates'] = [
item[_SIGNAL_DATA_POSITION]
for item in reply_content.get('signalsStates', [])
if self._is_valid_signal_tuple(item)
]
return self._parse_message(data_class=signals_state_model.SchemeSignalsStateReply, data=payload)
def parse_unimitate_signal_msg(self, data: list) -> UnimitateSignalReply:
"""
Парсит сообщение UnimitateSignal
"""
return self._find_and_parse_message(data_class=UnimitateSignalReply, data=data)
def parse_unmask_signal_msg(self, data: list) -> UnmaskSignalReply:
"""
Парсит сообщение UnmaskSignal
"""
return self._find_and_parse_message(data_class=UnmaskSignalReply, data=data)
def parse_report_data_exported_notification_msg(self, data: list) -> ReportDataExportedNotification:
"""
Парсит пуш-нотификацию ReportDataExportedNotification о готовности отчёта.
"""
return self._find_and_parse_message(data_class=ReportDataExportedNotification, data=data)
def parse_exported_data_list_msg(self, data: list) -> GetExportedDataListReply:
"""
Парсит ответ GetExportedDataListReply со списком сформированных файлов.
"""
return self._find_and_parse_message(data_class=GetExportedDataListReply, data=data)
def parse_download_exported_data_msg(self, data: list) -> DownloadExportedDataReply:
"""
Парсит ответ DownloadExportedDataReply со скачиваемым контентом файла (fileChunk).
"""
return self._find_and_parse_message(data_class=DownloadExportedDataReply, data=data)
def _find_and_parse_message(
self,
data_class: Type[ContentType],
data: List[Any],
config: Optional[Config] = None,
) -> ContentType:
"""
Ищет объект с replyStatus в ws сообщении и парсит его
"""
payload = self.find_reply_status_in_ws_msg(data)
parsed_payload = self._parse_message(data_class=data_class, data=payload, config=config)
return parsed_payload
def _parse_message(
self,
data_class: Type[MessageType],
data: dict,
config: Optional[Config] = None,
) -> MessageType:
"""
Универсальная функция парсинга сообщений
"""
data_class_name = data_class.__name__
if not data:
fail(f"Пустое сообщение типа: {data_class_name}")
error_message = data.get('replyErrors')
if error_message:
fail(f"Ошибка в сообщении типа: {data_class_name} текст ошибки: {error_message}")
try:
message = from_dict(
data_class=data_class, data=data, config=config or self._dacite_config # type: ignore[arg-type]
)
if not self.suppress_recv_logging:
attach(
str(message) + f" {datetime.now(ZoneInfo(WebSocketClientConstants.ZONE_INFO))}",
name=data_class_name,
attachment_type=attachment_type.TEXT,
)
return message
except DaciteError as error:
fail(f"Ошибка парсинга сообщения типа: {data_class_name} текст ошибки: {error}")
@staticmethod
def _is_valid_signal_tuple(item: Any) -> bool:
"""Проверяет, что элемент - кортеж/список [signal_type, signal_data_dict]."""
return (
isinstance(item, (list, tuple))
and len(item) >= _MIN_SIGNAL_TUPLE_LENGTH
and isinstance(item[_SIGNAL_DATA_POSITION], dict)
)
@staticmethod
def find_reply_status_in_ws_msg(data: List[Any]) -> Optional[Dict[str, Any]]:
"""
Ищет объект с replyStatus в ws сообщении
"""
if not data:
fail("Пустое сообщение")
try:
for item in reversed(data):
# 1) Если сам элемент — словарь с replyStatus
if isinstance(item, dict) and 'replyStatus' in item:
return item
# 2) Если элемент — список / кортеж — проверяем все элементы в нём
if isinstance(item, (list, tuple)):
for elem in item:
if isinstance(elem, dict) and 'replyStatus' in elem:
return elem
except (AttributeError, KeyError, TypeError, RuntimeError, ValueError):
fail("Не удалось найти replyStatus в сообщении")
def _get_default_config(self) -> Config:
"""
Получает конфиг с правилами обработки полей
"""
# TODO добавить strict=True, после выполнения задачи LDS-8792
def _to_export_status(value: Any) -> ExportStatus:
return value if isinstance(value, ExportStatus) else ExportStatus(value)
def _to_exported_data_type(value: Any) -> ExportedDataType:
return value if isinstance(value, ExportedDataType) else ExportedDataType(value)
return Config(
type_hooks={
UUID: self.convert_to_uuid,
datetime: self.timestamp_to_datetime,
ExportStatus: _to_export_status,
ExportedDataType: _to_exported_data_type,
}
)
# Создает экземпляр класса для удобства импорта
ws_message_parser = WsMessageParser()
вс тест ут
from __future__ import annotations
import asyncio
import pprint
import random
import re
from dataclasses import asdict, is_dataclass
from datetime import datetime, timedelta, timezone
from enum import IntEnum, IntFlag
from typing import Any, List, Optional, Set, Type, TypeVar
from zoneinfo import ZoneInfo
import allure
from msgpack import Timestamp as MsgpackTimestamp
from pytest import fail
from clients.websocket_client import WebSocketClient
from constants.architecture_constants import WebSocketClientConstants as WS_Const
from constants.enums import (
ConfirmationStatus,
ExportStatus,
LdsStatus,
LdsStatusDegradation,
LdsStatusFaulty,
LdsStatusInitialization,
LeakStatus,
ReplyStatus,
StationaryReason,
StationaryStatus,
StoppedPumpingReason,
UnStationaryReason,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.export_reports_model import ReportDataExportedContent, ReportDataExportedNotification
from models.get_messages_model import GetMessagesRequest
from models.subscribe_all_leaks_info_model import SubscribeAllLeaksInfoReply
from models.subscribe_common_scheme_model import DiagnosticArea, FlowArea
from models.subscribe_leaks_model import Leak
from models.subscribe_main_page_info_model import MainPageLeakInfo
from utils.helpers.ws_message_parser import ws_message_parser
from utils.msgpack_utils.message_filters import is_desired_invocation_id, is_desired_type
ObjectType = TypeVar("ObjectType") # создает типовую переменную для поиска объектов в списке
RandomObjectType = TypeVar("RandomObjectType")
Event = TypeVar("Event")
def convert_leak_volume_m3(volume: float) -> float:
"""
Преобразует объем утечки в м3/час
"""
# Округляет результат для читабельности
return round(volume * TestConst.MASS_KG, 3)
def datetime_minus_seconds(datetime_obj: datetime, delta_s: int) -> datetime:
"""
Вычитает время в секундах из datetime
"""
return (datetime_obj - timedelta(seconds=delta_s)).replace(microsecond=0)
def calculate_leak_start_time(imitator_start_time: datetime, leak_interval_seconds: int) -> Optional[datetime]:
"""
Рассчитывает время начала утечки на основе времени старта имитатора.
:param imitator_start_time: datetime объект времени старта имитатора
:param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
:return: datetime время ожидаемого начала утечки
"""
if not imitator_start_time:
return None
return (imitator_start_time + timedelta(seconds=leak_interval_seconds)).replace(microsecond=0)
def calculate_leak_end_time(
imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int
) -> Optional[datetime]:
"""
Рассчитывает крайнее время обнаружения утечки (с учётом допустимой погрешности).
:param imitator_start_time: datetime объект времени старта имитатора
:param leak_interval_seconds: интервал от старта до утечки в секундах (LEAK_START_INTERVAL)
:param allowed_diff_seconds: допустимая погрешность времени обнаружения (ALLOWED_TIME_DIFF_SECONDS)
:return: datetime крайнее время обнаружения утечки
"""
if not imitator_start_time:
return None
total_seconds = leak_interval_seconds + allowed_diff_seconds
return (imitator_start_time + timedelta(seconds=total_seconds)).replace(microsecond=0)
def get_leak_time_window(
imitator_start_time: datetime, leak_interval_seconds: int, allowed_diff_seconds: int, detected_at_tz=None
) -> tuple[datetime, datetime]:
"""
Возвращает временное окно для проверки времени обнаружения утечки.
:param imitator_start_time: datetime объект времени старта имитатора
:param leak_interval_seconds: интервал от старта до утечки в секундах
:param allowed_diff_seconds: допустимая погрешность времени обнаружения
:param detected_at_tz: timezone из времени обнаружения утечки (опционально)
:return: tuple (leak_start_time, leak_end_time) для использования в is_between проверке
"""
leak_start = calculate_leak_start_time(imitator_start_time, leak_interval_seconds)
leak_end = calculate_leak_end_time(imitator_start_time, leak_interval_seconds, allowed_diff_seconds)
# Если передан timezone, применяем его к временам для корректного сравнения
if detected_at_tz:
leak_start = leak_start.replace(tzinfo=detected_at_tz)
leak_end = leak_end.replace(tzinfo=detected_at_tz)
return leak_start, leak_end
def ensure_moscow_timezone(input_datetime: datetime) -> None | datetime:
"""
Конвертирует datetime в московское время, если оно не в московской таймзоне.
:param input_datetime: datetime объект
:return: datetime в московской таймзоне
"""
if input_datetime is None:
return input_datetime
# Если datetime без timezone - считаем что это UTC
if input_datetime.tzinfo is None:
input_datetime = input_datetime.replace(tzinfo=timezone.utc)
# Конвертируем в московское время
return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))
def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
"""
Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
"""
now = datetime.now(ZoneInfo(tz_name))
utc_offset = now.utcoffset()
if utc_offset is None:
return None
return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)
def localize_as_moscow(input_datetime: datetime) -> None | datetime:
"""
Присваивает datetime московский часовой пояс без сдвига времени.
Если datetime уже имеет timezone - конвертирует в московское время.
"""
if input_datetime is None:
return input_datetime
moscow_tz = ZoneInfo(TestConst.ZONE_INFO)
if input_datetime.tzinfo is None:
return input_datetime.replace(tzinfo=moscow_tz)
return input_datetime.astimezone(moscow_tz)
def format_datetime_moscow(value: Optional[datetime]) -> str:
"""Строковое представление datetime в Europe/Moscow для вложений Allure."""
if value is None:
return "None"
return str(localize_as_moscow(value))
def get_rejection_time_window(
imitator_start_time: datetime,
start_seconds: int | float,
reserve_seconds: int | float = 0,
) -> tuple[datetime, datetime]:
"""
Возвращает временное окно для проверки сообщения об отбраковке.
"""
imitator_msk = localize_as_moscow(imitator_start_time)
range_start = imitator_msk + timedelta(seconds=start_seconds - reserve_seconds)
range_end = localize_as_moscow(datetime.now())
return range_start, range_end
def find_rejection_journal_message(
messages_info: List[ObjectType],
tag: str,
range_start: datetime,
range_end: datetime,
technological_section: str,
expected_event: str,
) -> tuple[list[ObjectType], ObjectType | None]:
"""
Фильтрует сообщения журнала по tag и временному диапазону,
затем ищет целевое сообщение по technologicalSection и event.
"""
time_filtered = [
msg for msg in messages_info if msg.tag == tag and range_start <= ensure_moscow_timezone(msg.time) <= range_end
]
time_filtered.sort(key=lambda msg: ensure_moscow_timezone(msg.time), reverse=True)
target_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == technological_section and msg.event.rstrip() == expected_event
),
None,
)
return time_filtered, target_msg
def get_random_item(item_list: List[RandomObjectType]) -> RandomObjectType:
"""
Получает случайный объект из списка
"""
if not item_list:
return None
try:
return random.choice(item_list)
except (TypeError, ValueError):
return None
def get_longest_flow_area(flow_areas: List[FlowArea]) -> Optional[FlowArea]:
"""
Получает самый протяженный участок карты течения по количеству ДУ из списка всех участков
"""
if not flow_areas:
return None
try:
longest_flow_area = max(flow_areas, key=lambda flow_area: len(flow_area.diagnosticAreas))
return longest_flow_area
except (TypeError, ValueError):
return None
def determine_lds_status_by_priority(lds_status_set: Set[int]) -> Optional[int]:
"""
Определяет режим работы СОУ по приоритету и наличию режимов работы у ДУ на самом протяженном участки карты течений
"""
lds_status_priority = [
LdsStatus.FAULTY.value,
LdsStatus.INITIALIZATION.value,
LdsStatus.DEGRADATION.value,
LdsStatus.SERVICEABLE.value,
]
if not lds_status_set:
return None
try:
for status in lds_status_priority:
if status in lds_status_set:
return status
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_signal_type_by_address_suffix(signals_list: list, address_suffix: str) -> Optional[int]:
"""
Ищет в списке сигналов тип сигнала по части адреса
"""
if not signals_list:
return None
try:
for sensor_signal in signals_list:
if sensor_signal.address is not None and str(sensor_signal.address).endswith(address_suffix):
return sensor_signal.signalType
return None
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_signal_val_by_signal_type(signals_list: list, signal_type: int) -> Optional[str]:
"""
Ищет в списке сигналов значение сигнала по типу
"""
if not signals_list:
return None
try:
for sensor_signal in signals_list:
if sensor_signal.signalType is not None and sensor_signal.signalType == signal_type:
return sensor_signal.value
return None
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_object_by_field(item_list: List[ObjectType], field_name: str, value: Any) -> ObjectType:
"""
Ищет объект в списке объектов по значению одного из полей объекта
"""
if not item_list:
return None
try:
return next((item for item in item_list if getattr(item, field_name) == value))
except Exception:
return None
def find_object_by_a_few_fields(item_list: List[ObjectType], fields_dict: dict) -> ObjectType:
"""
Ищет объект в списке объектов по значениям нескольких полей
"""
if not item_list:
return None
return next(
(item for item in item_list if all(getattr(item, field) == value for field, value in fields_dict.items())), None
)
def parse_event(event_value: str) -> tuple[str | None, str | None]:
"""
Разделяет строку события на имя и причину, вложенную с скобки
"""
if not event_value or not isinstance(event_value, str):
return None, None
# Паттерн для "Состояние режима (Причина)"
pattern = r'^([^\(]+?)\s*\(([^\)]+)\)\s*$'
match = re.match(pattern, event_value)
if match:
mode_part = match.group(1).strip()
reason_part = match.group(2).strip()
return (mode_part if mode_part else None, reason_part if reason_part else None)
# Если значение события не соответствует паттерну, возвращаю текст, если нет текста, тогда None
return event_value, None
def get_signal(site_message, signal_type):
if not site_message:
return None
return next((s for s in site_message.signals if s.signalType == signal_type.value), None)
def get_value(obj):
return getattr(obj, "value", None)
def find_confirmed_leaks(item_list: List[Leak]) -> List[Leak]:
"""Ищет подтвержденные утечки"""
try:
return [
item
for item in item_list
if item.confirmationStatus == ConfirmationStatus.CONFIRMED.value and item.detectedAt is not None
]
except (AttributeError, KeyError, TypeError, ValueError):
return []
def find_confirmed_leaks_on_main_page(item_list: List[MainPageLeakInfo]) -> List[MainPageLeakInfo]:
"""Ищет подтвержденные утечки"""
try:
return [
item
for item in item_list
if item.leakStatus == LeakStatus.CONFIRMED.value and item.leakDetectedAt is not None
]
except (AttributeError, KeyError, TypeError, ValueError):
return []
def find_diagnostic_area_by_id(flow_areas: List[FlowArea], id_value: int) -> Optional[DiagnosticArea]:
"""
Ищет ДУ по id в списке участков карты течений, исключает дубликаты по количеству pipeIds
"""
candidates = []
if not flow_areas:
return None
try:
for flow_area in flow_areas:
for diagnostic_area in flow_area.diagnosticAreas:
if diagnostic_area.id == id_value:
candidates.append(diagnostic_area)
if not candidates:
return None
elif len(candidates) == 1:
return candidates[0]
else:
# Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
return max(candidates, key=lambda candidate: len(candidate.pipeIds))
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_diagnostic_area_by_pipe_id(flow_areas: List[FlowArea], pipe_id: int) -> Optional[DiagnosticArea]:
"""
Ищет ДУ по pipe id в списке участков карты течений, исключает дубликаты по количеству pipeIds
"""
candidates = []
if not flow_areas:
return None
try:
for flow_area in flow_areas:
for diagnostic_area in flow_area.diagnosticAreas:
if diagnostic_area.pipeIds and pipe_id in diagnostic_area.pipeIds:
candidates.append(diagnostic_area)
if not candidates:
return None
elif len(candidates) == 1:
return candidates[0]
else:
# Среди дубликатов ищет ДУ с наибольшим количеством pipeIds
return max(candidates, key=lambda candidate: len(candidate.pipeIds))
except (AttributeError, KeyError, RuntimeError, TypeError, ValueError):
return None
def find_diagnostic_areas_by_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
"""
Получает список ДУ из списка flow_areas по списку id
"""
diagnostic_areas = [
result
for diagnostic_area_id in id_list
if (result := find_diagnostic_area_by_id(flow_areas, diagnostic_area_id)) is not None
]
return diagnostic_areas
def find_diagnostic_areas_by_pipe_ids(flow_areas: List[FlowArea], id_list: List[int]) -> List[DiagnosticArea]:
"""
Получает список ДУ из списка flow_areas по списку pipe id
"""
diagnostic_areas = [
result for pipe_id in id_list if (result := find_diagnostic_area_by_pipe_id(flow_areas, pipe_id)) is not None
]
return diagnostic_areas
def find_base_diagnostic_areas(flow_areas: List[FlowArea]) -> List[DiagnosticArea]:
"""
Получает список базовых ДУ из списка flow_areas
"""
return find_diagnostic_areas_by_ids(flow_areas, TestConst.DIAGNOSTIC_AREA_BASE_IDS)
def find_leak_by_coordinate(
leaks_list: List[ObjectType], expected_coordinate: float, tolerance: float = TestConst.ALLOWED_DISTANCE_DIFF_METERS
) -> ObjectType:
"""
Ищет утечку в списке по координатам с допустимой погрешностью
"""
if not leaks_list:
return None
for leak in leaks_list:
leak_coordinate = getattr(leak, "leakCoordinate")
if leak_coordinate is None or "":
continue
if abs(leak_coordinate - expected_coordinate) <= tolerance:
return leak
return None
def to_moscow_timezone(date_str: str) -> Optional[datetime]:
"""
Преобразует строку времени в московское время
"""
if not date_str or not date_str.strip():
return None
try:
if date_str.startswith(("'", '"', '')) or date_str.endswith(("'", '"', '')):
date_str = date_str.strip().strip("'").strip('"')
date_utc = datetime.strptime(date_str, TestConst.OUTPUT_TIME_FORMAT).replace(tzinfo=timezone.utc)
return date_utc.astimezone(ZoneInfo(TestConst.ZONE_INFO))
except (AttributeError, TypeError, ValueError):
return None
def create_dict_from_dataclass(cls: Type, **kwargs) -> Optional[dict]:
"""Создает словарь из экземпляра dataclass c нужными параметрами"""
if not is_dataclass(cls):
return None
instance = cls(**kwargs)
return asdict(instance)
def datetime_to_msgpack_timestamp(dt: datetime) -> list:
"""Конвертирует datetime в формат [Timestamp(seconds, nanoseconds), tz_offset] для отправки на бэкенд."""
return [MsgpackTimestamp(seconds=int(dt.timestamp()), nanoseconds=0), 0]
def create_journal_req_body(**kwargs) -> dict:
"""Создает дефолтные параметры запроса к журналу"""
result = create_dict_from_dataclass(GetMessagesRequest, **kwargs)
period = result.get('periodTime')
if period:
for key in ('start', 'end'):
if isinstance(period.get(key), datetime):
period[key] = datetime_to_msgpack_timestamp(period[key])
return result
def extract_first_number(value: object) -> Optional[float]:
"""
Извлекает первое число из ячейки (int/float/str вида '55.55 км', '111.11 м3/ч').
"""
if value is None:
return None
if isinstance(value, (int, float)) and not isinstance(value, bool):
return float(value)
if isinstance(value, str):
matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
if matches:
try:
return float(matches[0].replace(",", "."))
except ValueError:
return None
return None
def _attach_ws_poll_failure(
collected_messages: List[Any],
total_wait_seconds: float,
expected_message_type: str,
) -> None:
"""Краткая сводка и pprint каждого WS-сообщения при таймауте поллинга."""
allure.attach(
"\n".join(
[
f"Таймаут ожидания: {total_wait_seconds} с",
f"Ожидаемый тип сообщения: {expected_message_type}",
f"Всего сообщений за период поллинга: {len(collected_messages)}",
]
),
name="WS poll timeout",
attachment_type=allure.attachment_type.TEXT,
)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
def _attach_ws_reply_parse_failure(
reply_payload: Optional[Any],
invocation_id: str,
request_name: str,
error: BaseException,
) -> None:
"""Прикрепляет к Allure ответ бэка при ошибке парсинга."""
allure.attach(
"\n".join(
[
f"Запрос: {request_name}",
f"invocation_id: {invocation_id}",
f"Ошибка: {error}",
]
),
name="WS parse failure",
attachment_type=allure.attachment_type.TEXT,
)
if reply_payload is not None:
allure.attach(
pprint.pformat(reply_payload, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
def _drain_recv_queue(ws_client: WebSocketClient) -> List[Any]:
"""Забирает все сообщения из очереди ws без блокирующего receive_by_..."""
messages: List[Any] = []
while not ws_client.recv_queue.empty():
try:
messages.append(ws_client.recv_queue.get_nowait())
except asyncio.QueueEmpty:
break
return messages
def _find_valid_report_export_notification(
messages: List[Any],
parser,
notification_type: str,
) -> Optional[ReportDataExportedNotification]:
"""Ищет среди уже полученных сообщений успешную ReportDataExportedNotification."""
for msg in messages:
if not isinstance(msg, list) or not is_desired_type(msg, notification_type):
continue
try:
notification = parser.parse_report_data_exported_notification_msg(msg)
except (ValueError, TypeError, KeyError):
continue
if notification.replyStatus != ReplyStatus.OK.value:
continue
content: Optional[ReportDataExportedContent] = notification.replyContent
if content is None:
continue
if content.exportStatus != ExportStatus.DONE:
continue
return notification
return None
async def poll_for_report_export_notification(
ws_client: WebSocketClient,
parser,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Собирает сообщения из очереди ws и ищет ReportDataExportedNotification с успешным exportStatus.
При таймауте прикрепляет к Allure все полученные за период сообщения.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
notification = _find_valid_report_export_notification(
batch, parser, ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION
)
if notification is not None:
return notification
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION,
)
return None
def _find_ws_reply_by_invocation_id(messages: List[Any], invocation_id: str, parser) -> Optional[list]:
"""
Ищет последний ответ с заданным invocation_id и телом replyStatus.
"""
reply_payload = None
for msg in messages:
if not isinstance(msg, list) or not is_desired_invocation_id(msg, invocation_id):
continue
if parser.find_reply_status_in_ws_msg(msg):
reply_payload = msg
return reply_payload
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
list_limit: int,
expected_data_type: Any,
name_substring: str,
tu_name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
"""
Периодически шлёт GetExportedDataListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
При таймауте или ошибке парсинга прикрепляет к Allure полученные ответы.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
collected_messages: List[Any] = []
request_name = ReportConst.GET_EXPORTED_DATA_LIST_REQUEST
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
drained_before_request = _drain_recv_queue(ws_client)
collected_messages.extend(drained_before_request)
await connect(
ws_client,
request_name,
{"limit": list_limit},
)
invocation_id = ws_client.invocation_id
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
list_reply_payload = _find_ws_reply_by_invocation_id(batch, invocation_id, parser)
if list_reply_payload is None:
continue
try:
parsed_payload = parser.parse_exported_data_list_msg(list_reply_payload)
except Exception as error:
_attach_ws_reply_parse_failure(list_reply_payload, invocation_id, request_name, error)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
fail(f"Не удалось разобрать ответ на {request_name}: {error}")
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={format_datetime_moscow(item.start)}, end={format_datetime_moscow(item.end)}"
for item in items
),
name=f"Список сформированных файлов (всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
tu_name_substring=tu_name_substring,
period_start=period_start,
period_end=period_end,
period_tolerance_minutes=period_tolerance_minutes,
)
if match is not None:
return match
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
request_name,
)
return None
def _normalize_report_period_datetime(value: datetime) -> datetime:
"""Приводит datetime периода отчёта к московскому времени без микросекунд."""
return localize_as_moscow(value).replace(microsecond=0)
def _exported_item_period_matches(
item_start: datetime,
item_end: datetime,
period_start: datetime,
period_end: datetime,
tolerance_minutes: int,
) -> bool:
"""Проверяет start/end элемента списка в пределах периода запроса +- tolerance_minutes."""
item_start_norm = _normalize_report_period_datetime(item_start)
item_end_norm = _normalize_report_period_datetime(item_end)
period_start_norm = _normalize_report_period_datetime(period_start)
period_end_norm = _normalize_report_period_datetime(period_end)
delta = timedelta(minutes=tolerance_minutes)
return (period_start_norm - delta) <= item_start_norm <= (period_start_norm + delta) and (
period_end_norm - delta
) <= item_end_norm <= (period_end_norm + delta)
def _normalize_report_text_for_match(text: str) -> str:
return text.lower().replace("ё", "е")
def find_matching_exported_item(
items: List[Any],
expected_data_type: Any,
name_substring: str,
tu_name_substring: str,
period_start: datetime,
period_end: datetime,
period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
"""
Ищет элемент списка по типу, подстрокам в имени (отчёт + ТУ) и периоду start/end с допуском.
"""
name_substring_normalized = _normalize_report_text_for_match(name_substring)
tu_name_normalized = _normalize_report_text_for_match(tu_name_substring)
matched_items = []
for item in items:
if item.exportedDataType != expected_data_type:
continue
item_name_normalized = _normalize_report_text_for_match(item.name or "")
if name_substring_normalized not in item_name_normalized:
continue
if tu_name_normalized not in item_name_normalized:
continue
if item.start is None or item.end is None:
continue
if not _exported_item_period_matches(item.start, item.end, period_start, period_end, period_tolerance_minutes):
continue
matched_items.append(item)
if not matched_items:
return None
return max(matched_items, key=lambda exported_item: exported_item.id)
def parse_journal_msg_value(value: str) -> tuple:
"""Парсит поле value в сообщении журнала"""
try:
# ищет группы цифр с точкой в строке
matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
coordinate, volume = (matches + [None, None])[:2]
if coordinate is not None:
try:
coordinate = float(coordinate)
except ValueError:
coordinate = None
if volume is not None:
try:
volume = float(volume)
except ValueError:
volume = None
return coordinate, volume
except (AttributeError, TypeError, ValueError):
return None, None
def parse_bit_flags(
value: int, enum_cls: Type[IntEnum | IntFlag], failures: Optional[List[str]] = None
) -> List[IntFlag]:
"""
Распаковка битовых флагов
"""
# 0 - это валидное состояние когда причин нет, в прошлой реализации тест бы падал если причин нет
# хотя это может быть ожидаемо, например при тестировании исправности или где-нибудь еще
if value == 0:
return []
found_flags = [flag for flag in enum_cls if value & flag.value]
known_bits = sum(flag.value for flag in found_flags)
if known_bits != value:
unknown_bits = value ^ known_bits
error_message = f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}"
if failures is not None:
failures.append(error_message)
else:
fail(f"Неизвестные биты при распаковке {enum_cls.__name__}: {unknown_bits}")
# та же сортировка только не цифры, а их текстовое значение
return sorted(found_flags, key=lambda flag: flag.value)
def get_reason_enum_by_lds_status(lds_status: int | LdsStatus, failures: Optional[List[str]] = None) -> Type[IntFlag]:
"""
Получение класса причин по режимам СОУ
"""
if isinstance(lds_status, int):
try:
lds_status = LdsStatus(lds_status)
except ValueError:
error_message = f"Неизвестный LdsStatus: {lds_status}"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
reason_by_lds_status = {
LdsStatus.FAULTY: LdsStatusFaulty,
LdsStatus.INITIALIZATION: LdsStatusInitialization,
LdsStatus.DEGRADATION: LdsStatusDegradation,
}
enum_class = reason_by_lds_status.get(lds_status)
if enum_class is None:
error_message = f"Для LdsStatus{lds_status.name} не определены причины"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
return enum_class
def get_reason_enum_by_stationary_status(
stationary_status: int | StationaryStatus, failures: Optional[List[str]] = None
) -> Type[IntFlag]:
"""
Получение класса причин по режимам МТ
"""
if isinstance(stationary_status, int):
try:
stationary_status = StationaryStatus(stationary_status)
except ValueError:
error_message = f"Неизвестный StationaryStatus: {stationary_status}"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
reason_by_stationary_status = {
StationaryStatus.STATIONARY: StationaryReason,
StationaryStatus.UNSTATIONARY: UnStationaryReason,
StationaryStatus.STOPPED: StoppedPumpingReason,
}
enum_class = reason_by_stationary_status.get(stationary_status)
if enum_class is None:
error_message = f"Для StationaryStatus{stationary_status.name} не определены причины"
if failures is not None:
failures.append(error_message)
else:
fail(error_message)
return enum_class
def parse_lds_status_reasons(lds_status: int, lds_status_reasons: int, failures: Optional[List[str]] = None):
"""
Получение списка ldsStatusReasons, соответствующего ldsStatus
"""
enum_cls = get_reason_enum_by_lds_status(lds_status, failures)
flags = enum_cls(lds_status_reasons)
if flags == str(lds_status_reasons):
error_message = f"Не удалось распаковать флаги: {lds_status_reasons} для {enum_cls.__name__}"
if failures is not None:
failures.append(error_message)
return flags
def parse_stationary_status_reasons(
stationary_status: int, stationary_status_reasons: int, failures: Optional[List[str]] = None
):
"""
Получение списка stationaryStatusReasons, соответствующего stationaryStatus
"""
enum_cls = get_reason_enum_by_stationary_status(stationary_status, failures)
flags = enum_cls(stationary_status_reasons)
if flags == str(stationary_status_reasons):
error_message = f"Не удалось распаковать флаги: {stationary_status_reasons} для {enum_cls.__name__}"
if failures is not None:
failures.append(error_message)
return flags
async def connect(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> None:
"""
Подключение к заданной подписке
"""
try:
with allure.step(f"Вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
await ws_client.invoke(ws_invoke_type, ws_invoke_params)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")
async def connect_stream(
ws_client: WebSocketClient,
ws_invoke_type: str,
ws_invoke_params: Any = None,
purpose: str = "streaming-вызов WS",
) -> None:
"""
Streaming-вызов (StreamInvocation)
"""
try:
with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(
f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
)
def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
"""Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
if not isinstance(msg, list):
return None
if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
return None
if not is_desired_invocation_id(msg, invocation_id):
return None
if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
return None
error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
if isinstance(error_text, str):
return error_text
return None
async def receive_download_exported_data_reply(
ws_client: WebSocketClient,
parser,
invocation_id: str,
request_name: str,
total_wait_seconds: float,
poll_interval_seconds: float = 0.5,
purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
"""
Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
for msg in batch:
stream_error = _stream_completion_error(msg, invocation_id)
if stream_error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
fail(
f"При {purpose} бэк вернул Completion с ошибкой "
f"({request_name}, invocation_id={invocation_id}): {stream_error}"
)
for msg in batch:
if (
not isinstance(msg, list)
or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
or not is_desired_invocation_id(msg, invocation_id)
):
continue
if parser.find_reply_status_in_ws_msg(msg) is None:
continue
try:
return parser.parse_download_exported_data_msg(msg)
except Exception as error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
fail(
f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
f"но не удалось разобрать ответ с fileChunk: {error}"
)
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
fail(
f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
)
async def connect_and_get_parsed_msg_by_tu_id(
tu_id: int,
ws_client: WebSocketClient,
ws_message_type: str,
ws_invoke_type: str,
ws_invoke_params: Any = None,
timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> SubscribeAllLeaksInfoReply:
"""
Подключается, ищет и парсит allLeaksInfo сообщение для конкретного ТУ
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
async def get_parsed_msg():
"""
Ищет и парсит allLeaksInfo сообщение для конкретного ТУ
"""
while True:
payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
parsed_payload = ws_message_parser.parse_all_leaks_info_msg(payload)
# Ищет сообщение с нужным ТУ
if parsed_payload.replyContent.tuId == tu_id:
return parsed_payload
try:
with allure.step(f"Получение сообщения с контентом типа: {ws_message_type} для ТУ {tu_id}"):
return await asyncio.wait_for(get_parsed_msg(), timeout=timeout)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось получить сообщение allLeaksInfo для ТУ {tu_id}. Ошибка: {error}")
async def connect_and_get_msg(ws_client: WebSocketClient, ws_invoke_type: str, ws_invoke_params: Any = None) -> list:
"""
Подключение типа get к заданной подписке и получение сообщения с заданным типом контента
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
invocation_id = ws_client.invocation_id
try:
with allure.step(f"Получение входящего сообщения c invocation_id: {invocation_id}"):
payload = await ws_client.receive_by_invocation_id(invocation_id)
return payload
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")
async def connect_and_subscribe_msg(
ws_client: WebSocketClient,
ws_message_type: str,
ws_invoke_type: str,
ws_invoke_params: Any = None,
timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> list:
"""
Подключение типа subscribe к заданной подписке и получение сообщения с заданным типом контента
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
try:
with allure.step(f"Получение сообщения с контентом типа: {ws_message_type}"):
payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
return payload
except (asyncio.TimeoutError, OSError, ConnectionError, ConnectionResetError) as error:
fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")
async def poll_balance_algorithm_diagnostic_areas(
ws_client: WebSocketClient,
ws_parser: ws_message_parser,
imitator_start_time: datetime,
end_time: datetime,
poll_interval: float,
) -> list:
"""
Опрашивает очередь ws_client на наличие BalanceAlgorithmResultsContent,
собирает и возвращает все diagnosticAreas из flowAreas.
"""
collected_diagnostic_areas = []
ws_client.suppress_recv_logging = True
ws_parser.suppress_recv_logging = True
try:
while datetime.now(tz=imitator_start_time.tzinfo) < end_time:
await asyncio.sleep(poll_interval)
latest_msg = None
while not ws_client.recv_queue.empty():
try:
msg = ws_client.recv_queue.get_nowait()
except asyncio.QueueEmpty:
break
if isinstance(msg, list) and is_desired_type(msg, "BalanceAlgorithmResultsContent"):
latest_msg = msg
if latest_msg is None:
continue
parsed_payload = ws_message_parser.parse_balance_algorithm_msg(latest_msg)
reply_content = parsed_payload.replyContent
if reply_content and reply_content.flowAreas:
for flow_area in reply_content.flowAreas:
if flow_area.diagnosticAreas:
collected_diagnostic_areas.extend(flow_area.diagnosticAreas)
finally:
ws_client.suppress_recv_logging = False
ws_parser.suppress_recv_logging = False
return collected_diagnostic_areas
def get_leak_diagnostic_area_samples(
collected_diagnostic_areas: list,
leak_diagnostic_area_name: str,
total_wait: int,
) -> list:
"""
Проверяет наличие diagnosticAreas и возвращает подмножество
для ДУ с заданным leak_diagnostic_area_id. Падает, если данные не найдены.
"""
if not collected_diagnostic_areas:
fail(f"За {total_wait} секунд не пришло ни одной diagnosticArea в BalanceAlgorithmResultsContent")
leak_diagnostic_area_samples = [
diagnostic_area
for diagnostic_area in collected_diagnostic_areas
if diagnostic_area.name == leak_diagnostic_area_name
]
if not leak_diagnostic_area_samples:
fail(f"За {total_wait} секунд не пришло ни одного сообщения для ДУ с name={leak_diagnostic_area_name}.")
return leak_diagnostic_area_samples
конфтест
import glob
import os
import shutil
import threading
import time
import allure
import pytest
import pytest_asyncio
from clients.keycloak_clients import KeycloakAuthError, KeycloakClient
from clients.testops_client import AllureResultsUploader, logger
from clients.websocket_client import WebSocketClient
from constants.architecture_constants import EnvKeyConstants as EnvConst
from constants.architecture_constants import ImitatorConstants as ImConst
from constants.architecture_constants import WebSocketClientConstants as WSCliConst
from constants.enums import RejectionSensorTag
from constants.test_constants import BaseTN3Constants
from infra.stand_setup_manager import StandSetupManager
from test_config.datasets import get_config_by_name
def pytest_addoption(parser):
"""
Добавляет кастомные опции командной строки pytest.
"""
parser.addoption(
"--suites",
action="store",
default=None,
help="Запустить только указанные наборы данных. Пример: --suites=select_4,select_19_20",
)
def _find_config_by_suite_name(suite_name: str):
"""Находит конфиг по имени набора данных."""
try:
return get_config_by_name(suite_name)
except ValueError:
return None
@pytest.fixture(autouse=True)
def allure_suite_hierarchy(request):
"""
Автоматически устанавливает иерархию Allure для группировки тестов по наборам данных.
В Allure отчёте тесты группируются:
- Parent Suite: SingleLeakSuite / MultiLeakSuite (тип набора)
- Suite: select_4 / select_6 / ... (имя набора данных)
Работает как с параметризованными тестами (config в параметрах),
так и с обычными тестами (через маркер test_suite_name).
"""
config = None
suite_name = None
# Пробуем получить конфиг из параметризации
if hasattr(request, 'fixturenames') and 'config' in request.fixturenames:
try:
config = request.getfixturevalue('config')
suite_name = config.suite_name
except Exception:
pass
# Если не нашли, пробуем найти конфиг по маркеру test_suite_name
if not config:
marker = request.node.get_closest_marker('test_suite_name')
if marker:
suite_name = marker.args[0]
config = _find_config_by_suite_name(suite_name)
if config and suite_name:
parent_suite = "MultiLeakSuite" if config.has_multiple_leaks else "SingleLeakSuite"
allure.dynamic.parent_suite(parent_suite)
allure.dynamic.suite(suite_name)
def pytest_configure(config):
"""
Храним состояние сессии
"""
config.addinivalue_line("markers", "critical_stop: если тест упал, останавливаем дальнейшее выполнение сессии")
config.group_state = {
"current_suite": None,
"suite_start_time": None,
"stand_manager": None,
"imitator_start_time": None, # datetime объект времени старта имитатора для расчёта интервалов утечек
}
def _update_sensor_ids(stand_manager: StandSetupManager) -> None:
"""
Для тестов датчиков обновляет sensor_id по address из конфигурации стенда.
"""
sensor_ids_by_address = stand_manager.get_sensor_ids_by_address()
BaseTN3Constants.SENSOR_IDS_BY_ADDRESS.update(sensor_ids_by_address)
RejectionSensorTag.update_ids_from_config(sensor_ids_by_address)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""
Делает падение критического теста с маркировкой critical_stop однозначным:
- рисуем fail для теста
- после него прекращаем запуск остальных тестов
"""
outcome = yield
report = outcome.get_result()
if report.when == "call" and report.failed and item.get_closest_marker("critical_stop"):
item.session.shouldstop = f"Критическая проверка упала: {item.nodeid}"
# ===== Маппинг имён тестов на атрибуты конфига для получения маркеров =====
# Используется для добавления offset и test_case_id маркеров во время сбора тестов
# Smoke-тесты уровня набора (маркеры из SmokeSuiteConfig)
SMOKE_SUITE_LEVEL_MAPPING = {
'test_basic_info': 'basic_info_test',
'test_journal_info': 'journal_info_test',
'test_imitate_pressure_sensor_signal': 'imitate_pressure_sensor_signal_test',
'test_imitate_flowmeter_signal': 'imitate_flowmeter_signal_test',
'test_lds_status_initialization': 'lds_status_initialization_test',
'test_lds_status_init_in_journal': 'lds_status_init_in_journal_test',
'test_main_page_info': 'main_page_info_test',
'test_main_page_info_signals': 'main_page_info_signals_test',
'test_mask_signal': 'mask_signal_test',
'test_mask_info_in_journal': 'mask_info_in_journal_test',
'test_lds_status_initialization_out': 'lds_status_initialization_out_test',
'test_lds_status_init_out_in_journal': 'lds_status_init_out_in_journal_test',
'test_main_page_info_unstationary': 'main_page_info_unstationary_test',
'test_mask_du_on_mini_scheme': 'mask_du_on_mini_scheme_test',
'test_unmask_du_on_mini_scheme': 'unmask_du_on_mini_scheme_test',
'test_lds_status_after_confirming_leak': 'lds_status_after_confirming_leak_test',
'test_lds_status_completed_leak': 'lds_status_completed_leak_test',
'test_diagnostics_of_signals_after_initialization': 'diagnostics_of_signals_after_initialization_test',
'test_mode_mt_in_journal': 'mode_mt_in_journal_test',
}
# Regress-тесты режимов СОУ (маркеры из LDSStatusConfig)
LDS_STATUS_SUITE_LEVEL_MAPPING = {
'test_lds_status_basic_info': 'lds_status_basic_info_test',
'test_lds_status_init_accumulation_data': 'init_accumulation_data_test',
'test_lds_status_init_accumulation_data_in_journal': 'init_accumulation_data_in_journal_test',
'test_lds_status_init_cold_start': 'init_cold_start_test',
'test_lds_status_init_cold_start_in_journal': 'init_cold_start_in_journal_test',
'test_lds_status_init_exiting_faulty': 'init_exiting_faulty_test',
'test_lds_status_init_switching_shut_off': 'init_switching_shut_off_test',
'test_lds_status_init_switching_shut_off_in_journal': 'init_switching_shut_off_in_journal_test',
'test_lds_status_serviceable_after_cold_start': 'serviceable_after_cold_start_test',
'test_lds_status_serviceable_after_cold_start_in_journal': 'serviceable_after_cold_start_in_journal_test',
'test_lds_status_serviceable_after_switching_shut_off': 'serviceable_after_switching_shut_off_test',
'test_lds_status_serviceable_after_switching_shut_off_in_journal': 'serviceable_after_switching_shut_off_in_journal_test', # noqa: E501
'test_lds_status_serviceable_after_deg_absence_min_pressure_sensors': 'serviceable_after_deg_absence_min_pressure_sensors_test', # noqa: E501
'test_lds_status_serviceable_after_deg_starting_pumping_out_pumps': 'serviceable_after_deg_starting_pumping_out_pumps_test', # noqa: E501
'test_lds_status_serviceable_after_deg_faulty_pressure_sensors_at_pump': 'serviceable_after_deg_faulty_pressure_sensors_at_pump_test', # noqa: E501
'test_lds_status_serviceable_after_deg_faulty_pressure_sensors_at_pump_in_journal': 'serviceable_after_deg_faulty_pressure_sensors_at_pump_in_journal_test', # noqa: E501
'test_lds_status_serviceable_after_faulty': 'serviceable_after_faulty_test',
'test_lds_status_degradation_additive_injectors_operation': 'deg_additive_injectors_operation_test',
'test_lds_status_degradation_exceeding_distance_between_pressure_sensors': 'deg_exceeding_distance_between_pressure_sensors_test', # noqa: E501
'test_lds_status_degradation_exceeding_distance_between_pressure_sensors_in_journal': 'deg_exceeding_distance_between_pressure_sensors_in_journal_test', # noqa: E501
'test_lds_status_degradation_absence_min_pressure_sensors': 'deg_absence_min_pressure_sensors_test',
'test_lds_status_degradation_faulty_pressure_sensors_at_pump_station': 'deg_faulty_pressure_sensors_at_pump_station_test', # noqa: E501
'test_lds_status_degradation_faulty_pressure_sensors_at_pump_station_in_journal': 'deg_faulty_pressure_sensors_at_pump_station_in_journal_test', # noqa: E501
'test_lds_status_degradation_gravity_section_pumping': 'deg_gravity_section_pumping_test',
'test_lds_status_degradation_gravity_section_pumping_in_stopping': 'deg_gravity_section_pumping_in_stopping_test',
'test_lds_status_degradation_gravity_section_pumping_in_stopping_in_journal': 'deg_gravity_section_pumping_in_stopping_in_journal_test', # noqa: E501
'test_lds_status_degradation_pig_sensor_passage': 'deg_pig_sensor_passage_test',
'test_lds_status_degradation_starting_pumping_out_pumps': 'deg_starting_pumping_out_pumps_test',
'test_lds_status_degradation_exceeding_distance_between_flow_meters': 'deg_exceeding_distance_between_flow_meters_test', # noqa: E501
'test_lds_status_degradation_rejection_temperature_sensor_on_du_2': 'deg_rejection_temperature_sensor_on_du_2_test',
'test_lds_status_degradation_rejection_temperature_sensor_on_du_3': 'deg_rejection_temperature_sensor_on_du_3_test',
'test_lds_status_degradation_rejection_temperature_sensor_on_du_5': 'deg_rejection_temperature_sensor_on_du_5_test',
'test_lds_status_degradation_rejection_density_and_viscosity_on_du_2': 'deg_rejection_density_and_viscosity_on_du_2_test', # noqa: E501
'test_lds_status_degradation_rejection_density_and_viscosity_on_du_3': 'deg_rejection_density_and_viscosity_on_du_3_test', # noqa: E501
'test_lds_status_degradation_rejection_density_and_viscosity_on_du_5': 'deg_rejection_density_and_viscosity_on_du_5_test', # noqa: E501
'test_lds_status_faulty_absence_min_flow_meters': 'faulty_absence_min_flow_meters_test',
'test_lds_status_faulty_absence_min_pressure_sensors': 'faulty_absence_min_pressure_sensors_test',
'test_lds_status_faulty_absence_min_pressure_sensors_in_journal': 'faulty_absence_min_pressure_sensors_in_journal_test', # noqa: E501
}
# Тесты уровня утечки (маркеры из LeakTestConfig - параметр leak)
LEAK_LEVEL_TEST_MAPPING = {
'test_leaks_content': 'leaks_content_test',
'test_all_leaks_info': 'all_leaks_info_test',
'test_tu_leaks_info': 'tu_leaks_info_test',
'test_leak_info_in_journal': 'leak_info_in_journal',
'test_possible_leak_in_journal': 'possible_leak_in_journal_test',
'test_acknowledge_leak_info': 'acknowledge_leak_test',
'test_acknowledge_leak_in_journal': 'acknowledge_leak_in_journal_test',
'test_output_signals': 'output_signals_test',
'test_lds_status_during_leak': 'lds_status_during_leak_test',
'test_balance_algorithm_leak_waiting': 'balance_algorithm_leak_waiting_test',
'test_balance_algorithm_leak_detected': 'balance_algorithm_leak_detected_test',
'test_the_leak_is_complete_on_kg': 'the_leak_is_complete_on_kg_test',
'test_leak_is_complete_in_output_signals': 'leak_is_complete_in_output_signals_test',
'test_complete_tu_leaks_info_content': 'complete_tu_leaks_info_content_test',
'test_all_leaks_is_empty': 'all_leaks_is_empty_test',
'test_leak_is_confirm_on_main_page': 'leak_is_confirm_on_main_page_test',
'test_leak_is_complete_on_main_page': 'leak_is_complete_on_main_page_test',
'test_balance_algorithm_leak_completed': 'balance_algorithm_leak_completed_test',
'test_completed_leak_info_in_journal': 'completed_leak_info_in_journal_test',
'test_export_leaks_report': 'export_leaks_report_test',
'test_export_lds_status_report': 'export_lds_status_report_test',
'test_export_mt_mode_report': 'export_mt_mode_report_test',
}
# Тесты уровня отбраковки (маркеры из RejectionTestCase - параметр rejection_case)
IS_REJECTED_LEVEL_TEST_MAPPING = {
'test_rejection_input_signals': 'rejection_input_signals_test',
'test_rejection_journal': 'rejection_journal_test',
'test_rejection_main_page': 'rejection_main_page_test',
'test_rejection_scheme_signals_state': 'rejection_scheme_signals_state_test',
}
# Suite-level тесты отбраковки (маркеры из IsRejectedConfig - параметр config)
IS_REJECTED_SUITE_LEVEL_MAPPING = {
'test_rejection_report': 'rejection_report_test',
}
# Мержим все вместе чтобы не переписывать логику коллектора айтемов (тестов)
SUITE_LEVEL_TEST_MAPPING = {**SMOKE_SUITE_LEVEL_MAPPING, **LDS_STATUS_SUITE_LEVEL_MAPPING}
def _get_test_markers_config(item, test_name):
"""
Получает конфигурацию с маркерами для теста.
Для leak-level тестов: маркеры берутся из параметра leak
Для suite-level тестов: маркеры берутся из config
:return: CaseMarkers объект или None
"""
if not hasattr(item, 'callspec'):
return None
params = item.callspec.params
# Проверяем, есть ли параметр leak (для leak-level тестов)
if 'leak' in params and test_name in LEAK_LEVEL_TEST_MAPPING:
leak = params['leak']
attr_name = LEAK_LEVEL_TEST_MAPPING[test_name]
return getattr(leak, attr_name, None)
# Проверяем, есть ли параметр rejection_case для тестов отбраковки
if 'rejection_case' in params and test_name in IS_REJECTED_LEVEL_TEST_MAPPING:
rejection_case = params['rejection_case']
attr_name = IS_REJECTED_LEVEL_TEST_MAPPING[test_name]
return getattr(rejection_case, attr_name, None)
# Suite-level тесты отбраковки (без rejection_case)
if 'config' in params and 'rejection_case' not in params and test_name in IS_REJECTED_SUITE_LEVEL_MAPPING:
suite_config = params['config']
attr_name = IS_REJECTED_SUITE_LEVEL_MAPPING[test_name]
return getattr(suite_config, attr_name, None)
# Для suite-level тестов берём из config
if 'config' in params:
if test_name in SMOKE_SUITE_LEVEL_MAPPING:
suite_config = params['config']
attr_name = SMOKE_SUITE_LEVEL_MAPPING[test_name]
return getattr(suite_config, attr_name, None)
if test_name in LDS_STATUS_SUITE_LEVEL_MAPPING:
suite_config = params['config']
attr_name = LDS_STATUS_SUITE_LEVEL_MAPPING[test_name]
return getattr(suite_config, attr_name, None)
return None
def pytest_collection_modifyitems(session, config, items):
"""
1. Фильтрует тесты по --suites (если указано)
2. Исключает тесты, у которых конфиг = None (тест отключён для этого набора данных)
3. Добавляет маркеры offset и test_case_id из конфига к каждому параметризованному тесту
4. Сортирует тесты по test_suite_name для группировки по наборам данных
"""
# Получаем список выбранных наборов из --suites
suites_option = config.getoption("--suites")
selected_suites = None
if suites_option:
# Парсим список наборов: "select_4,select_19_20" -> ["select_4", "select_19_20"]
selected_suites = [s.strip().lower() for s in suites_option.split(",")]
selected_items = []
deselected_items = []
for item in items:
# Фильтрация по --suites
if selected_suites:
suite_marker = item.get_closest_marker("test_suite_name")
if suite_marker:
suite_name = suite_marker.args[0].lower()
# Проверяем, содержит ли имя набора одну из выбранных подстрок
if not any(selected in suite_name for selected in selected_suites):
deselected_items.append(item)
continue
# Получаем имя функции теста (без параметров)
test_name = item.originalname or item.name.split('[')[0]
# Получаем конфиг с маркерами для теста
test_config = _get_test_markers_config(item, test_name)
if test_config is not None:
# Добавляем маркер offset
if hasattr(test_config, 'offset') and test_config.offset is not None:
item.add_marker(pytest.mark.offset(test_config.offset))
# Добавляем маркер test_case_id
if hasattr(test_config, 'test_case_id') and test_config.test_case_id is not None:
item.add_marker(pytest.mark.test_case_id(test_config.test_case_id))
elif (
test_name in SUITE_LEVEL_TEST_MAPPING
or test_name in LEAK_LEVEL_TEST_MAPPING
or test_name in IS_REJECTED_LEVEL_TEST_MAPPING
or test_name in IS_REJECTED_SUITE_LEVEL_MAPPING
): # noqa: E501
# Конфиг теста = None - исключаем тест из прогона
deselected_items.append(item)
continue
selected_items.append(item)
# Уведомляем pytest об исключённых тестах
if deselected_items:
config.hook.pytest_deselected(items=deselected_items)
# Заменяем список тестов на отфильтрованный
items[:] = selected_items
# Сортировка тестов по test_suite_name и offset
# Цель: обеспечить запуск тестов строго по offset строго внутри набора данных
# При равных offset сохраняем исходный порядок коллекции, чтобы порядок параметризации не перескакивал
def suite_offset_key(item):
"""
Сортировка тестов по test_suite_name и offset (без падения на None).
"""
test_suite_name_marker = item.get_closest_marker("test_suite_name")
test_suite_name = test_suite_name_marker.args[0] if test_suite_name_marker else ""
offset_marker = item.get_closest_marker("offset")
if offset_marker:
try:
offset_value = float(offset_marker.args[0])
except Exception:
offset_value = float("inf")
else:
offset_value = float("inf")
original_index = getattr(item, "_collection_index", 0)
# Возвращаем тройку ключей сортировки
# 1) test_suite_name - группировка по набору
# 2) offset_value - порядок внутри набора по времени
# 3) original_index - стабильность при равных offset
return test_suite_name, offset_value, original_index
# Сохраняем исходный порядок коллекции для стабильной сортировки
for index, item in enumerate(items):
item._collection_index = index
# по кортежу питон сортирует слева направо, благодаря этому сортировка по offset идет строго внутри test_suite_name
items.sort(key=suite_offset_key)
for item in items:
if hasattr(item, "_collection_index"):
delattr(item, "_collection_index")
@pytest.fixture(autouse=True)
def allure_tms_link(request):
"""
Allure TMS‑линки по test_case_id
"""
if test_case_id_marker := request.node.get_closest_marker("test_case_id"):
test_case_id = test_case_id_marker.args[0]
allure.dynamic.link(
f"https://{os.environ['TESTOPS_BASE_URL']}/testcases?selected_id={test_case_id}",
name=f"TestCase-{test_case_id}",
link_type="tms",
)
@pytest.fixture(autouse=True)
def offset_wait(request):
"""
Offset‑ожидание перед каждым тестом относительно фактического старта core
"""
if offset_marker := request.node.get_closest_marker("offset"):
offset_sec = float(offset_marker.args[0]) * 60
start = request.config.group_state["suite_start_time"] or 0
elapsed = time.monotonic() - start
to_wait = max(0, offset_sec - elapsed)
if to_wait:
time.sleep(to_wait)
def compute_imitator_duration(item, current_test_suite: str) -> float:
"""
Вычисляет длительность для имитатора (в минутах).
Правило:
- Собирает все тесты (item.session.items) с меткой test_suite_name == current_test_suite
- Извлекает все значения @pytest.mark.offset(...) (в минутах)
- Если offsets найдены: возвращает max(offsets) + IMITATOR_FINISH_DELAY задержка остановки имитатора
- Иначе: если у текущего item есть @pytest.mark.imitator_duration — используется как fallback и логируется
- Если ничего не найдено — pytest.fail с понятным текстом
"""
suite_items = [
suite_item
for suite_item in item.session.items
if (marker := suite_item.get_closest_marker("test_suite_name")) and marker.args[0] == current_test_suite
]
offsets = []
for suite_item in suite_items:
offset_marker = suite_item.get_closest_marker("offset")
if offset_marker:
try:
offsets.append(float(offset_marker.args[0]))
except Exception:
continue
if offsets:
max_offset = max(offsets)
imitator_duration = float(max_offset) + ImConst.IMITATOR_FINISH_DELAY_MINUTE
return imitator_duration
else:
# fallback- если все еще задан старый маркер imitator_duration, то используем его
if imitator_mark := item.get_closest_marker("imitator_duration"):
imitator_duration = float(imitator_mark.args[0])
logger.warning(
"[DEPRECATED] использован pytest.mark.imitator_duration()"
f"рекомендуется убрать и полагаться на max_offset + {ImConst.IMITATOR_FINISH_DELAY_MINUTE}"
)
return imitator_duration
pytest.fail(
"Не удалось вычислить imitator_duration: в тестовом модуле одновременно отсутствуют "
"и @pytest.mark.offset(), и pytest.mark.imitator_duration()"
)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_setup(item):
"""
Перезапуск имитатора при смене test_suite_name
"""
cfg = item.config.group_state
test_suite_marker = item.get_closest_marker("test_suite_name")
if not test_suite_marker:
pytest.fail("Тест без @pytest.mark.test_suite_name")
current_test_suite = test_suite_marker.args[0]
if current_test_suite != cfg["current_suite"]:
# stop old
if stand_manager := cfg["stand_manager"]:
stand_manager.stop_imitator_wrapper()
try:
stand_manager.restore_signal_unit_conversion_rules()
except Exception:
logger.exception(
"[ERROR] [SETUP] Ошибка при восстановлении signal_unit_conversion_rules.json "
"перед запуском нового набора"
)
if not os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
# При запуске с TestOps удаляет данные прогона
stand_manager.server_test_data_remover()
# start new
cfg["current_suite"] = current_test_suite
cfg["suite_start_time"] = None
data_id = item.get_closest_marker("test_suite_data_id").args[0]
test_data_name = item.get_closest_marker("test_data_name").args[0]
tu_id = item.get_closest_marker("tu_id").args[0]
imitator_duration = compute_imitator_duration(item, current_test_suite)
suite_config = _find_config_by_suite_name(current_test_suite)
measure_conversion_rules = suite_config.measure_conversion_rules if suite_config is not None else None
stand_manager = StandSetupManager(
duration_m=imitator_duration,
test_data_id=data_id,
test_data_name=test_data_name,
tu_id=tu_id,
measure_conversion_rules=measure_conversion_rules,
)
cfg["stand_manager"] = stand_manager
try:
stand_manager.check_opc_server_status()
except RuntimeError as error:
msg = (
"[SETUP] [ERROR] OPC сервер недоступен. Имитатор и автотесты не запущены. "
f"Ошибка при проверке статуса OPC: {error}"
)
allure.attach(msg, name="OPC сервер недоступен", attachment_type=allure.attachment_type.TEXT)
pytest.exit(msg)
try:
stand_manager.setup_stand_for_imitator_run()
except Exception as error:
pytest.exit(f"[SETUP] [ERROR] ошибка при подготовке стенда: {error}")
try:
_update_sensor_ids(stand_manager)
except Exception as error:
pytest.exit(f"[SETUP] [ERROR] ошибка обновления id датчиков отбраковки из конфигурации: {error}")
imitator_thread = threading.Thread(
target=stand_manager.start_imitator, name=f"imitator->{current_test_suite}", daemon=True
)
core_thread = threading.Thread(target=stand_manager.start_core)
try:
imitator_thread.start()
except Exception as error:
pytest.exit(f"[SETUP] [ERROR] ошибка запуска имитатора: {error}")
time.sleep(ImConst.CORE_START_DELAY_S)
try:
cfg["suite_start_time"] = time.monotonic()
core_thread.start()
core_thread.join(timeout=5)
except Exception as error:
pytest.exit(f"[SETUP] [ERROR] ошибка запуска СORE контейнеров: {error}")
# Сохраняем время старта имитатора для расчёта интервалов утечек в тестах
cfg["imitator_start_time"] = stand_manager.start_time
yield # pytest продолжит выполнение теста
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(item, nextitem):
"""
Teardown имитатора при выходе из группы
"""
yield
cfg = item.config.group_state
next_marker = nextitem.get_closest_marker("test_suite_name") if nextitem else None
next_suite = next_marker.args[0] if next_marker else None
if next_suite != cfg["current_suite"]:
if stand_manager := cfg["stand_manager"]:
stand_manager.stop_imitator_wrapper()
try:
stand_manager.restore_signal_unit_conversion_rules()
except Exception:
logger.exception("[ERROR] [TEARDOWN] Ошибка при восстановлении signal_unit_conversion_rules.json")
if not os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
# При запуске с TestOps удаляет данные прогона
stand_manager.server_test_data_remover()
cfg["stand_manager"] = None
cfg["current_suite"] = None
cfg["suite_start_time"] = None
cfg["imitator_start_time"] = None
# опционально дождаться завершения потока (если не daemon) — безопасный join
imitator_thread = cfg.get("imitator_thread")
if imitator_thread and not getattr(imitator_thread, "daemon", False):
try:
imitator_thread.join(timeout=5)
except RuntimeError:
logger.exception("Ошибка при join() фона имитатора")
def get_ws_host() -> str:
instance = os.environ.get(EnvConst.STAND_NAME)
if not instance:
pytest.exit(f"Переменная окружения {EnvConst.STAND_NAME} не задана в .env")
ws_host = f"{WSCliConst.SERVICE_NAME}.{WSCliConst.COMPONENT}-{instance}.{WSCliConst.ROOT_DOMAIN}"
return ws_host
def get_token(max_retries: int = 3, backoff: float = 5.0) -> str:
"""
:param max_retries: сколько всего попыток (включая первую)
:param backoff: время в секундах между попытками
"""
last_exc = None
for attempt in range(1, max_retries + 1):
try:
keycloak = KeycloakClient(
url=os.environ.get(EnvConst.KEYCLOAK_URL),
client_id=os.environ.get(EnvConst.KEYCLOAK_CLIENT_ID),
client_secret=os.environ.get(EnvConst.KEYCLOAK_CLIENT_SECRET),
username=os.environ.get(EnvConst.KEYCLOAK_USERNAME),
password=os.environ.get(EnvConst.KEYCLOAK_PASSWORD),
)
token = keycloak.get_access_token()
if not token:
raise KeycloakAuthError("Получен пустой access token")
return token
except KeycloakAuthError as e:
last_exc = e
logger.warning(f"[{attempt}/{max_retries}] KeycloakAuthError: {e}. Повтор через {backoff} сек.")
except Exception as e:
last_exc = e
logger.warning(f"[{attempt}/{max_retries}] Неожиданная ошибка: {e}. Повтор через {backoff} сек.")
if attempt < max_retries:
time.sleep(backoff)
# все попытки исчерпаны
logger.error(f"Не удалось получить токен после {max_retries} попыток: {last_exc}")
pytest.fail(f"Не удалось получить токен после {max_retries} попыток: {last_exc}")
@pytest_asyncio.fixture
async def ws_client():
"""
Фикстура для работы с websocket клиентом
:return: Объект wss соединения
"""
ws_host = get_ws_host()
auth_token = get_token()
async with WebSocketClient(ws_host, auth_token) as client:
yield client
@pytest.fixture
def imitator_start_time(request):
"""
Фикстура для получения времени старта имитатора (datetime объект).
Используется для точного расчёта времени обнаружения утечек:
- leak_start_time = imitator_start_time + timedelta(seconds=LEAK_START_INTERVAL)
- leak_end_time = imitator_start_time + timedelta(seconds=LEAK_START_INTERVAL + ALLOWED_TIME_DIFF_SECONDS)
"""
start_time = request.config.group_state.get("imitator_start_time")
if start_time is None:
pytest.fail("imitator_start_time не установлен. Убедитесь что тест запущен после инициализации имитатора.")
return start_time
def pytest_sessionfinish(session, exitstatus):
"""
В завершении сессии — отправляем единый Allure‑отчёт в TestOps.
"""
# 1) teardown стенда: остановки имитатора и удаление временных папок на сервере
try:
stand_manager = getattr(session.config, "group_state", {}).get("stand_manager")
if stand_manager:
try:
stand_manager.stop_imitator_wrapper()
except Exception:
logger.exception("[ERROR] [TEARDOWN] Ошибка при остановке имитатора")
try:
stand_manager.restore_signal_unit_conversion_rules()
except Exception:
logger.exception("[ERROR] [TEARDOWN] Ошибка при восстановлении signal_unit_conversion_rules.json")
try:
stand_manager.server_test_data_remover()
except Exception:
logger.exception("[ERROR] [TEARDOWN] Ошибка при удалении тестового набора данных со стенда")
except Exception:
logger.exception("[ERROR] [TEARDOWN] Ошибка при получении stand_manager из group_state")
# 2) Выгрузка allure-results в TestOps
try:
uploader = AllureResultsUploader()
logger.info("[INFO] [TEARDOWN] Uploading Allure results to TestOps")
uploader.upload_allure_results()
except Exception:
logger.exception("[ERROR] [TEARDOWN] Ошибка при выгрузке allure-results в TestOps")
# 3) Удаление локальных архивов с данными
shutil.rmtree("allure-results")
project_root = os.path.dirname(os.path.abspath(__file__))
files_for_drop = glob.glob(os.path.join(project_root, "*.tar.gz"))
if not files_for_drop:
logger.warning("[WARNING] [TEARDOWN] Не нашлось архивов .tar.gz с данным для удаления")
else:
for file in files_for_drop:
os.remove(file)
@pytest.fixture()
def ws_params(request):
"""
Передает параметры для websocket в тест
"""
return request.param