Загрузка данных


енамки


    UNKNOWN_ERROR = 520


class ExportedDataType(IntEnum):
    """
    Тип экспортируемых данных
    """

    JOURNAL = 1
    LDS_MODES = 2
    LEAKS_REPORT = 4
    REJECTED_INPUT_DATA = 7

    def to_download_name(self) -> str:
        """Строковый тип для DownloadExportedDataRequest.exportedDataType"""
        return _EXPORTED_DATA_TYPE_DOWNLOAD_NAMES[self]


_EXPORTED_DATA_TYPE_DOWNLOAD_NAMES = {
    ExportedDataType.JOURNAL: "JournalReport",
    ExportedDataType.LDS_MODES: "LdsModesReport",
    ExportedDataType.LEAKS_REPORT: "LeaksReport",
    ExportedDataType.REJECTED_INPUT_DATA: "RejectedInputDataReport",
}


class ExportStatus(IntEnum):
    """Статус формирования отчёта в ReportDataExportedNotification.replyContent.exportStatus."""

    NOT_READY = 0
    DONE = 1











тест конст


class ExportReportConstants:
    """Константы для теста формирования отчёта об утечках"""

    # ===== Параметры запроса =====
    # Смещение часового пояса (часы) от UTC для отображения времени в отчёте (Москва = UTC+3)
    MOSCOW_TIME_OFFSET_HOURS: int = 3

    # ===== Таймауты и интервалы поллинга =====
    # Максимальное ожидание нотификации о готовности отчёта
    NOTIFICATION_TIMEOUT_SECONDS: float = 60.0
    # Максимальное время ожидания появления отчёта в списке после нотификации
    LIST_POLL_TOTAL_WAIT_SECONDS: float = 10.0
    # Интервал между запросами getExportedFilesListRequest
    LIST_POLL_INTERVAL_SECONDS: float = 10.0
    # Таймаут получения ответа на скачивание
    DOWNLOAD_TIMEOUT_SECONDS: float = 60.0

    # ===== Имя файла отчёта =====
    LEAKS_REPORT_NAME_PART: str = "Отчет об утечках"  # подстрока в имени файла/отчёта
    XLSX_EXTENSION: str = ".xlsx"
    # Сигнатура zip-архива, используется для проверки формата файла по содержимому
    ZIP_SIGNATURE: bytes = b'PK\x03\x04'

    # ===== Формат даты/времени в отчёте =====
    REPORT_DATETIME_FORMAT: str = "%d.%m.%Y %H:%M:%S"
    # Регулярное выражение для извлечения двух дат из заголовка
    # "Отчет об утечках с 20.05.2026 11:00:00 по 20.05.2026 12:52:02"
    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )

    # ===== Шапка таблицы отчёта об утечках =====
    # Двойная шапка: первая строка - название отчёта с периодом, вторая - названия колонок
    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    # ===== Названия колонок =====
    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_LDS_STATUS: str = "Режим работы СОУ"
    COL_MASK_INFO: str = "Информация о маскировании"
    COL_COORDINATE: str = "Координата"
    COL_LEAK_VOLUME: str = "Объемный расход утечки"
    COL_MT_MODE: str = "Режим работы МТ"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_LDS_STATUS,
        COL_MASK_INFO,
        COL_COORDINATE,
        COL_LEAK_VOLUME,
        COL_MT_MODE,
    ]

    LDS_STATUS_OK_TEXT: str = "СОУ исправна"
    MASKING_NOT_MASKED_TEXT: str = "СОУ не замаскирована"

    # ===== Маппинг StationaryStatus <-> текст в колонке "Режим работы МТ" =====
    STATIONARY_STATUS_TO_REPORT_TEXT: dict = {
        StationaryStatus.UNSTATIONARY.value: "Нестационарный режим работы МТ",
        StationaryStatus.STATIONARY.value: "Стационарный режим работы МТ",
        StationaryStatus.STOPPED.value: "Режим остановленной перекачки МТ",
    }

    # ===== Прочее =====
    DEFAULT_SHEET_INDEX: int = 0



















