Загрузка данных
fh[
import os
class ImitatorConstants:
TEST_SETTINGS_KEY_NAME: str = "test_settings"
IMITATOR_FLAGS_KEY_NAME: str = "imitator_flags"
IMITATOR_TIME_FORMAT: str = "%Y%m%dT%H%M%S"
IMITATOR_START_DELAY_S: int = 100
IMITATOR_FINISH_DELAY_MINUTE: float = 2.0
IMITATOR_CHECK_CMD: str = "pgrep -f Playground"
IMITATOR_KILL_CMD: str = "pkill -f Playground"
IMITATOR_PATH = "/data/imitator/lds-flow-playground-csv-latest"
IMITATOR_RUN_CMD: str = f"dotnet {IMITATOR_PATH}/TN.LDS.Flow.Playground.Application.dll"
IMITATOR_LOG_FILE_NAME: str = "imitator.log"
IMITATOR_KEY_NAME: str = "imitator_key"
SERVER_IP_KEY_NAME: str = "server_ip"
SANDBOX_PATH: str = "Sandbox_path" # Удалить при переработке json_config_model.py
SANDBOX_DATA: str = "data"
SANDBOX_RULES: str = "rules.txt"
SANDBOX_TAGS: str = "tags.txt"
STAND_ENV_NAMING: str = os.environ.get("STAND_NAME")[:-1]
CONFIG_PATH: str = f"/data/{STAND_ENV_NAMING}/configs"
SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME: str = "signal_unit_conversion_rules.json"
SIGNAL_UNIT_CONVERSION_RULES_BACKUP_DIR: str = "original_conversion_rules"
SOURCE_TYPE_DEF_VALUE: str = "inflow"
SPEED_DEF_VALUE: int = 1
NS_DEF_VALUE: int = 2
KAFKA_OFFSET_EARLIEST: str = "earliest"
KAFKA_POLL_TIMEOUT_S: float = 1.0
KAFKA_SESSION_TIMEOUT_MS: int = 10000
TEST_ID_KEY: str = "test_id"
AUTOTEST_DATA_PATH: str = "/data/imitator/autotest_data"
POPEN_WAIT_TIMOUT_S: int = 5
LONG_PROCESS_TIMEOUT_S: int = 20
CMD_STATUS_OK: str = "OK"
CMD_STATUS_FAIL: str = "FAIL"
REDIS_STAND_ADDRESS: str = "10.7.49.210"
CORE_START_DELAY_S: int = 5
ENCODING_UTF_8: str = "utf-8"
ENCODING_UTF_8_SIG: str = "utf-8-sig"
ENCODING_LATIN_1: str = "latin-1"
WIN_ENCODING_CP866: str = "cp866" # Нужна только для запуска под WIN
WIN_ENCODING_CP1251: str = "cp1251" # Нужна только для запуска под WIN
OS_NAME_WIN: str = 'nt'
DEFAULT_ENCODINGS = [ENCODING_UTF_8_SIG, ENCODING_UTF_8, WIN_ENCODING_CP866, WIN_ENCODING_CP1251, ENCODING_LATIN_1]
HOST_MAP = {
"dev1": {IMITATOR_KEY_NAME: "DEV1_", SERVER_IP_KEY_NAME: "10.7.49.37"},
"dev2": {IMITATOR_KEY_NAME: "DEV2_", SERVER_IP_KEY_NAME: "10.7.49.38"},
"dev3": {IMITATOR_KEY_NAME: "DEV3_", SERVER_IP_KEY_NAME: "10.7.49.205"},
"test1": {IMITATOR_KEY_NAME: "TEST1_", SERVER_IP_KEY_NAME: "10.7.49.206"},
"test2": {IMITATOR_KEY_NAME: "TEST2_", SERVER_IP_KEY_NAME: "10.7.49.207"},
"test3": {IMITATOR_KEY_NAME: "TEST3_", SERVER_IP_KEY_NAME: "10.7.49.208"},
"test4": {IMITATOR_KEY_NAME: "TEST4_", SERVER_IP_KEY_NAME: "10.7.49.209"},
}
class ClickhouseConstants(ImitatorConstants):
CH_TABLE_NAMES: list = ["lds.records", "lds.records_lastvalue"]
EVO_OBJECT_ID_KEY_NAME: str = "evoObjectId"
EVO_PARAMETER_ID_KEY_NAME: str = "evoParameterId"
OBJECT_ID_KEY_NAME: str = "objectId"
PARAMETER_ID_KEY_NAME: str = "parameterId"
EVO_ID_PAIRS_CHUNK_SIZE: int = 450
NAME_CONTAINER: str = "clickhouse-2"
class DockerConstants:
HOSTNAME_CMD: str = "hostname"
STOP_CMD: str = "docker stop"
START_CMD: str = "docker start"
CHECK_STATUS_CMD: str = "docker inspect -f '{{.State.Status}}'"
RUNNING_STATUS: str = "running"
EXITED_STATUS: str = "exited"
CORE_CONTAINERS_GROUP: list = ["lds-core-node1", "lds-core-node2", "lds-core-node3"]
LB_CONTAINERS_GROUP: list = ["lds-layer-builder-node1", "lds-layer-builder-node2", "lds-layer-builder-node3"]
JOURNAL_CONTAINERS_GROUP: list = ["lds-journals-node1", "lds-journals-node2", "lds-journals-node3"]
WEB_APP_CONTAINERS_GROUP: list = ["lds-web-app-node1", "lds-web-app-node2", "lds-web-app-node3"]
API_GW_CONTAINERS_GROUP: list = ["lds-api-gw-node1", "lds-api-gw-node2", "lds-api-gw-node3"]
REPORTS_CONTAINERS_GROUP: list = ["lds-reports-node1", "lds-reports-node2", "lds-reports-node3"]
class RedisConstants:
LB_REDIS_KEY: str = "lds-layer-builder"
CORE_REDIS_KEY: str = "lds-core"
REDIS_KEY_FIND_CMD: str = "docker exec -i redis-redis-01-1-1 redis-cli KEYS"
REDIS_KEY_DEL_CMD: str = "| xargs -r docker exec -i redis-redis-01-1-1 redis-cli DEL"
class KeycloakClientConstants:
TOKEN_LEEWAY: int = 30
GRANT_TYPE: str = "password"
KEYCLOAK_HEADERS: dict = {"Content-Type": "application/x-www-form-urlencoded"}
TOKEN_KEY: str = "access_token"
ISSUED_AT_KEY: str = "issued_at"
EXPIRES_IN_KEY: str = "expires_in"
class TestOpsConstants:
TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
TESTOPS_UPLOAD_ERROR_MSG: str = "Ошибка при загрузке файлов allure отчета"
TESTOPS_UPLOAD_RESPONSE_MSG_KEY: str = "message"
TESTOPS_UPLOAD_FILES_KEY: str = "files"
POST_METHOD: str = "post"
ALLURE_RESULTS_DIR_NAME: str = "allure-results"
GZIP_FILE_SIGNATURE: bytes = b'\x1f\x8b'
class HTTPClientConstants:
GET_METHOD: str = "get"
POST_METHOD: str = "post"
TESTOPS_UPLOAD_ENDPOINT: str = "/upload"
TESTOPS_ATTACHMENTS_LIST_ENDPOINT: str = "/test_cases/{test_case_id}/attachments"
TESTOPS_LOAD_ATTACHMENT_ENDPOINT: str = "/test_cases/{test_case_id}/attachments/{attachment_id}?download=1"
TESTOPS_ATTACHMENTS_KEY: str = "items"
TESTOPS_ATTACHMENT_FILENAME_KEY: str = "original_filename"
TESTOPS_ATTACHMENT_ID_KEY: str = "id"
TEST_ID_KEY: str = "test_id"
IMITATOR_RUN_DATA_FILENAME: str = "imitator_run_data.tar.gz" # Название архива данных для прогона
class WebSocketClientConstants:
RS: bytes = b'\x1E' # ASCII Record Separator
HANDSHAKE_WAITING: float | int = 5.0
HANDSHAKE_MESSAGE: str = "{\"protocol\":\"messagepack\",\"version\":1}"
WS_HUBS: str = "/hubs/ldsClientHub"
START_INVOCATION_ID: str = 1
DEFAULT_RECONNECT_INTERVAL: float | int = 5.0
PING_INTERVAL: int = 3
PING_TIMEOUT: int = 5
CLOSE_TIMEOUT: int = 30
DEFAULT_SIGNALR_MESSAGE_TYPE: int = 1 # invocation
STREAM_INVOCATION_MESSAGE_TYPE: int = 4 # StreamInvocation
STREAM_ITEM_MESSAGE_TYPE: int = 2 # StreamItem
COMPLETION_MESSAGE_TYPE: int = 3 # Completion
# Текст ошибки Completion при неуспешном streaming (SignalR CompletionWithDetail)
COMPLETION_ERROR_MESSAGE_INDEX: int = 4
DEFAULT_SIGNALR_MAP_HEADERS: dict = {}
EVENT_TYPE_INDEX = 3
INVOCATION_ID_INDEX = 2
SERVICE_NAME = "web-app"
COMPONENT = "lds"
ROOT_DOMAIN = "tn.tngrp.ru"
FILTERING_TIMEOUT: int | float = 10.0
ZONE_INFO: str = 'Europe/Moscow'
class MockConstants:
MOCK_DURATION: int = 60
MOCK_TEST_DATA_ID: int = 1
MOCK_TEST_DATA_NAME: str = "mock.tar.gz"
class EnvKeyConstants:
CONNECTION_HOST: str = "CONNECTION_HOST"
KEYCLOAK_URL: str = "KEYCLOAK_URL"
KEYCLOAK_CLIENT_ID: str = "KEYCLOAK_CLIENT_ID"
KEYCLOAK_CLIENT_SECRET: str = "KEYCLOAK_CLIENT_SECRET"
KEYCLOAK_USERNAME: str = "KEYCLOAK_USERNAME"
KEYCLOAK_PASSWORD: str = "KEYCLOAK_PASSWORD"
TESTOPS_BASE_URL: str = "TESTOPS_BASE_URL"
SSH_KEY_NAME: str = "SSH_KEY_NAME"
SSH_USER_DEV: str = "SSH_USER_DEV"
STAND_NAME: str = "STAND_NAME"
DATA_PATH: str = "DATA_PATH"
OPC_URL: str = "OPC_URL"
TU_ID: str = "TU_ID"
енам
from enum import Enum, IntEnum, IntFlag
from typing import Mapping
class BaseStrEnum(Enum):
def __str__(self) -> str:
return f"{self.name} ({self.value})"
class BaseStrIntFlag(IntFlag):
def __str__(self) -> str:
raw_value = int(self)
if raw_value == 0:
return "0"
active_flags = [f"{flag.name} ({flag.value})" for flag in type(self) if flag.value and flag & self == flag]
if active_flags:
return f"{', '.join(active_flags)}"
return str(raw_value)
class BaseReasonEnum(IntFlag):
"""
.report_text - название на русском языке.
Вывод в формате report_text(value)
"""
def __new__(cls, value: int, report_text: str):
member = int.__new__(cls, value)
member._value_ = value
member.report_text = report_text
return member
def __str__(self) -> str:
raw_value = int(self)
if raw_value == 0:
return "0"
active_flags = [
f"{flag.report_text} ({flag.value})" for flag in type(self) if flag.value and flag & self == flag
]
if active_flags:
return f"{', '.join(active_flags)}"
return str(raw_value)
@classmethod
def report_text_by_value(cls, status_value: int) -> str | None:
"""Текст по числовому значению статуса"""
try:
return cls(status_value).report_text
except ValueError:
return None
class TU(Enum):
YAROSLAVL_MOSCOW = (1, "Ярославль - Москва", "volga.json")
TIKHORETSK_NOVOROSSIYSK_2 = (2, "Тихорецк-Новороссийск-2", "tn2.json")
TIKHORETSK_NOVOROSSIYSK_3 = (3, "Тихорецк-Новороссийск-3", "tn3.json")
RODIONOVSKAYA_TIKHORETSKAYA = (4, "Родионовская–Тихорецкая", "lt3_rt.json")
TIKHORETSKAYA_GRUSHEVAYA = (5, "Тихорецкая-6-Грушовая", "tn4_t6g.json")
def __init__(self, tu_id: int, description: str, file_name: str) -> None:
self.id = tu_id
self.description = description
self.file_name = file_name
def __str__(self):
return f"{self.id} - {self.description}"
@classmethod
def get_file_name_by_id(cls, target_id: int) -> str:
for item in cls:
if item.id == target_id:
return item.file_name
raise ValueError(f"ТУ с id = {target_id} не найден")
class ReplyStatus(Enum):
OK = 200
BAD_REQUEST = 400
UNAUTHORIZED = 401
FORBIDDEN = 403
NOT_FOUND = 404
REQUEST_TIMEOUT = 408
CONFLICT = 409
PRECONDITION_FAILED = 412
RANGE_NOT_SATISFIABLE = 416
TOO_MANY_REQUESTS = 429
INTERNAL_SERVER_ERROR = 500
NOT_IMPLEMENTED = 501
SERVICE_UNAVAILABLE = 503
GATEWAY_TIMEOUT = 504
UNKNOWN_ERROR = 520
class ExportedDataType(IntEnum):
"""
Тип экспортируемых данных
"""
STATIONARY_STATUS_REPORT = 6
LDS_STATUS_REPORT = 5
LEAKS_REPORT = 4
REJECTED_REPORT = 7
def to_download_name(self) -> str:
"""Строковый тип для DownloadExportedDataRequest.exportedDataType"""
return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]
_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
ExportedDataType.STATIONARY_STATUS_REPORT: "MnStateReport",
ExportedDataType.LDS_STATUS_REPORT: "LdsStateReport",
ExportedDataType.LEAKS_REPORT: "LeaksReport",
ExportedDataType.REJECTED_REPORT: "RejectedSignalsReport",
}
class ExportStatus(IntEnum):
"""Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""
NOT_READY = 0
DONE = 1
class StationaryStatus(BaseStrEnum):
UNSTATIONARY = (1, 'Нестационарный режим работы МТ') # Нестационарный режим
STATIONARY = (2, 'Стационарный режим работы МТ') # Стационарный режим
STOPPED = (3, 'МТ в режиме остановленной перекачки') # Режим остановкленной перекачки
def __new__(cls, value: int, report_text: str) -> "StationaryStatus":
member = object.__new__(cls)
member._value_ = value
member.report_text = report_text
return member
@classmethod
def report_text_by_value(cls, status_value: int) -> str | None:
"""Текст режима СОУ для отчёта по числовому значению статуса"""
try:
return cls(status_value).report_text
except ValueError:
return None
class LeakStatus(BaseStrEnum):
CONFIRMED = 2
WAITING = 1
POSSIBLE = 3
class LeakLocationStatus(BaseStrEnum):
NODATA = 1 # нет данных
LEFT_FROM_PUMP_STATION = 2 # Слева от МНС
RIGHT_FROM_PUMP_STATION = 3 # Справа от МНС
INSIDE_PUMP_STATION = 4 # Внутри МНС
INSIDE_NPS = 5 # Внутри НПС при неработающей/отсутствующей МНС
class FieldName(Enum):
SECTION_TYPE = "sectionType"
SIGNAL_TYPE = "signalType"
class FilterCriteriaType(Enum):
ONE_OF = "oneOf"
ALL_OF = "allOf"
class FilterCriteriaValue(Enum):
MASK = "mask"
MASK_REASON = "maskReason"
LEAK = "leak"
LEAK_COORDINATE = "leakCoordinate"
PUMPING_STATUS = "pumpingStatus"
FREE_FLOW = "freeFlow"
ACKNOWLEDGE = "acknowledge"
LEAK_TIME = "leakTime"
LEAK_VOLUME = "leakVolume"
LDS_STATUS = "ldsStatus"
CONTROLLED_SITES = "controlledSites"
LINEAR_PARTS = "linearParts"
SERVER_DOWN = "serverDown"
TIME_SYNCHRONIZATION_DISABLE = "timeSynchronizationDisable"
FREE_FLOW_START_COORDINATE = "freeFlowStartCoordinate"
class SortingParam(Enum):
OBJECT_NAME = "objectName"
ADDRESS = "address"
class SortingType(Enum):
ASCENDING = "ascending"
DESCENDING = "descending"
class Direction(Enum):
"""Направление прокрутки"""
PREV = 1
NEXT = 2
FIRST = 3
LAST = 4
class LdsStatus(BaseStrEnum):
"""
Режим работы СОУ.
report_text - значение колонки 'Режим работы СОУ' в xlsx-отчёте об утечках.
"""
FAULTY = (1, "СОУ неисправна")
INITIALIZATION = (2, "СОУ в инициализации")
DEGRADATION = (3, "СОУ в ухудшенных характеристиках")
SERVICEABLE = (4, "СОУ исправна")
def __new__(cls, value: int, report_text: str) -> "LdsStatus":
member = object.__new__(cls)
member._value_ = value
member.report_text = report_text
return member
@classmethod
def report_text_by_value(cls, status_value: int) -> str | None:
"""Текст режима СОУ для отчёта по числовому значению статуса"""
try:
return cls(status_value).report_text
except ValueError:
return None
class ConfirmationStatus(BaseStrEnum):
FAULTY = 0 # Неисправность
AWAITING = 1 # Предварительная
NOT_CONFIRMED = 2 # Не подтверждена
CONFIRMED = 3 # Подтверждена
CONFIRMED_AND_LEAK_CLOSED = 4 # Завершена
class ReservedType(BaseStrEnum):
"""Алгоритмы СОУ"""
FAULTY = 0 # Неисправность
STOP = 1 # Дифференциальный
STATIONARY_FLOW = 2 # Стационарный
UNSTATIONARY_FLOW = 3 # Модельный
BALANCE_IN_NPS = 4 # Баланс внутри НПС
CHANGED_IN_DECISION_MAKING = 5 # Стационарный + Изменено в АПР
CREATED_IN_DECISION_MAKING = 6 # Создано в АПР
class MessageType(BaseStrIntFlag):
AUTHENTICATION = 1 # Вход в систему
REJECTION = 1 << 2 # Отбраковка сигналов
LDS_STATUS = 1 << 3 # Режим работы СОУ
INPUT_SIGNALS = 1 << 6 # Входные сигналы
PUMPING_STATUS = 1 << 7 # Режим работы МТ
MASKING_LDS = 1 << 8 # Маскирование СОУ
FREE_FLOWS = 1 << 9 # Самотечное течение
LEAKS = 1 << 10 # Утечка
class MessagePriority(BaseStrIntFlag):
LOW = 1 # Прочее
COMMON = 1 << 1 # Информационное
MEDIUM = 1 << 2 # Значительное
HIGH = 1 << 3 # Важное
VERY_HIGH = 1 << 4 # Особой важности
class LdsStatusDegradation(BaseReasonEnum):
"""
Причины режима работы СОУ: Ухудшение характеристик
"""
LEAK_ON_ADJACENT_DIAGNOSTIC_AREAS = (1 << 0, 'Утечка на соседнем диагностическом участке')
ADDITIVE_INJECTORS_OPERATION = (1 << 1, 'Наличие ПТП')
PIG_SENSOR_PASSAGE = (1 << 2, 'Наличие СОД')
TRIGGERING_EMERGENCY_RESET = (1 << 3, 'Срабатывание аварийного сброса или предохранительных клапанов')
STARTING_PUMPING_OUT_PUMPS = (1 << 4, 'Работа насосов откачки')
EXCEEDING_DISTANCE_BETWEEN_SERVICEABLE_PRESSURE_SENSORS = (
1 << 5,
'Расстояние между ближайшими исправными СИ давления на пути перекачки более 50 км',
)
FAULTY_PRESSURE_SENSORS_AT_PUMP_STATION_NODES = (1 << 6, 'Отказ СИ давления на входе/выходе НПС')
REJECTION_TEMPERATURE_SENSOR = (1 << 7, 'Отказ СИ температуры')
REJECTION_VISCOSITY_SENSOR = (1 << 8, 'Отказ СИ вязкости')
REJECTION_DENSITY_SENSOR = (1 << 9, 'Отказ СИ плотности')
GRAVITY_SECTION_IN_PUMPING_MODE = (1 << 10, 'Наличие самотечного участка/участка с неполным сечением')
ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = (1 << 11, 'Менее 4 исправных СИ давления на разных КП ЛЧ и НПС')
EXCEEDING_DISTANCE_BETWEEN_FLOW_METERS = (
1 << 12,
'Расстояние между ближайшими исправными СИ расхода на пути перекачки более 200 км',
)
GRAVITY_SECTION_IN_STOPPED_PUMPING_MODE = (
1 << 13,
'Наличие самотечного участка/участка с неполным сечением в режиме остановленной перекачки',
)
class LdsStatusFaulty(BaseReasonEnum):
"""
Причины режима работы СОУ: Неисправность
"""
NO_DATA_SOURCE_CONNECTION = (1 << 0, 'Потеря связи СОУ с СДКУ')
ABSENCE_MIN_PRESSURE_SENSORS_REQUIRED_NUMBER = (1 << 1, 'Менее 4 КП с достоверными СИ давления')
ABSENCE_MIN_FLOW_METERS_REQUIRED_NUMBER = (1 << 2, 'Недостоверность граничного СИ расхода')
class LdsStatusInitialization(BaseReasonEnum):
"""
Причины режима работы СОУ: Инициализация
"""
ACCUMULATION_DATA = (1 << 0, 'Накопление данных')
EXITING_FAULTY_MODE = (1 << 1, 'Выход СОУ из режима «Неисправна»')
COLD_START_OF_SERVERS = (1 << 2, 'Одновременный «холодный» запуск нескольких серверов СОУ')
SWITCHING_SHUT_OFF_IN_STOPPED_PUMPING_MODE = (
1 << 3,
'Переключение запорной арматуры в режиме остановленной перекачки',
)
USER_ACTION = (1 << 4, 'По команде пользователя')
class StationaryReason(BaseStrEnum):
"""
Причины режима работы МТ: Стационар для ЭФ Журнал
"""
# Отклонения давления и расхода не превышают допустимых отклонений
PRESSURE_AND_FLOW_MOVING_AVERAGES_MEET_CRITERIA = "Отклонения давления и расхода не превышают допустимых отклонений"
class UnStationaryReason(BaseStrIntFlag):
"""
Причины режима работы МТ: Нестационар
"""
# Пуск/остановка трубопровода; включение/отключение магистрального насоса; включение/отключение НПС
CHANGING_EQUIPMENT_STATUS = 1 << 0
# Начало/окончание работы насосов откачки емкостей на НПС и ЛЧ технологического участка
CHANGING_WORKING_OF_PUMPING_OUT_PUMPS = 1 << 1
# Изменение частоты вращения в ручном режиме и/или изменение уставки регулирования в автоматическом режиме
# работы МНА с ЧРП
CHANGING_MAIN_PUMPS_ROTATION_SPEED = 1 << 2
CHANGING_BLOCK_VALVES_STATUS = 1 << 3 # Полное или частичное открытие/закрытие задвижки
SWITCHING_TANKS = 1 << 4 # Переключение резервуаров
CHANGING_ACCEPTANCE_OR_DELIVERY_STATE = 1 << 5 # Начало или прекращение приема/сдачи нефти/нефтепродуктов
TRIGGERING_EMERGENCY_RESET_OR_PWSS_OPERATION = 1 << 6 # Задействование аварийного сброса
SAFETY_VALVES_ACTUATION = 1 << 7 # Срабатывание предохранительных клапанов
# Изменение уставки регулирования по давлению узлов регулирования давления, работающих в автоматическом режиме
# управления
CHANGING_PRESSURE_SETTING = 1 << 8
# Изменение процента открытия/закрытия заслонки узлов регулирования давления, работающих в ручном режиме управления
CHANGING_OPENING_PERCENTAGE_VALVE = 1 << 9
CHANGING_ADDITIVE_INJECTOR_STATUS_OR_FLOW = 1 << 10 # Начало/окончание ввода ПТП или изменение расхода вводимой ПТП
LEAK_END = 1 << 11 # Окончание утечки
# Наличие сигнала статуса «Открывается»/»Закрывается» запорной арматуры (не в режиме имитации),
# расположенной в точке, гидравлически связанной с рассматриваемым ДУ
TO_OPEN_OR_TO_CLOSE_STATUS = 1 << 12
# Нестационарный режим работы/отсутствие сигнала о режиме работы смежного ТУ, работающего
# в единой гидравлической системе с защищаемым ТУ
ADJACENT_TU = 1 << 13
COLD_START = 1 << 14 # Одновременный «холодный» запуск нескольких серверов СОУ
class StoppedPumpingReason(BaseStrIntFlag):
"""
Причины режима работы МТ: Остановленный
"""
# На ДУ отсутствуют работающие НА, при этом показания СИ расхода не превышают 1 % от максимального значения
# диапазона измерений всех СИ расхода на технологическом участке
STOPPING_PUMPS = 1 << 0
CUTOFF_AREA = 1 << 1 # Участок отсечен запорной арматурой от подкачек/откачек
class RejectionCriteria(IntFlag):
"""Критерии отбраковки сигналов criteriaNames"""
QUALITY = 1 << 0 # qualityRejection
RANGE = 1 << 1 # rangeRejection
EMPTY = 1 << 2 # emptyRejection
TIME = 1 << 3 # timeRejection
CONSTANT_SIGNAL = 1 << 4 # constantSignalRejection
DISCHARGE = 1 << 5 # dischargeRejection
VTOR = 1 << 6 # VTORRejection
NEARBY = 1 << 7 # nearbyRejection
DIAGNOSTIC_INFO = 1 << 8 # diagnInfoRejection
@property
def backend_name(self) -> str:
names = {
"QUALITY": "qualityRejection",
"RANGE": "rangeRejection",
"EMPTY": "emptyRejection",
"TIME": "timeRejection",
"CONSTANT_SIGNAL": "constantSignalRejection",
"DISCHARGE": "dischargeRejection",
"SIGMA3": "sigma3Rejection",
"VTOR": "VTORRejection",
"NEARBY": "nearbyRejection",
"DIAGNOSTIC_INFO": "diagnInfoRejection",
}
return names.get(self.name or "", str(int(self)))
def __str__(self) -> str:
raw_value = int(self)
if raw_value == 0:
return "0"
active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
if active_flags:
return f"{'|'.join(active_flags)} ({raw_value})"
return str(raw_value)
class RejectionSensorTag(Enum):
"""
Теги датчиков для тестов отбраковки (id, description=tag)
Айди тегов подставляется на ходу из текущей версии конфы с сервера
"""
KP_8_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pin") # nearby_pressure_pin range_upper_pressure range_lower_pres
NPS_TIH_5_Vmom = (0, "AK.CHTN.NPS_TIH_5.UZR_1.Vmom") # diagnostic_info_flowrange_upper_flow range_lower_flow
KP_8_Pout = (0, "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout") # nearby_pressure_pout
KP_209_1_Pin = (0, "AK.CHTN.LU_VELKRIM.KP_209-1.SW_215-3-1.Pin") # empty_pressure
KP_7_Pin = (0, "AK.CHTN.LU_TIHVEL.KP_7.SW_6-3.Pin") # vtor_pressure
NPS_KRIM_P_Vmom = (0, "AK.CHTN.NPS_KRIM_P.UZR_1.Vmom") # empty_flow quality_flow
def __init__(self, sensor_id: int, description: str) -> None:
self.id = sensor_id
self.description = description
@classmethod
def update_ids_from_config(cls, sensor_ids_by_address: Mapping[str, int]) -> None:
"""
Обновляет sensor_id по tag из конфигурации стенда.
"""
missing_tags = []
for sensor in cls:
sensor_id = sensor_ids_by_address.get(sensor.description)
if sensor_id is None:
missing_tags.append(sensor.description)
continue
sensor.id = sensor_id
if missing_tags:
raise ValueError(f"Не найдены sensor_id для tags: {', '.join(missing_tags)}")
def __str__(self):
return f"{self.id} - {self.description}"
class UserActions(IntFlag):
USER_LOGIN = 1 # Вход пользователя
USER_EXIT = 1 << 1 # Выход пользователя
FAILED_USER_LOGIN = 1 << 2 # Неуспешная попытка входа пользователя
ALGORITHMS_REINITIALIZATION = 1 << 3 # Переинициализация алгоритмов
SIGNAL_MASK_SIM = 1 << 4 # Маскирование и имитация входных сигналов
LDS_MASKING = 1 << 5 # Маскирование СОУ
EXPORT = 1 << 6 # Экспорт и выгрузки
SETTINGS_CHANGE = 1 << 7 # Изменение настроек
LEAK_ACK = 1 << 8 # Квитирование сообщения об утечке
LEAK_REMOVE = 1 << 9 # Исключение неактивных утечек
LDS_ADMIN = 1 << 10 # Администрирование СОУ
PIG_CONTROL = 1 << 11 # Управление СОД
class SiteKpKp(Enum):
"""controlledSiteId, segmentId"""
TIXORECZKAYA_NOVOVELICHKOVSKAYA = {'controlledSiteId': 6012, 'segmentId': 6013}
NOVOVELICHKOVSKAYA_KRYMSKAYA = {'controlledSiteId': 6074, 'segmentId': 6075}
KRYMSKAYA_GRUSHOVAYA = {'controlledSiteId': 6220, 'segmentId': 6221}
BACKUP_ROUTE_BEJSUG = {'controlledSiteId': 6242, 'segmentId': 6243}
BACKUP_ROUTE_PONURA = {'controlledSiteId': 6076, 'segmentId': 6077}
BACKUP_ROUTE_KUBAN = {'controlledSiteId': 6088, 'segmentId': 6089}
NPZ_AFIPSKIJ = {'controlledSiteId': 6244, 'segmentId': 6245}
NPZ_ILINSKIJ = {'controlledSiteId': 6120, 'segmentId': 6121}
class SignalType(IntFlag):
"""Типы сигналов: режим МТ - 8, режим СОУ - 256, самотеки -16"""
REGLU = 1 << 3
REGSOU = 1 << 8
GRAVITYPIPE = 1 << 4
@property
def backend_name(self) -> str:
names = {
"REGLU": "PumpingStatus",
"REGSOU": "LdsStatus",
"GRAVITYPIPE": "FreeFlow",
}
return names.get(self.name or "", str(int(self)))
def __str__(self) -> str:
raw_value = int(self)
if raw_value == 0:
return "0"
active_flags = [flag.backend_name for flag in type(self) if flag.value and flag & self == flag]
if active_flags:
return f"{'|'.join(active_flags)} ({raw_value})"
return str(raw_value)
class GravityPipe(Enum):
expected_lds_status_gravity_true = (1, "Наличие самотека")
expected_lds_status_gravity_false = (0, "Отсутствие самотека")
def __init__(self, status_id: int, description: str) -> None:
self.id = status_id
self.description = description
def __str__(self):
return f"{self.id} - {self.description}"
class MeasureConversionRule(Enum):
MPA_MEASURE = "MPA_MEASURE"
KG_CM_MEASURE = "KG_CM_MEASURE"
тест конст
"""
Общие константы для тестов.
"""
from constants.enums import StationaryStatus
class BaseTN3Constants:
# ===== Константы для запросов журнала =====
COLUMN_SELECTION_DEF = [
'Time',
'User',
'MainPipeline',
'TechnologicalSection',
'TechnologicalObject',
'ControlPoint',
'Object',
'SignalName',
'Event',
'Value',
'MessageType',
'Tag',
'Status',
]
# ===== Типы сигналов и объектов =====
PRESSURE_SENSOR_OBJECT_TYPE = 2
FLOWMETER_OBJECT_TYPE = 3
PRESSURE_SIGNAL_TYPE = 1
FLOW_SIGNAL_TYPE = 4
# ===== Суффиксы адресов выходных сигналов =====
ADDRESS_SUFFIX_ACK_LEAK = "AckLeak"
ADDRESS_SUFFIX_LEAK = "Leak"
ADDRESS_SUFFIX_MASK = "Mask"
ADDRESS_SUFFIX_POINT_LEAK = "PointLeak"
ADDRESS_SUFFIX_Q_LEAK = "QLeak"
ADDRESS_SUFFIX_TIME_LEAK = "TimeLeak"
ADDRESS_SUFFIX_PUMPING_STATUS = "RegLU"
ADDRESS_SUFFIX_LDS_STATUS = "RegSOU"
# ===== Ключи поиска =====
LEAK_LINEAR_PART_ID_KEY = "id"
CONTROLLED_SITE_ID_AND_SEGMENT_ID = "controlledSiteId"
# ===== Общее количество участков КП-КП =====
LIMIT_CONTROLLED_SITES = 500
COUNT_CONTROLLED_SITES = 114
# ===== Ожидаемые значения выходных сигналов =====
OUTPUT_IS_ACK_LEAK = "1"
OUTPUT_IS_LEAK = "1"
OUTPUT_IS_NOT_LEAK = "0"
OUTPUT_IS_NOT_MASK = "0"
OUTPUT_IS_MASK = "1"
MASS_KG = 3600 # Коэффициент массы, нужно умножить, чтобы получить объем в м3/час
KGS_SM2 = 98066 # Коэффициент давления, нужно умножить, чтобы получить объем в кгс/см2
ALLOWED_VOLUME_DIFF = 0.3 # Относительная погрешность по объему
ALLOWED_DISTANCE_DIFF_METERS = 5000 # Погрешность координаты в метрах
KM_TO_METERS = 1000 # Перевод в метры
LEAK_START_INTERVAL = 2100 # Интервал от старта имитатора до первого обнаружения утечки - 35 минут по умолчанию
LEAK_LOCATION_STATUS = 1
# ===== Параметры выходных сигналов =====
OUTPUT_TEST_DELAY = 120 # Задержка для теста выходных сигналов в секундах
OUTPUT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" # Формат времени для парсинга выходных сигналов
# ===== Параметры маскирования =====
IS_MASKED_TRUE = True
IS_MASKED_FALSE = False
# ===== Параметры имитации =====
PRESSURE_IMITATION_RANGE = (1, 40)
VOLUME_IMITATION_RANGE = (100, 2400)
GOOD_QUALITY_VAL = 1
# ===== Константы журнала =====
JOURNAL_EVENT_MASK = "Установка признака маскирования"
JOURNAL_EVENT_UNMASK = "Снятие признака маскирования"
JOURNAL_SIGNAL_PRESSURE = "Значение давления"
JOURNAL_SIGNAL_FLOW = "Расход"
JOURNAL_MESSAGE_TYPE_USER_ACTIONS = "Действия пользователя"
JOURNAL_STATUS_SUCCESS = "Успешно"
JOURNAL_EXPECTED_MSG_COUNT_PER_SIGNAL = 2
JOURNAL_MASK_PAGINATION_LIMIT = 10
JOURNAL_EVENT_POSSIBLE_LEAK = "Возможна утечка"
JOURNAL_EVENT_DETECTED_LEAK = "Утечка."
JOURNAL_MESSAGE_TYPE_LEAKS = "Утечки"
JOURNAL_EVENT_COMPLETED_LEAKS = "Утечка завершена"
JOURNAL_EXPECTED_MASK_MSG_TOTAL = 4
JOURNAL_MASK_EXPECTED_EVENTS = {"Установка признака маскирования", "Снятие признака маскирования"}
JOURNAL_MASK_EXPECTED_SIGNALS = {"Значение давления", "Расход"}
JOURNAL_PAGINATION_LIMIT = 10
JOURNAL_PAGINATION_REJECT_LIMIT = 20
JOURNAL_PAGINATION_STATUS_LIMIT = 120
JOURNAL_STATUS_TOTAL_WAIT = 300 # Время установки режима в данных, в секундах
JOURNAL_EVENT_LEAK_ACKNOWLEDGED = "Сообщение об утечке квитировано"
JOURNAL_EVENT_LDS_INIT_ACCUM_DATA = "СОУ в инициализации (Накопление данных)"
JOURNAL_EVENT_LDS_INIT_COLD_START = "СОУ в инициализации (Одновременный «холодный» запуск нескольких серверов СОУ)"
JOURNAL_MESSAGE_TYPE_LDS_STATUS = "Режим работы СОУ"
JOURNAL_MESSAGE_TYPE_REJECTION = "Отбраковка"
JOURNAL_MESSAGE_EVENT_STATIONARY = (
"Стационарный режим работы МТ (Отклонения давления и расхода не превышают допустимых отклонений)"
)
JOURNAL_MESSAGE_EVENT_NOT_STATIONARY = (
"Нестационарный режим работы МТ (Одновременный «холодный» запуск нескольких серверов СОУ)"
)
JOURNAL_MESSAGE_EVENT_STOP = (
"МТ в режиме остановленной перекачки (На ДУ отсутствуют работающие НА, "
"при этом показания СИ расхода не превышают 1 % отмаксимального значения "
"диапазона измерений всех СИ расхода на технологическом участке)"
)
SEC_PER_MIN = 60
# ===== Параметры подтверждения =====
IS_ACKNOWLEDGED_FALSE = False
# ===== Параметры BalanceAlgorithmResults =====
BALANCE_ALGORITHM_POLL_INTERVAL = 15 # Интервал опроса подписки в секундах
BALANCE_ALGORITHM_TOTAL_WAIT = 300 # Общее время опроса в секундах
DEBALANCE_TOLERANCE = 0.25 # Допустимое отклонение дебаланса от порога 30%
# ===== Теги датчиков для маскирования и имитации =====
PRESSURE_SENSOR_ADDRESS = "AK.CHTN.LU_TIHVEL.KP_8.SW_8-3.Pout"
FLOWMETER_ADDRESS = "AK.CHTN.NPS_TIH_5.UZR_1.Vmom"
SENSOR_IDS_BY_ADDRESS = {}
# ===== Прочие константы =====
BASIC_MESSAGE_TIMEOUT = 10.0 # Таймаут ожидания сообщений в секундах
MASK_MESSAGE_TIMEOUT = 180.0 # Таймаут ожидания сообщений в секундах
PRECISION = 3 # Точность округления для координат
DIGITS_WITH_DOT_PATTERN = r'\d+(?:\.\d+)?' # Регулярное выражение для поиска чисел с точкой
DIAGNOSTIC_AREA_BASE_IDS = {
"Т-Н-3.НПС-5 «Тихорецкая».УЗР СИКН ТН-3 - Т-Н-3.НПС-5 «Тихорецкая».УЗР вых": (9992054907, (9992054908,)),
"Т-Н-3.НПС-5 «Тихорецкая».УЗР вых - Т-Н-3.УЗР НПС-3 «Нововеличковская».": (
9992054908,
(9992054907, 9992054909),
),
"Т-Н-3.УЗР НПС-3 «Нововеличковская». - Т-Н-3.НПС-2 «Крымская».УЗР СИКН Т-К": (
9992054909,
(9992054908, 9992054910, 9992054911),
),
"Т-Н-3.НПС-2 «Крымская».УЗР СИКН Т-К - Т-Н-3.НПС-2 «Крымская».УЗР вых": (9992054910, (9992054909, 9992054912)),
"Н-К.КП-0.УЗР 0км - Н-К.КП-30.УЗР 30км": (9992054911, (9992054909, 9992054913, 9992054914)),
"Т-Н-3.НПС-2 «Крымская».УЗР вых - Т-Н-3.НПС«Крымская».УЗР вых Камеры пуска": (
9992054912,
(9992054910, 9992054915),
),
"Н-К.КП-30.УЗР 30км - ПСП Афипский СИКН 1015 УЗР": (9992054913, (9992054911,)),
"Н-К.УП ИНПЗ.УЗР 4,3км - Н-К.ИНПЗ.УЗР СИКН 1019": (9992054914, (9992054911,)),
"Т-Н-3.НПС«Крымская».УЗР вых Камеры пуска - Т-Н-3.«Грушовая».УЗР-700": (9992054915, (9992054912,)),
}
REPRESENTATIVE_DIAGNOSTIC_AREA_IDS = [2, 3] # Список показательных ДУ для определения режима СОУ
ZONE_INFO: str = "Europe/Moscow"
SECONDS_PER_HOUR: int = 3600
CRITERIA_NAMES_FIELD: str = 'criteriaNames'
class ExportReportConstants:
"""Константы для теста формирования отчёта об утечках"""
# Максимальное ожидание уведомления о готовности отчёта
NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
# Максимальное время ожидания появления отчёта в списке после уведомления
LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
# Интервал между запросами getExportedFilesListRequest
LIST_POLL_INTERVAL_SECONDS: float = 10.0
# Таймаут получения ответа на скачивание
DOWNLOAD_TIMEOUT_SECONDS: float = 60.0
# ===== Имя файла отчёта =====
LEAKS_REPORT_NAME_PART: str = "Отчет об утечках" # подстрока в имени файла/отчёта
XLSX_EXTENSION: str = ".xlsx"
# Сигнатура zip-архива, используется для проверки формата файла по содержимому
ZIP_SIGNATURE: bytes = b'PK\x03\x04'
# ===== Формат даты/времени в отчёте =====
REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
# Регулярное выражение для извлечения двух дат из заголовка
REPORT_HEADER_PERIOD_PATTERN: str = (
r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
# Регулярное выражение для извлечения двух дат из названия файла
REPORT_FILE_NAME_PERIOD_PATTERN: str = (
r'^Отчет об утечках (?P<tu>.+?) '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r' - '
r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r'\.xlsx$'
)
# Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
REPORT_TITLE_ROW: int = 1
REPORT_COLUMN_HEADERS_ROW: int = 2
REPORT_DATA_FIRST_ROW: int = 3
# ===== Названия колонок =====
COL_DATETIME: str = "Дата и время"
COL_OBJECT: str = "Объект"
COL_LDS_STATUS: str = "Режим работы СОУ"
COL_MASK_INFO: str = "Информация о маскировании"
COL_COORDINATE: str = "Координата"
COL_LEAK_VOLUME: str = "Объемный расход утечки"
COL_MT_MODE: str = "Режим работы МТ"
EXPECTED_COLUMN_HEADERS: list = [
COL_DATETIME,
COL_OBJECT,
COL_LDS_STATUS,
COL_MASK_INFO,
COL_COORDINATE,
COL_LEAK_VOLUME,
COL_MT_MODE,
]
MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"
# ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
StationaryStatus.STOPPED.value: "МТ в режиме остановленной перекачки",
}
# ===== Прочее =====
DEFAULT_SHEET_INDEX: int = 0
SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST: str = "SubscribeReportsDataExportedRequest"
EXPORT_REPORTS_COMMAND_REQUEST: str = "ExportReportsCommandRequest"
REPORT_DATA_EXPORTED_NOTIFICATION: str = "ReportDataExportedNotification"
GET_EXPORTED_DATA_LIST_REQUEST: str = "GetExportedDataListRequest"
EXPORTED_DATA_LIST_LIMIT: int = 10
DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"
# Допустимая погрешность при сравнении границ периода отчёта
REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
# Формат даты/времени в имени скачиваемого xlsx-файла
REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"
class ExportLdsStatusReportConstants:
"""Константы для теста формирования xlsx-отчёта о режиме работы СОУ"""
LDS_STATUS_REPORT_NAME_PART: str = "Отчет о режиме работы СОУ"
SECTION_NAMES: list[str] = [
"НПС-5 Тихорецкая - НПС-3 Нововеличковская",
"НПС-3 Нововеличковская - НПС-2 Крымская",
"НПС-2 Крымская - НПС Грушовая",
]
TOTAL_WORK_DURATION_LABEL: str = "Суммарное время работы:"
ZERO_DURATION_TEXT: str = "0:00:00"
TOTAL_DURATION_TOLERANCE_SECONDS: int = 5
# Число частей времени при split(':') - часы:минуты:секунды (1:02:51) и минуты:секунды (02:51)
DURATION_PARTS_COUNT_H_MM_SS: int = 3
DURATION_PARTS_COUNT_MM_SS: int = 2
REPORT_TITLE_ROW: int = 1
REPORT_COLUMN_HEADERS_ROW: int = 2
REPORT_DATA_FIRST_ROW: int = 3
COL_SECTION: str = "Наименование участка"
COL_FAULTY: str = "Неисправность"
COL_DEGRADATION: str = "В ухудшенных характеристиках"
COL_INITIALIZATION: str = "Инициализация"
COL_SERVICEABLE: str = "Исправность"
MODE_DURATION_COLUMNS: list = [
COL_FAULTY,
COL_DEGRADATION,
COL_INITIALIZATION,
COL_SERVICEABLE,
]
EXPECTED_COLUMN_HEADERS: list = [COL_SECTION, *MODE_DURATION_COLUMNS]
REPORT_HEADER_PERIOD_PATTERN: str = (
r'Отчет о режиме работы СОУ с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
REPORT_FILE_NAME_PERIOD_PATTERN: str = (
r'^Отчет о режиме работы СОУ\. (?P<tu>.+?) '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r' - '
r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r'\.xlsx$'
)
class ExportRejectedReportConstants:
"""Константы для теста формирования xlsx-отчёта об отбракованных входных данных"""
REJECTED_REPORT_NAME_PART: str = "Отчет об отбракованных входных данных"
REJECTED_REPORT_NAME_PART_ALT: str = "Отчёт об отбракованных входных данных"
REPORT_TITLE_ROW: int = 1
REPORT_COLUMN_HEADERS_ROW: int = 2
REPORT_DATA_FIRST_ROW: int = 3
COL_DATETIME: str = "Дата и время"
COL_OBJECT: str = "Объект"
COL_EVENT: str = "Событие"
COL_VALUE: str = "Значение"
COL_DURATION: str = "Продолжительность отбраковки"
COL_TAG: str = "Тег сигнала"
EXPECTED_COLUMN_HEADERS: list = [
COL_DATETIME,
COL_OBJECT,
COL_EVENT,
COL_VALUE,
COL_DURATION,
COL_TAG,
]
REPORT_HEADER_PERIOD_PATTERN: str = (
r'[Оо]тч[её]т об отбракованных входных данных с '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
REPORT_FILE_NAME_PERIOD_PATTERN: str = (
r'^[Оо]тч[её]т об отбракованных входных данных (?P<tu>.+?) '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r' - '
r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r'\.xlsx$'
)
TIME_FILTER_TOLERANCE_SECONDS: int = 60
# Суффикс сигнала в колонке "Объект" отчёта (после последней точки в строке)
REPORT_SIGNAL_FLOW: str = "Расход"
REPORT_SIGNAL_PRESSURE: str = "Давление"
REPORT_SIGNAL_SUFFIX_BY_EXPECTED_NAME: dict = {
BaseTN3Constants.JOURNAL_SIGNAL_FLOW: REPORT_SIGNAL_FLOW,
BaseTN3Constants.JOURNAL_SIGNAL_PRESSURE: REPORT_SIGNAL_PRESSURE,
}
# Разбор колонки "Объект": участок трубопровода и суффикс сигнала разделяются последней точкой
OBJECT_SIGNAL_SEPARATOR: str = "."
OBJECT_SIGNAL_RSPLIT_MAXSPLIT: int = 1
REJECTED_REPORT_HEADER_TITLE_PART: str = "отчет об отбракованных входных данных с"
REJECTED_REPORT_HEADER_TITLE_PART_ALT: str = "отчёт об отбракованных входных данных с"
class MeasureUnitConstants:
MPA_MEASURE: str = "MPa"
KG_CM_MEASURE: str = "kgf/cm^2"
class ExportMtModeReportConstants:
"""Константы для теста формирования xlsx-отчёта о режиме работы МТ"""
MT_MODE_REPORT_NAME_PART: str = "Отчет о режиме работы МТ"
SECTION_NAMES: list[str] = [
"НПС-5 Тихорецкая - НПС-3 Нововеличковская",
"НПС-3 Нововеличковская - НПС-2 Крымская",
"НПС-2 Крымская - НПС Грушовая",
]
TOTAL_WORK_DURATION_LABEL: str = "Суммарное время работы:"
ZERO_DURATION_TEXT: str = "0:00:00"
TOTAL_DURATION_TOLERANCE_SECONDS: int = 5
DURATION_PARTS_COUNT_H_MM_SS: int = 3
DURATION_PARTS_COUNT_MM_SS: int = 2
REPORT_TITLE_ROW: int = 1
REPORT_COLUMN_HEADERS_ROW: int = 2
REPORT_DATA_FIRST_ROW: int = 3
COL_SECTION: str = "Наименование участка"
COL_STOPPED: str = "Остановленный"
COL_UNSTATIONARY: str = "Нестационарный"
COL_STATIONARY: str = "Стационарный"
MODE_DURATION_COLUMNS: list = [
COL_STOPPED,
COL_UNSTATIONARY,
COL_STATIONARY,
]
EXPECTED_COLUMN_HEADERS: list = [COL_SECTION, *MODE_DURATION_COLUMNS]
STATIONARY_STATUS_TO_COLUMN: dict = {
StationaryStatus.STOPPED.value: COL_STOPPED,
StationaryStatus.UNSTATIONARY.value: COL_UNSTATIONARY,
StationaryStatus.STATIONARY.value: COL_STATIONARY,
}
REPORT_HEADER_PERIOD_PATTERN: str = (
r'Отчет о режиме работы МТ с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
REPORT_FILE_NAME_PERIOD_PATTERN: str = (
r'^Отчет о режиме работы МТ\. (?P<tu>.+?) '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r' - '
r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r'\.xlsx$'
)
кх мен
import json
import logging
from pathlib import Path
from typing import Any, List, Optional
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import ClickhouseConstants as CH_const
from infra.cmd_generator import ClickHouseCmdGenerator
logger = logging.getLogger(__name__)
class ClickHouseManager:
"""
Класс работы с clickhouse
Пример использования:
click_manager = ClickHouseManager(stand_client, infra_client)
click_manager.copy_configuration_file_from_stand() - для загрузки конфигурации со стенда
click_manager.delete_clickhouse_keys_with_check() - для удаления данных по определенным ключам
"""
def __init__(
self,
stand_client: SubprocessClient,
infra_client: SubprocessClient,
configuration_file_name: str,
) -> None:
self._stand_client = stand_client
self._infra_client = infra_client
self._configuration_file_name = configuration_file_name
self._username = stand_client.username
self._stand_host = stand_client.host
self._infra_host = infra_client.host
self._evo_id_pairs: List[tuple] = []
self._cmd_generator = ClickHouseCmdGenerator(self._username, self._stand_host, configuration_file_name)
def copy_configuration_file_from_stand(self) -> None:
"""
Копирует файл конфигурации со стенда в корень проекта
"""
copy_cmd = self._cmd_generator.generate_scp_config_file_cmd()
configuration_file_path = Path(self._configuration_file_name)
if not configuration_file_path.exists():
try:
self._stand_client.run_cmd(copy_cmd, timeout=CH_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)
logger.info(f"[CLICKHOUSE] [OK] файл: {self._configuration_file_name} успешно сохранен на runner")
except Exception as error:
error_msg = f"[CLICKHOUSE] [ERROR] При сохранении файла: {self._configuration_file_name} на runner"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def delete_clickhouse_keys_with_check(self) -> None:
"""
Метод удаления данных по ключам с проверкой из ClickHouse командами
"""
evo_id_pairs_chunks = self._split_pairs_list()
for table_name in CH_const.CH_TABLE_NAMES:
for chunk in evo_id_pairs_chunks:
self._delete_clickhouse_keys(chunk, table_name)
self._check_clickhouse_keys(chunk, table_name)
logger.info(f"[CLICKHOUSE] [OK] Успех! Данные всех датчиков в таблице {table_name} удалены")
def _delete_clickhouse_keys(self, evo_id_pairs: List[tuple], table_name) -> None:
"""
Метод удаления данных по ключам из ClickHouse командой, используя clickhouse-client
"""
delete_cmd = self._cmd_generator.generate_delete_clickhouse_keys_cmd(evo_id_pairs, table_name)
try:
self._infra_client.run_cmd(delete_cmd)
except Exception as error:
error_msg = f"[CLICKHOUSE] [ERROR] При удалении данных в таблице командой: {delete_cmd}."
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def _check_clickhouse_keys(self, evo_id_pairs: List[tuple], table_name: str) -> None:
"""
Метод проверки удаления данных по ключам из ClickHouse командой, используя clickhouse-client
"""
check_cmd = self._cmd_generator.generate_check_sensor_data_click_cmd(evo_id_pairs, table_name)
try:
result = self._infra_client.run_cmd(check_cmd, need_output=True)
try:
result_int = int(result)
except (TypeError, ValueError) as error:
error_msg = (
f"[CLICKHOUSE] [ERROR] Результат: {result} проверки количества записей после удаления,"
" не является числом или равен None"
)
logger.exception(error_msg)
raise TypeError(error_msg) from error
if result_int != 0:
error_msg = f"[CLICKHOUSE] [ERROR] Осталось: {result} записей после удаления"
logger.error(error_msg)
raise RuntimeError(error_msg)
except Exception as error:
error_msg = f"[CLICKHOUSE] [ERROR] При проверке данных в таблице командой: {check_cmd}."
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def _extract_evo_id_pairs_from_configuration(self) -> None:
"""
Получение списка пар значений evoObjectId и evoParameterId из файла конфигурации
"""
configuration_json = self._read_configuration_file()
self._evo_id_pairs = self._extract_evo_id_pairs(configuration_json)
def _extract_evo_id_pairs(self, configuration_json: Any) -> List[tuple]:
"""
Получение списка уникальных пар значений evoObjectId и evoParameterId
"""
uniq_pairs = set()
evo_id_pairs = []
stack = [configuration_json]
try:
while stack:
current_element = stack.pop() # Забирает последний элемент списка
if isinstance(current_element, dict):
evo_id_pair = self._extract_evo_id_pair(current_element)
if evo_id_pair is not None and evo_id_pair not in uniq_pairs:
uniq_pairs.add(evo_id_pair)
evo_id_pairs.append(evo_id_pair)
# Добавляет все значения текущего элемента в список в обратном порядке
stack.extend(reversed(current_element.values()))
elif isinstance(current_element, list):
stack.extend(reversed(current_element))
except Exception as error:
error_msg = (
f"[CLICKHOUSE] [ERROR] При получении списка пар {CH_const.EVO_OBJECT_ID_KEY_NAME} и "
f"{CH_const.EVO_PARAMETER_ID_KEY_NAME} из конфигурации "
)
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
if not evo_id_pairs:
error_msg = (
f"[CLICKHOUSE] [ERROR] Пустой список при получении списка пар {CH_const.EVO_OBJECT_ID_KEY_NAME} и "
f"{CH_const.EVO_PARAMETER_ID_KEY_NAME} из конфигурации "
)
logger.error(error_msg)
raise ValueError(error_msg)
return evo_id_pairs
@staticmethod
def _extract_evo_id_pair(element: dict) -> Optional[tuple]:
"""
Ищет пару значений evoObjectId и evoParameterId в словаре
"""
try:
evo_id = element[CH_const.EVO_OBJECT_ID_KEY_NAME]
param_id = element[CH_const.EVO_PARAMETER_ID_KEY_NAME]
if (isinstance(evo_id, int) and evo_id != 0) and (isinstance(param_id, int) and param_id != 0):
return evo_id, param_id
except (AttributeError, KeyError, TypeError):
pass
def _read_configuration_file(self) -> Any:
"""
Чтение файла конфигурации, использует список кодировок
"""
error_msg = (
f"[CLICKHOUSE] [ERROR] Не удалось декодировать файл {self._configuration_file_name} "
f"в кодировках {CH_const.DEFAULT_ENCODINGS}"
)
for encoding in CH_const.DEFAULT_ENCODINGS:
try:
with open(self._configuration_file_name, "r", encoding=encoding, errors="strict") as conf_file:
data = json.load(conf_file)
if not data:
error_msg = f"Пустой json (кодировка:{encoding}"
logger.error(error_msg)
raise ValueError(error_msg)
return data
except UnicodeDecodeError:
# следующая кодировка
continue
except Exception as error:
logger.exception(error_msg)
raise OSError(error_msg) from error
logger.exception(error_msg)
raise OSError(error_msg)
def _split_pairs_list(self) -> List[list]:
"""
Делит список пар на части
"""
if not self._evo_id_pairs:
self._extract_evo_id_pairs_from_configuration()
return [
# fmt: off
self._evo_id_pairs[i:i + CH_const.EVO_ID_PAIRS_CHUNK_SIZE]
# fmt: on
for i in range(0, len(self._evo_id_pairs), CH_const.EVO_ID_PAIRS_CHUNK_SIZE)
]
смд ген
import logging
import os
from datetime import datetime, timedelta
from pathlib import PurePosixPath
from typing import List
import allure
from constants.architecture_constants import ClickhouseConstants as CH_const
from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const
from infra.path_generator import ImitatorDataPathGenerator
logger = logging.getLogger(__name__)
class BaseCmdGenerator:
def __init__(self, username: str, host: str) -> None:
self._username = username
self._host = host
self._ssh_key_name: str = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)
self._scp_cmd: str = ""
self._choose_scp_cmd_by_os()
def _choose_scp_cmd_by_os(self):
if os.name == Im_const.OS_NAME_WIN:
scp_cmd = f"scp -i {self._ssh_key_name}"
else:
scp_cmd = "scp"
self._scp_cmd = scp_cmd
class TimeProcessor:
"""
Класс для получения времени запуска и остановки имитатора
Для получения времени:
from utils.flag_generator import TimeProcessor
time_processor = TimeProcessor(test_duration_m)
start_time = time_processor.formatted_start_time
stop_time = time_processor.formatted_stop_time
start_time_dt = time_processor.start_time # datetime объект для расчётов
"""
def __init__(self, duration_m: float) -> None:
self._duration_m = duration_m
self._current_time: datetime = datetime.now()
self._start_time: datetime = self._add_time_delta(seconds=Im_const.IMITATOR_START_DELAY_S)
self._formatted_start_time: str = self._get_formatted_start_time()
self._formatted_stop_time: str = self._get_formatted_stop_time()
@property
def start_time(self) -> datetime:
"""Возвращает время старта имитатора как datetime объект для расчётов интервалов"""
return self._start_time
@property
def formatted_start_time(self) -> str:
return self._formatted_start_time
@property
def formatted_stop_time(self) -> str:
return self._formatted_stop_time
def _add_time_delta(self, minutes: float = 0, seconds: int = 0) -> datetime:
"""
:param minutes: время в минутах
:param seconds: время в секундах
:return: текущее время + добавленное время
"""
try:
delta = timedelta(minutes=minutes, seconds=seconds)
current_time_with_delta = self._current_time + delta
return current_time_with_delta
except (ValueError, TypeError):
logger.exception("[ERROR] Ошибка при добавлении времени")
raise
@staticmethod
def get_formatted_time(time) -> str:
"""
:param time: время в формате datetime
:return: время в формате строки, которую принимает имитатор
"""
try:
formatted_time = time.strftime(Im_const.IMITATOR_TIME_FORMAT)
return formatted_time
except (ValueError, TypeError):
logger.exception("[ERROR] Ошибка при форматировании времени")
raise
def _get_formatted_start_time(self) -> str:
"""
:return: время запуска имитатора строкой
"""
formatted_start_time = self.get_formatted_time(self._start_time)
return formatted_start_time
def _get_formatted_stop_time(self) -> str:
"""
Добавляет время прогона ко времени отсрочки пуска имитатора
:return: время остановки имитатора строкой
"""
stop_time = self._add_time_delta(minutes=self._duration_m, seconds=Im_const.IMITATOR_START_DELAY_S)
formatted_stop_time = self.get_formatted_time(stop_time)
return formatted_stop_time
class ImitatorCmdGenerator:
"""
Класс для получения команды запуска имитатора
Большая часть флагов формируется из дефолтных значений.
Для получения флагов:
from utils.imitator_cmd_generator import ImitatorCmdGenerator
cmd_generator = ImitatorCmdGenerator(path_to_test_data, host, test_duration_m)
final_cmd = cmd_generator.generate_final_imitator_cmd()
start_time_dt = cmd_generator.start_time # datetime объект для расчётов интервалов утечек
"""
def __init__(self, sandbox_path: str, stand_name: str, duration_m: float) -> None:
self._sandbox_path = sandbox_path
self._stand_name = stand_name
self._duration_m = duration_m
self._time_processor = TimeProcessor(self._duration_m)
self._source_type: str = Im_const.SOURCE_TYPE_DEF_VALUE
self._speed: int = Im_const.SPEED_DEF_VALUE
self._opcua: str = os.environ.get(EnvKeyConstants.OPC_URL)
self._ns: int = Im_const.NS_DEF_VALUE
self.final_cmd: str = ""
self._get_other_flags_values()
self._generate_flags()
@property
def start_time(self) -> datetime:
"""Возвращает время старта имитатора как datetime объект для расчётов интервалов утечек"""
return self._time_processor.start_time
def _generate_inner_test_data_path(self, sub_path: str) -> str:
"""
Добавляет название файла / директорию к пути хранения тестовых данных
:param sub_path: название файла / директорию
:return: полный путь к файлу / директории
"""
try:
result_path = PurePosixPath(self._sandbox_path) / sub_path
return str(result_path)
except (ValueError, TypeError, OSError):
logger.exception(f"[ERROR] Ошибка при создании пути к данным прогона. Данные: {sub_path}")
raise
def _generate_sandbox_paths(self) -> tuple[str, str, str]:
"""
:return: кортеж: пути к директории data и к файлам внутри директории с данными для запуска имитатора
"""
# Формирует пути к папке с логами датчиков и файлам
path_to_data = self._generate_inner_test_data_path(Im_const.SANDBOX_DATA)
path_to_rules = self._generate_inner_test_data_path(Im_const.SANDBOX_RULES)
path_to_tags = self._generate_inner_test_data_path(Im_const.SANDBOX_TAGS)
return path_to_data, path_to_rules, path_to_tags
def _get_target_host(self) -> str:
"""
Получает target для флага из списка стендов
:return: target для флага
"""
try:
return Im_const.HOST_MAP.get(self._stand_name, {}).get(Im_const.IMITATOR_KEY_NAME)
except KeyError:
logger.exception(f"[ERROR] Не удалось получить target для стенда: {self._stand_name}")
raise
def _get_other_flags_values(self):
"""
Метод получения значений флагов
"""
try:
self._path_to_data, self._path_to_rules, self._path_to_tags = self._generate_sandbox_paths()
self._start_time = self._time_processor.formatted_start_time
self._stop_time = self._time_processor.formatted_stop_time
self._target = self._get_target_host()
except (AttributeError, ValueError):
logger.exception("[ERROR] Ошибка при получении флагов")
raise
def _generate_flags(self) -> None:
"""
Метод получения флагов
:return: строку с флагами для запуска имитатора
"""
try:
# Формирует флаги
command_parts = [
f' --rules="{self._path_to_rules}"',
f'--source="{self._path_to_data}"',
f'--sourceType="{self._source_type}"',
f'--sourceTagTypes="{self._path_to_tags}"',
f'--startTime="{self._start_time}"',
f'--stopTime="{self._stop_time}"',
f'--speed={self._speed}',
f'--opcua="{self._opcua}"',
f'--ns={self._ns}',
]
if self._target:
command_parts.append(f'--target="{self._target}"')
self._flags = " ".join(command_parts)
except (ValueError, TypeError):
logger.exception("[ERROR] Ошибка при создании итоговой версии флагов")
raise
def generate_final_imitator_cmd(self) -> str:
"""
Собирает команду для запуска имитатора
:return: финальная команда запуска имитатора
"""
try:
with allure.step(f"Запуск имитатора данных с флагами {self._flags}"):
self.final_cmd = Im_const.IMITATOR_RUN_CMD + self._flags
return self.final_cmd
except (ValueError, TypeError):
logger.exception("[ERROR] Ошибка при создании итоговой команды для запуска")
raise
class UploadImitatorDataCmdGenerator(BaseCmdGenerator):
def __init__(self, username: str, host: str, path_generator: ImitatorDataPathGenerator) -> None:
super().__init__(username=username, host=host)
# Список ожидаемых файлов в архиве
self.expected_files: list = [Im_const.SANDBOX_RULES]
self._ssh_key_name: str = os.environ.get(EnvKeyConstants.SSH_KEY_NAME)
self._os_is_windows: bool = os.name == Im_const.OS_NAME_WIN
self._path_generator = path_generator
self._remote_temp_dir_path = self._path_generator.remote_temp_dir_path
self._tar_package_name = self._path_generator.tar_package_name
# Путь к архиву во временной директории на удаленном сервере
self._full_remote_tar_path = self._path_generator.generate_full_remote_tar_path()
def generate_check_remote_data_cmd(self) -> str:
"""
Создает команду проверки существования директории с данными и сопутствующих файлов
:return: команда для выполнения в консоли
"""
expected_dir = Im_const.SANDBOX_DATA
# Файлы для проверки после распаковки и копирования tags.txt
files_to_check = [Im_const.SANDBOX_RULES, Im_const.SANDBOX_TAGS]
check_dir_part = f"[ -d '{self._remote_temp_dir_path}/{expected_dir}' ]"
check_parts = [check_dir_part]
for file in files_to_check:
check_parts.append(f"[ -f '{self._remote_temp_dir_path}/{file}' ]")
condition = " && ".join(check_parts)
check_cmd = f"if {condition}; then echo {Im_const.CMD_STATUS_OK}; else echo {Im_const.CMD_STATUS_FAIL}; fi"
return check_cmd
def generate_create_dir_cmd(self) -> str:
"""
Генерирует команду создания временной директории
"""
return f"mkdir -p {self._remote_temp_dir_path}"
def generate_delete_dir_cmd(self) -> str:
"""
Генерирует команду удаления временной директории
"""
return f"rm -rf {self._remote_temp_dir_path}"
def generate_copy_tar_to_remote_cmd(self) -> str:
"""
Генерирует команду копирования данных на удаленный сервер
"""
return f"{self._scp_cmd} {self._tar_package_name} {self._username}@{self._host}:{self._remote_temp_dir_path}/"
def generate_unpack_tar_cmd(self) -> str:
"""
Генерирует команду распаковки архива на удаленном сервере
"""
return f"tar -xvzf {self._full_remote_tar_path} -C {self._remote_temp_dir_path}"
def generate_check_tar_cmd(self) -> str:
"""
Генерирует команду проверки архива на удаленном сервере
"""
return f"tar -tzf {self._full_remote_tar_path}"
def generate_copy_tags_cmd(self, tu_id: int) -> str:
"""
Генерирует команду для копирования tags.txt с сервера во временную директорию текущего набора данных
Файл tn{tu_id}_tags.txt копируется как tags.txt
"""
source_path = f"{Im_const.CONFIG_PATH}/tn{tu_id}_tags.txt"
target_path = f"{self._remote_temp_dir_path}/{Im_const.SANDBOX_TAGS}"
return f"cp {source_path} {target_path}"
class ClickHouseCmdGenerator(BaseCmdGenerator):
def __init__(self, username: str, host: str, configuration_file_name: str) -> None:
super().__init__(username=username, host=host)
self._configuration_file_name = configuration_file_name
def generate_scp_config_file_cmd(self) -> str:
"""
Генерирует команду копирования файла конфигурации со стенда
"""
path_to_remote_configuration = self._generate_path_to_remote_configuration()
return f"{self._scp_cmd} {self._username}@{self._host}:{path_to_remote_configuration} ."
def generate_check_sensor_data_click_cmd(self, evo_id_pairs: List[tuple], table_name: str) -> str:
"""
Генерирует команду проверки данных в ClickHouse по паре значений objectId и parameterId
"""
sql_evo_id_pairs = self._generate_sql_evo_id_pairs(evo_id_pairs)
return (
f"echo \'SELECT COUNT(*) FROM {table_name} WHERE "
f"({CH_const.OBJECT_ID_KEY_NAME}, {CH_const.PARAMETER_ID_KEY_NAME}) IN ({sql_evo_id_pairs})\' "
f"| docker exec -i {CH_const.NAME_CONTAINER} clickhouse-client"
)
def generate_delete_clickhouse_keys_cmd(self, evo_id_pairs: List[tuple], table_name: str) -> str:
"""
Генерирует команду удаления данных в ClickHouse по парам значений objectId и parameterId
"""
sql_evo_id_pairs = self._generate_sql_evo_id_pairs(evo_id_pairs)
return (
f"echo \'DELETE FROM {table_name} WHERE "
f"({CH_const.OBJECT_ID_KEY_NAME}, {CH_const.PARAMETER_ID_KEY_NAME}) IN ({sql_evo_id_pairs})\' "
f"| docker exec -i {CH_const.NAME_CONTAINER} clickhouse-client"
)
@staticmethod
def _generate_sql_evo_id_pairs(evo_id_pairs: List[tuple]) -> str:
"""
Создает строку из списка пар значений evoObjectId и evoParameterId
"""
return ", ".join("({}, {})".format(object_id, param_id) for object_id, param_id in evo_id_pairs)
def _generate_path_to_remote_configuration(self) -> PurePosixPath:
"""
Создает путь к файлу конфигурации конкретного стенда
"""
return PurePosixPath(CH_const.CONFIG_PATH) / self._configuration_file_name
class SignalUnitConversionCmdGenerator(BaseCmdGenerator):
def generate_scp_signal_rules_from_stand_cmd(self) -> str:
"""
Генерирует команду копирования signal_unit_conversion_rules.json со стенда на runner
"""
remote_path = self._generate_path_to_remote_signal_rules()
return f"{self._scp_cmd} {self._username}@{self._host}:{remote_path} ."
def generate_scp_signal_rules_to_stand_cmd(self, local_file_path: str) -> str:
"""
Генерирует команду копирования локального файла на стенд
"""
remote_path = self._generate_path_to_remote_signal_rules()
return f"{self._scp_cmd} {local_file_path} {self._username}@{self._host}:{remote_path}"
@staticmethod
def _generate_path_to_remote_signal_rules() -> PurePosixPath:
return PurePosixPath(Im_const.CONFIG_PATH) / Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME
конфиг мен
import json
import logging
from typing import Any
from constants.architecture_constants import ImitatorConstants as Imitator_const
from utils.helpers.configuration_utils import extract_sensor_ids_by_address
logger = logging.getLogger(__name__)
class ConfigurationManager:
"""
Читает локальную конфигурацию стенда и извлекает из нее данные для тестов.
"""
def __init__(self, configuration_file_name: str) -> None:
self._configuration_file_name = configuration_file_name
def get_sensor_ids_by_address(self) -> dict[str, int]:
"""
Возвращает словарь address: id из файла конфигурации.
"""
configuration_json = self._read_configuration_file()
return extract_sensor_ids_by_address(configuration_json)
def _read_configuration_file(self) -> Any:
"""
Чтение файла конфигурации, использует список кодировок.
"""
error_msg = (
f"[CONFIGURATION] [ERROR] Не удалось декодировать файл {self._configuration_file_name} "
f"в кодировках {Imitator_const.DEFAULT_ENCODINGS}"
)
for encoding in Imitator_const.DEFAULT_ENCODINGS:
try:
with open(self._configuration_file_name, "r", encoding=encoding, errors="strict") as conf_file:
data = json.load(conf_file)
if not data:
error_msg = f"Пустой json (кодировка:{encoding}"
logger.error(error_msg)
raise ValueError(error_msg)
return data
except UnicodeDecodeError:
continue
except Exception as error:
logger.exception(error_msg)
raise OSError(error_msg) from error
logger.exception(error_msg)
raise OSError(error_msg)
им дата ап
import logging
import tarfile
from pathlib import Path
from typing import List
from clients.http_client import HttpClient
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import HTTPClientConstants as Http_const
from constants.architecture_constants import ImitatorConstants as Im_const
from infra.cmd_generator import UploadImitatorDataCmdGenerator
from infra.path_generator import ImitatorDataPathGenerator
from models.http_models.attacments_list_testops_model import FileInfo, Items
logger = logging.getLogger(__name__)
class ImitatorDataUploader:
"""
Класс загрузки набора данных на удаленный сервер, нужных для запуска имитатора
Пример использования:
uploader = ImitatorDataUploader(your_user, your_host, test_id, tu_id)
uploader.upload_with_confirm() - для загрузки данных на удаленный сервер
remote_temp_path = uploader.remote_temp_dir_path - для получения пути ко временной директории
uploader.delete_with_confirm() - для удаления данных на удаленном сервере
"""
def __init__(self, stand_client: SubprocessClient, test_data_id: int, test_data_name: str, tu_id: int) -> None:
self._username = stand_client.username
self._host = stand_client.host
self._test_data_id = test_data_id
self._test_data_name = test_data_name
self._tu_id = tu_id # ID ТУ для получения tags.txt с сервера
self._http_client = HttpClient()
self._stand_client = stand_client
self._path_generator = ImitatorDataPathGenerator(test_data_id)
self._cmd_generator = UploadImitatorDataCmdGenerator(self._username, self._host, self._path_generator)
self._subprocess_client = UploadDataSubprocessClient(stand_client, self._cmd_generator, self._tu_id)
self._tar_package_name = self._path_generator.tar_package_name
self.remote_temp_dir_path = self._path_generator.remote_temp_dir_path
def upload_with_confirm(self) -> None:
"""
Выполняется основной сценарий загрузки данных на удаленный сервер
"""
# 1. Загрузка архива на runner
imitator_data_bytes = self._get_run_data_bytes()
# 2. Сохранение архива на runner
self._save_test_data_package(imitator_data_bytes)
# 3. Проверка архива на runner
if not self._is_tar_valid():
logging.error(f"[DATA UPLOADER] [ERROR] Архив: {self._tar_package_name} поврежден на runner")
raise ValueError("[DATA UPLOADER] [ERROR] При проверке архива на runner")
# 4. Создание временной директории на удаленном сервере
self._subprocess_client.create_remote_data_dir()
# 5. Копирование архива во временную директорию на удаленный сервер
self._subprocess_client.copy_tar_to_remote()
# 6. Проверка целостности архива на удаленном сервере
if not self._subprocess_client.is_remote_tar_valid():
logging.error(
f"[DATA UPLOADER] [ERROR] Архив: {self._tar_package_name} поврежден на удаленном сервере: {self._host}"
)
raise ValueError("[DATA UPLOADER] [ERROR] При проверке архива на удаленном сервере")
# 7. Распаковка архива
self._subprocess_client.unpack_remote_package()
# 8. Копирование tags.txt с сервера во временную директорию текущего набора данных
self._subprocess_client.copy_tags_from_server()
# 9. Проверка данных
if not self._subprocess_client.check_remote_unpack_data():
logging.error(
f"[DATA UPLOADER] [ERROR] При распаковке данных на удаленном сервере: "
f"{self._host} Путь: {self.remote_temp_dir_path}"
)
raise ValueError("[DATA UPLOADER] [ERROR] При распаковке данных на удаленном сервере")
logging.info(
f"[DATA UPLOADER] [OK] Тестовые данные успешно загружены на удаленный сервер: "
f"{self._host} Путь: {self.remote_temp_dir_path}"
)
def delete_with_confirm(self) -> None:
"""
Удаление временной директории с удаленного сервера с проверкой удаления
"""
self._subprocess_client.delete_remote_data_dir()
if self._subprocess_client.check_remote_unpack_data():
logging.error(
f"[DATA UPLOADER] [ERROR] При удалении данных на удаленном сервере: "
f"{self._host} Путь: {self.remote_temp_dir_path}"
)
raise ValueError("[DATA UPLOADER] [ERROR] При проверке удаления данных")
logging.info(f"[DATA UPLOADER] [OK] Тестовые данные успешно удалены с удаленного сервера: {self._host}")
def _get_test_data_attachment_id_by_name(self, attachments_list: dict) -> int:
"""
Получает id архива данных для имитатора
"""
parsed_attachments_list = Items(
items=[FileInfo(**file) for file in attachments_list.get(Http_const.TESTOPS_ATTACHMENTS_KEY, [])]
)
attachment_id = next(
(file.id for file in parsed_attachments_list.items if file.original_filename == self._test_data_name), None
)
return attachment_id
def _get_run_data_bytes(self) -> bytes:
"""
Получает данные через GET запрос к Testops
:return: содержимое ответа на запрос
"""
# Получает список вложений для test_data_id
attachments_list = self._http_client.get_attachments_list_by_test_case_id(self._test_data_id)
# Получает id архива данных
attachment_id = self._get_test_data_attachment_id_by_name(attachments_list)
run_data_bytes = self._http_client.get_test_case_attachment_by_id(self._test_data_id, attachment_id)
return run_data_bytes
def _is_tar_valid(self) -> bool:
"""
Проверяет целостность архива на runner после скачивания с testops
"""
req_files = {Im_const.SANDBOX_RULES}
req_dir = Im_const.SANDBOX_DATA
try:
with tarfile.open(self._tar_package_name, "r:gz") as tar_file:
names = set(tar_file.getnames())
has_dir = any(name.startswith(req_dir) for name in names)
has_files = req_files.issubset(names)
if not has_dir or not has_files:
raise FileNotFoundError("[DATA UPLOADER] [ERROR] В архиве на runner отсутствуют необходимые файлы")
return has_dir and has_files
except tarfile.TarError:
logging.exception("[DATA UPLOADER] [ERROR] Архив на runner поврежден")
raise
def _save_test_data_package(self, tar_bytes: bytes) -> None:
"""
Сохраняет архив тестовых данных в рабочей директории runner
:return: путь к архиву
"""
file_path = Path(self._tar_package_name)
try:
with open(file_path, "wb") as tar_file:
tar_file.write(tar_bytes)
logging.info(f"[DATA UPLOADER] [OK] Архив: {self._tar_package_name} успешно сохранен на runner")
except (FileNotFoundError, PermissionError, OSError):
logging.exception("[DATA UPLOADER] [ERROR] При сохранении архива на runner")
raise
class UploadDataSubprocessClient:
"""
Выполняет команды в консоли для загрузки данных прогона для имитатора
"""
def __init__(self, client: SubprocessClient, cmd_generator: UploadImitatorDataCmdGenerator, tu_id: int) -> None:
self._client = client
self._expected_files: List[str] = list(cmd_generator.expected_files)
self._cmd_generator = cmd_generator
self._tu_id = tu_id
def create_remote_data_dir(self) -> None:
"""
Создает временную директорию на удаленном сервере
"""
create_dir_cmd = self._cmd_generator.generate_create_dir_cmd()
self._client.run_cmd(create_dir_cmd)
def delete_remote_data_dir(self) -> None:
"""
Удаляет временную директорию на удаленном сервере
"""
delete_dir_cmd = self._cmd_generator.generate_delete_dir_cmd()
self._client.run_cmd(delete_dir_cmd)
def copy_tar_to_remote(self) -> None:
"""
Копирует архив во временную директорию на удаленном сервере
"""
copy_cmd = self._cmd_generator.generate_copy_tar_to_remote_cmd()
self._client.run_cmd(copy_cmd, timeout=Im_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)
def unpack_remote_package(self) -> None:
"""
Распаковывает архив во временную директорию на удаленном сервере
"""
unpack_cmd = self._cmd_generator.generate_unpack_tar_cmd()
self._client.run_cmd(unpack_cmd, timeout=Im_const.LONG_PROCESS_TIMEOUT_S)
def copy_tags_from_server(self) -> None:
""" """
source_path = f"{Im_const.CONFIG_PATH}/tn{self._tu_id}_tags.txt"
copy_tags_cmd = self._cmd_generator.generate_copy_tags_cmd(self._tu_id)
try:
self._client.run_cmd(copy_tags_cmd)
logging.info(f"[DATA UPLOADER] [OK] tags.txt скопирован из {source_path}")
except Exception as e:
logging.error(f"[DATA UPLOADER] [ERROR] Не удалось скопировать {source_path}: {e}")
raise RuntimeError(
f"Не удалось скопировать tags.txt с сервера. Проверьте наличие файла {source_path}"
) from e
def is_remote_tar_valid(self) -> bool:
"""
Проверка целостности архива
:return: результат проверки
"""
check_tar_cmd = self._cmd_generator.generate_check_tar_cmd()
result = self._client.run_cmd(check_tar_cmd, need_output=True)
if not result:
return False
tar_list = result.split("\n")
return all(file in tar_list for file in self._expected_files)
def check_remote_unpack_data(self) -> bool:
"""
Проверяет наличие директории с данными и сопутствующих файлов
:return: результат проверки
"""
check_cmd = self._cmd_generator.generate_check_remote_data_cmd()
result = self._client.run_cmd(check_cmd, need_output=True)
return result == Im_const.CMD_STATUS_OK
им мен
import logging
import subprocess
from typing import Optional
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import ImitatorConstants as Im_const
logger = logging.getLogger(__name__)
class ImitatorManager:
"""
Класс управления имитатором
Для запуска имитатора:
from clients.subprocess_client import SubprocessClient
from infra.imitator_manager import ImitatorManager
client = SubprocessClient(your_user, your_host)
imitator_manager = ImitatorManager(client, your_command)
imitator_manager.run_imitator()
imitator_manager.log_imitator_stdout()
imitator_manager.wait_and_stop_imitator()
Для получения процесса с запущенным имитатором:
imitator_process = imitator_manager.imitator_process
Для остановки имитатора:
imitator_manager.stop_imitator()
Для получения процесса с запущенным имитатором
"""
def __init__(self, client: SubprocessClient, imitator_run_cmd: str) -> None:
self._client = client
self._imitator_run_cmd = imitator_run_cmd
self._logger: logging.getLogger() = logging.getLogger(self.__class__.__name__)
self._setup_logger()
self._imitator_process: Optional[subprocess.Popen] = None
@property
def imitator_process(self) -> Optional[subprocess.Popen]:
return self._imitator_process
def _setup_logger(self) -> None:
"""
Настраивает отдельный логер для имитатора
"""
# Запись логов имитатора в отдельный файл
# Кодировка установлена под WIN, для gitlab нужно установить utf-8
file_handler = logging.FileHandler(Im_const.IMITATOR_LOG_FILE_NAME, encoding=Im_const.WIN_ENCODING_CP1251)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter()
file_handler.setFormatter(formatter)
self._logger.addHandler(file_handler)
# Отключает дублирование логов в консоль
self._logger.propagate = False
def run_imitator(self) -> None:
"""
Запускает имитатор на удаленном сервере
"""
if self._imitator_process is not None:
logger.error("[IMITATOR] [ERROR] Имитатор уже запущен")
raise
process = self._client.exec_popen(self._imitator_run_cmd)
if process.poll() is None:
self._imitator_process = process
logger.info("[IMITATOR] [OK] Имитатор запущен успешно")
else:
logger.error("[IMITATOR] [ERROR] Ошибка запуска имитатора")
raise
def log_imitator_stdout(self) -> None:
"""
Записывает логи имитатора в файл с помощью отдельного логера
"""
try:
if self._imitator_process.poll() is None:
for line in self._imitator_process.stdout:
self._logger.info(line)
else:
logger.error("[IMITATOR] [ERROR] Ошибка записи логов имитатора: Имитатор не запущен")
raise
except (OSError, ValueError):
logging.exception("[IMITATOR] [ERROR] Ошибка при получении логов имитатора!")
def _is_imitator_running(self) -> bool:
"""
Проверяет наличие PID имитатора на стенде
"""
result = self._client.run_cmd(Im_const.IMITATOR_CHECK_CMD, check=False, need_output=True)
if result:
logger.warning(f"[IMITATOR] [WARNING] Имитатор не остановлен! PID: {result}")
else:
logger.info("[IMITATOR] [OK] Имитатор остановлен успешно")
return bool(result and result.strip())
def stop_imitator(self) -> None:
"""
Останавливает имитатор
"""
try:
self._client.terminate_process(self._imitator_process, timeout=Im_const.POPEN_WAIT_TIMOUT_S)
if self._is_imitator_running():
# Останавливает имитатор на стенде
self._client.run_cmd(Im_const.IMITATOR_KILL_CMD)
logger.info("[IMITATOR] [OK] Имитатор остановлен успешно")
self._imitator_process = None
except RuntimeError:
logger.exception("[IMITATOR] [ERROR] Ошибка остановки имитатора")
raise
def wait_and_stop_imitator(self) -> None:
"""
Ожидает окончания работы имитатора и завершает его работу
"""
try:
self._imitator_process.wait()
except KeyboardInterrupt:
# На случай остановки принудительно, через Ctrl+C например
logging.exception("[IMITATOR] [ERROR] Принудительная остановка имитатора!")
raise
finally:
self.stop_imitator()
пас ген
import logging
from datetime import datetime
from pathlib import PurePosixPath # Используется для корректного запуска на Windows
from constants.architecture_constants import ImitatorConstants as JC_const
logger = logging.getLogger(__name__)
class ImitatorDataPathGenerator:
"""
Генерация путей для данных прогона
"""
def __init__(self, test_data_id: int):
self._test_data_id = test_data_id
self._expected_files: list = [JC_const.SANDBOX_TAGS, JC_const.SANDBOX_RULES]
self._test_package_name = self._generate_test_package_name()
# Временное название архива
self.tar_package_name = self._add_tar_extension()
# Путь к временной директории
self.remote_temp_dir_path = self._generate_remote_temp_dir_path()
def _generate_test_package_name(self) -> str:
"""
Создает уникальное имя для набора данных
:return: уникальное имя для набора данных
"""
time_now_str = datetime.now().strftime(JC_const.IMITATOR_TIME_FORMAT)
return f"test_case_id_{self._test_data_id}_{time_now_str}"
def _add_tar_extension(self) -> str:
"""
Добавляет tar расширение к имени набора данных
:return: уникальное имя архива набора данных
"""
return f"{self._test_package_name}.tar.gz"
def _generate_remote_temp_dir_path(self) -> str:
"""
Создает путь к временной директории на удаленном сервере
:return: путь к временной директории на удаленном сервере
"""
remote_temp_dir_path = PurePosixPath(JC_const.AUTOTEST_DATA_PATH) / self._test_package_name
return str(remote_temp_dir_path)
def generate_full_remote_tar_path(self) -> str:
"""
Создает путь к архиву на удаленном сервере
:return: путь к архиву на удаленном сервере
"""
full_remote_tar_path = PurePosixPath(self.remote_temp_dir_path) / self.tar_package_name
return str(full_remote_tar_path)
сигн юнит
import json
import logging
import shutil
from pathlib import Path
from typing import Any
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import ClickhouseConstants as CH_const
from constants.architecture_constants import ImitatorConstants as Im_const
from constants.enums import MeasureConversionRule
from infra.cmd_generator import SignalUnitConversionCmdGenerator
from utils.helpers.signal_unit_conversion_utils import apply_measure_conversion_rule, conversion_rules_need_update
logger = logging.getLogger(__name__)
class SignalUnitConversionManager:
"""
Управляет signal_unit_conversion_rules.json на стенде:
- скачивает оригинал в original_conversion_rules/ перед прогоном набора
- подкладывает изменённую версию в CONFIG_PATH (имя на сервере не меняется)
- восстанавливает оригинал в teardown
"""
def __init__(
self,
stand_client: SubprocessClient,
measure_conversion_rule: MeasureConversionRule,
) -> None:
self._stand_client = stand_client
self._measure_conversion_rule = measure_conversion_rule
self._cmd_generator = SignalUnitConversionCmdGenerator(stand_client.username, stand_client.host)
self._local_file = Path(Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME)
self._backup_file = (
Path(Im_const.SIGNAL_UNIT_CONVERSION_RULES_BACKUP_DIR) / Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME
)
self._modified = False
def setup_signal_unit_conversion_rules(self) -> None:
"""
Скачивает файл со стенда, при необходимости меняет единицы и загружает обратно.
"""
try:
self._download_from_stand()
rules_json = self._read_local_file()
if not conversion_rules_need_update(rules_json, self._measure_conversion_rule):
logger.info(
"[SETUP] [OK] signal_unit_conversion_rules.json уже настроен корректно для набора данных "
f"(правило {self._measure_conversion_rule.name})"
)
return
self._save_backup()
modified_rules = apply_measure_conversion_rule(rules_json, self._measure_conversion_rule)
self._write_local_file(modified_rules)
self._upload_to_stand(self._local_file)
self._modified = True
logger.info(
"[SETUP] [OK] signal_unit_conversion_rules.json обновлён по правилу "
f"{self._measure_conversion_rule.name}"
)
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка при подготовке signal_unit_conversion_rules.json"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def restore_signal_unit_conversion_rules(self) -> None:
"""
Возвращает оригинальный signal_unit_conversion_rules.json на стенд
"""
if not self._modified:
logger.info("[TEARDOWN] [SKIP] signal_unit_conversion_rules.json не изменялся")
return
if not self._backup_file.exists():
error_msg = f"[TEARDOWN] [ERROR] Оригинал signal_unit_conversion_rules.json не найден: {self._backup_file}"
logger.error(error_msg)
raise RuntimeError(error_msg)
try:
self._upload_to_stand(self._backup_file)
self._modified = False
logger.info(
f"[TEARDOWN] [OK] signal_unit_conversion_rules.json восстановлен на стенде из {self._backup_file}"
)
except Exception as error:
error_msg = "[TEARDOWN] [ERROR] Ошибка при восстановлении signal_unit_conversion_rules.json"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def _download_from_stand(self) -> None:
copy_cmd = self._cmd_generator.generate_scp_signal_rules_from_stand_cmd()
self._stand_client.run_cmd(copy_cmd, timeout=CH_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)
if not self._local_file.exists():
raise FileNotFoundError(f"Не удалось скачать {Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME} со стенда")
def _save_backup(self) -> None:
self._backup_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(self._local_file, self._backup_file)
def _upload_to_stand(self, local_file: Path) -> None:
upload_cmd = self._cmd_generator.generate_scp_signal_rules_to_stand_cmd(local_file.as_posix())
self._stand_client.run_cmd(upload_cmd, timeout=CH_const.LONG_PROCESS_TIMEOUT_S, use_ssh=False)
def _read_local_file(self) -> dict[str, Any]:
error_msg = (
f"[SETUP] [ERROR] Не удалось декодировать файл {self._local_file} "
f"в кодировках {Im_const.DEFAULT_ENCODINGS}"
)
for encoding in Im_const.DEFAULT_ENCODINGS:
try:
with open(self._local_file, "r", encoding=encoding, errors="strict") as rules_file:
data = json.load(rules_file)
if not data:
raise ValueError(f"Пустой json (кодировка:{encoding})")
return data
except UnicodeDecodeError:
continue
except json.JSONDecodeError as error:
logger.exception(error_msg)
raise OSError(error_msg) from error
logger.exception(error_msg)
raise OSError(error_msg)
@staticmethod
def _write_local_file(rules_json: dict[str, Any]) -> None:
with open(
Im_const.SIGNAL_UNIT_CONVERSION_RULES_FILE_NAME,
"w",
encoding=Im_const.ENCODING_UTF_8,
) as rules_file:
json.dump(rules_json, rules_file, ensure_ascii=False, indent=2)
стенд сет мен
import logging
import os
from urllib.parse import urlparse
from clients.subprocess_client import SubprocessClient
from constants.architecture_constants import EnvKeyConstants
from constants.architecture_constants import ImitatorConstants as Im_const
from constants.enums import TU, MeasureConversionRule
from infra.clickhouse_manager import ClickHouseManager
from infra.cmd_generator import ImitatorCmdGenerator
from infra.configuration_manager import ConfigurationManager
from infra.docker_manager import DockerContainerManager
from infra.imitator_data_uploader import ImitatorDataUploader
from infra.imitator_manager import ImitatorManager
from infra.redis_manager import RedisCleaner
from infra.signal_unit_conversion_manager import SignalUnitConversionManager
logger = logging.getLogger(__name__)
class StandSetupManager:
"""
Подготовка стенда к запуску автотестов
Для запуска имитатора:
setup_manager = StandSetupManager(test_duration(minutes), test_data_id, test_data_name, tu_id)
setup_manager.setup_stand_for_imitator_run()
imitator_thread = threading.Thread(target=stand_manager.start_imitator, daemon=True)
core_thread = threading.Thread(target=stand_manager.start_core)
imitator_thread.start()
time.sleep(20)
core_thread.start()
Доступ к времени старта имитатора для расчёта интервалов утечек:
start_time = setup_manager.start_time # datetime объект
"""
def __init__(
self,
duration_m: float, # Максимальное время работы имитатора в минутах
test_data_id: int, # id тест кейса из которого будут загружены данные
test_data_name: str, # Название архива данных имитатора для загрузки из TestOps
tu_id: int,
measure_conversion_rules: MeasureConversionRule | None = None,
username: str = os.environ.get(EnvKeyConstants.SSH_USER_DEV),
stand_name: str = os.environ.get(EnvKeyConstants.STAND_NAME),
) -> None:
self._duration_m = duration_m
self._test_data_id = test_data_id
self._test_data_name = test_data_name
self._tu_id = tu_id
self._measure_conversion_rules = measure_conversion_rules
self._username = username
self._stand_name = stand_name
self._configuration_file_name = self._get_configuration_file_name()
self._server_ip = self._get_server_ip() # Получает ip сервера из словаря
self._init_clients()
self._cmd_generator = self._choose_cmd_generator()
self._final_cmd = self._cmd_generator.generate_final_imitator_cmd()
# Экземпляр имитатор менеджера нужно создавать после генерации команды, отдельно от других клиентов
self._imitator_manager = ImitatorManager(self._stand_client, self._final_cmd)
@property
def start_time(self):
"""
Возвращает время старта имитатора как datetime объект.
Используется для расчёта интервалов утечек в тестах:
- leak_start_time = start_time + LEAK_START_INTERVAL
- leak_end_time = start_time + LEAK_START_INTERVAL + ALLOWED_TIME_DIFF_SECONDS
"""
return self._cmd_generator.start_time
def setup_stand_for_imitator_run(self) -> None:
"""
Обертка, в которой проходит полная подготовка стенда
"""
try:
if not os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
# При запуске с TestOps загружает данные для прогона
self._uploader.upload_with_confirm()
self.stop_all_containers()
if self._signal_unit_conversion_manager is not None:
self._signal_unit_conversion_manager.setup_signal_unit_conversion_rules()
else:
logger.info("[SETUP] [SKIP] measure_conversion_rules не задан для набора данных")
self.clean_redis_and_clickhouse()
self.start_containers_without_core()
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка подготовки стенда к запуску имитатора"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def restore_signal_unit_conversion_rules(self) -> None:
"""
Возвращает оригинальный signal_unit_conversion_rules.json на стенд.
"""
self._signal_unit_conversion_manager.restore_signal_unit_conversion_rules()
def clean_redis_and_clickhouse(self):
"""
Чистит БД: Clickhouse и Redis
"""
# Копирование файла конфигурации на runner
self._clickhouse_manager.copy_configuration_file_from_stand()
# Чистка ключей Redis
self._redis_cleaner.delete_keys_with_check()
# Чистка ключей ClickHouse
self._clickhouse_manager.delete_clickhouse_keys_with_check()
def get_sensor_ids_by_address(self) -> dict[str, int]:
"""
Возвращает словарь address: id из конфигурации, скопированной на runner.
"""
return self._configuration_manager.get_sensor_ids_by_address()
def stop_all_containers(self):
"""
Останавливает все контейнеры и чистит БД: Clickhouse и Redis
"""
self._docker_manager.stop_all_lds_containers()
def start_containers_without_core(self):
"""
Запускает все контейнеры кроме core
"""
# Запуск lds-layer-builder
self._docker_manager.start_lds_layer_builder_containers()
# Запуск lds-journals
self._docker_manager.start_lds_journals_containers()
# Запуск lds-web-app
self._docker_manager.start_lds_web_app_containers()
# Запуск lds-api-gw
self._docker_manager.start_lds_api_gw_containers()
# Запуск lds-reports
self._docker_manager.start_lds_reports_containers()
def start_imitator(self) -> None:
"""
Запускает имитатор и собирает отдельно лог имитатора в файл
"""
try:
self._imitator_manager.run_imitator()
self._imitator_manager.log_imitator_stdout()
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка запуска имитатора"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def start_core(self) -> None:
"""
Запускает core контейнеры
"""
try:
# Запуск CORE
self._docker_manager.start_lds_core_containers()
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка запуска CORE"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def stop_imitator_wrapper(self) -> None:
"""
В teardown может вызываться даже если имитатор не запущен
"""
try:
if not self._imitator_manager.imitator_process:
logger.info("[TEARDOWN] [SKIP] Имитатор не был запущен")
return
self._imitator_manager.wait_and_stop_imitator()
logger.info("[TEARDOWN] [OK] Имитатор остановлен")
except Exception as error:
error_msg = "[TEARDOWN] [ERROR] Не удалось остановить имитатор"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def server_test_data_remover(self):
"""
Может вызваться в teardown если загрузка не была выполнена
"""
uploader = getattr(self, "_uploader", None)
if uploader is None:
logger.info("[TEARDOWN] [SKIP] uploader не инициализирован, удаление данных со стенда пропущено")
return
try:
uploader.delete_with_confirm()
except Exception as error:
error_msg = "[TEARDOWN] [ERROR] Не удалось удалить данные с сервера"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
@staticmethod
def _parse_opc_target() -> tuple[str, int]:
"""
Извлекает хост и порт OPC из переменной окружения OPC_URL.
"""
opc_url = os.environ.get(EnvKeyConstants.OPC_URL)
if not opc_url:
raise RuntimeError(f"[SETUP] [ERROR] Переменная окружения {EnvKeyConstants.OPC_URL} не задана")
parsed = urlparse(opc_url)
if not parsed.hostname or not parsed.port:
raise RuntimeError(
f"[SETUP] [ERROR] Некорректное значение OPC_URL: '{opc_url}'. Ожидается формат вида opc.tcp://host:port"
)
return parsed.hostname, parsed.port
def check_opc_server_status(self, timeout_s: int = 5) -> None:
"""
Проверяет доступность OPC сервера с сервера стенда через /dev/tcp.
"""
host, port = self._parse_opc_target()
check_cmd = (
f"if timeout {timeout_s} bash -lc 'cat < /dev/null > /dev/tcp/{host}/{port}'; "
f"then echo {Im_const.CMD_STATUS_OK}; else echo {Im_const.CMD_STATUS_FAIL}; fi"
)
result = self._stand_client.run_cmd(check_cmd, need_output=True)
if result != Im_const.CMD_STATUS_OK:
raise RuntimeError(f"[SETUP] [ERROR] OPC сервер {host}:{port} недоступен с сервера стенда")
logger.info(f"[SETUP] [OK] OPC сервер {host}:{port} доступен")
def _get_server_ip(self) -> str:
"""
Получает server ip из списка стендов
:return: server ip
"""
try:
return Im_const.HOST_MAP.get(self._stand_name, {}).get(Im_const.SERVER_IP_KEY_NAME)
except Exception as error:
error_msg = f"[SETUP] [ERROR] Не удалось получить server ip для стенда: {self._stand_name}"
logger.exception(error_msg)
raise ValueError(error_msg) from error
def _get_configuration_file_name(self) -> str:
"""
Получает имя файла конфигурации
"""
return TU.get_file_name_by_id(self._tu_id)
def _choose_cmd_generator(self) -> ImitatorCmdGenerator:
"""
Выбирает вариант генерации команды запуска имитатора, в зависимости от типа запуска
"""
try:
if os.environ.get("RUN_WITHOUT_TESTOPS", "False").lower() == "true":
# Запуск без TestOps
return ImitatorCmdGenerator(self._test_data_name, self._stand_name, self._duration_m)
else:
self._uploader = ImitatorDataUploader(
self._stand_client, self._test_data_id, self._test_data_name, self._tu_id
)
self._data_path = self._uploader.remote_temp_dir_path
return ImitatorCmdGenerator(self._data_path, self._stand_name, self._duration_m)
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка при выборе варианта генерации команды запуска имитатора"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error
def _init_clients(self) -> None:
"""
Создает экземпляры необходимых для запуска клиентов
"""
try:
self._stand_client = SubprocessClient(self._username, self._server_ip)
self._infra_client = SubprocessClient(self._username, Im_const.REDIS_STAND_ADDRESS)
self._clickhouse_manager = ClickHouseManager(
self._stand_client, self._infra_client, self._configuration_file_name
)
self._configuration_manager = ConfigurationManager(self._configuration_file_name)
self._signal_unit_conversion_manager = SignalUnitConversionManager(
self._stand_client, self._measure_conversion_rules
)
self._docker_manager = DockerContainerManager(self._stand_client)
self._redis_cleaner = RedisCleaner(self._infra_client, self._stand_name)
except Exception as error:
error_msg = "[SETUP] [ERROR] Ошибка инициализации клиентов"
logger.exception(error_msg)
raise RuntimeError(error_msg) from error