conftest.py
from constants.architecture_constants import WebSocketClientConstants as WSCliConst
from constants.enums import RejectionSensorTag
"imitator_start_time": None, # datetime объект времени старта имитатора для расчёта интервалов утечек
}
def _update_rejection_sensor_ids(item, stand_manager: StandSetupManager) -> None:
"""
Для тестов отбраковки обновляет sensor_id по address из конфигурации стенда.
"""
if "rejection_case" not in getattr(item, "fixturenames", []):
return
sensor_ids_by_address = stand_manager.get_sensor_ids_by_address()
RejectionSensorTag.update_ids_from_config(sensor_ids_by_address)
try:
stand_manager.setup_stand_for_imitator_run()
except Exception as error:
pytest.exit(f"[SETUP] [ERROR] ошибка при подготовке стенда: {error}")
try:
_update_rejection_sensor_ids(item, stand_manager)
except Exception as error:
pytest.exit(f"[SETUP] [ERROR] ошибка обновления id датчиков отбраковки из конфигурации: {error}")
constants/enums.py
NPS_KRIM_P_Vmom = (30157, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom")
def __init__(self, sensor_id: int, description: str) -> None:
self.id = sensor_id
self.description = description
@classmethod
def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
"""
Обновляет sensor_id по tag из конфигурации стенда.
"""
missing_tags = []
for sensor in cls:
sensor_id = sensor_ids_by_address.get(sensor.description)
if sensor_id is None:
missing_tags.append(sensor.description)
continue
sensor.id = sensor_id
if missing_tags:
raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")
infra/clickhouse_manager.py