в датасете
        output_signals_test=CaseMarkers(test_case_id="33", offset=61),
        # ----- Тест ExportReports -----
        export_leaks_report_test=CaseMarkers(test_case_id="", offset=62),

















сценариос инит

    scenarios.output_signals,
    scenarios.export_leaks_report,
























сам сценарий

async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
    """
    Сценарий формирования отчёта об утечках.

    Этапы:
    1. Отправка ExportReportsCommandRequest с фильтром по времени
       (start = старт имитатора, end = старт имитатора + offset теста).
    2. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
    3. Лонг-поллинг getExportedFilesListRequest до появления нашего отчёта в списке.
    4. Отправка DownloadExportedDataRequest по id отчёта.
    5. Скачивание xlsx файла, проверка формата (zip-сигнатура, имя с .xlsx и подстрокой).
    6. Открытие xlsx, проверка двойной шапки (название отчёта + период, названия колонок).
    7. Проверка содержимого строки утечки: дата, объект, режим СОУ, маскирование,
       координата, объём, режим работы МТ (по leak.expected_stationary_status).

    Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
    """
    report_state = ExportLeaksReportState()

    with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
        report_state.report_test = leak.export_leaks_report_test
        StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
            report_state.report_test
        ).is_not_none()

        report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
        report_state.period_end = t_utils.localize_as_moscow(
            imitator_start_time + timedelta(minutes=report_state.report_test.offset)
        )
        report_state.period_start_naive = report_state.period_start.replace(tzinfo=None)
        report_state.period_end_naive = report_state.period_end.replace(tzinfo=None)
        report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
            leak.expected_stationary_status
        )
        report_state.tu_description_lower = cfg.technological_unit.description.lower()

        StepCheck(
            "Задан ожидаемый текст режима МТ для отчёта",
            "expected_mt_mode",
        ).actual(report_state.expected_mt_mode).is_not_none()

        allure.attach(
            f"period.start={report_state.period_start}\n"
            f"period.end={report_state.period_end}\n"
            f"offset_minutes={report_state.report_test.offset}",
            name="Фильтр периода отчёта",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(f"Этап 1. Запрос формирования отчёта ({EXPORT_REPORTS_COMMAND_REQUEST})"):
        request_payload = {
            "tuId": cfg.tu_id,
            "exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
            "timeOffset": ReportConst.MOSCOW_TIME_OFFSET_HOURS,
            "period": {
                "start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
                "end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
                "additionalProperties": {},
            },
        }
        await t_utils.connect(ws_client, EXPORT_REPORTS_COMMAND_REQUEST, request_payload)

    with allure.step(
        f"Этап 2. Ожидание пуш-нотификации {REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
    ):
        report_state.notification = await t_utils.poll_for_report_export_notification(
            ws_client=ws_client,
            parser=parser,
            total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )
        StepCheck("Получена пуш-нотификация о готовности отчёта", "notification").actual(
            report_state.notification
        ).is_not_none()

        notification = report_state.notification
        StepCheck("Проверка статуса пуш-нотификации", "replyStatus").actual(
            notification.replyStatus
        ).expected(ReplyStatus.OK.value).equal_to()
        StepCheck("Проверка наличия контента нотификации", "replyContent").actual(
            notification.replyContent
        ).is_not_none()
        StepCheck("Проверка exportStatus в нотификации", "exportStatus").actual(
            notification.replyContent.exportStatus
        ).expected(ExportStatus.DONE).equal_to()
        StepCheck("Проверка отсутствия ошибки в нотификации", "errorMessage").actual(
            notification.replyContent.errorMessage or ""
        ).expected("").equal_to()

    with allure.step(
        f"Этап 3. Лонг-поллинг {GET_EXPORTED_FILES_LIST_REQUEST} до появления отчёта в списке"
    ):
        report_state.report_item = await t_utils.poll_for_exported_file(
            ws_client=ws_client,
            parser=parser,
            tu_id=cfg.tu_id,
            expected_data_type=ExportedDataType.LEAKS_REPORT,
            name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
            period_start=report_state.period_start,
            period_end=report_state.period_end,
            total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )
        StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
            report_state.report_item
        ).is_not_none()

        report_item = report_state.report_item
        allure.attach(
            f"id={report_item.id}, name={report_item.name}, "
            f"exportedDataType={report_item.exportedDataType}, "
            f"start={report_item.start}, end={report_item.end}",
            name="Найденный отчёт в списке",
            attachment_type=allure.attachment_type.TEXT,
        )
        report_state.report_file_name = report_item.name + ReportConst.XLSX_EXTENSION

    with allure.step(
        f"Этап 4. Отправка {DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
    ):
        download_request = {
            "exportedDataId": report_state.report_item.id,
            "exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
            "additionalProperties": None,
            "timeOffset": ReportConst.MOSCOW_TIME_OFFSET_HOURS,
        }
        await t_utils.connect(ws_client, DOWNLOAD_EXPORTED_DATA_REQUEST, download_request)
        report_state.download_invocation_id = ws_client.invocation_id

    with allure.step("Этап 5. Получение fileChunk и проверка формата xlsx"):
        try:
            report_state.download_payload = await ws_client.receive_by_invocation_id(
                report_state.download_invocation_id, timeout=ReportConst.DOWNLOAD_TIMEOUT_SECONDS
            )
        except (TimeoutError, OSError) as error:
            pytest.fail(
                f"Не получили ответ на {DOWNLOAD_EXPORTED_DATA_REQUEST} "
                f"за {ReportConst.DOWNLOAD_TIMEOUT_SECONDS} секунд. Ошибка: {error}"
            )

        report_state.download_reply = parser.parse_download_exported_data_msg(
            report_state.download_payload
        )
        download_reply = report_state.download_reply

        StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(
            download_reply.replyStatus
        ).expected(ReplyStatus.OK.value).equal_to()
        StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
            download_reply.replyContent
        ).is_not_none()

        report_state.file_bytes = download_reply.replyContent.fileChunk
        StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
        StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(
            report_utils.is_xlsx_file_bytes(report_state.file_bytes)
        ).expected(True).equal_to()

    with allure.step("Этап 6. Проверка имени файла отчёта"):
        report_file_name_lower = report_state.report_file_name.lower()
        StepCheck(
            f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name"
        ).actual(report_utils.is_xlsx_extension(report_state.report_file_name)).expected(True).equal_to()
        StepCheck(
            f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name"
        ).actual(
            ReportConst.LEAKS_REPORT_NAME_PART.lower() in report_file_name_lower
        ).expected(True).equal_to()
        StepCheck(
            f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
            "file_name",
        ).actual(report_state.tu_description_lower in report_file_name_lower).expected(True).equal_to()

    with allure.step("Этап 7. Сохранение xlsx во временный файл"):
        report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(report_state.file_bytes)
        StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
            report_state.temp_file_path
        ).is_not_none()

    try:
        with allure.step("Этап 8. Открытие xlsx и проверка двойной шапки"):
            report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
            StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()

            title_info = report_utils.parse_report_title(
                report_utils.get_report_title_cell(report_state.worksheet)
            )
            allure.attach(
                f"Шапка отчёта (raw): {title_info.raw_title}\n"
                f"period_start: {title_info.period_start}\n"
                f"period_end: {title_info.period_end}",
                name="Шапка отчёта (1-я строка)",
                attachment_type=allure.attachment_type.TEXT,
            )

            with SoftAssertions() as soft_failures:
                StepCheck(
                    f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
                    "report_title",
                    soft_failures,
                ).actual(
                    ReportConst.LEAKS_REPORT_NAME_PART.lower() in title_info.raw_title.lower()
                ).expected(True).equal_to()

                StepCheck(
                    "Время начала периода в шапке совпадает с фильтром запроса",
                    "period_start",
                    soft_failures,
                ).actual(title_info.period_start).expected(report_state.period_start_naive).equal_to()
                StepCheck(
                    "Время конца периода в шапке совпадает с фильтром запроса",
                    "period_end",
                    soft_failures,
                ).actual(title_info.period_end).expected(report_state.period_end_naive).equal_to()
                StepCheck(
                    "Названия колонок в шапке отчёта",
                    "column_headers",
                    soft_failures,
                ).actual(report_utils.get_report_column_headers(report_state.worksheet)).expected(
                    ReportConst.EXPECTED_COLUMN_HEADERS
                ).equal_to()

        with allure.step("Этап 9. Проверка содержимого строки утечки"):
            report_state.data_rows = report_utils.iter_report_data_rows(report_state.worksheet)
            StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
                report_state.data_rows
            ).is_not_empty()

            report_state.target_row = report_utils.find_row_with_object(
                report_state.data_rows, cfg.technological_unit.description
            )
            allure.attach(
                "\n".join(
                    f"row#{row.row_index}: {row.cells}" for row in report_state.data_rows
                ),
                name="Все строки данных отчёта",
                attachment_type=allure.attachment_type.TEXT,
            )
            StepCheck(
                f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
                ReportConst.COL_OBJECT,
            ).actual(report_state.target_row).is_not_none()

            target_row = report_state.target_row
            with SoftAssertions() as soft_failures:
                StepCheck(
                    "Время утечки в диапазоне [старт имитатора, старт + offset теста]",
                    ReportConst.COL_DATETIME,
                    soft_failures,
                ).actual(target_row.datetime_value).is_between(
                    report_state.period_start_naive, report_state.period_end_naive
                )

                StepCheck(
                    f"Колонка '{ReportConst.COL_OBJECT}' содержит "
                    f"'{cfg.technological_unit.description}'",
                    ReportConst.COL_OBJECT,
                    soft_failures,
                ).actual(
                    report_state.tu_description_lower in target_row.object_value.lower()
                ).expected(True).equal_to()

                StepCheck(
                    f"Колонка '{ReportConst.COL_LDS_STATUS}'",
                    ReportConst.COL_LDS_STATUS,
                    soft_failures,
                ).actual(target_row.lds_status.strip()).expected(ReportConst.LDS_STATUS_OK_TEXT).equal_to()

                StepCheck(
                    f"Колонка '{ReportConst.COL_MASK_INFO}' содержит "
                    f"'{ReportConst.MASKING_NOT_MASKED_TEXT}'",
                    ReportConst.COL_MASK_INFO,
                    soft_failures,
                ).actual(
                    ReportConst.MASKING_NOT_MASKED_TEXT.lower() in target_row.masking_info.lower()
                ).expected(True).equal_to()

                StepCheck(
                    f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью "
                    f"{cfg.allowed_distance_diff_meters} м)",
                    ReportConst.COL_COORDINATE,
                    soft_failures,
                ).actual(target_row.coordinate_meters).is_close_to(
                    leak.coordinate_meters,
                    cfg.allowed_distance_diff_meters,
                    f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
                )

                StepCheck(
                    f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
                    ReportConst.COL_LEAK_VOLUME,
                    soft_failures,
                ).actual(target_row.leak_volume).is_not_none()

                StepCheck(
                    f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{report_state.expected_mt_mode}'",
                    ReportConst.COL_MT_MODE,
                    soft_failures,
                ).actual(
                    report_state.expected_mt_mode.lower() in target_row.mt_mode.lower()
                ).expected(True).equal_to()
    except Exception:
        with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
            if report_state.temp_file_path and report_state.report_file_name:
                report_utils.attach_report_file_to_allure(
                    report_state.temp_file_path, report_state.report_file_name
                )
        raise
    finally:
        with allure.step("Удаление временного xlsx файла"):
            temp_path = report_state.temp_file_path
            if temp_path is not None:
                try:
                    temp_path.unlink(missing_ok=True)
                except OSError:
                    pass


































