Загрузка данных
енамки
UNKNOWN_ERROR = 520
class ExportedDataType(IntEnum):
"""
Тип экспортируемых данных
"""
JOURNAL = 1
LDS_MODES = 2
LEAKS_REPORT = 4
REJECTED_INPUT_DATA = 7
def to_download_name(self) -> str:
"""Строковый тип для DownloadExportedDataRequest.exportedDataType"""
return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]
_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
ExportedDataType.JOURNAL: "JournalReport",
ExportedDataType.LDS_MODES: "LdsModesReport",
ExportedDataType.LEAKS_REPORT: "LeaksReport",
ExportedDataType.REJECTED_INPUT_DATA: "RejectedInputDataReport",
}
class ExportStatus(IntEnum):
"""Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""
NOT_READY = 0
DONE = 1
тест конст
class ExportReportConstants:
"""Константы для теста формирования отчёта об утечках"""
# ===== Параметры запроса =====
# Смещение часового пояса (часы) от UTC для отображения времени в отчёте (Москва = UTC+3)
MOSCOW_TIME_OFFSET_HOURS: int = 3
# ===== Таймауты и интервалы поллинга =====
# Максимальное ожидание нотификации о готовности отчёта
NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
# Максимальное время ожидания появления отчёта в списке после нотификации
LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
# Интервал между запросами getExportedFilesListRequest
LIST_POLL_INTERVAL_SECONDS: float = 10.0
# Таймаут получения ответа на скачивание
DOWNLOAD_TIMEOUT_SECONDS: float = 60.0
# ===== Имя файла отчёта =====
LEAKS_REPORT_NAME_PART: str = "Отчет об утечках" # подстрока в имени файла/отчёта
XLSX_EXTENSION: str = ".xlsx"
# Сигнатура zip-архива, используется для проверки формата файла по содержимому
ZIP_SIGNATURE: bytes = b'PK\x03\x04'
# ===== Формат даты/времени в отчёте =====
REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
# Регулярное выражение для извлечения двух дат из заголовка
# "Отчет об утечках с 20.05.2026 11:00:00 по 20.05.2026 12:52:02"
REPORT_HEADER_PERIOD_PATTERN: str = (
r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
# ===== Шапка таблицы отчёта об утечках =====
# Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
REPORT_TITLE_ROW: int = 1
REPORT_COLUMN_HEADERS_ROW: int = 2
REPORT_DATA_FIRST_ROW: int = 3
# ===== Названия колонок =====
COL_DATETIME: str = "Дата и время"
COL_OBJECT: str = "Объект"
COL_LDS_STATUS: str = "Режим работы СОУ"
COL_MASK_INFO: str = "Информация о маскировании"
COL_COORDINATE: str = "Координата"
COL_LEAK_VOLUME: str = "Объемный расход утечки"
COL_MT_MODE: str = "Режим работы МТ"
EXPECTED_COLUMN_HEADERS: list = [
COL_DATETIME,
COL_OBJECT,
COL_LDS_STATUS,
COL_MASK_INFO,
COL_COORDINATE,
COL_LEAK_VOLUME,
COL_MT_MODE,
]
LDS_STATUS_OK_TEXT: str = "СОУ исправна"
MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"
# ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
StationaryStatus.STOPPED.value: "Режим остановленной перекачки МТ",
}
# ===== Прочее =====
DEFAULT_SHEET_INDEX: int = 0
в датасете
output_signals_test=CaseMarkers(test_case_id="33", offset=61),
# ----- Тест ExportReports -----
export_leaks_report_test=CaseMarkers(test_case_id="", offset=62),
сценариос инит
scenarios.output_signals,
scenarios.export_leaks_report,
сам сценарий
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
"""
Сценарий формирования отчёта об утечках.
Этапы:
1. Отправка ExportReportsCommandRequest с фильтром по времени
(start = старт имитатора, end = старт имитатора + offset теста).
2. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
3. Лонг-поллинг getExportedFilesListRequest до появления нашего отчёта в списке.
4. Отправка DownloadExportedDataRequest по id отчёта.
5. Скачивание xlsx файла, проверка формата (zip-сигнатура, имя с .xlsx и подстрокой).
6. Открытие xlsx, проверка двойной шапки (название отчёта + период, названия колонок).
7. Проверка содержимого строки утечки: дата, объект, режим СОУ, маскирование,
координата, объём, режим работы МТ (по leak.expected_stationary_status).
Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
"""
report_state = ExportLeaksReportState()
with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
report_state.report_test = leak.export_leaks_report_test
StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
report_state.report_test
).is_not_none()
report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.report_test.offset)
)
report_state.period_start_naive = report_state.period_start.replace(tzinfo=None)
report_state.period_end_naive = report_state.period_end.replace(tzinfo=None)
report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
leak.expected_stationary_status
)
report_state.tu_description_lower = cfg.technological_unit.description.lower()
StepCheck(
"Задан ожидаемый текст режима МТ для отчёта",
"expected_mt_mode",
).actual(report_state.expected_mt_mode).is_not_none()
allure.attach(
f"period.start={report_state.period_start}\n"
f"period.end={report_state.period_end}\n"
f"offset_minutes={report_state.report_test.offset}",
name="Фильтр периода отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Запрос формирования отчёта ({EXPORT_REPORTS_COMMAND_REQUEST})"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
"timeOffset": ReportConst.MOSCOW_TIME_OFFSET_HOURS,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 2. Ожидание пуш-нотификации {REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification").actual(
report_state.notification
).is_not_none()
notification = report_state.notification
StepCheck("Проверка статуса пуш-нотификации", "replyStatus").actual(
notification.replyStatus
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent").actual(
notification.replyContent
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus").actual(
notification.replyContent.exportStatus
).expected(ExportStatus.DONE).equal_to()
StepCheck("Проверка отсутствия ошибки в нотификации", "errorMessage").actual(
notification.replyContent.errorMessage or ""
).expected("").equal_to()
with allure.step(
f"Этап 3. Лонг-поллинг {GET_EXPORTED_FILES_LIST_REQUEST} до появления отчёта в списке"
):
report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
tu_id=cfg.tu_id,
expected_data_type=ExportedDataType.LEAKS_REPORT,
name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
period_start=report_state.period_start,
period_end=report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.report_item
).is_not_none()
report_item = report_state.report_item
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={report_item.start}, end={report_item.end}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
report_state.report_file_name = report_item.name + ReportConst.XLSX_EXTENSION
with allure.step(
f"Этап 4. Отправка {DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
):
download_request = {
"exportedDataId": report_state.report_item.id,
"exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": ReportConst.MOSCOW_TIME_OFFSET_HOURS,
}
await t_utils.connect(ws_client, DOWNLOAD_EXPORTED_DATA_REQUEST, download_request)
report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 5. Получение fileChunk и проверка формата xlsx"):
try:
report_state.download_payload = await ws_client.receive_by_invocation_id(
report_state.download_invocation_id, timeout=ReportConst.DOWNLOAD_TIMEOUT_SECONDS
)
except (TimeoutError, OSError) as error:
pytest.fail(
f"Не получили ответ на {DOWNLOAD_EXPORTED_DATA_REQUEST} "
f"за {ReportConst.DOWNLOAD_TIMEOUT_SECONDS} секунд. Ошибка: {error}"
)
report_state.download_reply = parser.parse_download_exported_data_msg(
report_state.download_payload
)
download_reply = report_state.download_reply
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(
download_reply.replyStatus
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
download_reply.replyContent
).is_not_none()
report_state.file_bytes = download_reply.replyContent.fileChunk
StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(
report_utils.is_xlsx_file_bytes(report_state.file_bytes)
).expected(True).equal_to()
with allure.step("Этап 6. Проверка имени файла отчёта"):
report_file_name_lower = report_state.report_file_name.lower()
StepCheck(
f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name"
).actual(report_utils.is_xlsx_extension(report_state.report_file_name)).expected(True).equal_to()
StepCheck(
f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name"
).actual(
ReportConst.LEAKS_REPORT_NAME_PART.lower() in report_file_name_lower
).expected(True).equal_to()
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
"file_name",
).actual(report_state.tu_description_lower in report_file_name_lower).expected(True).equal_to()
with allure.step("Этап 7. Сохранение xlsx во временный файл"):
report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(report_state.file_bytes)
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
report_state.temp_file_path
).is_not_none()
try:
with allure.step("Этап 8. Открытие xlsx и проверка двойной шапки"):
report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()
title_info = report_utils.parse_report_title(
report_utils.get_report_title_cell(report_state.worksheet)
)
allure.attach(
f"Шапка отчёта (raw): {title_info.raw_title}\n"
f"period_start: {title_info.period_start}\n"
f"period_end: {title_info.period_end}",
name="Шапка отчёта (1-я строка)",
attachment_type=allure.attachment_type.TEXT,
)
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).actual(
ReportConst.LEAKS_REPORT_NAME_PART.lower() in title_info.raw_title.lower()
).expected(True).equal_to()
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса",
"period_start",
soft_failures,
).actual(title_info.period_start).expected(report_state.period_start_naive).equal_to()
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса",
"period_end",
soft_failures,
).actual(title_info.period_end).expected(report_state.period_end_naive).equal_to()
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(report_utils.get_report_column_headers(report_state.worksheet)).expected(
ReportConst.EXPECTED_COLUMN_HEADERS
).equal_to()
with allure.step("Этап 9. Проверка содержимого строки утечки"):
report_state.data_rows = report_utils.iter_report_data_rows(report_state.worksheet)
StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
report_state.data_rows
).is_not_empty()
report_state.target_row = report_utils.find_row_with_object(
report_state.data_rows, cfg.technological_unit.description
)
allure.attach(
"\n".join(
f"row#{row.row_index}: {row.cells}" for row in report_state.data_rows
),
name="Все строки данных отчёта",
attachment_type=allure.attachment_type.TEXT,
)
StepCheck(
f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
).actual(report_state.target_row).is_not_none()
target_row = report_state.target_row
with SoftAssertions() as soft_failures:
StepCheck(
"Время утечки в диапазоне [старт имитатора, старт + offset теста]",
ReportConst.COL_DATETIME,
soft_failures,
).actual(target_row.datetime_value).is_between(
report_state.period_start_naive, report_state.period_end_naive
)
StepCheck(
f"Колонка '{ReportConst.COL_OBJECT}' содержит "
f"'{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
soft_failures,
).actual(
report_state.tu_description_lower in target_row.object_value.lower()
).expected(True).equal_to()
StepCheck(
f"Колонка '{ReportConst.COL_LDS_STATUS}'",
ReportConst.COL_LDS_STATUS,
soft_failures,
).actual(target_row.lds_status.strip()).expected(ReportConst.LDS_STATUS_OK_TEXT).equal_to()
StepCheck(
f"Колонка '{ReportConst.COL_MASK_INFO}' содержит "
f"'{ReportConst.MASKING_NOT_MASKED_TEXT}'",
ReportConst.COL_MASK_INFO,
soft_failures,
).actual(
ReportConst.MASKING_NOT_MASKED_TEXT.lower() in target_row.masking_info.lower()
).expected(True).equal_to()
StepCheck(
f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью "
f"{cfg.allowed_distance_diff_meters} м)",
ReportConst.COL_COORDINATE,
soft_failures,
).actual(target_row.coordinate_meters).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck(
f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
ReportConst.COL_LEAK_VOLUME,
soft_failures,
).actual(target_row.leak_volume).is_not_none()
StepCheck(
f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{report_state.expected_mt_mode}'",
ReportConst.COL_MT_MODE,
soft_failures,
).actual(
report_state.expected_mt_mode.lower() in target_row.mt_mode.lower()
).expected(True).equal_to()
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if report_state.temp_file_path and report_state.report_file_name:
report_utils.attach_report_file_to_allure(
report_state.temp_file_path, report_state.report_file_name
)
raise
finally:
with allure.step("Удаление временного xlsx файла"):
temp_path = report_state.temp_file_path
if temp_path is not None:
try:
temp_path.unlink(missing_ok=True)
except OSError:
pass
тест смоук
@pytest.mark.asyncio
async def test_export_leaks_report(
self,
ws_client: WebSocketClient,
config: SmokeSuiteConfig,
leak: LeakTestConfig,
leak_number: int,
imitator_start_time: datetime,
) -> None:
"""[ExportReports] Проверка формирования и содержимого xlsx отчёта об утечках"""
tag = "ExportReports"
title = f"[{tag}] Проверка формирования отчёта об утечках. ЭФ: Выпадашка отчётов"
_apply_allure_markers(
leak.export_leaks_report_test,
tag,
title,
(
f"Проверка формирования и содержимого xlsx отчёта об утечках на наборе данных "
f"{config.suite_name},\n"
f"на технологическом участке {config.technological_unit.description}\n"
f"Время проведения проверки: {leak.export_leaks_report_test.offset} мин.\n"
"Этапы сценария:\n"
"1) Отправка ExportReportsCommandRequest с фильтром по времени "
"(start = старт имитатора, end = старт + offset теста)\n"
"2) Ожидание пуш-нотификации ReportDataExportedNotification\n"
"3) Лонг-поллинг getExportedFilesListRequest до появления отчёта в списке\n"
"4) Отправка DownloadExportedDataRequest и приём fileChunk\n"
"5) Проверка имени файла (.xlsx, 'Отчет об утечках', описание ТУ)\n"
"6) Проверка двойной шапки xlsx (название + период, названия колонок)\n"
"7) Проверка содержимого строки утечки: дата, объект, режим СОУ, маскирование, "
"координата, объём, режим работы МТ"
),
)
if config.has_multiple_leaks:
allure.dynamic.title(f"{title} (утечка #{leak_number})")
await scenarios.export_leaks_report(ws_client, config, leak, imitator_start_time)
вс мс парсер
return self._find_and_parse_message(data_class=UnmaskSignalReply, data=data)
def parse_report_data_exported_notification_msg(self, data: list) -> ReportDataExportedNotification:
"""
Парсит пуш-нотификацию ReportDataExportedNotification о готовности отчёта.
"""
return self._find_and_parse_message(data_class=ReportDataExportedNotification, data=data)
def parse_exported_files_list_msg(self, data: list) -> GetExportedFilesListReply:
"""
Парсит ответ getExportedFilesListReply со списком сформированных файлов.
"""
return self._find_and_parse_message(data_class=GetExportedFilesListReply, data=data)
def parse_download_exported_data_msg(self, data: list) -> DownloadExportedDataReply:
"""
Парсит ответ DownloadExportedDataReply со скачиваемым контентом файла (fileChunk).
"""
return self._find_and_parse_message(data_class=DownloadExportedDataReply, data=data)
def _get_default_config(self) -> Config:
"""
Получает конфиг с правилами обработки полей
"""
# TODO добавить strict=True, после выполнения задачи LDS-8792
def _to_export_status(value: Any) -> ExportStatus:
return value if isinstance(value, ExportStatus) else ExportStatus(value)
def _to_exported_data_type(value: Any) -> ExportedDataType:
return value if isinstance(value, ExportedDataType) else ExportedDataType(value)
return Config(
type_hooks={
UUID: self.convert_to_uuid,
datetime: self.timestamp_to_datetime,
ExportStatus: _to_export_status,
ExportedDataType: _to_exported_data_type,
}
)
вс тест утилс
period[key] = datetime_to_msgpack_timestamp(period[key])
return result
def extract_first_number(value: object) -> Optional[float]:
"""
Извлекает первое число из ячейки (int/float/str вида '55.89 км', '111.46 м³/ч').
"""
if value is None:
return None
if isinstance(value, (int, float)) and not isinstance(value, bool):
return float(value)
if isinstance(value, str):
matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
if matches:
try:
return float(matches[0].replace(",", "."))
except ValueError:
return None
return None
return leak_diagnostic_area_samples
async def poll_for_report_export_notification(
ws_client: WebSocketClient,
parser,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Лонг-поллит очередь ws до появления ReportDataExportedNotification с успешным exportStatus.
"""
from constants.enums import ExportStatus, ReplyStatus
from models.export_reports_model import REPORT_DATA_EXPORTED_NOTIFICATION
deadline = asyncio.get_event_loop().time() + total_wait_seconds
while asyncio.get_event_loop().time() < deadline:
remaining = deadline - asyncio.get_event_loop().time()
try:
payload = await ws_client.receive_by_type(
REPORT_DATA_EXPORTED_NOTIFICATION,
timeout=min(poll_interval_seconds * 5, remaining),
)
notification = parser.parse_report_data_exported_notification_msg(payload)
if (
notification.replyStatus == ReplyStatus.OK.value
and notification.replyContent is not None
and notification.replyContent.exportStatus == ExportStatus.DONE
):
return notification
except (asyncio.TimeoutError, TimeoutError, OSError):
pass
await asyncio.sleep(poll_interval_seconds)
return None
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
tu_id: int,
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
) -> Optional[Any]:
"""
Лонг-поллит getExportedFilesListRequest пока в списке не появится наш отчёт.
"""
from models.get_exported_files_list_model import GET_EXPORTED_FILES_LIST_REQUEST
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
while asyncio.get_event_loop().time() < deadline:
await connect(
ws_client,
GET_EXPORTED_FILES_LIST_REQUEST,
{"tuId": tu_id, "additionalProperties": None},
)
invocation_id = ws_client.invocation_id
try:
payload = await ws_client.receive_by_invocation_id(
invocation_id, timeout=poll_interval_seconds * 5
)
except (asyncio.TimeoutError, TimeoutError, OSError):
await asyncio.sleep(poll_interval_seconds)
continue
parsed_payload = parser.parse_exported_files_list_msg(payload)
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={item.start}, end={item.end}"
for item in items
),
name=f"Список сформированных файлов (попытка, всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
period_start=period_start,
period_end=period_end,
)
if match is not None:
return match
await asyncio.sleep(poll_interval_seconds)
return None
def _normalize_report_period_datetime(value: datetime) -> datetime:
"""Приводит datetime периода отчёта к московскому времени без микросекунд."""
return localize_as_moscow(value).replace(microsecond=0)
def find_matching_exported_item(
items: List[Any],
expected_data_type: Any,
name_substring: str,
period_start: datetime,
period_end: datetime,
) -> Optional[Any]:
"""
Ищет элемент списка по типу, подстроке имени и точному совпадению периода start/end.
"""
name_substring_lower = name_substring.lower()
period_start_expected = _normalize_report_period_datetime(period_start)
period_end_expected = _normalize_report_period_datetime(period_end)
matched_items = []
for item in items:
if item.exportedDataType != expected_data_type:
continue
if name_substring_lower not in (item.name or "").lower():
continue
if item.start is None or item.end is None:
continue
item_start = _normalize_report_period_datetime(item.start)
item_end = _normalize_report_period_datetime(item.end)
if item_start != period_start_expected or item_end != period_end_expected:
continue
matched_items.append(item)
if not matched_items:
return None
return max(matched_items, key=lambda exported_item: exported_item.id)