Загрузка данных
модел тестов
"""
Датаклассы для конфигурации тестовых наборов.
Архитектура:
- TestSuiteConfig - главный конфиг набора, содержит всё для запуска тестов
- LeakTestConfig - конфиг утечки с параметрами и тест-кейсами
- TestCaseMarkers - маркеры для allure и pytest
Принцип: один файл конфига select_xx.py в папке datasets = один набор данных.
"""
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from constants.enums import (
TU,
ConfirmationStatus,
LdsStatus,
MeasureConversionRule,
RejectionCriteria,
RejectionSensorTag,
ReservedType,
StationaryStatus,
)
from constants.test_constants import BaseTN3Constants
from models.export_reports_model import ReportDataExportedNotification
from models.get_exported_files_list_model import ExportedDataItem
from models.subscribe_main_page_signals_info_model import SignalsInfo
from models.upload_exported_file_model import DownloadExportedDataReply
from utils.helpers.report_xlsx_utils import LeakReportRow, ReportTitleInfo
@dataclass
class BaseSuiteConfig:
"""
Структура:
1. Метаданные набора (имя, id, архив)
2. Технологический участок (из enum TU)
"""
# ===== Метаданные набора =====
suite_name: str
suite_data_id: int
archive_name: str = "" # Автоматически вычисляется из suite_name
# ===== Технологический участок =====
technological_unit: TU = TU.TIKHORETSK_NOVOROSSIYSK_3
# ===== Правила конвертации единиц измерения давления на стенде =====
measure_conversion_rules: Optional[MeasureConversionRule] = None
# ===== Общие константы (можно переопределить) =====
allowed_distance_diff_meters: int = BaseTN3Constants.ALLOWED_DISTANCE_DIFF_METERS
precision: int = BaseTN3Constants.PRECISION
basic_message_timeout: float = BaseTN3Constants.BASIC_MESSAGE_TIMEOUT
mask_message_timeout: float = BaseTN3Constants.MASK_MESSAGE_TIMEOUT
mask_du_name: Optional[str] = None
main_pipe_line: Optional[str] = None
mask_du_event: Optional[str] = None
unmask_du_event: Optional[str] = None
# ===== Свойства для удобства =====
@property
def tu_id(self) -> int:
"""ID технологического участка"""
return self.technological_unit.id
@property
def tu_name(self) -> str:
"""Название технологического участка"""
return self.technological_unit.description
@property
def has_multiple_leaks(self) -> bool:
return False
@dataclass
class CaseData:
"""
Данные тест-кейса.
"""
name: str = ""
params: Optional[Dict[str, Any]] = None
expected_result: Optional[Any] = None
description: str = ""
@dataclass
class CaseMarkers:
"""
Маркеры тест-кейса для pytest и allure.
"""
test_case_id: str
offset: float
@dataclass
class DiagnosticAreaStatusConfig:
"""
Конфигурация ожидаемых статусов СОУ для диагностического участка.
Используется в тесте lds_status_during_leak.
"""
leak_diagnostic_area_name: str
leak_du_expected_lds_status: Any
neighbors_du_expected_lds_status: Any
@dataclass
class LeakTestConfig:
"""
Полная конфигурация утечки со всеми параметрами и тест-кейсами.
Все данные для тестов утечки:
- Параметры утечки (координата, объём)
- Временные интервалы
- Ожидаемые значения
- Маркеры тестов (AllLeaksInfo, TuLeaksInfo, и т.д.)
"""
# ===== Идентификаторы =====
control_site_id: Optional[int] = None
diagnostic_area_id: Optional[int] = None
diagnostic_area_name: Optional[str] = None
linear_part_id: Optional[int] = None
technological_object: Optional[str] = None
message_event_leak_completion: Optional[str] = None
# ===== Параметры утечки =====
coordinate_meters: float = None
volume_m3: float = None
max_pumping_m3: int = 2500 # Производительность(максимальная перекачка)
flow_rate_settings_threshold: Optional[float] = None # Порог объема дебаланса для текущего ДУ в текущем режиме
# ===== Временные интервалы (секунды) =====
leak_start_interval_seconds: int = BaseTN3Constants.LEAK_START_INTERVAL
allowed_time_diff_seconds: int = 0 # Допустимое время обнаружения
output_test_delay_seconds: int = BaseTN3Constants.OUTPUT_TEST_DELAY
# ===== Ожидаемые статусы =====
expected_lds_status: Any = LdsStatus.SERVICEABLE
# Режим СОУ в xlsx export_leaks_report (колонка 'Режим работы СОУ')
expected_lds_status_in_leaks_report: Optional[int] = None
expected_report_stationary_status: int = StationaryStatus.STATIONARY.value
expected_stationary_status: Any = StationaryStatus.STATIONARY
expected_algorithm_type: Any = ReservedType.STATIONARY_FLOW
expected_leak_status: Any = ConfirmationStatus.CONFIRMED
expected_complete_leak_status: Any = ConfirmationStatus.CONFIRMED_AND_LEAK_CLOSED
# ===== Конфигурация статусов СОУ во время утечки =====
lds_status_during_leak_config: Optional[DiagnosticAreaStatusConfig] = None
# ===== Данные тест-кейсов =====
lds_status_after_confirming_leak_data: Optional[CaseData] = None
lds_status_after_completed_leak_data: Optional[CaseData] = None
# ===== Тест-кейсы для этой утечки =====
balance_algorithm_leak_waiting_test: Optional[CaseMarkers] = None
balance_algorithm_leak_detected_test: Optional[CaseMarkers] = None
leaks_content_test: Optional[CaseMarkers] = None
all_leaks_info_test: Optional[CaseMarkers] = None
all_leaks_is_empty_test: Optional[CaseMarkers] = None
tu_leaks_info_test: Optional[CaseMarkers] = None
leak_info_in_journal: Optional[CaseMarkers] = None
possible_leak_in_journal_test: Optional[CaseMarkers] = None
acknowledge_leak_test: Optional[CaseMarkers] = None
acknowledge_leak_in_journal_test: Optional[CaseMarkers] = None
output_signals_test: Optional[CaseMarkers] = None
lds_status_during_leak_test: Optional[CaseMarkers] = None
lds_status_after_confirming_leak_test: Optional[CaseMarkers] = None
lds_status_completed_leak_test: Optional[CaseMarkers] = None
the_leak_is_complete_on_kg_test: Optional[CaseMarkers] = None
leak_is_complete_in_output_signals_test: Optional[CaseMarkers] = None
leak_is_complete_on_main_page_test: Optional[CaseMarkers] = None
leak_is_confirm_on_main_page_test: Optional[CaseMarkers] = None
complete_tu_leaks_info_content_test: Optional[CaseMarkers] = None
completed_leak_info_in_journal_test: Optional[CaseMarkers] = None
balance_algorithm_leak_completed_test: Optional[CaseMarkers] = None
export_leaks_report_test: Optional[CaseMarkers] = None
export_lds_status_report_test: Optional[CaseMarkers] = None
export_mt_mode_report_test: Optional[CaseMarkers] = None
@property
def leak_diagnostic_area_id(self) -> Optional[int]:
"""ID диагностического участка с утечкой из lds_status_during_leak_config"""
if self.lds_status_during_leak_config is not None:
return self.lds_status_during_leak_config.leak_diagnostic_area_id
return None
@property
def allowed_volume_m3(self) -> float:
"""Допустимая погрешность объёма"""
return self.volume_m3 * BaseTN3Constants.ALLOWED_VOLUME_DIFF
@property
def leak_rate_percentages(self) -> float:
"""Интенсивность утечки в процентах"""
return round((self.volume_m3 / self.max_pumping_m3) * 100, 2)
@property
def allowed_time_diff_minutes(self) -> float:
"""Допустимое время обнаружения утечки в минутах"""
return round(self.allowed_time_diff_seconds / 60, 2)
@property
def output_allowed_time_diff_seconds(self) -> int:
"""Допустимое время для теста выходных сигналов"""
return self.allowed_time_diff_seconds + self.output_test_delay_seconds
@dataclass
class SmokeSuiteConfig(BaseSuiteConfig):
"""
Полная конфигурация тестового набора.
Один конфиг = один набор данных = один файл в test_config/datasets/
Структура:
1. Базовые тесты с маркерами
2. Конфигурации утечек (LeakTestConfig)
"""
# ===== Ожидаемый статусы для main_page_info =====
expected_stationary_status: Any = StationaryStatus.STATIONARY
expected_main_page_signals: dict = field(default_factory=lambda: asdict(SignalsInfo()))
# ===== Название Магистрального Нефтепровода =====
main_pipeline: Optional[str] = None
# ===== Ожидаемые переменные при маскировании ДУ =====
mask_reason: Optional[str] = None
unmask_reason: Optional[str] = None
mask_one_du: Optional[int] = None
not_mask_du: Optional[int] = None
linear_part_identifier_for_mask: Optional[int] = None
technological_section: Optional[str] = None
imitate_flowmeter_signal_test_data: Optional[CaseData] = None
imitate_pressure_sensor_signal_test_data: Optional[CaseData] = None
# дефолтные значения для датчиков маскирования
mask_signal_test_data: Optional[CaseData] = CaseData(
params={
"pressure_sensor_address": BaseTN3Constants.PRESSURE_SENSOR_ADDRESS,
"flowmeter_address": BaseTN3Constants.FLOWMETER_ADDRESS,
}
)
# ===== Ожидаемые переменные для проверок сообщений о режимах =====
exp_mode_mt_message: Optional[CaseData] = None
# ----- Ожидаемые статусы для проверки режимов на ЭФ Диагностика сигналов -----
exp_tixoreczkaya_novovelichkovskaya_reg_lu: Optional[int] = None
exp_tixoreczkaya_novovelichkovskaya_reg_sou: Optional[int] = None
exp_novovelichkovskaya_krymskaya_reg_lu: Optional[int] = None
exp_novovelichkovskaya_krymskaya_reg_sou: Optional[int] = None
exp_krymskaya_grushovaya_reg_lu: Optional[int] = None
exp_krymskaya_grushovaya_reg_sou: Optional[int] = None
exp_backup_route_bejsug_reg_lu: Optional[int] = None
exp_backup_route_bejsug_reg_sou: Optional[int] = None
exp_backup_route_ponura_reg_lu: Optional[int] = None
exp_backup_route_ponura_reg_sou: Optional[int] = None
exp_backup_route_kuban_reg_lu: Optional[int] = None
exp_backup_route_kuban_reg_sou: Optional[int] = None
exp_npz_afipskij_reg_lu: Optional[int] = None
exp_npz_afipskij_reg_sou: Optional[int] = None
exp_npz_ilinskij_reg_lu: Optional[int] = None
exp_npz_ilinskij_reg_sou: Optional[int] = None
# ===== Базовые тесты =====
basic_info_test: Optional[CaseMarkers] = None
imitate_flowmeter_signal_test: Optional[CaseMarkers] = None
imitate_pressure_sensor_signal_test: Optional[CaseMarkers] = None
journal_info_test: Optional[CaseMarkers] = None
lds_status_initialization_test: Optional[CaseMarkers] = None
lds_status_init_in_journal_test: Optional[CaseMarkers] = None
main_page_info_test: Optional[CaseMarkers] = None
main_page_info_signals_test: Optional[CaseMarkers] = None
mask_signal_test: Optional[CaseMarkers] = None
mask_info_in_journal_test: Optional[CaseMarkers] = None
lds_status_initialization_out_test: Optional[CaseMarkers] = None
lds_status_init_out_in_journal_test: Optional[CaseMarkers] = None
mask_du_on_mini_scheme_test: Optional[CaseMarkers] = None
unmask_du_on_mini_scheme_test: Optional[CaseMarkers] = None
diagnostics_of_signals_after_initialization_test: Optional[CaseMarkers] = None
mode_mt_in_journal_test: Optional[CaseMarkers] = None
# ===== Конфигурации утечек =====
# Для наборов с одной утечкой
leak: Optional[LeakTestConfig] = None
# Для наборов с несколькими утечками (select_19_20)
leaks: list[LeakTestConfig] = field(default_factory=list)
# Участки в xlsx-отчёте о режиме работы СОУ (export_lds_status_report)
lds_status_report_section_names: list[str] = field(default_factory=list)
# ===== Дополнительные тесты для двух утечек =====
main_page_info_unstationary_test: Optional[CaseMarkers] = None
def get_leak(self, index: int = 0) -> Optional[LeakTestConfig]:
"""Получить конфигурацию утечки по индексу"""
if self.leak and index == 0:
return self.leak
if self.leaks and index < len(self.leaks):
return self.leaks[index]
return None
@property
def has_multiple_leaks(self) -> bool:
"""Проверить, есть ли несколько утечек"""
return len(self.leaks) > 1
@property
def allowed_volume_diff(self) -> float:
"""Относительная погрешность по объёму"""
return BaseTN3Constants.ALLOWED_VOLUME_DIFF
@dataclass
class LDSStatusConfig(BaseSuiteConfig):
"""
Полная конфигурация тестового набора.
Один конфиг = один набор данных = один файл в test_config/datasets/
Структура:
1. Данные для тестов(параметры и ожидаемый результат)
2. Тесты с маркерами
"""
# ===== Название Магистрального Нефтепровода =====
main_pipeline: Optional[str] = None
# ===== Данные для тестов =====
init_accumulation_data_test_data: Optional[CaseData] = None
init_accumulation_data_in_journal_test_data: Optional[CaseData] = None
init_cold_start_test_data: Optional[CaseData] = None
init_exiting_faulty_test_data: Optional[CaseData] = None
init_switching_shut_off_test_data: Optional[CaseData] = None
init_switching_shut_off_in_journal_test_data: Optional[CaseData] = None
serviceable_all_test_data: Optional[CaseData] = None
serviceable_all_in_journal_test_data: Optional[CaseData] = None
serviceable_after_switching_shut_off_test_data: Optional[CaseData] = None
serviceable_after_switching_shut_off_in_journal_test_data: Optional[CaseData] = None
serviceable_after_deg_faulty_pressure_sensors_at_pump_test_data: Optional[CaseData] = None
serviceable_after_deg_faulty_pressure_sensors_at_pump_in_journal_test_data: Optional[CaseData] = None
deg_faulty_pressure_sensors_at_pump_station_test_data: Optional[CaseData] = None
deg_faulty_pressure_sensors_at_pump_station_in_journal_test_data: Optional[CaseData] = None
deg_additive_injectors_operation_test_data: Optional[CaseData] = None
deg_absence_min_pressure_sensors_test_data: Optional[CaseData] = None
deg_exceeding_distance_between_pressure_sensors_test_data: Optional[CaseData] = None
deg_exceeding_distance_between_pressure_sensors_in_journal_test_data: Optional[CaseData] = None
deg_gravity_section_pumping_test_data: Optional[CaseData] = None
deg_gravity_section_pumping_in_stopping_test_data: Optional[CaseData] = None
deg_gravity_section_pumping_in_stopping_in_journal_test_data: Optional[CaseData] = None
deg_pig_sensor_passage_test_data: Optional[CaseData] = None
deg_starting_pumping_out_pumps_test_data: Optional[CaseData] = None
deg_exceeding_distance_between_flow_meters_test_data: Optional[CaseData] = None
deg_rejection_temperature_sensor_on_du_2_test_data: Optional[CaseData] = None
deg_rejection_temperature_sensor_on_du_3_test_data: Optional[CaseData] = None
deg_rejection_temperature_sensor_on_du_5_test_data: Optional[CaseData] = None
deg_rejection_density_and_viscosity_on_du_2_test_data: Optional[CaseData] = None
deg_rejection_density_and_viscosity_on_du_3_test_data: Optional[CaseData] = None
deg_rejection_density_and_viscosity_on_du_5_test_data: Optional[CaseData] = None
faulty_absence_min_flow_meters_test_data: Optional[CaseData] = None
faulty_absence_min_pressure_sensors_test_data: Optional[CaseData] = None
faulty_absence_min_pressure_sensors_in_journal_test_data: Optional[CaseData] = None
# ===== Тесты =====
lds_status_basic_info_test: Optional[CaseMarkers] = None
init_accumulation_data_test: Optional[CaseMarkers] = None
init_accumulation_data_in_journal_test: Optional[CaseMarkers] = None
init_cold_start_test: Optional[CaseMarkers] = None
init_cold_start_in_journal_test: Optional[CaseMarkers] = None
init_exiting_faulty_test: Optional[CaseMarkers] = None
init_switching_shut_off_test: Optional[CaseMarkers] = None
init_switching_shut_off_in_journal_test: Optional[CaseMarkers] = None
serviceable_after_cold_start_test: Optional[CaseMarkers] = None
serviceable_after_cold_start_in_journal_test: Optional[CaseMarkers] = None
serviceable_after_switching_shut_off_test: Optional[CaseMarkers] = None
serviceable_after_switching_shut_off_in_journal_test: Optional[CaseMarkers] = None
serviceable_after_deg_absence_min_pressure_sensors_test: Optional[CaseMarkers] = None
serviceable_after_deg_starting_pumping_out_pumps_test: Optional[CaseMarkers] = None
serviceable_after_deg_faulty_pressure_sensors_at_pump_test: Optional[CaseMarkers] = None
serviceable_after_deg_faulty_pressure_sensors_at_pump_in_journal_test: Optional[CaseMarkers] = None
serviceable_after_faulty_test: Optional[CaseMarkers] = None
deg_additive_injectors_operation_test: Optional[CaseMarkers] = None
deg_exceeding_distance_between_pressure_sensors_test: Optional[CaseMarkers] = None
deg_exceeding_distance_between_pressure_sensors_in_journal_test: Optional[CaseMarkers] = None
deg_absence_min_pressure_sensors_test: Optional[CaseMarkers] = None
deg_faulty_pressure_sensors_at_pump_station_test: Optional[CaseMarkers] = None
deg_faulty_pressure_sensors_at_pump_station_in_journal_test: Optional[CaseMarkers] = None
deg_gravity_section_pumping_test: Optional[CaseMarkers] = None
deg_gravity_section_pumping_in_stopping_test: Optional[CaseMarkers] = None
deg_gravity_section_pumping_in_stopping_in_journal_test: Optional[CaseMarkers] = None
deg_pig_sensor_passage_test: Optional[CaseMarkers] = None
deg_starting_pumping_out_pumps_test: Optional[CaseMarkers] = None
deg_exceeding_distance_between_flow_meters_test: Optional[CaseMarkers] = None
deg_rejection_temperature_sensor_on_du_2_test: Optional[CaseMarkers] = None
deg_rejection_temperature_sensor_on_du_3_test: Optional[CaseMarkers] = None
deg_rejection_temperature_sensor_on_du_5_test: Optional[CaseMarkers] = None
deg_rejection_density_and_viscosity_on_du_2_test: Optional[CaseMarkers] = None
deg_rejection_density_and_viscosity_on_du_3_test: Optional[CaseMarkers] = None
deg_rejection_density_and_viscosity_on_du_5_test: Optional[CaseMarkers] = None
faulty_absence_min_flow_meters_test: Optional[CaseMarkers] = None
faulty_absence_min_pressure_sensors_test: Optional[CaseMarkers] = None
faulty_absence_min_pressure_sensors_in_journal_test: Optional[CaseMarkers] = None
@dataclass
class RejectionTestCase:
"""
Описание одного события отбраковки для тестирования.
Содержит:
- Тег и id датчика (из RejectionSensorTag)
- Ожидаемые значения для проверок журнала и схемы
- Маркеры (offset и test_case_id)
"""
name: str = ""
sensor: RejectionSensorTag = ""
expected_event: str = ""
expected_signal_name: str = ""
expected_criteria_names: Optional[RejectionCriteria] = None
expected_is_rejected: bool = True
time_range_start_s: float = 0
time_range_end_s: float = 0
rejection_input_signals_test: Optional[CaseMarkers] = None
rejection_journal_test: Optional[CaseMarkers] = None
rejection_main_page_test: Optional[CaseMarkers] = None
rejection_scheme_signals_state_test: Optional[CaseMarkers] = None
@dataclass
class RejectionReportRow:
"""Разобранная строка отчёта об отбракованных входных данных."""
row_index: int
datetime_value: Optional[datetime] = None
object_value: str = ""
event_value: str = ""
value_text: str = ""
duration_seconds: int = 0
tag_value: str = ""
@dataclass
class IsRejectedConfig(BaseSuiteConfig):
"""
Конфигурация тестового набора отбраковки сигналов.
Структура:
1. Название МН
2. Список случаев отбраковки (RejectionTestCase)
"""
main_pipeline: str = ""
rejection_cases: list[RejectionTestCase] = field(default_factory=list)
rejection_report_test: Optional[CaseMarkers] = None
@dataclass
class ExportLeaksReportState:
"""
Состояние сценария формирования xlsx-отчёта об утечках между allure-шагами.
Заполняется по ходу export_leaks_report в smoke_scenarios.
"""
report_test: Optional[CaseMarkers] = None
period_start: Optional[datetime] = None
period_end: Optional[datetime] = None
period_start_naive: Optional[datetime] = None
period_end_naive: Optional[datetime] = None
expected_mt_mode: Optional[str] = None
expected_lds_status_text: Optional[str] = None
time_offset_hours: Optional[int] = None
tu_description_lower: str = ""
notification: Optional[ReportDataExportedNotification] = None
report_item: Optional[ExportedDataItem] = None
report_file_name: str = ""
download_invocation_id: Optional[str] = None
download_payload: Optional[list] = None
download_reply: Optional[DownloadExportedDataReply] = None
file_bytes: Optional[bytes] = None
temp_file_path: Optional[Path] = None
worksheet: Any = None
title_info: Optional[ReportTitleInfo] = None
data_rows: list[LeakReportRow] = field(default_factory=list)
target_row: Optional[LeakReportRow] = None
@dataclass
class ExportLdsStatusReportState:
"""Состояние сценария формирования xlsx-отчёта о режиме работы СОУ."""
report_test: Optional[CaseMarkers] = None
period_start: Optional[datetime] = None
period_end: Optional[datetime] = None
period_start_naive: Optional[datetime] = None
period_end_naive: Optional[datetime] = None
time_offset_hours: Optional[int] = None
tu_description_lower: str = ""
notification: Optional[ReportDataExportedNotification] = None
report_item: Optional[ExportedDataItem] = None
report_file_name: str = ""
download_invocation_id: Optional[str] = None
download_reply: Optional[DownloadExportedDataReply] = None
file_bytes: Optional[bytes] = None
temp_file_path: Optional[Path] = None
worksheet: Any = None
parsed_report: Any = None
@dataclass
class ExportRejectedReportState:
"""
Состояние сценария формирования xlsx-отчёта об отбракованных входных данных.
Поля с префиксом expected_ - из конфигурации теста, actual_ - из ответов бэка и разбора xlsx.
"""
# --- expected: конфигурация теста и расчётные ожидания ---
expected_report_test: Optional[CaseMarkers] = None
expected_period_start: Optional[datetime] = None
expected_period_end: Optional[datetime] = None
expected_period_start_naive: Optional[datetime] = None
expected_period_end_naive: Optional[datetime] = None
expected_tu_description_lower: str = ""
expected_file_name: str = ""
# --- actual: ответы бэка ---
actual_time_offset_hours: Optional[int] = None
actual_notification: Optional[ReportDataExportedNotification] = None
actual_report_item: Optional[ExportedDataItem] = None
actual_download_invocation_id: Optional[str] = None
actual_download_reply: Optional[DownloadExportedDataReply] = None
actual_file_bytes: Optional[bytes] = None
# --- actual: разбор xlsx ---
actual_temp_file_path: Optional[Path] = None
actual_worksheet: Any = None
actual_title_info: Optional[ReportTitleInfo] = None
actual_data_rows: list[RejectionReportRow] = field(default_factory=list)
actual_monitored_tag_rows: list[RejectionReportRow] = field(default_factory=list)
actual_header_column_headers: list[str] = field(default_factory=list)
actual_header_period_start: Optional[datetime] = None
actual_header_period_end: Optional[datetime] = None
actual_header_contains_expected_title: bool = False
actual_case_checks: list = field(default_factory=list)
@dataclass
class ExportMtModeReportState:
"""
Состояние сценария формирования xlsx-отчёта о режиме работы МТ.
Поля с префиксом expected_ - из конфигурации теста, actual_ - из ответов бэка и разбора xlsx.
"""
# --- expected: конфигурация теста и расчётные ожидания ---
expected_report_test: Optional[CaseMarkers] = None
expected_period_start: Optional[datetime] = None
expected_period_end: Optional[datetime] = None
expected_period_start_naive: Optional[datetime] = None
expected_period_end_naive: Optional[datetime] = None
expected_tu_description_lower: str = ""
expected_file_name: str = ""
expected_section_names: list[str] = field(default_factory=list)
expected_dominant_mode_column: str = ""
# --- actual: ответы бэка ---
actual_time_offset_hours: Optional[int] = None
actual_notification: Optional[ReportDataExportedNotification] = None
actual_report_item: Optional[ExportedDataItem] = None
actual_download_invocation_id: Optional[str] = None
actual_download_reply: Optional[DownloadExportedDataReply] = None
actual_file_bytes: Optional[bytes] = None
# --- actual: разбор xlsx ---
actual_temp_file_path: Optional[Path] = None
actual_worksheet: Any = None
actual_parsed_report: Any = None
сцен ини
"""
Модуль сценариев тестов.
Содержит функции-обёртки для каждого типа теста.
Функции называются так же, как тесты, но без префикса test_.
Для добавления нового теста необходимо добавить в список __all__ название этого теста scenarios.your_new_test
"""
import test_scenarios.smoke_scenarios as scenarios
from test_scenarios import lds_status_scenarios, rejected_scenarios
__all__ = [
scenarios.basic_info,
scenarios.journal_info,
scenarios.imitate_sensor_signal,
scenarios.lds_status_initialization,
scenarios.lds_status_init_in_journal,
scenarios.main_page_info,
scenarios.main_page_info_signals,
scenarios.mask_info_in_journal,
scenarios.mask_signal_test,
scenarios.lds_status_initialization_out,
scenarios.lds_status_init_out_in_journal,
scenarios.all_leaks_info,
scenarios.leak_info_in_journal,
scenarios.possible_leak_in_journal,
scenarios.tu_leaks_info,
scenarios.lds_status_during_leak,
scenarios.acknowledge_leak_info,
scenarios.acknowledge_leak_in_journal,
scenarios.output_signals,
scenarios.balance_algorithm_leak_completed,
scenarios.completed_leak_info_in_journal,
scenarios.mode_mt_in_journal,
rejected_scenarios.rejection_input_signals,
rejected_scenarios.rejection_journal,
rejected_scenarios.rejection_main_page,
rejected_scenarios.rejection_scheme_signals_state,
lds_status_scenarios.lds_status_check_with_reasons,
scenarios.export_leaks_report,
scenarios.export_lds_status_report,
scenarios.export_mt_mode_report,
]
редж сцен
"""
Сценарии тестов - функции-обёртки без pytest маркеров.
Каждая функция содержит логику одного теста.
Pytest маркеры и allure декораторы применяются в тестовых файлах.
"""
from datetime import datetime, timedelta
import allure
import pytest
from constants.enums import (
Direction,
ExportedDataType,
ExportStatus,
MessageType,
RejectionCriteria,
RejectionSensorTag,
ReplyStatus,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportRejectedReportConstants as RejectedReportConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.get_messages_model import Filtering, FilteringObjects, Pagination
from test_config.models_for_tests import ExportRejectedReportState, IsRejectedConfig, RejectionTestCase
from utils.helpers import rejection_report_xlsx_utils as rejection_report_utils
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.asserts import SoftAssertions, StepCheck
from utils.helpers.ws_message_parser import ws_message_parser as parser
# ===== Сценарии отбраковки сигналов =====
async def rejection_input_signals(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase):
"""
Проверка отбраковки сигнала по подписке SubscribeInputSignalsRequest.
Проверяет isRejected=True для указанного датчика.
"""
sensor = rejection_case.sensor
with allure.step(
f"Подключение по ws, получение данных InputSignalsContent для датчика {sensor.description} (id={sensor.id})"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [sensor.id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
sensor_data = parsed_payload.replyContent.inputSignals
target_signal = t_utils.find_object_by_field(sensor_data, "id", sensor.id)
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка отбраковки датчика {sensor.description} (id={sensor.id})", "isRejected", soft_failures
).actual(target_signal.isRejected).expected(True).equal_to()
if rejection_case.expected_criteria_names:
raw_criteria = (
target_signal.rejection.get(TestConst.CRITERIA_NAMES_FIELD)
if isinstance(target_signal.rejection, dict)
else None
)
criteria = RejectionCriteria(raw_criteria) if raw_criteria is not None else None
StepCheck(
f"Проверка rejection.criteriaNames для {sensor.description} (id={sensor.id})",
TestConst.CRITERIA_NAMES_FIELD,
soft_failures,
).actual(criteria).expected(rejection_case.expected_criteria_names).equal_to()
async def rejection_journal(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase, imitator_start_time):
"""
Проверка наличия записи об отбраковке в журнале по GetMessagesRequest.
"""
sensor = rejection_case.sensor
expected_event = rejection_case.expected_event
with allure.step("Подготовка запроса и ожидаемого диапазона времени"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_REJECT_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(
messageTypes=int(MessageType.REJECTION),
objects=FilteringObjects(tuId=cfg.tu_id),
),
)
range_start, range_end = t_utils.get_rejection_time_window(
imitator_start_time=imitator_start_time,
start_seconds=rejection_case.time_range_start_s,
reserve_seconds=TestConst.SEC_PER_MIN,
)
with allure.step("Получение сообщений журнала с фильтром messageTypes=REJECTION"):
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
with allure.step("Проверка наличия сообщений в журнале"):
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step(
f"Подготовка сообщений к проверке по диапазону слоя данных "
f"({rejection_case.time_range_start_s - TestConst.SEC_PER_MIN}-"
f"{rejection_case.time_range_end_s + TestConst.SEC_PER_MIN} с от старта имитатора)"
):
time_filtered, target_msg = t_utils.find_rejection_journal_message(
messages_info=messages_info,
tag=sensor.description,
range_start=range_start,
range_end=range_end,
technological_section=cfg.tu_name,
expected_event=expected_event,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"Диапазон фильтрации: {range_start} - {range_end}\n"
f"После фильтрации по tag='{sensor.description}' и времени: {len(time_filtered)}\n"
f"Найдено ли сообщение с technologicalSection='{cfg.tu_name}' и событием {rejection_case.expected_event}: "
f"{'True' if target_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с tag='{sensor.description}' (id={sensor.id}) "
f"в диапазоне {range_start}-{range_end} с"
):
if target_msg is None:
pytest.fail(
f"Сообщение с tag='{sensor.description}' (id={sensor.id}) "
f"и technologicalSection='{cfg.tu_name}' не найдено в диапазоне "
f"{range_start} - {range_end} "
f"(всего сообщений: {len(messages_info)}, после фильтрации: {len(time_filtered)})"
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(target_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(target_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_REJECTION
).equal_to()
StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
target_msg.technologicalSection
).is_not_none()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
target_msg.technologicalObject
).is_not_none()
StepCheck(f"Проверка tag для {sensor.description} (id={sensor.id})", "tag", soft_failures).actual(
target_msg.tag
).expected(sensor.description).equal_to()
if rejection_case.expected_signal_name:
StepCheck("Проверка signalName", "signalName", soft_failures).actual(target_msg.signalName).expected(
rejection_case.expected_signal_name
).equal_to()
if expected_event:
StepCheck("Проверка event", "event", soft_failures).actual(
(target_msg.event.rstrip() or "").strip()
).expected(expected_event).equal_to()
async def rejection_main_page(ws_client, cfg: IsRejectedConfig):
"""
Проверка numberOfRejectedSignals > 0 по подписке subscribeMainPageSignalsInfoRequest.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageSignalsInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageSignalsInfoContent",
"subscribeMainPageSignalsInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_signals_msg(payload)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck(
f"Проверка numberOfRejectedSignals > 0 для ТУ {cfg.tu_name}",
"numberOfRejectedSignals",
soft_failures,
).actual(parsed_payload.replyContent.signalsInfo.numberOfRejectedSignals).is_greater_than(0)
async def rejection_scheme_signals_state(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase):
"""
Проверка отбраковки сигнала по подписке SubscribeSchemeSignalsStateRequest.
Проверяет isRejected, isMasked, isImitated и rejection.criteriaNames.
Логирование больших ответов подавляется suppress_recv_logging.
"""
sensor = rejection_case.sensor
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
with allure.step(
f"Подключение по ws, получение данных SchemeSignalsStateContent "
f"для датчика {sensor.description} (id={sensor.id})"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"SchemeSignalsStateContent",
"SubscribeSchemeSignalsStateRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_scheme_signals_state_msg(payload)
signals = parsed_payload.replyContent.signalsStates
target_signal = next(
(signal for signal in signals if signal.id == sensor.id),
None,
)
allure.attach(
f"Всего сигналов получено: {len(signals)}\n"
f"Поиск сигнала с id={sensor.id} ({sensor.description}): "
f"{'Найден' if target_signal else 'Не найден'}",
name="Результат поиска сигнала в SchemeSignalsState",
attachment_type=allure.attachment_type.TEXT,
)
if target_signal is not None:
allure.attach(
str(target_signal),
name=f"Тестируемый фрагмент ответа с бэка: сигнал id={sensor.id} ({sensor.description})",
attachment_type=allure.attachment_type.TEXT,
)
finally:
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
with allure.step(f"Проверка: найден ли сигнал с id={sensor.id} ({sensor.description})"):
if target_signal is None:
pytest.fail(
f"Сигнал с id={sensor.id} ({sensor.description}) " f"не найден среди {len(signals)} полученных сигналов"
)
with SoftAssertions() as soft_failures:
StepCheck(f"Проверка isRejected для {sensor.description} (id={sensor.id})", "isRejected", soft_failures).actual(
target_signal.isRejected
).expected(rejection_case.expected_is_rejected).equal_to()
StepCheck(f"Проверка isMasked для {sensor.description} (id={sensor.id})", "isMasked", soft_failures).actual(
target_signal.isMasked
).expected(False).equal_to()
StepCheck(f"Проверка isImitated для {sensor.description} (id={sensor.id})", "isImitated", soft_failures).actual(
target_signal.isImitated
).expected(False).equal_to()
if rejection_case.expected_criteria_names and target_signal.rejection is not None:
raw_criteria = (
target_signal.rejection.get(TestConst.CRITERIA_NAMES_FIELD)
if isinstance(target_signal.rejection, dict)
else None
)
criteria = RejectionCriteria(raw_criteria) if raw_criteria is not None else None
StepCheck(
f"Проверка rejection.criteriaNames для {sensor.description} (id={sensor.id})",
TestConst.CRITERIA_NAMES_FIELD,
soft_failures,
).actual(criteria).expected(rejection_case.expected_criteria_names).equal_to()
async def export_rejection_report(ws_client, cfg: IsRejectedConfig, imitator_start_time: datetime):
"""
Сценарий формирования общего xlsx-отчёта об отбракованных входных данных.
Этапы:
1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
2. ExportReportsCommandRequest с периодом от старта имитатора до offset теста.
3. Ожидание ReportDataExportedNotification.
4. Лонг-поллинг GetExportedDataListRequest до появления отчёта в списке.
5. DownloadExportedDataRequest и получение fileChunk.
6. Проверка формата, имени файла, двойной шапки и строк по RejectionTestCase.
"""
report_state = ExportRejectedReportState()
with allure.step("Подготовка параметров сценария формирования отчёта об отбракованных входных данных"):
# offset теста задаёт конец периода отчёта - после всех отдельных проверок отбраковок
report_state.expected_report_test = cfg.rejection_report_test
StepCheck("В конфигурации задан rejection_report_test", "rejection_report_test").actual(
report_state.expected_report_test
).is_not_none()
report_state.expected_period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.expected_period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.expected_report_test.offset)
)
report_state.expected_period_start_naive = report_utils.normalize_report_period_naive(
report_state.expected_period_start
)
report_state.expected_period_end_naive = report_utils.normalize_report_period_naive(
report_state.expected_period_end
)
report_state.expected_tu_description_lower = cfg.technological_unit.description.lower()
report_state.expected_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
report_state.expected_period_start,
report_state.expected_period_end,
RejectedReportConst.REJECTED_REPORT_NAME_PART,
" ",
)
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
report_state.actual_time_offset_hours = time_offset_hours
allure.attach(
f"period.start={report_state.expected_period_start}\n"
f"period.end={report_state.expected_period_end}\n"
f"offset_minutes={report_state.expected_report_test.offset}",
name="Фильтр периода отчёта об отбраковках",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"):
# без подписки не придёт уведомление о готовности отчёта
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.REJECTED_REPORT.value],
"timeOffset": report_state.actual_time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.expected_period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.expected_period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.actual_notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
report_state.actual_report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.REJECTED_REPORT,
name_substring=RejectedReportConst.REJECTED_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=report_state.expected_period_start,
period_end=report_state.expected_period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = report_state.actual_report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={t_utils.format_datetime_moscow(report_item.start)}, "
f"end={t_utils.format_datetime_moscow(report_item.end)}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.actual_report_item
).is_not_none()
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} "
f"по id={report_state.actual_report_item.id}"
):
download_request = {
"exportedDataId": report_state.actual_report_item.id,
"exportedDataType": ExportedDataType.REJECTED_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": report_state.actual_time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта об отбракованных входных данных "
f"(exportedDataId={report_state.actual_report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest"
)
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
report_state.actual_download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта об отбракованных входных данных"):
report_state.actual_download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=report_state.actual_download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = report_state.actual_download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
report_state.actual_file_bytes = (
download_reply.replyContent.fileChunk if has_download_reply_content else None
)
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(report_state.actual_file_bytes)
if report_state.actual_file_bytes
else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.actual_file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
# имя файла берём из ответа бэка (список сформированных отчётов), не из шапки xlsx
actual_file_name = report_state.actual_report_item.name if report_state.actual_report_item else ""
actual_file_name_lower = actual_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(
actual_file_name,
RejectedReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
report_utils.report_period_comparison_bounds(
report_state.expected_period_start_naive,
report_state.expected_period_end_naive,
)
)
has_xlsx_extension = report_utils.is_xlsx_extension(actual_file_name)
rejected_report_file_name_part_lower = RejectedReportConst.REJECTED_REPORT_NAME_PART.lower()
rejected_report_file_name_part_alt_lower = RejectedReportConst.REJECTED_REPORT_NAME_PART_ALT.lower()
try:
with allure.step("Этап 7. Сохранение и разбор xlsx-отчёта об отбракованных входных данных"):
report_state.actual_temp_file_path = report_utils.save_report_bytes_to_temp_file(
report_state.actual_file_bytes,
prefix="rejected_report_",
)
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
report_state.actual_temp_file_path
).is_not_none()
report_state.actual_worksheet = report_utils.load_report_worksheet(report_state.actual_temp_file_path)
report_state.actual_title_info = report_utils.parse_report_title(
report_utils.get_report_title_cell(report_state.actual_worksheet),
RejectedReportConst.REPORT_HEADER_PERIOD_PATTERN,
)
allure.attach(
f"Шапка (raw): {report_state.actual_title_info.raw_title}\n"
f"period_start: {report_state.actual_title_info.period_start}\n"
f"period_end: {report_state.actual_title_info.period_end}",
name="Первая строка шапки xlsx-отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Этап 8. Извлечение строк данных из отчёта"):
report_state.actual_data_rows = rejection_report_utils.iter_rejection_report_rows(
report_state.actual_worksheet
)
report_state.actual_monitored_tag_rows = rejection_report_utils.filter_rows_by_monitored_tags(
report_state.actual_data_rows,
RejectionSensorTag,
)
allure.attach(
rejection_report_utils.format_rejection_rows_for_allure(report_state.actual_monitored_tag_rows),
name="Строки отчёта по тегам RejectionSensorTag",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных шапки xlsx для проверки"):
title_info = report_state.actual_title_info
report_state.actual_header_column_headers = report_utils.get_report_column_headers(
report_state.actual_worksheet,
headers_row=RejectedReportConst.REPORT_COLUMN_HEADERS_ROW,
)
report_state.actual_header_period_start = title_info.period_start
report_state.actual_header_period_end = title_info.period_end
report_state.actual_header_contains_expected_title = (
rejection_report_utils.report_header_contains_expected_title(title_info.raw_title)
)
with allure.step("Подготовка данных для проверки строк отчёта по RejectionTestCase"):
report_state.actual_case_checks = rejection_report_utils.prepare_rejection_report_case_checks(
report_state.actual_monitored_tag_rows,
cfg.rejection_cases,
imitator_start_time,
)
with allure.step("Проверка первой строки шапки xlsx-отчёта"):
StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.actual_worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Первая строка шапки содержит заголовок отчёта об отбракованных входных данных",
"report_title",
soft_failures,
).actual(report_state.actual_header_contains_expected_title).expected(True).equal_to()
StepCheck(
"Время начала периода в первой строке шапки совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(report_state.actual_header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в первой строке шапки совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(report_state.actual_header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок во второй строке шапки отчёта",
"column_headers",
soft_failures,
).actual(
report_state.actual_header_column_headers
).expected(RejectedReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Проверка строк отчёта по каждому RejectionTestCase из конфигурации набора"):
with SoftAssertions() as soft_failures:
for case_check in report_state.actual_case_checks:
StepCheck(
f"В отчёте найдена отбраковка для {case_check.case_label} в интервале времени "
f"{case_check.window_start} - {case_check.window_end}",
RejectedReportConst.COL_TAG,
soft_failures,
).actual(case_check.row_found).is_true_with_details(
expected_text=(
f"найдена строка с тегом {case_check.tag_description} "
f"и событием '{case_check.report_event}'"
),
actual_text=case_check.found_row_summary,
)
if not case_check.row_found:
continue
StepCheck(
f"Для {case_check.case_label} время получения отбраковки в допустимом диапазоне",
RejectedReportConst.COL_DATETIME,
soft_failures,
).actual(case_check.datetime_in_window).is_true_with_details(
expected_text=(f"дата и время в диапазоне {case_check.window_start} — {case_check.window_end}"),
actual_text=case_check.datetime_actual_text,
)
StepCheck(
f"Для {case_check.case_label} суммарная продолжительность отбраковки "
f"({case_check.expected_duration_text}) совпадает",
RejectedReportConst.COL_DURATION,
soft_failures,
).actual(case_check.actual_duration_seconds).expected(
case_check.expected_duration_seconds
).equal_to()
StepCheck(
f"Для {case_check.case_label} участок трубопровода в колонке "
f"'{RejectedReportConst.COL_OBJECT}' не пустой",
RejectedReportConst.COL_OBJECT,
soft_failures,
).actual(case_check.pipe_section).is_not_empty()
StepCheck(
f"Для {case_check.case_label} после последней точки в колонке "
f"'{RejectedReportConst.COL_OBJECT}' указан сигнал '{case_check.expected_signal_suffix}'",
RejectedReportConst.COL_OBJECT,
soft_failures,
).actual(case_check.actual_signal_suffix).expected(case_check.expected_signal_suffix).equal_to()
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
attachment_name = (
report_state.actual_report_item.name
if report_state.actual_report_item and report_state.actual_report_item.name
else report_state.expected_file_name
)
if report_state.actual_temp_file_path and attachment_name:
report_utils.attach_report_file_to_allure(report_state.actual_temp_file_path, attachment_name)
raise
with allure.step("Проверка имени файла отчёта из списка сформированных файлов"):
with SoftAssertions() as soft_failures:
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(
"Имя файла из ответа бэка содержит название отчёта об отбракованных входных данных",
"file_name",
soft_failures,
).actual(
rejected_report_file_name_part_lower in actual_file_name_lower
or rejected_report_file_name_part_alt_lower in actual_file_name_lower
).expected(
True
).equal_to()
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
"file_name",
soft_failures,
).contains(actual_file_name_lower, report_state.expected_tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
soft_failures,
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
soft_failures,
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
notification = report_state.actual_notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with SoftAssertions() as soft_failures:
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
report_state.actual_notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
notification_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
notification_export_status
).expected(ExportStatus.DONE).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
notification_error_message
).is_empty()
сцен
"""
Сценарии тестов - функции-обёртки без
pytest маркеров.
Каждая функция содержит логику одного теста.
Pytest маркеры и allure декораторы применяются в тестовых файлах.
"""
import time
from collections import defaultdict
from datetime import datetime, timedelta
import allure
import pytest
from constants.enums import (
ConfirmationStatus,
Direction,
ExportedDataType,
ExportStatus,
GravityPipe,
LdsStatus,
LeakStatus,
MessagePriority,
MessageType,
ReplyStatus,
ReservedType,
SignalType,
SiteKpKp,
StationaryStatus,
UserActions,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportLdsStatusReportConstants as LdsReportConst
from constants.test_constants import ExportMtModeReportConstants as MtReportConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.get_messages_model import Filtering, FilteringObjects, Pagination
from test_config.models_for_tests import (
CaseData,
ExportLdsStatusReportState,
ExportLeaksReportState,
ExportMtModeReportState,
LDSStatusConfig,
LeakTestConfig,
SmokeSuiteConfig,
)
from utils.helpers import lds_status_report_xlsx_utils as lds_report_utils
from utils.helpers import mt_mode_report_xlsx_utils as mt_report_utils
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.asserts import SoftAssertions, StepCheck
from utils.helpers.ws_message_parser import ws_message_parser as parser
from utils.helpers.ws_test_utils import get_value
async def basic_info(ws_client, cfg: SmokeSuiteConfig | LDSStatusConfig):
"""
Проверка базовой информации СОУ: список ТУ.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: BasicInfoContent"):
payload = await t_utils.connect_and_get_msg(ws_client, "getBasicInfoRequest", [])
parsed_payload = parser.parse_basic_info_msg(payload)
expected_tu = [(cfg.tu_id, cfg.tu_name)]
with allure.step("Извлечение и подготовка данных для проверки"):
tus = getattr(getattr(parsed_payload.replyContent, 'basicInfo', None), 'tus', None)
actual_tu = [(tu.tuId, tu.tuName) for tu in tus if tu.tuId == cfg.tu_id]
StepCheck("Проверка наличия данных с базовой информацией СОУ", "tus").actual(actual_tu).is_not_none()
with allure.step(f"Поверка наличия {cfg.tu_name} в списке доступных ТУ на сервере"):
# Критическая проверка: если нужного ТУ нет в BasicInfoContent — считаем что ТУ отключен (через Zookeeper)
# и прерываем весь прогон.
if expected_tu[0] not in actual_tu:
msg = (
f"ТУ отключен: в BasicInfoContent отсутствует ТУ для запущенного набора данных: "
f"tuId={cfg.tu_id}, tuName='{cfg.tu_name}', suite={cfg.suite_name}. "
f"Необходимо убедиться, что ТУ включен (Zookeeper) и перезапустить прогон."
)
allure.attach(
f"Ожидаемый ТУ: {expected_tu}\nПолученные ТУ: {actual_tu}",
name="Предварительная проверка: ТУ отключен",
attachment_type=allure.attachment_type.TEXT,
)
pytest.fail(msg, pytrace=False)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса ответа", "replyStatus", soft_failures).actual(parsed_payload.replyStatus).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия объектов в списке ТУ", "tus", soft_failures).actual(
parsed_payload.replyContent.basicInfo.tus
).is_not_empty()
StepCheck(
f"Проверка наличия ТУ: {cfg.tu_name} в списке ТУ ",
"(tuId, tuName)",
soft_failures,
).actual(
actual_tu
).expected(expected_tu).equal_to()
async def journal_info(ws_client):
"""
Проверка наличия сообщений в журнале.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
request_body = t_utils.create_journal_req_body()
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
async def lds_status_initialization(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка режима работы СОУ: Инициализация.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
# Получает список участков карты течения
flow_areas = getattr(parsed_payload.replyContent, 'flowAreas', [])
# Получает самый протяженный участок карты течения
longest_flow_area = t_utils.get_longest_flow_area(flow_areas)
# Получает список ДУ
diagnostic_areas = getattr(longest_flow_area, 'diagnosticAreas', [])
StepCheck("Проверка наличия данных диагностических участков", "diagnosticAreas").actual(
diagnostic_areas
).is_not_empty()
allure.attach(
f"Самый протяженный участок карты течений: {longest_flow_area}",
name="flowArea. Инициализация",
attachment_type=allure.attachment_type.TEXT,
)
# Получает коллекцию статусов списка ДУ
lds_status_set = {diagnostic_area.ldsStatus for diagnostic_area in diagnostic_areas}
# Определяет режим работы СОУ по приоритету
lds_status_int = t_utils.determine_lds_status_by_priority(lds_status_set)
lds_status = LdsStatus(lds_status_int) if lds_status_int else None
StepCheck("Проверка режима работы СОУ", "ldsStatus").actual(lds_status).expected(
LdsStatus.INITIALIZATION
).equal_to()
async def diagnostics_of_signals_after_initialization(
ws_client,
cfg: SmokeSuiteConfig,
):
"""
Проверка выходных сигналов после окончания режима Инициализация по причине "холодного" пуска СОУ.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: OutputSignalsInfo"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [],
'controlledSites': [
SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
SiteKpKp.BACKUP_ROUTE_PONURA.value,
SiteKpKp.BACKUP_ROUTE_KUBAN.value,
SiteKpKp.NPZ_AFIPSKIJ.value,
SiteKpKp.NPZ_ILINSKIJ.value,
],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
controlled_site_dict = {
"controlled_site_first": SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
"controlled_site_second": SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
"controlled_site_third": SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
"controlled_site_fourth": SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
"controlled_site_fifth": SiteKpKp.BACKUP_ROUTE_PONURA.value,
"controlled_site_sixth": SiteKpKp.BACKUP_ROUTE_KUBAN.value,
"controlled_site_seventh": SiteKpKp.NPZ_AFIPSKIJ.value,
"controlled_site_eight": SiteKpKp.NPZ_ILINSKIJ.value,
}
controlled_site_messages = {}
for name, key in controlled_site_dict.items():
controlled_site_messages[name] = t_utils.find_object_by_a_few_fields(
parsed_payload.replyContent.controlledSiteSignals, key
)
all_signals = {}
for site_name, site_message in controlled_site_messages.items():
signal_dict = {'pump': None, 'sou': None, 'gravity': None}
if site_message:
all_signals[site_name] = {
'pump': t_utils.get_signal(site_message, SignalType.REGLU),
'sou': t_utils.get_signal(site_message, SignalType.REGSOU),
'gravity': t_utils.get_signal(site_message, SignalType.GRAVITYPIPE),
}
else:
all_signals[site_name] = signal_dict
first_kp_kp = all_signals.get("controlled_site_first") or {}
if first_kp_kp:
first_site_signal_pump = get_value(first_kp_kp.get("pump"))
first_site_signal_sou = get_value(first_kp_kp.get("sou"))
first_site_signal_gravity = get_value(first_kp_kp.get("gravity"))
second_kp_kp = all_signals.get("controlled_site_second") or {}
if second_kp_kp:
second_site_signal_pump = get_value(second_kp_kp.get("pump"))
second_site_signal_sou = get_value(second_kp_kp.get("sou"))
second_site_signal_gravity = get_value(second_kp_kp.get("gravity"))
third_kp_kp = all_signals.get("controlled_site_third") or {}
if third_kp_kp:
third_site_signal_pump = get_value(third_kp_kp.get("pump"))
third_site_signal_sou = get_value(third_kp_kp.get("sou"))
third_site_signal_gravity = get_value(third_kp_kp.get("gravity"))
fourth_kp_kp = all_signals.get("controlled_site_fourth") or {}
if fourth_kp_kp:
fourth_site_signal_pump = get_value(fourth_kp_kp.get("pump"))
fourth_site_signal_sou = get_value(fourth_kp_kp.get("sou"))
fourth_site_signal_gravity = get_value(fourth_kp_kp.get("gravity"))
fifth_kp_kp = all_signals.get("controlled_site_fifth") or {}
if fifth_kp_kp:
fifth_site_signal_pump = get_value(fifth_kp_kp.get("pump"))
fifth_site_signal_sou = get_value(fifth_kp_kp.get("sou"))
fifth_site_signal_gravity = get_value(fifth_kp_kp.get("gravity"))
sixth_kp_kp = all_signals.get("controlled_site_sixth") or {}
if sixth_kp_kp:
sixth_site_signal_pump = get_value(sixth_kp_kp.get("pump"))
sixth_site_signal_sou = get_value(sixth_kp_kp.get("sou"))
sixth_site_signal_gravity = get_value(sixth_kp_kp.get("gravity"))
seventh_kp_kp = all_signals.get("controlled_site_seventh") or {}
if seventh_kp_kp:
seventh_site_signal_pump = get_value(seventh_kp_kp.get("pump"))
seventh_site_signal_sou = get_value(seventh_kp_kp.get("sou"))
seventh_site_signal_gravity = get_value(seventh_kp_kp.get("gravity"))
eighth_kp_kp = all_signals.get("controlled_site_eight") or {}
if eighth_kp_kp:
eight_site_signal_pump = get_value(eighth_kp_kp.get("pump"))
eight_site_signal_sou = get_value(eighth_kp_kp.get("sou"))
eight_site_signal_gravity = get_value(eighth_kp_kp.get("gravity"))
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка сигнала - режим МТ на участке Тихорецкая-Нововеличковская",
"Режим МТ",
soft_failures,
).actual(first_site_signal_pump).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Тихорецкая-Нововеличковская",
"Режим СОУ",
soft_failures,
).actual(first_site_signal_sou).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} \n"
f"на участке Тихорецкая-Нововеличковская",
"Количество самотеков",
soft_failures,
).actual(first_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на участке Нововеличковская-Крымская",
"Режим МТ",
soft_failures,
).actual(second_site_signal_pump).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description}\n"
f"на участке Нововеличковская-Крымская",
"Количество самотеков",
soft_failures,
).actual(second_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Нововеличковская-Крымская",
"Режим СОУ",
soft_failures,
).actual(second_site_signal_sou).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на участке Крымская-Грушовая",
"Режим МТ",
soft_failures,
).actual(
third_site_signal_pump
).expected(str(cfg.exp_krymskaya_grushovaya_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_true.description} на участке Крымская-Грушовая",
"Количество самотеков",
soft_failures,
).actual(third_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_true.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Крымская-Грушовая",
"Режим СОУ",
soft_failures,
).actual(
third_site_signal_sou
).expected(str(cfg.exp_krymskaya_grushovaya_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Бейсуг",
"Режим МТ",
soft_failures,
).actual(
fourth_site_signal_pump
).expected(str(cfg.exp_backup_route_bejsug_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Бейсуг",
"Количество самотеков",
soft_failures,
).actual(fourth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Бейсуг",
"Режим СОУ",
soft_failures,
).actual(
fourth_site_signal_sou
).expected(str(cfg.exp_backup_route_bejsug_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Понура",
"Режим МТ",
soft_failures,
).actual(
fifth_site_signal_pump
).expected(str(cfg.exp_backup_route_ponura_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Понура",
"Режим СОУ",
soft_failures,
).actual(
fifth_site_signal_sou
).expected(str(cfg.exp_backup_route_ponura_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Понура",
"Количество самотеков",
soft_failures,
).actual(fifth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Кубань",
"Режим МТ",
soft_failures,
).actual(
sixth_site_signal_pump
).expected(str(cfg.exp_backup_route_kuban_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Кубань",
"Режим СОУ",
soft_failures,
).actual(
sixth_site_signal_sou
).expected(str(cfg.exp_backup_route_kuban_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Кубань",
"Количество самотеков",
soft_failures,
).actual(sixth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на НПЗ Афипский",
"Режим МТ",
soft_failures,
).actual(
seventh_site_signal_pump
).expected(str(cfg.exp_npz_afipskij_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на НПЗ Афипский",
"Режим СОУ",
soft_failures,
).actual(
seventh_site_signal_sou
).expected(str(cfg.exp_npz_afipskij_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Афипский",
"Количество самотеков",
soft_failures,
).actual(seventh_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на НПЗ Ильинский",
"Режим МТ",
soft_failures,
).actual(
eight_site_signal_pump
).expected(str(cfg.exp_npz_ilinskij_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на НПЗ Ильинский",
"Режим СОУ",
soft_failures,
).actual(
eight_site_signal_sou
).expected(str(cfg.exp_npz_ilinskij_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Ильинский",
"Количество самотеков",
soft_failures,
).actual(eight_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
async def lds_status_init_in_journal(ws_client, cfg: SmokeSuiteConfig | LDSStatusConfig, imitator_start_time):
"""
Проверка наличия записи в журнале о входе СОУ в режим Инициализация.
"""
with allure.step("Запрос сообщений журнала с фильтром messageTypes=LDS_STATUS"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LDS_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
lds_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_ACCUM_DATA}': {'True' if lds_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START}'"
):
if lds_msg is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with allure.step("Проверка актуальности сообщения"):
msg_time_msk = t_utils.ensure_moscow_timezone(lds_msg.time)
start_time_msk = t_utils.localize_as_moscow(imitator_start_time)
StepCheck(
f"Проверка: время сообщения позднее времени старта имитатора {msg_time_msk} > {start_time_msk}",
"time",
).actual(msg_time_msk > start_time_msk).expected(True).equal_to()
with SoftAssertions() as soft_failures:
StepCheck("Проверка event", "event", soft_failures).actual(lds_msg.event).expected(
TestConst.JOURNAL_EVENT_LDS_INIT_COLD_START
).equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(lds_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
lds_msg.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
lds_msg.technologicalObject
).is_not_none()
StepCheck("Проверка priority не пустой", "priority", soft_failures).actual(lds_msg.priority).is_not_none()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(lds_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_LDS_STATUS
).equal_to()
async def main_page_info(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка установки режима МТ.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
tu_info = getattr(parsed_payload.replyContent, 'tuInfo', None)
StepCheck("Проверка наличия данных по ТУ", "tuInfo").actual(tu_info).is_not_none()
main_pipeline_stationary_status = (
StationaryStatus(tu_info.stationaryStatus) if tu_info.stationaryStatus else None
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck(
f"Проверка установки стационара для ТУ {cfg.tu_name}",
"stationaryStatus",
soft_failures,
).actual(
main_pipeline_stationary_status
).expected(cfg.expected_stationary_status).equal_to()
async def main_page_info_signals(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка счетчиков состояния сигналов
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageSignalsInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageSignalsInfoContent",
"subscribeMainPageSignalsInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_signals_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
signals_info = getattr(parsed_payload.replyContent, 'signalsInfo', None)
StepCheck("Проверка данных сигналов ТУ", "signalsInfo").actual(signals_info).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
field_name = "numberOfRejectedSignals"
# Проверяет что количество отбракованных сигналов больше или равно ОР
StepCheck(
f"Проверка количества отбракованных сигналов ТУ {cfg.tu_name}",
field_name,
soft_failures,
).actual(
signals_info.numberOfRejectedSignals
).is_greater_than_or_equal_to(cfg.expected_main_page_signals[field_name])
field_name = "numberOfMaskedSignals"
StepCheck(
f"Проверка количества маскированных сигналов ТУ {cfg.tu_name}",
field_name,
soft_failures,
).actual(
signals_info.numberOfMaskedSignals
).expected(cfg.expected_main_page_signals[field_name]).equal_to()
field_name = "numberOfImitatedSignals"
StepCheck(
f"Проверка количества имитированных сигналов ТУ {cfg.tu_name}",
field_name,
soft_failures,
).actual(
signals_info.numberOfImitatedSignals
).expected(cfg.expected_main_page_signals[field_name]).equal_to()
async def main_page_info_unstationary(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка установки режима Нестационар (для наборов с несколькими утечками).
Запускается после первой утечки, когда режим переходит в Нестационар.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
tu_info = getattr(parsed_payload.replyContent, 'tuInfo', None)
StepCheck("Проверка наличия данных по ТУ", "tuInfo").actual(tu_info).is_not_none()
main_pipeline_stationary_status = (
StationaryStatus(tu_info.stationaryStatus) if tu_info.stationaryStatus else None
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck(
f"Проверка установки режима Нестационар для ТУ {cfg.tu_name}",
"stationaryStatus",
soft_failures,
).actual(main_pipeline_stationary_status).expected(StationaryStatus.UNSTATIONARY).equal_to()
async def leak_is_confirm_on_main_page(ws_client, cfg: SmokeSuiteConfig):
"""
MainPageInfoContent - проверка подтвержденной утечки на ЭФ Состояние МТ
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent."):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
leaks_info = getattr(getattr(parsed_payload.replyContent, 'tuInfo', None), 'leaksInfo', [])
StepCheck("Проверка наличия списка сообщений об утечках", "leakStatus").actual(leaks_info).is_not_empty()
confirm_leak = t_utils.find_object_by_field(leaks_info, "leakStatus", LeakStatus.CONFIRMED.value)
lds_status = LeakStatus(confirm_leak.leakStatus) if confirm_leak.leakStatus else None
StepCheck("Проверка подтвержденной утечки на ЭФ Состояние МТ", "leakStatus").actual(lds_status).expected(
LeakStatus.CONFIRMED
).equal_to()
async def leak_is_complete_on_main_page(ws_client, cfg: SmokeSuiteConfig):
"""
MainPageInfoContent - отсутствует подтвержденная утечка на ЭФ Состояние МТ
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent."):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
main_page_leak_info = parsed_payload.replyContent.tuInfo.leaksInfo
confirmed_and_closed_leaks = t_utils.find_confirmed_leaks_on_main_page(main_page_leak_info)
StepCheck("Проверка подтвержденной утечки на ЭФ Состояние МТ", "leakStatus").actual(
confirmed_and_closed_leaks
).is_empty()
async def imitate_sensor_signal(ws_client, cfg: SmokeSuiteConfig, test_data: CaseData):
"""
Проверка имитации сигнала датчика.
"""
# Распаковка данных для теста
sensor_address = test_data.params.get("sensor_address")
sensor_val, sensor_quality = test_data.expected_result
# Получение актуального id датчика
sensor_id = TestConst.SENSOR_IDS_BY_ADDRESS.get(sensor_address)
with allure.step(f"Отправка сообщения и обработка ответа об имитации сигнала датчика с id: {sensor_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"ImitateSignalRequest",
{
'id': sensor_id,
'tuId': cfg.tu_id,
'imitateInfo': {
'value': str(sensor_val),
'quality': sensor_quality,
'additionalProperties': None,
},
'additionalProperties': None,
},
)
parsed_payload = parser.parse_imitate_signal_msg(payload)
sensor_imitate_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос об имитации", "replyStatus").actual(
sensor_imitate_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчика из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [sensor_id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки имитации"):
sensor_data = getattr(parsed_payload.replyContent, 'inputSignals', [])
sensor_imitate_data = t_utils.find_object_by_field(sensor_data, "id", sensor_id)
StepCheck("Проверка наличия данных для проверки имитации", "inputSignals").actual(
sensor_imitate_data
).is_not_none()
with allure.step(f"Отправка сообщения и обработка ответа о снятии имитации датчика с id: {sensor_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"UnimitateSignalRequest",
{'id': sensor_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_unimitate_signal_msg(payload)
sensor_unimitate_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о снятии имитации", "replyStatus").actual(
sensor_unimitate_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчика из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [sensor_id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки снятия имитации"):
sensor_data = getattr(parsed_payload.replyContent, 'inputSignals', [])
sensor_unimitate_data = t_utils.find_object_by_field(sensor_data, "id", sensor_id)
StepCheck("Проверка наличия данных для проверки снятия имитации", "inputSignals").actual(
sensor_unimitate_data
).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(f"Проверка имитации датчика с id: {sensor_id}", "isImitated", soft_failures).actual(
sensor_imitate_data.isImitated
).expected(True).equal_to()
StepCheck(f"Проверка показаний датчика с id: {sensor_id}", "value", soft_failures).actual(
sensor_imitate_data.imitation.value
).expected(sensor_val).equal_to()
StepCheck(f"Проверка качества сигнала датчика с id: {sensor_id}", "quality", soft_failures).actual(
sensor_imitate_data.quality
).expected(sensor_quality).equal_to()
StepCheck(f"Проверка снятия имитации датчика с id: {sensor_id}", "isImitated", soft_failures).actual(
sensor_unimitate_data.isImitated
).expected(False).equal_to()
async def mask_signal_test(ws_client, cfg: SmokeSuiteConfig, test_data: CaseData):
"""
Проверка маскирования датчиков.
"""
if not test_data:
pytest.fail("Данные датчиков отсутствуют")
pressure_sensor_address = test_data.params.get("pressure_sensor_address")
flowmeter_address = test_data.params.get("flowmeter_address")
# Получение актуальных id датчиков
pressure_sensor_id = TestConst.SENSOR_IDS_BY_ADDRESS.get(pressure_sensor_address)
flowmeter_id = TestConst.SENSOR_IDS_BY_ADDRESS.get(flowmeter_address)
with allure.step("Маскирование датчиков"):
with allure.step(
f"Отправка сообщения и обработка ответа о маскировании датчика давления с id: {pressure_sensor_id}"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"MaskSignalRequest",
{'id': pressure_sensor_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_mask_signal_msg(payload)
pressure_sensor_mask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus").actual(
pressure_sensor_mask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(f"Отправка сообщения и обработка ответа о маскировании расходомера с id: {flowmeter_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"MaskSignalRequest",
{'id': flowmeter_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_mask_signal_msg(payload)
flowmeter_mask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о маскировании", "replyStatus").actual(
flowmeter_mask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчиков из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [pressure_sensor_id, flowmeter_id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки маскирования"):
sensor_data = getattr(parsed_payload.replyContent, 'inputSignals', [])
pressure_sensor_mask_data = t_utils.find_object_by_field(sensor_data, "id", pressure_sensor_id)
flowmeter_mask_data = t_utils.find_object_by_field(sensor_data, "id", flowmeter_id)
with SoftAssertions() as soft_failures:
StepCheck("Проверка наличия данных о маскировании датчика давления", "inputSignals", soft_failures).actual(
pressure_sensor_mask_data
).is_not_none()
StepCheck("Проверка наличия данных о маскировании расходомера", "inputSignals", soft_failures).actual(
flowmeter_mask_data
).is_not_none()
with allure.step("Снятие маскирования датчиков"):
with allure.step(
f"Отправка сообщения и обработка ответа о снятии маскирования датчика давления с id: {pressure_sensor_id}"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"UnmaskSignalRequest",
{'id': pressure_sensor_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_unmask_signal_msg(payload)
pressure_sensor_unmask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о снятии маскирования", "replyStatus").actual(
pressure_sensor_unmask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
f"Отправка сообщения и обработка ответа о снятии маскирования расходомера с id: {flowmeter_id}"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"UnmaskSignalRequest",
{'id': flowmeter_id, 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_unmask_signal_msg(payload)
flowmeter_unmask_reply_status = parsed_payload.replyStatus
StepCheck("Проверка кода ответа на запрос о снятии маскирования", "replyStatus").actual(
flowmeter_unmask_reply_status
).expected(ReplyStatus.OK.value).equal_to()
with allure.step(
"Подключение по ws, получение и обработка данных о статусе датчиков из сообщения типа: InputSignalsContent"
):
time.sleep(cfg.basic_message_timeout)
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [pressure_sensor_id, flowmeter_id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_input_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки снятия маскирования"):
sensor_data = getattr(parsed_payload.replyContent, 'inputSignals', [])
pressure_sensor_unmask_data = t_utils.find_object_by_field(sensor_data, "id", pressure_sensor_id)
flowmeter_unmask_data = t_utils.find_object_by_field(sensor_data, "id", flowmeter_id)
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка наличия данных о снятии маскировании датчика давления", "inputSignals", soft_failures
).actual(pressure_sensor_unmask_data).is_not_none()
StepCheck(
"Проверка наличия данных о снятии маскировании расходомера", "inputSignals", soft_failures
).actual(flowmeter_unmask_data).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка маскирования датчика давления с id: {pressure_sensor_id}", "isMasked", soft_failures
).actual(pressure_sensor_mask_data.isMasked).expected(True).equal_to()
StepCheck(f"Проверка маскирования расходомера с id: {flowmeter_id}", "isMasked", soft_failures).actual(
flowmeter_mask_data.isMasked
).expected(True).equal_to()
StepCheck(
f"Проверка снятия маскирования датчика давления с id: {pressure_sensor_id}", "isMasked", soft_failures
).actual(pressure_sensor_unmask_data.isMasked).expected(False).equal_to()
StepCheck(f"Проверка снятия маскирования расходомера с id: {flowmeter_id}", "isMasked", soft_failures).actual(
flowmeter_unmask_data.isMasked
).expected(False).equal_to()
async def mask_info_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка записей журнала о маскировании и размаскировании.
"""
with allure.step("Запрос сообщений журнала с фильтром userActions"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_MASK_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(userActions=int(UserActions.SIGNAL_MASK_SIM), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по событиям маскирования и временному диапазону"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
mask_unmask_msgs = [
msg
for msg in messages_info
if msg.event in TestConst.JOURNAL_MASK_EXPECTED_EVENTS
and msg.signalName in TestConst.JOURNAL_MASK_EXPECTED_SIGNALS
]
journal_messages = [
msg
for msg in mask_unmask_msgs
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по event и signalName осталось сообщений: {len(mask_unmask_msgs)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}) "
f"осталось сообщений: {len(journal_messages)}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Группировка отфильтрованных сообщений"):
pressure_msgs = [msg for msg in journal_messages if msg.signalName == TestConst.JOURNAL_SIGNAL_PRESSURE]
flow_msgs = [msg for msg in journal_messages if msg.signalName == TestConst.JOURNAL_SIGNAL_FLOW]
mask_event_msgs = [msg for msg in journal_messages if msg.event == TestConst.JOURNAL_EVENT_MASK]
unmask_event_msgs = [msg for msg in journal_messages if msg.event == TestConst.JOURNAL_EVENT_UNMASK]
mask_signal_names = {msg.signalName for msg in mask_event_msgs}
unmask_signal_names = {msg.signalName for msg in unmask_event_msgs}
with SoftAssertions() as journal_soft_failures:
StepCheck(
"Проверка соответствия количества сообщений о действиях пользователя (снятие и установка "
"маскирования для датчиков давления и расходомеров)",
"total_count",
journal_soft_failures,
).actual(len(journal_messages)).expected(TestConst.JOURNAL_EXPECTED_MASK_MSG_TOTAL).equal_to()
StepCheck(
f"Проверка соответствия количества сообщений "
f"о действиях пользователя для датчиков давления - '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
"count",
journal_soft_failures,
).actual(len(pressure_msgs)).expected(TestConst.JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL).equal_to()
StepCheck(
f"Проверка соответствия количества сообщений "
f"о действиях пользователя для расходомеров - '{TestConst.JOURNAL_SIGNAL_FLOW}'",
"count",
journal_soft_failures,
).actual(len(flow_msgs)).expected(TestConst.JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_MASK}' содержит '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_PRESSURE in mask_signal_names).expected(True).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_MASK}' содержит '{TestConst.JOURNAL_SIGNAL_FLOW}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_FLOW in mask_signal_names).expected(True).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_UNMASK}' содержит '{TestConst.JOURNAL_SIGNAL_PRESSURE}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_PRESSURE in unmask_signal_names).expected(True).equal_to()
StepCheck(
f"Проверка: событие '{TestConst.JOURNAL_EVENT_UNMASK}' содержит '{TestConst.JOURNAL_SIGNAL_FLOW}'",
"signalName",
journal_soft_failures,
).actual(TestConst.JOURNAL_SIGNAL_FLOW in unmask_signal_names).expected(True).equal_to()
for signal_name in [TestConst.JOURNAL_SIGNAL_PRESSURE, TestConst.JOURNAL_SIGNAL_FLOW]:
mask_msg_for_signal = next((msg for msg in mask_event_msgs if msg.signalName == signal_name), None)
unmask_msg_for_signal = next((msg for msg in unmask_event_msgs if msg.signalName == signal_name), None)
if mask_msg_for_signal and unmask_msg_for_signal:
StepCheck(
f"Проверка совпадения tag для '{signal_name}' между маскированием и снятием",
"tag",
journal_soft_failures,
).actual(mask_msg_for_signal.tag).expected(unmask_msg_for_signal.tag).equal_to()
for msg in journal_messages:
msg_label = f"{msg.event} - {msg.signalName}"
StepCheck(
f"Проверка user не пустой [{msg_label}]",
"user",
journal_soft_failures,
).actual(msg.user).is_not_none()
StepCheck(
f"Проверка mainPipeline [{msg_label}]",
"mainPipeline",
journal_soft_failures,
).actual(
msg.mainPipeline
).expected(cfg.main_pipeline).equal_to()
StepCheck(
f"Проверка object не пустой [{msg_label}]",
"object",
journal_soft_failures,
).actual(msg.object).is_not_none()
StepCheck(
f"Проверка technologicalObject не пустой [{msg_label}]",
"technologicalObject",
journal_soft_failures,
).actual(msg.technologicalObject).is_not_none()
StepCheck(
f"Проверка technologicalSection [{msg_label}]",
"technologicalSection",
journal_soft_failures,
).actual(msg.technologicalSection).expected(cfg.tu_name).equal_to()
StepCheck(
f"Проверка priority не пустой [{msg_label}]",
"priority",
journal_soft_failures,
).actual(msg.priority).is_not_none()
StepCheck(
f"Проверка messageType [{msg_label}]",
"messageType",
journal_soft_failures,
).actual(
msg.messageType
).expected(TestConst.JOURNAL_MESSAGE_TYPE_USER_ACTIONS).equal_to()
StepCheck(
f"Проверка status [{msg_label}]",
"status",
journal_soft_failures,
).actual(
msg.status
).expected(TestConst.JOURNAL_STATUS_SUCCESS).equal_to()
async def mask_du_on_mini_scheme(ws_client, cfg: SmokeSuiteConfig):
"""
Маскирование ДУ на мини-схеме
Проверка маскированного участка в выходных сигналах
"""
linear_part_id = cfg.linear_part_identifier_for_mask
mask_reason = cfg.mask_reason
with allure.step(
"Подключение по ws, отправка сообщения типа: MaskLdsRequest. Совершается действие - маскирование ДУ"
):
payload = (
await t_utils.connect_and_get_msg(
ws_client,
"MaskLdsRequest",
{
'tuId': cfg.tu_id,
'maskInfo': [
{
'linearPartId': linear_part_id,
'reason': mask_reason,
'additionalProperties': None,
}
],
'additionalProperties': None,
},
),
)
time.sleep(cfg.basic_message_timeout)
parsed_payload = parser.parse_unmask_lds_message(payload)
mask_du_reply_status = parsed_payload.replyStatus
with allure.step("Подключение по ws, получение и обработка данных сообщений для теста"):
with allure.step(f"Получение словаря для линейного участка с id: {linear_part_id}.\n" f"ЭФ Выходные сигналы."):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
with allure.step(
"Извлечение и подготовка данных типов выходных сигналов из обработанных данных ЭФ Выходные сигналы"
):
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
leak_signals_list = leak_linear_part.signals
mask_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
)
StepCheck(
"Проверка получения типов выходных сигналов в ЭФ Выходные сигналы", "linearPartSignals"
).actual(mask_signal_type).is_not_none()
with allure.step(
"Получение сообщения типа: OutputSignalsInfo. "
f"С данными выходных сигналов для линейного участка с id: {linear_part_id}\n"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки маскирования ДУ ЭФ Выходные сигналы"):
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
StepCheck("Проверка наличия данных о маскировании в ЭФ Выходные сигналы", "linearPartSignals").actual(
leak_linear_part
).is_not_none()
leak_signals_list = leak_linear_part.signals
mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent. ЭФ Схема"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки маскирования ДУ ЭФ Схема"):
linear_parts = parsed_payload.replyContent.linearParts
mask_linear_part = next((lp for lp in linear_parts if lp.id == linear_part_id), None)
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfo. ЭФ Журнал"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=10, direction=Direction.FIRST.value),
filtering=Filtering(
messageTypes=int(MessageType.MASKING_LDS), objects=FilteringObjects(tuId=cfg.tu_id)
),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки маскирования ДУ ЭФ Журнал"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', None)
if cfg.technological_section:
journal_mask_message = t_utils.find_object_by_field(
messages_info, "technologicalSection", cfg.technological_section
)
else:
journal_mask_message = parsed_payload.replyContent.messagesInfo[0]
StepCheck("Проверка наличия данных о маскировании в журнале", "technologicalSection").actual(
journal_mask_message
).is_not_none()
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent. "
"Получен результат маскирования ДУ на ЭФ Состояние МТ"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки маскирования ДУ ЭФ Состояние МТ"):
lds_status = getattr(getattr(parsed_payload.replyContent, "tuInfo", None), 'ldsStatus', None)
number_of_masked_lps = getattr(lds_status, 'numberOfMaskedLps', None)
mask_du_list = getattr(lds_status, "maskedLps", None)
if mask_du_list:
masked_lps_name = next(iter(mask_du_list), None)
else:
masked_lps_name = None
# Проверки сообщений
with SoftAssertions() as soft_failures:
StepCheck("Проверка кода ответа на запрос о маскировании ДУ", "replyStatus", soft_failures).actual(
mask_du_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck(
"Проверка сигнала маскирования ДУ в выходных сигналах", TestConst.ADDRESS_SUFFIX_MASK, soft_failures
).actual(mask_leak_value).expected(TestConst.OUTPUT_IS_MASK).equal_to()
StepCheck("Проверка статуса маскирования ДУ на схеме", "isMasked", soft_failures).actual(
mask_linear_part.isMasked
).expected(True).equal_to()
StepCheck("Проверка причины маски на схеме", "maskReason").actual(mask_linear_part.maskReason).expected(
cfg.mask_reason
).equal_to()
StepCheck("Проверка имени ТУ в журнале", "mainPipeline", soft_failures).actual(
journal_mask_message.mainPipeline
).expected(cfg.main_pipe_line).equal_to()
StepCheck("Проверка имени ДУ в журнале", "technologicalObject", soft_failures).actual(
journal_mask_message.technologicalObject
).expected(cfg.mask_du_name).equal_to()
StepCheck("Проверка события в журнале", "event", soft_failures).actual(journal_mask_message.event).expected(
cfg.mask_du_event
).equal_to()
StepCheck("Проверка количества маскированных ДУ. ЭФ Состояние МТ", "numberOfMaskedLps", soft_failures).actual(
number_of_masked_lps
).expected(cfg.mask_one_du).equal_to()
StepCheck("Проверка имени маскированного ДУ. ЭФ Состояние МТ", "maskedLps", soft_failures).actual(
masked_lps_name
).expected(cfg.mask_du_name).equal_to()
async def unmask_du_on_mini_scheme(ws_client, cfg: SmokeSuiteConfig):
"""
Размаскирование ДУ на мини-схеме
Проверка маскированного участка в выходных сигналах
"""
linear_part_id = cfg.linear_part_identifier_for_mask
unmask_reason = cfg.unmask_reason
with allure.step(
"Подключение по ws, отправка сообщения типа: UnmaskLdsRequest. Совершается действие - размаскирование ДУ"
):
payload = (
await t_utils.connect_and_get_msg(
ws_client,
"UnmaskLdsRequest",
{
'tuId': cfg.tu_id,
'maskInfo': [
{
'linearPartId': linear_part_id,
'reason': unmask_reason,
'additionalProperties': None,
}
],
'additionalProperties': None,
},
),
)
time.sleep(cfg.basic_message_timeout)
parsed_payload = parser.parse_unmask_lds_message(payload)
unmask_du_reply_status = parsed_payload.replyStatus
with allure.step("Подключение по ws, получение и обработка данных сообщений для теста"):
with allure.step(f"Получение словаря для линейного участка с id: {linear_part_id}\n" f"ЭФ Выходные сигналы"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
with allure.step("Извлечение и подготовка данных типов выходных сигналов ЭФ Выходные сигналы"):
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
leak_signals_list = leak_linear_part.signals
mask_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
)
StepCheck("Проверка наличия данных типов выходных сигналов ЭФ Выходные сигналы", "signalType").actual(
mask_signal_type
).is_not_none()
with allure.step(
"Получение сообщения типа: OutputSignalsInfo. "
f"С данными выходных сигналов для линейного участка с id: {linear_part_id}\n"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки снятия маскирования ДУ ЭФ Выходные сигналы"):
leak_linear_part = t_utils.find_object_by_field(
parsed_payload.replyContent.linearPartSignals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
StepCheck(
"Проверка наличия данных о снятии маскирования в ЭФ Выходные сигналы", "linearPartSignals"
).actual(leak_linear_part).is_not_none()
leak_signals_list = leak_linear_part.signals
mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MessagesInfo. "
"Получен результат маскирования ДУ на ЭФ Журнал"
):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=10, direction=Direction.FIRST.value),
filtering=Filtering(
messageTypes=int(MessageType.MASKING_LDS), objects=FilteringObjects(tuId=cfg.tu_id)
),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки снятия маскирования ДУ ЭФ Журнал"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', None)
if cfg.technological_section:
journal_unmask_message = t_utils.find_object_by_field(
messages_info, "technologicalSection", cfg.technological_section
)
else:
journal_unmask_message = parsed_payload.replyContent.messagesInfo[0]
StepCheck("Проверка наличия данных о снятии маскирования в журнале", "technologicalSection").actual(
journal_unmask_message
).is_not_none()
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MainPageInfoContent. "
"Получен результат маскирования ДУ на ЭФ Состояние МТ"
):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"MainPageInfoContent",
"subscribeMainPageInfoRequest",
{'tuIds': [cfg.tu_id], 'additionalProperties': None},
)
parsed_payload = parser.parse_main_page_msg(payload)
lds_status = getattr(getattr(parsed_payload.replyContent, "tuInfo", None), 'ldsStatus', None)
number_of_masked_lps = getattr(lds_status, 'numberOfMaskedLps', None)
masked_lps_name = getattr(lds_status, "maskedLps", None)
# Проверки сообщений
with SoftAssertions() as soft_failures:
StepCheck("Проверка кода ответа на запрос о размаскировании", "replyStatus", soft_failures).actual(
unmask_du_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck(
"Проверяем, что тег маскирования ДУ в выходных сигналах равен null",
TestConst.ADDRESS_SUFFIX_MASK,
soft_failures,
).actual(mask_leak_value).expected(TestConst.OUTPUT_IS_NOT_MASK).equal_to()
StepCheck("Проверяем имя ТУ в сообщении в журнале", "mainPipeline", soft_failures).actual(
journal_unmask_message.mainPipeline
).expected(cfg.main_pipe_line).equal_to()
StepCheck("Проверяем имя ДУ в сообщении в журнале", "technologicalObject", soft_failures).actual(
journal_unmask_message.technologicalObject
).expected(cfg.mask_du_name).equal_to()
StepCheck("Проверка события в сообщении в журнале", "event", soft_failures).actual(
journal_unmask_message.event
).expected(cfg.unmask_du_event).equal_to()
StepCheck("Проверка количества маскированных ДУ", "numberOfMaskedLps", soft_failures).actual(
number_of_masked_lps
).expected(cfg.not_mask_du).equal_to()
StepCheck("Проверка счетчика маски. ЭФ Состояние МТ", "Количество замаскированных ДУ", soft_failures).actual(
number_of_masked_lps
).expected(cfg.not_mask_du).equal_to()
StepCheck(
"Проверка отсутствия списка маскированных ДУ. ЭФ Состояние МТ",
"Отсутствуют замаскированные ДУ",
soft_failures,
).actual(masked_lps_name).is_none()
async def lds_status_initialization_out(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка выхода СОУ из Инициализации.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
flow_areas = getattr(parsed_payload.replyContent, 'flowAreas', None)
longest_flow_area = t_utils.get_longest_flow_area(flow_areas)
diagnostic_areas = getattr(longest_flow_area, 'diagnosticAreas', [])
StepCheck("Проверка наличия данных диагностических участков", "diagnosticAreas").actual(
diagnostic_areas
).is_not_empty()
allure.attach(
f"Самый протяженный участок карты течений: {longest_flow_area}",
name="flowArea. Выход из Инициализации",
attachment_type=allure.attachment_type.TEXT,
)
lds_status_set = {diagnostic_area.ldsStatus for diagnostic_area in diagnostic_areas}
lds_status_int = t_utils.determine_lds_status_by_priority(lds_status_set)
lds_status = LdsStatus(lds_status_int) if lds_status_int else None
StepCheck(
"Проверка: СОУ находится не в режиме 'Инициализация'",
"ldsStatus",
).actual(
lds_status
).expected(LdsStatus.INITIALIZATION).is_not_equal_to()
async def lds_status_init_out_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка наличия записи в журнале о выходе СОУ из режима Инициализация.
"""
with allure.step("Запрос сообщений журнала с фильтром messageTypes=LDS_STATUS"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LDS_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
lds_msg = next(
(msg for msg in time_filtered if msg.technologicalSection == cfg.tu_name),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}': {'True' if lds_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}'"):
if lds_msg is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with allure.step("Проверка актуальности сообщения"):
msg_time_msk = t_utils.ensure_moscow_timezone(lds_msg.time)
start_time_msk = t_utils.localize_as_moscow(imitator_start_time)
StepCheck(
f"Проверка: время сообщения позднее времени старта имитатора {msg_time_msk} > {start_time_msk}",
"time",
).actual(msg_time_msk > start_time_msk).expected(True).equal_to()
with SoftAssertions() as soft_failures:
StepCheck("Проверка: event не является Инициализацией", "event", soft_failures).actual(lds_msg.event).expected(
TestConst.JOURNAL_EVENT_LDS_INIT_ACCUM_DATA
).is_not_equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(lds_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
lds_msg.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
lds_msg.technologicalObject
).is_not_none()
StepCheck("Проверка priority не пустой", "priority", soft_failures).actual(lds_msg.priority).is_not_none()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(lds_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_LDS_STATUS
).equal_to()
async def leaks_content(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка утечки через сообщение LeaksContent.
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: LeaksContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"LeaksContent",
"SubscribeLeaksRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_leaks_content_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
leaks_list_info = getattr(parsed_payload.replyContent, 'leaksListInfo', [])
# Ищет подтвержденные утечки
confirmed_leaks_list = t_utils.find_confirmed_leaks(leaks_list_info)
first_leak_info = t_utils.find_leak_by_coordinate(confirmed_leaks_list, leak.coordinate_meters)
StepCheck("Проверка наличия собщения", "Leak").actual(first_leak_info).is_not_none()
# Конвертируем время обнаружения в московское время
leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.detectedAt)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.allowed_time_diff_seconds,
detected_at_tz=leak_detected_at.tzinfo,
)
leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.leakVolume)
leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)
leak_algorithm_type = ReservedType(first_leak_info.type) if first_leak_info.type else None
leak_confirmation_status = (
ConfirmationStatus(first_leak_info.confirmationStatus) if first_leak_info.confirmationStatus else None
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
first_leak_info.diagnosticAreaName
).is_not_none()
StepCheck("Проверка статуса утечки", "confirmationStatus", soft_failures).actual(
leak_confirmation_status
).expected(leak.expected_leak_status).equal_to()
StepCheck("Проверка источника события (алгоритм)", "type", soft_failures).actual(leak_algorithm_type).expected(
leak.expected_algorithm_type
).equal_to()
StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_detected_at
).is_between(leak_wait_start_time, leak_wait_end_time)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
async def possible_leak_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка наличия сообщения 'Возможна утечка' в журнале.
"""
with allure.step("Подключение по ws, получение и обработка сообщений журнала типа: MessagesInfoContent"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
possible_leak_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_POSSIBLE_LEAK
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}': {'True' if possible_leak_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}'"
):
if possible_leak_msg is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_POSSIBLE_LEAK}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса утечки в журнале", "event", soft_failures).actual(possible_leak_msg.event).expected(
TestConst.JOURNAL_EVENT_POSSIBLE_LEAK
).equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(
possible_leak_msg.mainPipeline
).expected(cfg.main_pipeline).equal_to()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(possible_leak_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_LEAKS
).equal_to()
StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
possible_leak_msg.technologicalSection
).is_not_none()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
possible_leak_msg.technologicalObject
).is_not_none()
async def leak_info_in_journal(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
end_time = datetime.now()
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
leak_message = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and TestConst.JOURNAL_EVENT_DETECTED_LEAK in msg.event
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Первичная проверка после фильтрации"):
StepCheck(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event содержит подстроку подтвержденной утечки'{TestConst.JOURNAL_EVENT_DETECTED_LEAK}'",
"event",
).actual(leak_message).is_not_none()
leak_coordinate_km, leak_volume_m3 = t_utils.parse_journal_msg_value(leak_message.value)
leak_coordinate_round = round(leak_coordinate_km * TestConst.KM_TO_METERS, TestConst.PRECISION)
leak_message_time = t_utils.ensure_moscow_timezone(leak_message.time)
with SoftAssertions() as soft_failures:
StepCheck("Проверка полученного события event", "event", soft_failures).contains(
leak_message.event, TestConst.JOURNAL_EVENT_DETECTED_LEAK
)
StepCheck("Проверка полученного ТУ", "technologicalSection", soft_failures).actual(
leak_message.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка типа полученного сообщения", "messageType", soft_failures).actual(
leak_message.messageType
).expected(TestConst.JOURNAL_MESSAGE_TYPE_LEAKS).equal_to()
StepCheck("Проверка имени ДУ", "technologicalObject", soft_failures).actual(
leak_message.technologicalObject
).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_message_time
).is_between(filter_start_msk, filter_end_msk)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
async def completed_leak_info_in_journal(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка наличия сообщения 'Утечка завершена' в журнале.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(messageTypes=int(MessageType.LEAKS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
end_time = datetime.now()
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
completed_leak_message = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_COMPLETED_LEAKS
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Первичная проверка после фильтрации"):
StepCheck(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_COMPLETED_LEAKS}'",
"event",
).actual(completed_leak_message).is_not_none()
leak_coordinate_km, leak_volume_m3 = t_utils.parse_journal_msg_value(completed_leak_message.value)
leak_coordinate_round = round(leak_coordinate_km * TestConst.KM_TO_METERS, TestConst.PRECISION)
leak_message_time = t_utils.ensure_moscow_timezone(completed_leak_message.time)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса утечки в журнале", "event", soft_failures).actual(
completed_leak_message.event
).expected(TestConst.JOURNAL_EVENT_COMPLETED_LEAKS).equal_to()
StepCheck("Проверка полученного ТУ", "technologicalSection", soft_failures).actual(
completed_leak_message.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени завершения утечки", "leakDetectedAt", soft_failures).actual(
leak_message_time
).is_between(filter_start_msk, filter_end_msk)
async def all_leaks_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка сообщения AllLeaksInfo об утечке.
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent"):
parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
cfg.tu_id,
ws_client,
"AllLeaksInfoContent",
"subscribeAllLeaksInfoRequest",
[],
)
with allure.step("Извлечение и подготовка данных для проверки"):
leaks_info = getattr(parsed_payload.replyContent, 'leaksInfo', [])
first_leak_info = t_utils.find_leak_by_coordinate(leaks_info, leak.coordinate_meters)
StepCheck("Проверка наличия сообщения об утечке типа AllLeaksInfoContent", "leaksInfo").actual(
first_leak_info
).is_not_none()
# Конвертируем время обнаружения в московское время
leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.leakDetectedAt)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.allowed_time_diff_seconds,
detected_at_tz=leak_detected_at.tzinfo,
)
leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.volume)
leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)
leak_lds_status = LdsStatus(first_leak_info.ldsStatus) if first_leak_info.ldsStatus else None
leak_stationary_status = (
StationaryStatus(first_leak_info.stationaryStatus) if first_leak_info.stationaryStatus else None
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
first_leak_info.diagnosticAreaName
).is_not_none()
StepCheck("Проверка статуса СОУ", "ldsStatus", soft_failures).actual(leak_lds_status).expected(
leak.expected_lds_status
).equal_to()
StepCheck("Проверка маскирования утечки", "isMasked", soft_failures).actual(first_leak_info.isMasked).expected(
False
).equal_to()
StepCheck("Проверка квитирования утечки", "isAcknowledged", soft_failures).actual(
first_leak_info.isAcknowledged
).expected(False).equal_to()
StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_detected_at
).is_between(leak_wait_start_time, leak_wait_end_time)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
StepCheck("Проверка режима ТУ", "stationaryStatus", soft_failures).actual(leak_stationary_status).expected(
leak.expected_stationary_status
).equal_to()
async def all_leaks_is_empty(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка отсутствия информации об утечке
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent"):
parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
cfg.tu_id,
ws_client,
"AllLeaksInfoContent",
"subscribeAllLeaksInfoRequest",
[],
)
with allure.step("Извлечение и подготовка данных для проверки"):
leaks_info = getattr(parsed_payload.replyContent, 'leaksInfo', [])
StepCheck("Проверка отсутствия информации об утечке в сообщении AllLeaksInfoContent", "leaksInfo").actual(
leaks_info
).is_empty()
async def tu_leaks_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка сообщения TuLeaksInfo об утечке.
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: TuLeaksInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"TuLeaksInfoContent",
"subscribeTuLeaksInfoRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_tu_leaks_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
tu_leaks_info_list = getattr(parsed_payload.replyContent, 'leaksInfo', [])
first_leak_info = t_utils.find_leak_by_coordinate(tu_leaks_info_list, leak.coordinate_meters)
StepCheck("Проверка наличия сообщения об утечке типа TuLeaksInfoContent", "leaksInfo").actual(
first_leak_info
).is_not_none()
# Конвертируем время обнаружения в московское время
leak_detected_at = t_utils.ensure_moscow_timezone(first_leak_info.leakDetectedAt)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.allowed_time_diff_seconds,
detected_at_tz=leak_detected_at.tzinfo,
)
leak_volume_m3 = t_utils.convert_leak_volume_m3(first_leak_info.volume)
leak_coordinate_round = round(first_leak_info.leakCoordinate, cfg.precision)
leak_lds_status = LdsStatus(first_leak_info.ldsStatus) if first_leak_info.ldsStatus else None
leak_stationary_status = (
StationaryStatus(first_leak_info.stationaryStatus) if first_leak_info.stationaryStatus else None
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
parsed_payload.replyContent.tuId
).expected(cfg.tu_id).equal_to()
StepCheck("Проверка наличия id участка утечки", "controlledSiteId", soft_failures).actual(
first_leak_info.controlledSiteId
).is_not_none()
StepCheck("Проверка статуса СОУ", "ldsStatus", soft_failures).actual(leak_lds_status).expected(
leak.expected_lds_status
).equal_to()
StepCheck("Проверка маскирования утечки", "isMasked", soft_failures).actual(first_leak_info.isMasked).expected(
False
).equal_to()
StepCheck("Проверка наличия pipeId в сообщении", "pipeId", soft_failures).actual(
first_leak_info.pipeId
).is_not_none()
StepCheck("Проверка наличия id утечки", "id", soft_failures).actual(first_leak_info.id).is_not_none()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка времени обнаружения утечки", "leakDetectedAt", soft_failures).actual(
leak_detected_at
).is_between(leak_wait_start_time, leak_wait_end_time)
StepCheck("Проверка объема утечки", "volume", soft_failures).actual(leak_volume_m3).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
StepCheck("Проверка режима ТУ", "stationaryStatus", soft_failures).actual(leak_stationary_status).expected(
leak.expected_stationary_status
).equal_to()
async def lds_status_during_leak(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка режима работы СОУ во время утечки.
"""
with allure.step("Подключение по ws, получение и обработка сообщения типа: CommonSchemeContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"CommonSchemeContent",
"SubscribeCommonSchemeRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_common_scheme_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
flow_areas = getattr(parsed_payload.replyContent, 'flowAreas', [])
status_config = leak.lds_status_during_leak_config
if status_config is None:
pytest.fail("Не задан leak.lds_status_during_leak_config для теста lds_status_during_leak")
leak_diagnostic_area_name = status_config.leak_diagnostic_area_name
leak_diagnostic_area_id, neighbor_ids = TestConst.DIAGNOSTIC_AREA_BASE_IDS.get(
leak_diagnostic_area_name, (None, None)
)
leak_diagnostic_area = t_utils.find_diagnostic_area_by_id(flow_areas, leak_diagnostic_area_id)
leak_lds_status_int = getattr(leak_diagnostic_area, 'ldsStatus', None)
leak_lds_status = LdsStatus(leak_lds_status_int) if leak_lds_status_int else None
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка режима работы СОУ на ДУ с утечкой, pipe_id ДУ: {leak_diagnostic_area_id}",
"ldsStatus",
soft_failures,
).actual(leak_lds_status).expected(status_config.leak_du_expected_lds_status).equal_to()
if neighbor_ids:
found_neighbor_count = 0
for neighbor_id in neighbor_ids:
neighbor_area = t_utils.find_diagnostic_area_by_id(flow_areas, neighbor_id)
neighbor_lds_status_int = getattr(neighbor_area, 'ldsStatus', None)
neighbor_lds_status = LdsStatus(neighbor_lds_status_int) if neighbor_lds_status_int else None
if neighbor_lds_status:
found_neighbor_count += 1
StepCheck(
f"Проверка режима работы СОУ на соседнем ДУ, id ДУ: {neighbor_id}",
"ldsStatus",
soft_failures,
).actual(neighbor_lds_status).expected(status_config.neighbors_du_expected_lds_status).equal_to()
if found_neighbor_count == 0:
pytest.fail(f"Не найдены соседние с утечкой ДУ по _id: {neighbor_ids}")
async def acknowledge_leak_info(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig = None):
"""
Проверка квитирования утечки.
Для multi-leak наборов: после квитирования проверяется что утечка удалена из списка.
Для single-leak наборов: проверяется что список утечек пуст.
"""
with allure.step("Получение id утечки"):
with allure.step("Подключение по ws, получение и обработка сообщения об утечке типа: TuLeaksInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"TuLeaksInfoContent",
"subscribeTuLeaksInfoRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_tu_leaks_info_msg(payload)
with allure.step("Извлечение и подготовка данных для получения id утечки"):
leaks_info = getattr(parsed_payload.replyContent, 'leaksInfo', [])
leak_to_ack = t_utils.find_leak_by_coordinate(leaks_info, leak.coordinate_meters)
StepCheck("Проверка наличия сообщения об утечке", "leaksInfo").actual(leak_to_ack).is_not_none()
acknowledged_leak_id = leak_to_ack.id
with allure.step(
"Подключение по ws, отправка сообщения и обработка ответа о квитировании утечки типа: AcknowledgeLeakRequest"
):
payload = await t_utils.connect_and_get_msg(
ws_client,
"AcknowledgeLeakRequest",
{'leakId': str(acknowledged_leak_id), 'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_acknowledge_leak_msg(payload)
acknowledge_reply_status = parsed_payload.replyStatus
with allure.step(
"Подключение по ws и получение сообщения об утечке типа: AllLeaksInfoContent для проверки квитирования"
):
with allure.step("Очистка очереди websocket сообщений"):
ws_client.clear_queue()
time.sleep(cfg.basic_message_timeout)
parsed_payload = await t_utils.connect_and_get_parsed_msg_by_tu_id(
cfg.tu_id,
ws_client,
"AllLeaksInfoContent",
"subscribeAllLeaksInfoRequest",
[],
)
with allure.step("Извлечение и подготовка данных для проверки"):
remaining_leaks = getattr(parsed_payload.replyContent, 'leaksInfo', None)
remaining_leak_ids = [leak.id for leak in remaining_leaks] if remaining_leaks else []
StepCheck("Проверка кода ответа на запрос о квитировании", "replyStatus").actual(acknowledge_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
# Проверяем что квитированная утечка исчезла из списка
StepCheck("Проверка отсутствия квитированной утечки в списке AllLeaksInfo", "id").does_not_contain(
remaining_leak_ids, acknowledged_leak_id
)
async def acknowledge_leak_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time):
"""
Проверка записи в журнале о квитировании утечки.
"""
with allure.step("Запрос сообщений журнала с фильтром userActions=LEAK_ACK"):
end_time = datetime.now()
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(userActions=int(UserActions.LEAK_ACK), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
messages_info = getattr(parsed_payload.replyContent, 'messagesInfo', [])
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step("Фильтрация сообщений по времени и technologicalSection"):
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
ack_message = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event == TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"После фильтрации по времени ({filter_start_msk} - {filter_end_msk}): {len(time_filtered)}\n"
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}': {'True' if ack_message else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}'"
):
if ack_message is None:
pytest.fail(
f"Сообщение с technologicalSection='{cfg.tu_name}' "
f"и event='{TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED}' "
f"не найдено среди {len(time_filtered)} отфильтрованных по времени сообщений"
)
with allure.step("Проверка актуальности сообщения"):
msg_time_msk = t_utils.ensure_moscow_timezone(ack_message.time)
start_time_msk = t_utils.localize_as_moscow(imitator_start_time)
StepCheck(
"Проверка: время сообщения позднее времени старта имитатора",
"time",
).actual(
msg_time_msk > start_time_msk
).expected(True).equal_to()
with SoftAssertions() as soft_failures:
StepCheck("Проверка event", "event", soft_failures).actual(ack_message.event).expected(
TestConst.JOURNAL_EVENT_LEAK_ACKNOWLEDGED
).equal_to()
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(ack_message.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка technologicalSection", "technologicalSection", soft_failures).actual(
ack_message.technologicalSection
).expected(cfg.tu_name).equal_to()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
ack_message.technologicalObject
).is_not_none()
async def output_signals(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка наличия данных об утечке в выходных сигналах.
"""
linear_part_id = leak.linear_part_id
with allure.step(f"Получение списка выходных сигналов для линейного участка с id: {linear_part_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
with allure.step("Извлечение и подготовка данных для списка выходных сигналов"):
linear_part_signals = getattr(parsed_payload.replyContent, 'linearPartSignals', [])
leak_linear_part = t_utils.find_object_by_field(
linear_part_signals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
StepCheck("Проверка наличия данных выходных сигналов", "linearPartSignals").actual(
leak_linear_part
).is_not_none()
with allure.step("Получение типов выходных сигналов из обработанных данных"):
leak_signals_list = leak_linear_part.signals
ack_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_ACK_LEAK
)
leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_LEAK
)
mask_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_MASK
)
point_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_POINT_LEAK
)
q_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_Q_LEAK
)
time_leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_TIME_LEAK
)
with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}"):
with allure.step("Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных Выходных сигналов для проверки"):
linear_part_signals = getattr(parsed_payload.replyContent, 'linearPartSignals', [])
leak_linear_part = t_utils.find_object_by_field(
linear_part_signals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
leak_signals_list = getattr(leak_linear_part, 'signals', [])
StepCheck(
f"Проверка наличия данных выходных сигналов линейного участка {linear_part_id}", "linearPartSignals"
).actual(leak_signals_list).is_not_empty()
ack_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, ack_leak_signal_type)
leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, leak_signal_type)
mask_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, mask_signal_type)
point_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, point_leak_signal_type)
q_leak_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, q_leak_signal_type)
time_leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, time_leak_signal_type).strip()
if not time_leak_value:
pytest.fail(f"В данных линейного участка: {leak_signals_list} \n Отсутствуют данные об утечке")
time_leak_value_datetime = t_utils.to_moscow_timezone(time_leak_value)
leak_wait_start_time, leak_wait_end_time = t_utils.get_leak_time_window(
imitator_start_time,
leak.leak_start_interval_seconds,
leak.output_allowed_time_diff_seconds,
detected_at_tz=time_leak_value_datetime.tzinfo,
)
q_leak_value_m3 = t_utils.convert_leak_volume_m3(float(q_leak_leak_value))
point_leak_value_round = round(float(point_leak_value), cfg.precision)
with SoftAssertions() as soft_failures:
StepCheck("Проверка сигнала квитирования утечки", TestConst.ADDRESS_SUFFIX_ACK_LEAK, soft_failures).actual(
ack_leak_value
).expected(TestConst.OUTPUT_IS_ACK_LEAK).equal_to()
StepCheck("Проверка сигнала наличия утечки", TestConst.ADDRESS_SUFFIX_LEAK, soft_failures).actual(
leak_value
).expected(TestConst.OUTPUT_IS_LEAK).equal_to()
StepCheck("Проверка сигнала маскирования утечки", TestConst.ADDRESS_SUFFIX_MASK, soft_failures).actual(
mask_leak_value
).expected(TestConst.OUTPUT_IS_NOT_MASK).equal_to()
StepCheck("Проверка сигнала координаты утечки", TestConst.ADDRESS_SUFFIX_POINT_LEAK, soft_failures).actual(
point_leak_value_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck("Проверка сигнала объема утечки", TestConst.ADDRESS_SUFFIX_Q_LEAK, soft_failures).actual(
q_leak_value_m3
).is_close_to(
leak.volume_m3,
leak.allowed_volume_m3,
f"значение допустимой погрешности по объему {leak.allowed_volume_m3}",
)
StepCheck("Проверка времени обнаружения утечки", TestConst.ADDRESS_SUFFIX_TIME_LEAK, soft_failures).actual(
time_leak_value_datetime
).is_between(leak_wait_start_time, leak_wait_end_time)
async def balance_algorithm_leak_waiting(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time):
"""
Проверка подозрения утечки через BalanceAlgorithmResults
Логика:
- Подписка на BalanceAlgorithmResults однократно
- Раз в BALANCE_ALGORITHM_POLL_INTERVAL секунд забираем из очереди свежее сообщение
- Собираем все diagnosticAreas (только из flowAreas с непустым списком)
- Проверяем, что на ДУ с утечкой хотя бы раз пришёл isLeakPossible=True
- Проверяем, что на всех остальных ДУ isLeakPossible всегда False
- Проверяем дебаланс на ДУ с будущей утечкой, дебаланс должен быть выше значения порога - 20%
"""
poll_interval = TestConst.BALANCE_ALGORITHM_POLL_INTERVAL
total_wait = TestConst.BALANCE_ALGORITHM_TOTAL_WAIT
end_time = imitator_start_time + timedelta(
seconds=leak.balance_algorithm_leak_waiting_test.offset * 60 + total_wait
)
with allure.step(
f"Подписка и сбор BalanceAlgorithmResults раз в {poll_interval} с, в течение {total_wait} с после начала утечки"
):
await t_utils.connect(
ws_client,
"SubscribeBalanceAlgorithmResultsRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
collected_diagnostic_areas = await t_utils.poll_balance_algorithm_diagnostic_areas(
ws_client,
parser,
imitator_start_time,
end_time,
poll_interval,
)
if collected_diagnostic_areas is not None:
allure.attach(
str(collected_diagnostic_areas),
name="Тестируемый фрагмент ответа с бэка",
attachment_type=allure.attachment_type.TEXT,
)
diagnostic_area_names_with_possible = [
diagnostic_area.name for diagnostic_area in collected_diagnostic_areas if diagnostic_area.isLeakPossible
]
diagnostic_area_possible_leak = next(
(diagnostic_area for diagnostic_area in collected_diagnostic_areas if diagnostic_area.isLeakDetected),
None,
)
is_leak_possible_seen = any(diagnostic_area.isLeakPossible for diagnostic_area in collected_diagnostic_areas)
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка: получен хотя бы один ДУ с подозрением на утечку",
"isLeakPossible",
soft_failures,
).actual(diagnostic_area_names_with_possible).is_not_empty()
StepCheck(
f"Проверка: на ДУ {str(diagnostic_area_names_with_possible)} бы раз за "
f"{TestConst.BALANCE_ALGORITHM_TOTAL_WAIT / TestConst.SEC_PER_MIN} минут приходил"
" статус 'подозрение на утечку': isLeakPossible=True",
"isLeakPossible",
soft_failures,
).actual(is_leak_possible_seen).expected(True).equal_to()
if leak.flow_rate_settings_threshold is not None and diagnostic_area_possible_leak is not None:
threshold = leak.flow_rate_settings_threshold
tolerance = TestConst.DEBALANCE_TOLERANCE
lower_bound = threshold * (1 - tolerance)
StepCheck(
f"Проверка значения дебаланса на ДУ name={diagnostic_area_possible_leak.name} с будущей утечкой"
f" в пределах {int(tolerance * 100)}% снизу от порогового значения по объему: {threshold}).",
"debalance",
soft_failures,
).actual(abs(diagnostic_area_possible_leak.debalance)).is_greater_than(lower_bound)
async def balance_algorithm_leak_detected(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка наличия утечки (isLeakDetected) через BalanceAlgorithmResults.
Логика:
- Подписка на BalanceAlgorithmResultsContent
- Получение первого подходящего сообщения типа BalanceAlgorithmResultsContent
- Проверяем, что на ДУ с утечкой isLeakDetected=True
- Проверяем, что на всех остальных ДУ isLeakDetected=False
- Проверяем, что дебаланс на ДУ с утечкой > FLOW_RATE_SETTINGS_THRESHOLD
"""
with allure.step("Подписка и получение BalanceAlgorithmResultsContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"BalanceAlgorithmResultsContent",
"SubscribeBalanceAlgorithmResultsRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_balance_algorithm_msg(payload)
reply_content = parsed_payload.replyContent
if not reply_content or not reply_content.flowAreas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults отсутствуют flowAreas, "
"невозможно проверить наличие утечки"
)
all_diagnostic_areas = []
flow_areas = getattr(reply_content, 'flowAreas', [])
for flow_area in flow_areas:
if flow_area.diagnosticAreas:
all_diagnostic_areas.extend(flow_area.diagnosticAreas)
if not all_diagnostic_areas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults во всех flowAreas отсутствуют diagnosticAreas, "
"невозможно проверить наличие утечки"
)
leak_diagnostic_area = next(
(diagnostic_area for diagnostic_area in all_diagnostic_areas if diagnostic_area.isLeakDetected),
None,
)
if leak_diagnostic_area is None:
pytest.fail("Ни одного ДУ с утечкой не найдено в ответе BalanceAlgorithmResultsContent")
leak_diagnostic_area_name = leak_diagnostic_area.name
with SoftAssertions() as soft_failures:
StepCheck(
f"Проверка: на ДУ name={leak_diagnostic_area_name} обнаружена утечка",
"isLeakDetected",
soft_failures,
).actual(leak_diagnostic_area.isLeakDetected).expected(True).equal_to()
foreign_with_detected = [
diagnostic_area
for diagnostic_area in all_diagnostic_areas
if diagnostic_area.name != leak_diagnostic_area_name and diagnostic_area.isLeakDetected
]
if not cfg.has_multiple_leaks:
StepCheck(
"Проверка: на остальных ДУ не обнаружена утечка, "
f" количество ДУ с неправильным статусом: {len(foreign_with_detected)}, "
f"их id: {[diagnostic_area.id for diagnostic_area in foreign_with_detected]})",
"isLeakDetected_without_leak",
soft_failures,
).actual(len(foreign_with_detected)).expected(0).equal_to()
if leak.flow_rate_settings_threshold is not None:
threshold = leak.flow_rate_settings_threshold
StepCheck(
f"Дебаланс на ДУ name={leak_diagnostic_area_name} по модулю больше порога для данного режима МТ:"
f" {threshold}",
"debalance",
soft_failures,
).actual(abs(leak_diagnostic_area.debalance)).is_greater_than(threshold)
async def balance_algorithm_leak_completed(ws_client, cfg: SmokeSuiteConfig):
"""
Проверка отсутствия утечки (isLeakDetected) через BalanceAlgorithmResults.
Логика:
- Подписка на BalanceAlgorithmResultsContent.
- Получение первого подходящего сообщения типа BalanceAlgorithmResultsContent.
- Проверяем, что на всех ДУ флаг isLeakDetected=False.
- Проверяем, что дебаланс на всех ДУ < FLOW_RATE_SETTINGS_THRESHOLD.
"""
with allure.step("Подписка и получение BalanceAlgorithmResultsContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"BalanceAlgorithmResultsContent",
"SubscribeBalanceAlgorithmResultsRequest",
{'tuId': cfg.tu_id, 'additionalProperties': None},
)
parsed_payload = parser.parse_balance_algorithm_msg(payload)
reply_content = parsed_payload.replyContent
if not reply_content or not reply_content.flowAreas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults отсутствуют flowAreas, "
"невозможно проверить наличие/отсутствие утечки"
)
all_diagnostic_areas = []
flow_areas = getattr(reply_content, 'flowAreas', [])
for flow_area in flow_areas:
if flow_area.diagnosticAreas:
all_diagnostic_areas.extend(flow_area.diagnosticAreas)
if not all_diagnostic_areas:
pytest.fail(
"В ответе с бэка в DTO BalanceAlgorithmResults во всех flowAreas отсутствуют diagnosticAreas, "
"невозможно проверить наличие/отсутствие утечки"
)
with SoftAssertions() as soft_failures:
for diagnostic_area in all_diagnostic_areas:
diagnostic_area_name = diagnostic_area.name
StepCheck(
f"Проверка: на ДУ {diagnostic_area_name} не должно быть утечки",
"isLeakDetected_without_leak",
soft_failures,
).actual(diagnostic_area.isLeakDetected).expected(False).equal_to()
async def the_leak_is_complete_on_kg(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""
Проверка факта завершения утечки на ЭФ КГ(табличное представление).
Логика:
LeaksContent - проверить, что утечка в статусе завершена
"""
with allure.step("Подключение по ws и получение сообщения об утечке типа: LeaksContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"LeaksContent",
"SubscribeLeaksRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_leaks_content_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
leaks_list_info = getattr(parsed_payload.replyContent, 'leaksListInfo', None)
complete_leak_info = t_utils.find_leak_by_coordinate(leaks_list_info, leak.coordinate_meters)
leak_coordinate_round = round(complete_leak_info.leakCoordinate, cfg.precision)
complete_leak = t_utils.find_object_by_field(
leaks_list_info, "confirmationStatus", ConfirmationStatus.CONFIRMED_AND_LEAK_CLOSED.value
)
leak_algorithm_type = ConfirmationStatus(complete_leak_info.type) if complete_leak_info.type else None
leak_confirmation_status = (
ConfirmationStatus(complete_leak.confirmationStatus) if complete_leak.confirmationStatus else None
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка статуса утечки в КГ - завершена", "confirmationStatus", soft_failures).actual(
leak_confirmation_status
).expected(leak.expected_complete_leak_status).equal_to()
StepCheck("Проверка наличия названия участка утечки", "diagnosticAreaName", soft_failures).actual(
complete_leak_info.diagnosticAreaName
).is_not_none()
StepCheck("Проверка источника события (алгоритм)", "type", soft_failures).actual(leak_algorithm_type).expected(
leak.expected_algorithm_type
).equal_to()
StepCheck("Проверка координаты утечки", "leakCoordinate", soft_failures).actual(
leak_coordinate_round
).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
async def leak_is_complete_in_output_signals(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig):
"""OutputSignalsInfo - нет утечки в выходных сигналах"""
linear_part_id = leak.linear_part_id
with allure.step(f"Получение списка выходных сигналов для линейного участка с id: {linear_part_id}"):
payload = await t_utils.connect_and_get_msg(
ws_client,
"GetOutputSignalsRequest",
{
'tuId': cfg.tu_id,
'filtering': None,
'search': None,
'sorting': None,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_msg(payload)
# Получение данных линейного участка утечки по id
with allure.step("Извлечение и подготовка данных для проверки"):
linear_part_signals = getattr(parsed_payload.replyContent, 'linearPartSignals', [])
leak_linear_part = t_utils.find_object_by_field(
linear_part_signals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
StepCheck("Проверка наличия данных выходных сигналов", "linearPartSignals").actual(
leak_linear_part
).is_not_none()
leak_signals_list = leak_linear_part.signals
leak_signal_type = t_utils.find_signal_type_by_address_suffix(
leak_signals_list, TestConst.ADDRESS_SUFFIX_LEAK
)
with allure.step(f"Получение данных выходных сигналов для линейного участка с id: {linear_part_id}"):
with allure.step("Получение сообщения с данными выходных сигналов типа: OutputSignalsInfo"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [{'linearPartId': linear_part_id}],
'controlledSites': [],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
linear_part_signals = getattr(parsed_payload.replyContent, 'linearPartSignals', [])
leak_linear_part = t_utils.find_object_by_field(
linear_part_signals,
TestConst.LEAK_LINEAR_PART_ID_KEY,
linear_part_id,
)
StepCheck("Проверка наличия данных выходных сигналов", "linearPartSignals").actual(
leak_linear_part
).is_not_none()
leak_signals_list = getattr(leak_linear_part, 'signals', [])
leak_value = t_utils.find_signal_val_by_signal_type(leak_signals_list, leak_signal_type)
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка отсутствия времени утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_TIME_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия квитирования утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_ACK_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия объема утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_Q_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
StepCheck(
"Проверка отсутствия координаты утечки в выходных сигналах",
TestConst.ADDRESS_SUFFIX_POINT_LEAK,
soft_failures,
).actual(leak_value).expected(TestConst.OUTPUT_IS_NOT_LEAK).equal_to()
async def complete_tu_leaks_info_content(ws_client, cfg: SmokeSuiteConfig):
"""
TuLeaksInfoContent - проверка отсутствия утечки на схеме
"""
with allure.step("Подключение по ws, получение и обработка сообщения об утечке типа: TuLeaksInfoContent"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"TuLeaksInfoContent",
"subscribeTuLeaksInfoRequest",
{'tuId': cfg.tu_id},
)
parsed_payload = parser.parse_tu_leaks_info_msg(payload)
with allure.step("Извлечение и подготовка данных для проверки"):
leak_on_scheme = getattr(parsed_payload.replyContent, 'leaksInfo', [])
StepCheck("Проверка отсутствия утечки на схеме", "leaksInfo").actual(leak_on_scheme).is_empty()
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
"""
Сценарий формирования отчёта об утечках.
Этапы:
1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
2. Отправка ExportReportsCommandRequest с фильтром по времени
(start = старт имитатора, end = старт имитатора + offset теста).
3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
5. Отправка DownloadExportedDataRequest по id отчёта.
6. Получение fileChunk по ответу на скачивание.
7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.
Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
"""
# Создаем универсальный словарь для сбора фактических результатов ответов с бэка
actual_report_state = ExportLeaksReportState()
with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
actual_report_state.report_test = leak.export_leaks_report_test
StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
actual_report_state.report_test
).is_not_none()
actual_report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
actual_report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=actual_report_state.report_test.offset)
)
actual_report_state.period_start_naive = report_utils.normalize_report_period_naive(
actual_report_state.period_start
)
actual_report_state.period_end_naive = report_utils.normalize_report_period_naive(
actual_report_state.period_end
)
actual_report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
leak.expected_report_stationary_status
)
actual_report_state.expected_lds_status_text = LdsStatus.report_text_by_value(
leak.expected_lds_status_in_leaks_report
)
actual_report_state.tu_description_lower = cfg.technological_unit.description.lower()
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
actual_report_state.time_offset_hours = time_offset_hours
StepCheck(
"Задан ожидаемый текст режима МТ для отчёта",
"expected_mt_mode",
).actual(actual_report_state.expected_mt_mode).is_not_none()
allure.attach(
f"period.start={actual_report_state.period_start}\n"
f"period.end={actual_report_state.period_end}\n"
f"offset_minutes={actual_report_state.report_test.offset}",
name="Фильтр периода отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
"timeOffset": actual_report_state.time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(actual_report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(actual_report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
actual_report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Извлечение полей пуш-нотификации"):
notification = actual_report_state.notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
actual_report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.LEAKS_REPORT,
name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=actual_report_state.period_start,
period_end=actual_report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = actual_report_state.report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={t_utils.format_datetime_moscow(report_item.start)}, "
f"end={t_utils.format_datetime_moscow(report_item.end)}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
actual_report_state.report_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
actual_report_state.period_start,
actual_report_state.period_end,
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
actual_report_state.report_item
).is_not_none()
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по "
f"id={actual_report_state.report_item.id}"
):
download_request = {
"exportedDataId": actual_report_state.report_item.id,
"exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": actual_report_state.time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта об утечках (exportedDataId={actual_report_state.report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - "
f"выпадашка уведомлений на UI"
)
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
actual_report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
actual_report_state.download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=actual_report_state.download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = actual_report_state.download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
actual_report_state.file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(actual_report_state.file_bytes) if actual_report_state.file_bytes else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(actual_report_state.file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
report_file_name = actual_report_state.report_file_name
report_file_name_lower = report_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(report_file_name)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
actual_report_state.period_start_naive,
actual_report_state.period_end_naive,
)
has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
with allure.step("Этап 8. Сохранение, обработка и проверка отчета по утечкам"):
actual_report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(actual_report_state.file_bytes)
try:
with allure.step("Проверка: временный xlsx файл создан"):
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
actual_report_state.temp_file_path
).is_not_none()
with allure.step("Этап 9. Открытие xlsx и чтение шапки"):
actual_report_state.worksheet = report_utils.load_report_worksheet(actual_report_state.temp_file_path)
actual_report_state.title_info = report_utils.parse_report_title(
report_utils.get_report_title_cell(actual_report_state.worksheet)
)
allure.attach(
f"Шапка отчёта (raw): {actual_report_state.title_info.raw_title}\n"
f"period_start: {actual_report_state.title_info.period_start}\n"
f"period_end: {actual_report_state.title_info.period_end}",
name="Шапка отчёта (1-я строка)",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных шапки отчёта для проверки"):
title_info = actual_report_state.title_info
report_title_lower = title_info.raw_title.lower()
leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
column_headers = report_utils.get_report_column_headers(actual_report_state.worksheet)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
report_utils.report_period_comparison_bounds(
actual_report_state.period_start_naive,
actual_report_state.period_end_naive,
)
)
header_period_start = title_info.period_start
header_period_end = title_info.period_end
with allure.step("Этап 10. Извлечение строк данных из отчёта"):
actual_report_state.data_rows = report_utils.iter_report_data_rows(actual_report_state.worksheet)
actual_report_state.target_row = report_utils.find_row_with_object(
actual_report_state.data_rows, cfg.technological_unit.description
)
allure.attach(
"\n".join(f"row#{row.row_index}: {row.cells}" for row in actual_report_state.data_rows),
name="Все строки данных отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных строки утечки для проверки"):
target_row = actual_report_state.target_row
leak_datetime_value = target_row.datetime_value if target_row else None
object_value_lower = target_row.object_value.lower() if target_row else ""
lds_status_value = target_row.lds_status.strip() if target_row else ""
masking_info_lower = target_row.masking_info.lower() if target_row else ""
leak_coordinate_meters = target_row.coordinate_meters if target_row else None
leak_volume_value = target_row.leak_volume if target_row else None
mt_mode_lower = target_row.mt_mode.lower() if target_row else ""
expected_mt_mode_lower = actual_report_state.expected_mt_mode.lower()
expected_lds_status_lower = actual_report_state.expected_lds_status_text.lower()
masking_not_masked_lower = ReportConst.MASKING_NOT_MASKED_TEXT.lower()
period_start_lo, period_start_hi, period_end_lo, period_end_hi = (
report_utils.report_period_comparison_bounds(
actual_report_state.period_start_naive,
actual_report_state.period_end_naive,
)
)
with allure.step("Проверка содержимого строки утечки"):
StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
actual_report_state.data_rows
).is_not_empty()
StepCheck(
f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
).actual(actual_report_state.target_row).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Время утечки в диапазоне [старт имитатора, старт + offset теста] (+-1 мин)",
ReportConst.COL_DATETIME,
soft_failures,
).actual(leak_datetime_value).is_between(period_start_lo, period_end_hi)
StepCheck(
f"Колонка '{ReportConst.COL_OBJECT}' содержит '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
soft_failures,
).contains(object_value_lower, actual_report_state.tu_description_lower)
StepCheck(
f"Колонка '{ReportConst.COL_LDS_STATUS}' содержит "
f"'{actual_report_state.expected_lds_status_text}'",
ReportConst.COL_LDS_STATUS,
soft_failures,
).contains(lds_status_value.lower(), expected_lds_status_lower)
StepCheck(
f"Колонка '{ReportConst.COL_MASK_INFO}' содержит '{ReportConst.MASKING_NOT_MASKED_TEXT}'",
ReportConst.COL_MASK_INFO,
soft_failures,
).contains(masking_info_lower, masking_not_masked_lower)
StepCheck(
f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью {cfg.allowed_distance_diff_meters} м)",
ReportConst.COL_COORDINATE,
soft_failures,
).actual(leak_coordinate_meters).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck(
f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
ReportConst.COL_LEAK_VOLUME,
soft_failures,
).actual(leak_volume_value).is_not_none()
StepCheck(
f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{actual_report_state.expected_mt_mode}'",
ReportConst.COL_MT_MODE,
soft_failures,
).contains(mt_mode_lower, expected_mt_mode_lower)
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if actual_report_state.temp_file_path and actual_report_state.report_file_name:
report_utils.attach_report_file_to_allure(
actual_report_state.temp_file_path, actual_report_state.report_file_name
)
raise
with allure.step("Проверка имени файла отчёта"):
with SoftAssertions() as soft_failures:
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(
f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name", soft_failures
).contains(report_file_name_lower, leaks_report_name_part_lower)
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'", "file_name", soft_failures
).contains(report_file_name_lower, actual_report_state.tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
soft_failures,
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
soft_failures,
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
with allure.step("Проверка двойной шапки отчёта"):
StepCheck("Лист xlsx открыт", "worksheet").actual(actual_report_state.worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).contains(report_title_lower, leaks_report_name_part_lower)
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(
column_headers
).expected(ReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
with SoftAssertions() as soft_failures:
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
actual_report_state.notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
notification_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
notification_export_status
).expected(ExportStatus.DONE).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
notification_error_message
).is_empty()
async def export_lds_status_report(
ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime
):
"""
Сценарий формирования xlsx-отчёта о режиме работы СОУ.
"""
report_state = ExportLdsStatusReportState()
with allure.step("Подготовка параметров сценария формирования отчёта о режиме работы СОУ"):
report_state.report_test = leak.export_lds_status_report_test
StepCheck("В конфигурации задан export_lds_status_report_test", "export_lds_status_report_test").actual(
report_state.report_test
).is_not_none()
report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.report_test.offset)
)
report_state.period_start_naive = report_utils.normalize_report_period_naive(report_state.period_start)
report_state.period_end_naive = report_utils.normalize_report_period_naive(report_state.period_end)
report_state.tu_description_lower = cfg.technological_unit.description.lower()
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
report_state.time_offset_hours = time_offset_hours
allure.attach(
f"period.start={report_state.period_start}\n"
f"period.end={report_state.period_end}\n"
f"offset_minutes={report_state.report_test.offset}\n"
f"sections={LdsReportConst.SECTION_NAMES}",
name="Фильтр периода отчёта о режиме СОУ",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Подписка на пуш-нотификации {ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST}"):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта {ReportConst.EXPORT_REPORTS_COMMAND_REQUEST}"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LDS_STATUS_REPORT.value],
"timeOffset": report_state.time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.LDS_STATUS_REPORT,
name_substring=LdsReportConst.LDS_STATUS_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=report_state.period_start,
period_end=report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = report_state.report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={t_utils.format_datetime_moscow(report_item.start)}, "
f"end={t_utils.format_datetime_moscow(report_item.end)}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
report_state.report_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
report_state.period_start,
report_state.period_end,
LdsReportConst.LDS_STATUS_REPORT_NAME_PART,
". ",
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.report_item
).is_not_none()
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
):
download_request = {
"exportedDataId": report_state.report_item.id,
"exportedDataType": ExportedDataType.LDS_STATUS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": report_state.time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта о режиме СОУ (exportedDataId={report_state.report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest"
)
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта о режиме СОУ"):
report_state.download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=report_state.download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = report_state.download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
report_state.file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(report_state.file_bytes) if report_state.file_bytes else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
report_file_name = report_state.report_file_name
report_file_name_lower = report_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(
report_file_name,
LdsReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.period_start_naive,
report_state.period_end_naive,
)
has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
lds_report_name_part_lower = LdsReportConst.LDS_STATUS_REPORT_NAME_PART.lower()
try:
with allure.step("Этап 7. Сохранение и разбор xlsx-отчёта о режиме СОУ"):
report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(
report_state.file_bytes,
prefix="lds_status_report_",
)
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(report_state.temp_file_path).is_not_none()
report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
report_state.parsed_report = lds_report_utils.parse_lds_status_report_worksheet(
report_state.worksheet,
LdsReportConst.SECTION_NAMES,
)
allure.attach(
f"Шапка (raw): {report_state.parsed_report.title_info.raw_title}\n"
f"period_start: {report_state.parsed_report.title_info.period_start}\n"
f"period_end: {report_state.parsed_report.title_info.period_end}\n"
f"total_duration: {report_state.parsed_report.total_duration_raw}",
name="Шапка отчёта о режиме СОУ",
attachment_type=allure.attachment_type.TEXT,
)
allure.attach(
lds_report_utils.format_section_rows_for_allure(report_state.parsed_report.section_rows),
name="Строки участков отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных таблицы отчёта для проверки"):
parsed_report = report_state.parsed_report
expected_section_names = LdsReportConst.SECTION_NAMES
section_rows = parsed_report.section_rows
total_duration_seconds = parsed_report.total_duration_seconds
duration_tolerance = LdsReportConst.TOTAL_DURATION_TOLERANCE_SECONDS
with allure.step("Проверка содержимого таблицы отчёта о режиме СОУ"):
StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Количество строк участков в отчёте",
"section_rows_count",
soft_failures,
).actual(
len(section_rows)
).expected(len(expected_section_names)).equal_to()
for section_index, expected_section_name in enumerate(expected_section_names):
actual_section_name = (
section_rows[section_index].section_name if section_index < len(section_rows) else None
)
StepCheck(
f"Наименование участка #{section_index + 1}",
LdsReportConst.COL_SECTION,
soft_failures,
).actual(actual_section_name).expected(expected_section_name).equal_to()
for section_row in section_rows:
for column_name in LdsReportConst.MODE_DURATION_COLUMNS:
cell_value = section_row.cells.get(column_name)
StepCheck(
f"Длительность '{column_name}' для участка '{section_row.section_name}' заполнена",
column_name,
soft_failures,
).actual(lds_report_utils.is_duration_cell_filled(cell_value)).expected(True).equal_to()
StepCheck(
"В отчёте найдена строка 'Суммарное время работы:'",
"total_work_duration_label",
soft_failures,
).actual(parsed_report.total_label_row_index).is_not_none()
StepCheck(
"Суммарное время работы в отчёте не нулевое",
"total_work_duration",
soft_failures,
).actual(
total_duration_seconds
).is_greater_than(0, LdsReportConst.ZERO_DURATION_TEXT)
for section_row in section_rows:
duration_diff = abs(section_row.modes_sum_seconds - (total_duration_seconds or 0))
StepCheck(
f"Сумма режимов СОУ для '{section_row.section_name}' "
f"совпадает с суммарным временем (+-{duration_tolerance} с)",
"modes_sum_seconds",
soft_failures,
).actual(duration_diff).is_less_than(
duration_tolerance + 1,
f"погрешность {duration_tolerance} с",
)
with allure.step("Подготовка данных шапки отчёта для проверки"):
title_info = parsed_report.title_info
report_title_lower = title_info.raw_title.lower()
column_headers = parsed_report.column_headers
header_period_start = title_info.period_start
header_period_end = title_info.period_end
with allure.step("Проверка двойной шапки отчёта о режиме СОУ"):
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{LdsReportConst.LDS_STATUS_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).contains(report_title_lower, lds_report_name_part_lower)
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(
column_headers
).expected(LdsReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Проверка имени файла отчёта о режиме СОУ"):
with SoftAssertions() as soft_failures:
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(
f"Имя файла содержит '{LdsReportConst.LDS_STATUS_REPORT_NAME_PART}'",
"file_name",
soft_failures,
).contains(report_file_name_lower, lds_report_name_part_lower)
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
"file_name",
soft_failures,
).contains(report_file_name_lower, report_state.tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
soft_failures,
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
soft_failures,
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if report_state.temp_file_path and report_state.report_file_name:
report_utils.attach_report_file_to_allure(report_state.temp_file_path, report_state.report_file_name)
raise
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
notification = report_state.notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with SoftAssertions() as soft_failures:
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
notification_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
notification_export_status
).expected(ExportStatus.DONE).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
notification_error_message
).is_empty()
async def mode_mt_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time, test_data: CaseData):
"""
Проверка записей журнала о режиме мт
"""
exp_mode_part_message, exp_reason_part_message, exp_priority_message = test_data.expected_result
with allure.step(
"Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent" "Фильтр MessageType. ЭФ Журнал"
):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.LIMIT_CONTROLLED_SITES),
filtering=Filtering(messageTypes=int(MessageType.PUMPING_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
# Фильтрация сообщений по времени
end_time = datetime.now()
filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
filter_end_msk = t_utils.localize_as_moscow(end_time)
messages_time_filtered = [
msg
for msg in messages_info
if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
]
messages_time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
# Фильтрация уникальных наименований участков КП-КП
control_points_list = []
for msg in messages_time_filtered:
control_points_list.append(msg.controlPoint)
unique_control_points_list = set(control_points_list)
count_unique_control_points = len(unique_control_points_list)
# Собираю список сообщений с уникальными наименованиями участков КП_КП
already_been = set() # Создание пустого множества для хранения отобранных сообщений
filter_messages_with_unique_control_points = [] # пустой список
for msg in messages_time_filtered:
if msg.controlPoint in unique_control_points_list: # условие1: наименование КП-КП в списке
if msg.controlPoint not in already_been: # условие2: сообщение еще не в already_been
filter_messages_with_unique_control_points.append(
msg
) # действие1: добавляется сообщение в filter_messages_with_unique_control_points
already_been.add(msg.controlPoint) # действие2: добавляется КП-КП во множество, для условия 2
allure.attach(
f" Список: {filter_messages_with_unique_control_points}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
# Создание и наполнение списков сообщений с фильтром по 'event'
containers = defaultdict(list)
for msg in filter_messages_with_unique_control_points:
event = msg.event
if event in (
TestConst.JOURNAL_MESSAGE_EVENT_STATIONARY,
TestConst.JOURNAL_MESSAGE_EVENT_NOT_STATIONARY,
TestConst.JOURNAL_MESSAGE_EVENT_STOP,
):
containers[event].append(msg)
else:
containers["another"].append(msg)
stationary_status_list = list(containers[TestConst.JOURNAL_MESSAGE_EVENT_STATIONARY])
unstationary_status_list = list(containers[TestConst.JOURNAL_MESSAGE_EVENT_NOT_STATIONARY])
stopped_status_list = list(containers[TestConst.JOURNAL_MESSAGE_EVENT_STOP])
another_event_list = list(containers["another"])
most_long_event_list = max(
[unstationary_status_list, stationary_status_list, stopped_status_list, another_event_list], key=len
)
first_message = next(iter(most_long_event_list)) if most_long_event_list else None
if first_message:
priority_message = MessagePriority(first_message.priority) if first_message.priority else None
mode_part, reason_part = t_utils.parse_event(getattr(first_message, "event", None))
else:
priority_message = None
mode_part, reason_part = None, None
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка результата фильтрации сообщений о режиме МТ",
"Кол-во сообщений о режиме МТ с уникальными наименованиями участков КП-КП",
soft_failures,
).actual(count_unique_control_points).expected(TestConst.COUNT_CONTROLLED_SITES).equal_to()
StepCheck(
"Проверка режима МТ на ЛЧ в наибольшей области связности.",
"Режим МТ",
soft_failures,
).actual(
mode_part
).expected(exp_mode_part_message).equal_to()
if reason_part:
StepCheck(
"Проверка причины режима МТ на ЛЧ в наибольшей области связности.", "Причина режима МТ", soft_failures
).contains(reason_part, exp_reason_part_message)
StepCheck("Проверка значимости сообщения", "Важность", soft_failures).actual(priority_message).expected(
exp_priority_message
).equal_to()
async def export_mt_mode_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
"""
Сценарий формирования xlsx-отчёта о режиме работы МТ.
Этапы:
1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
2. Отправка ExportReportsCommandRequest (тип StationaryStatusReport, фильтр по периоду).
3. Ожидание ReportDataExportedNotification о готовности отчёта.
4. Лонг-поллинг GetExportedDataListRequest до появления отчёта в списке.
5. DownloadExportedDataRequest и получение fileChunk.
6. Проверка xlsx: участки, длительности режимов МТ, основной режим.
7. Проверка двойной шапки и имени файла.
Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
"""
report_state = ExportMtModeReportState()
with allure.step("Подготовка параметров сценария формирования отчёта о режиме работы МТ"):
report_state.expected_report_test = leak.export_mt_mode_report_test
StepCheck("В конфигурации задан export_mt_mode_report_test", "export_mt_mode_report_test").actual(
report_state.expected_report_test
).is_not_none()
report_state.expected_period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.expected_period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.expected_report_test.offset)
)
report_state.expected_period_start_naive = report_utils.normalize_report_period_naive(
report_state.expected_period_start
)
report_state.expected_period_end_naive = report_utils.normalize_report_period_naive(
report_state.expected_period_end
)
report_state.expected_tu_description_lower = cfg.technological_unit.description.lower()
report_state.expected_section_names = list(MtReportConst.SECTION_NAMES)
report_state.expected_dominant_mode_column = MtReportConst.STATIONARY_STATUS_TO_COLUMN.get(
leak.expected_report_stationary_status
)
report_state.expected_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
report_state.expected_period_start,
report_state.expected_period_end,
MtReportConst.MT_MODE_REPORT_NAME_PART,
". ",
)
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
report_state.actual_time_offset_hours = time_offset_hours
StepCheck(
"Для набора задан ожидаемый доминирующий режим МТ",
"expected_dominant_mode_column",
).actual(report_state.expected_dominant_mode_column).is_not_none()
allure.attach(
f"period.start={report_state.expected_period_start}\n"
f"period.end={report_state.expected_period_end}\n"
f"offset_minutes={report_state.expected_report_test.offset}\n"
f"sections={report_state.expected_section_names}\n"
f"expected_dominant_mode_column={report_state.expected_dominant_mode_column}",
name="Фильтр периода отчёта о режиме МТ",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Подписка на пуш-нотификации {ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST}"):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта {ReportConst.EXPORT_REPORTS_COMMAND_REQUEST}"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.STATIONARY_STATUS_REPORT.value],
"timeOffset": report_state.actual_time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.expected_period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.expected_period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.actual_notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
report_state.actual_report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.STATIONARY_STATUS_REPORT,
name_substring=MtReportConst.MT_MODE_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=report_state.expected_period_start,
period_end=report_state.expected_period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = report_state.actual_report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={t_utils.format_datetime_moscow(report_item.start)}, "
f"end={t_utils.format_datetime_moscow(report_item.end)}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.actual_report_item
).is_not_none()
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} "
f"по id={report_state.actual_report_item.id}"
):
download_request = {
"exportedDataId": report_state.actual_report_item.id,
"exportedDataType": ExportedDataType.STATIONARY_STATUS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": report_state.actual_time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта о режиме МТ (exportedDataId={report_state.actual_report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest"
)
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
report_state.actual_download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта о режиме МТ"):
report_state.actual_download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=report_state.actual_download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = report_state.actual_download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
report_state.actual_file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(report_state.actual_file_bytes) if report_state.actual_file_bytes else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.actual_file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
report_file_name = report_state.expected_file_name
report_file_name_lower = report_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(
report_file_name,
MtReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.expected_period_start_naive,
report_state.expected_period_end_naive,
)
has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
mt_report_name_part_lower = MtReportConst.MT_MODE_REPORT_NAME_PART.lower()
try:
with allure.step("Этап 7. Сохранение и разбор xlsx-отчёта о режиме МТ"):
report_state.actual_temp_file_path = report_utils.save_report_bytes_to_temp_file(
report_state.actual_file_bytes,
prefix="mt_mode_report_",
)
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
report_state.actual_temp_file_path
).is_not_none()
report_state.actual_worksheet = report_utils.load_report_worksheet(report_state.actual_temp_file_path)
report_state.actual_parsed_report = mt_report_utils.parse_mt_mode_report_worksheet(
report_state.actual_worksheet,
report_state.expected_section_names,
)
parsed_report = report_state.actual_parsed_report
allure.attach(
f"Шапка (raw): {parsed_report.title_info.raw_title}\n"
f"period_start: {parsed_report.title_info.period_start}\n"
f"period_end: {parsed_report.title_info.period_end}\n"
f"total_duration: {parsed_report.total_duration_raw}",
name="Шапка отчёта о режиме МТ",
attachment_type=allure.attachment_type.TEXT,
)
allure.attach(
lds_report_utils.format_section_rows_for_allure(parsed_report.section_rows),
name="Строки участков отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных таблицы отчёта для проверки"):
parsed_report = report_state.actual_parsed_report
section_rows = parsed_report.section_rows
total_duration_seconds = parsed_report.total_duration_seconds
duration_tolerance = MtReportConst.TOTAL_DURATION_TOLERANCE_SECONDS
mode_totals = mt_report_utils.sum_duration_columns_across_rows(
section_rows,
MtReportConst.MODE_DURATION_COLUMNS,
)
with allure.step("Проверка содержимого таблицы отчёта о режиме МТ"):
StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.actual_worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Количество строк участков в отчёте",
"section_rows_count",
soft_failures,
).actual(
len(section_rows)
).expected(len(report_state.expected_section_names)).equal_to()
for section_index, expected_section_name in enumerate(report_state.expected_section_names):
actual_section_name = (
section_rows[section_index].section_name if section_index < len(section_rows) else None
)
StepCheck(
f"Наименование участка #{section_index + 1}",
MtReportConst.COL_SECTION,
soft_failures,
).actual(actual_section_name).expected(expected_section_name).equal_to()
for section_row in section_rows:
for column_name in MtReportConst.MODE_DURATION_COLUMNS:
cell_value = section_row.cells.get(column_name)
duration_text = (
section_row.cells.get(column_name) or ""
).strip() or MtReportConst.ZERO_DURATION_TEXT
StepCheck(
f"Участок '{section_row.section_name}': колонка '{column_name}'",
"длительность",
soft_failures,
).actual(mt_report_utils.is_duration_cell_filled(cell_value)).is_true_with_details(
expected_text="указано время в формате H:MM:SS (допускается 0:00:00)",
actual_text=duration_text,
)
total_duration_text = parsed_report.total_duration_raw or lds_report_utils.format_duration_seconds(
total_duration_seconds or 0
)
total_label_row = parsed_report.total_label_row_index
StepCheck(
"В отчёте присутствует строка 'Суммарное время работы:'",
"структура отчёта",
soft_failures,
).actual(total_label_row is not None).is_true_with_details(
expected_text=f"найдена строка с текстом '{MtReportConst.TOTAL_WORK_DURATION_LABEL}'",
actual_text=(
f"строка {total_label_row} содержит '{MtReportConst.TOTAL_WORK_DURATION_LABEL}'"
if total_label_row is not None
else "строка не найдена"
),
)
StepCheck(
"Суммарное время работы в отчёте больше нуля",
"суммарное время",
soft_failures,
).actual((total_duration_seconds or 0) > 0).is_true_with_details(
expected_text="суммарное время больше 0:00:00",
actual_text=total_duration_text,
)
for section_row in section_rows:
duration_diff = abs(section_row.modes_sum_seconds - (total_duration_seconds or 0))
section_sum_text = lds_report_utils.format_duration_seconds(section_row.modes_sum_seconds)
diff_text = lds_report_utils.format_duration_seconds(duration_diff)
sums_match = duration_diff < duration_tolerance + 1
StepCheck(
f"Участок '{section_row.section_name}': сумма режимов МТ совпадает с общим временем "
f"(±{duration_tolerance} с)",
"согласованность длительностей",
soft_failures,
).actual(sums_match).is_true_with_details(
expected_text=(
f"сумма режимов ({section_sum_text}) равна суммарному времени ({total_duration_text}), "
f"погрешность не более {duration_tolerance} с"
),
actual_text=(
f"сумма режимов: {section_sum_text}, суммарное время: {total_duration_text}, "
f"разница: {diff_text} ({duration_diff} с)"
),
)
dominant_column = report_state.expected_dominant_mode_column
dominant_total = mode_totals.get(dominant_column, 0)
max_column = max(mode_totals, key=mode_totals.get)
max_total = mode_totals[max_column]
all_modes_text = ", ".join(
f"'{column_name}': {lds_report_utils.format_duration_seconds(total_seconds)}"
for column_name, total_seconds in mode_totals.items()
)
StepCheck(
f"Режим '{dominant_column}' имеет максимальное суммарное время по всем участкам",
"доминирующий режим МТ",
soft_failures,
).actual(
mt_report_utils.is_expected_dominant_mode_column(mode_totals, dominant_column)
).is_true_with_details(
expected_text=(
f"'{dominant_column}': {lds_report_utils.format_duration_seconds(dominant_total)} - "
f"наибольшее время; по всем участкам: {all_modes_text}"
),
actual_text=(
f"наибольшее время у режима '{max_column}': "
f"{lds_report_utils.format_duration_seconds(max_total)}"
),
)
allure.attach(
"\n".join(
f"{column_name}: {lds_report_utils.format_duration_seconds(total_seconds)}"
for column_name, total_seconds in mode_totals.items()
),
name="Суммарные длительности режимов МТ по всем участкам",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных шапки отчёта для проверки"):
title_info = parsed_report.title_info
report_title_lower = title_info.raw_title.lower()
column_headers = parsed_report.column_headers
header_period_start = title_info.period_start
header_period_end = title_info.period_end
with allure.step("Проверка двойной шапки отчёта о режиме МТ"):
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{MtReportConst.MT_MODE_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).contains(report_title_lower, mt_report_name_part_lower)
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(
column_headers
).expected(MtReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Проверка имени файла отчёта о режиме МТ"):
with SoftAssertions() as soft_failures:
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(
f"Имя файла содержит '{MtReportConst.MT_MODE_REPORT_NAME_PART}'",
"file_name",
soft_failures,
).contains(report_file_name_lower, mt_report_name_part_lower)
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
"file_name",
soft_failures,
).contains(report_file_name_lower, report_state.expected_tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
soft_failures,
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
soft_failures,
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if report_state.actual_temp_file_path and report_state.expected_file_name:
report_utils.attach_report_file_to_allure(
report_state.actual_temp_file_path,
report_state.expected_file_name,
)
raise
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
notification = report_state.actual_notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with SoftAssertions() as soft_failures:
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
notification_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
notification_export_status
).expected(ExportStatus.DONE).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
notification_error_message
).is_empty()