тест смоук

    @pytest.mark.asyncio
    async def test_export_leaks_report(
        self,
        ws_client: WebSocketClient,
        config: SmokeSuiteConfig,
        leak: LeakTestConfig,
        leak_number: int,
        imitator_start_time: datetime,
    ) -> None:
        """[ExportReports] Проверка формирования и содержимого xlsx отчёта об утечках"""
        tag = "ExportReports"
        title = f"[{tag}] Проверка формирования отчёта об утечках. ЭФ: Выпадашка отчётов"
        _apply_allure_markers(
            leak.export_leaks_report_test,
            tag,
            title,
            (
                f"Проверка формирования и содержимого xlsx отчёта об утечках на наборе данных "
                f"{config.suite_name},\n"
                f"на технологическом участке {config.technological_unit.description}\n"
                f"Время проведения проверки: {leak.export_leaks_report_test.offset} мин.\n"
                "Этапы сценария:\n"
                "1) Отправка ExportReportsCommandRequest с фильтром по времени "
                "(start = старт имитатора, end = старт + offset теста)\n"
                "2) Ожидание пуш-нотификации ReportDataExportedNotification\n"
                "3) Лонг-поллинг getExportedFilesListRequest до появления отчёта в списке\n"
                "4) Отправка DownloadExportedDataRequest и приём fileChunk\n"
                "5) Проверка имени файла (.xlsx, 'Отчет об утечках', описание ТУ)\n"
                "6) Проверка двойной шапки xlsx (название + период, названия колонок)\n"
                "7) Проверка содержимого строки утечки: дата, объект, режим СОУ, маскирование, "
                "координата, объём, режим работы МТ"
            ),
        )
        if config.has_multiple_leaks:
            allure.dynamic.title(f"{title} (утечка #{leak_number})")
        await scenarios.export_leaks_report(ws_client, config, leak, imitator_start_time)




















