Загрузка данных


conftest.py


from constants.architecture_constants import WebSocketClientConstants as WSCliConst
from constants.enums import RejectionSensorTag


        "imitator_start_time": None,  # datetime объект времени старта имитатора для расчёта интервалов утечек
    }


def _update_rejection_sensor_ids(item, stand_manager: StandSetupManager) -> None:
    """
    Для тестов отбраковки обновляет sensor_id по address из конфигурации стенда.
    """
    if "rejection_case" not in getattr(item, "fixturenames", []):
        return

    sensor_ids_by_address = stand_manager.get_sensor_ids_by_address()
    RejectionSensorTag.update_ids_from_config(sensor_ids_by_address)





        try:
            stand_manager.setup_stand_for_imitator_run()
        except Exception as error:
            pytest.exit(f"[SETUP] [ERROR] ошибка при подготовке стенда: {error}")

        try:
            _update_rejection_sensor_ids(item, stand_manager)
        except Exception as error:
            pytest.exit(f"[SETUP] [ERROR] ошибка обновления id датчиков отбраковки из конфигурации: {error}")








constants/enums.py
    NPS_KRIM_P_Vmom = (30157, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom")

    def __init__(self, sensor_id: int, description: str) -> None:
        self.id = sensor_id
        self.description = description

    @classmethod
    def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
        """
        Обновляет sensor_id по tag из конфигурации стенда.
        """
        missing_tags = []
        for sensor in cls:
            sensor_id = sensor_ids_by_address.get(sensor.description)
            if sensor_id is None:
                missing_tags.append(sensor.description)
                continue
            sensor.id = sensor_id

        if missing_tags:
            raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")





infra/clickhouse_manager.py