вс мс парсер
        return self._find_and_parse_message(data_class=UnmaskSignalReply, data=data)

    def parse_report_data_exported_notification_msg(self, data: list) -> ReportDataExportedNotification:
        """
        Парсит пуш-нотификацию ReportDataExportedNotification о готовности отчёта.
        """
        return self._find_and_parse_message(data_class=ReportDataExportedNotification, data=data)

    def parse_exported_files_list_msg(self, data: list) -> GetExportedFilesListReply:
        """
        Парсит ответ getExportedFilesListReply со списком сформированных файлов.
        """
        return self._find_and_parse_message(data_class=GetExportedFilesListReply, data=data)

    def parse_download_exported_data_msg(self, data: list) -> DownloadExportedDataReply:
        """
        Парсит ответ DownloadExportedDataReply со скачиваемым контентом файла (fileChunk).
        """
        return self._find_and_parse_message(data_class=DownloadExportedDataReply, data=data)










    def _get_default_config(self) -> Config:
        """
        Получает конфиг с правилами обработки полей
        """
        # TODO добавить strict=True, после выполнения задачи LDS-8792
        def _to_export_status(value: Any) -> ExportStatus:
            return value if isinstance(value, ExportStatus) else ExportStatus(value)

        def _to_exported_data_type(value: Any) -> ExportedDataType:
            return value if isinstance(value, ExportedDataType) else ExportedDataType(value)

        return Config(
            type_hooks={
                UUID: self.convert_to_uuid,
                datetime: self.timestamp_to_datetime,
                ExportStatus: _to_export_status,
                ExportedDataType: _to_exported_data_type,
            }
        )
































вс тест утилс

                period[key] = datetime_to_msgpack_timestamp(period[key])
    return result


def extract_first_number(value: object) -> Optional[float]:
    """
    Извлекает первое число из ячейки (int/float/str вида '55.89 км', '111.46 м³/ч').
    """
    if value is None:
        return None
    if isinstance(value, (int, float)) and not isinstance(value, bool):
        return float(value)
    if isinstance(value, str):
        matches = re.findall(TestConst.DIGITS_WITH_DOT_PATTERN, value)
        if matches:
            try:
                return float(matches[0].replace(",", "."))
            except ValueError:
                return None
    return None




    return leak_diagnostic_area_samples


async def poll_for_report_export_notification(
    ws_client: WebSocketClient,
    parser,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Лонг-поллит очередь ws до появления ReportDataExportedNotification с успешным exportStatus.
    """
    from constants.enums import ExportStatus, ReplyStatus
    from models.export_reports_model import REPORT_DATA_EXPORTED_NOTIFICATION

    deadline = asyncio.get_event_loop().time() + total_wait_seconds

    while asyncio.get_event_loop().time() < deadline:
        remaining = deadline - asyncio.get_event_loop().time()
        try:
            payload = await ws_client.receive_by_type(
                REPORT_DATA_EXPORTED_NOTIFICATION,
                timeout=min(poll_interval_seconds * 5, remaining),
            )
            notification = parser.parse_report_data_exported_notification_msg(payload)
            if (
                notification.replyStatus == ReplyStatus.OK.value
                and notification.replyContent is not None
                and notification.replyContent.exportStatus == ExportStatus.DONE
            ):
                return notification
        except (asyncio.TimeoutError, TimeoutError, OSError):
            pass

        await asyncio.sleep(poll_interval_seconds)

    return None


async def poll_for_exported_file(
    ws_client: WebSocketClient,
    parser,
    tu_id: int,
    expected_data_type: Any,
    name_substring: str,
    period_start: datetime,
    period_end: datetime,
    total_wait_seconds: float,
    poll_interval_seconds: float,
) -> Optional[Any]:
    """
    Лонг-поллит getExportedFilesListRequest пока в списке не появится наш отчёт.
    """
    from models.get_exported_files_list_model import GET_EXPORTED_FILES_LIST_REQUEST

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    last_items_count = -1

    while asyncio.get_event_loop().time() < deadline:
        await connect(
            ws_client,
            GET_EXPORTED_FILES_LIST_REQUEST,
            {"tuId": tu_id, "additionalProperties": None},
        )
        invocation_id = ws_client.invocation_id
        try:
            payload = await ws_client.receive_by_invocation_id(
                invocation_id, timeout=poll_interval_seconds * 5
            )
        except (asyncio.TimeoutError, TimeoutError, OSError):
            await asyncio.sleep(poll_interval_seconds)
            continue

        parsed_payload = parser.parse_exported_files_list_msg(payload)
        items = []
        if parsed_payload.replyContent is not None:
            items = parsed_payload.replyContent.exportedData or []

        if len(items) != last_items_count:
            allure.attach(
                "\n".join(
                    f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
                    f"start={item.start}, end={item.end}"
                    for item in items
                ),
                name=f"Список сформированных файлов (попытка, всего: {len(items)})",
                attachment_type=allure.attachment_type.TEXT,
            )
            last_items_count = len(items)

        match = find_matching_exported_item(
            items=items,
            expected_data_type=expected_data_type,
            name_substring=name_substring,
            period_start=period_start,
            period_end=period_end,
        )
        if match is not None:
            return match

        await asyncio.sleep(poll_interval_seconds)

    return None


def _normalize_report_period_datetime(value: datetime) -> datetime:
    """Приводит datetime периода отчёта к московскому времени без микросекунд."""
    return localize_as_moscow(value).replace(microsecond=0)


def find_matching_exported_item(
    items: List[Any],
    expected_data_type: Any,
    name_substring: str,
    period_start: datetime,
    period_end: datetime,
) -> Optional[Any]:
    """
    Ищет элемент списка по типу, подстроке имени и точному совпадению периода start/end.
    """
    name_substring_lower = name_substring.lower()
    period_start_expected = _normalize_report_period_datetime(period_start)
    period_end_expected = _normalize_report_period_datetime(period_end)

    matched_items = []
    for item in items:
        if item.exportedDataType != expected_data_type:
            continue
        if name_substring_lower not in (item.name or "").lower():
            continue
        if item.start is None or item.end is None:
            continue
        item_start = _normalize_report_period_datetime(item.start)
        item_end = _normalize_report_period_datetime(item.end)
        if item_start != period_start_expected or item_end != period_end_expected:
            continue
        matched_items.append(item)

    if not matched_items:
        return None
    return max(matched_items, key=lambda exported_item: exported_item.id)