Загрузка данных


asserts 
    def does_not_contain(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> str:
        message_parts = [
            f"Ожидаемый результат: Список элементов: {objects_list}",
            f"Не содержит элемента: {forbidden_object}",
        ]
        return self._build_message(message_parts)

    def contains(self, container: Any, expected_item: Any) -> str:
        if isinstance(container, str):
            message_parts = [
                f"Ожидаемый результат: '{container}' содержит подстроку '{expected_item}'",
                f"Фактический результат: {self.field_name} = {self._format_val(container)}",
            ]
        else:
            message_parts = [
                f"Ожидаемый результат: список {self._format_val(container)} содержит элемент {expected_item}",
                f"Фактический результат: {self.field_name} = {self._format_val(container)}",
            ]
        return self._build_message(message_parts)






тоже внизу


        except AssertionError as exc:
            self._handle_assertion(exc)

    def contains(self, container: Any, expected_item: Any) -> None:
        """Проверка, что container (список или строка) содержит expected_item."""
        msg = self._msg_builder.contains(container, expected_item)

        try:
            with allure.step(msg):
                assert_that(container).described_as(msg).contains(expected_item)
        except AssertionError as exc:
            self._handle_assertion(exc)















тест конст
    DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"

    # Допустимая погрешность при сравнении границ периода отчёта
    REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
    # Формат даты/времени в имени скачиваемого xlsx-файла
    REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"




    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^Отчет об утечках (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

























сценарий
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
    """
    Сценарий формирования отчёта об утечках.

    Этапы:
    1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
    2. Отправка ExportReportsCommandRequest с фильтром по времени
       (start = старт имитатора, end = старт имитатора + offset теста).
    3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
    4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
    5. Отправка DownloadExportedDataRequest по id отчёта.
    6. Получение fileChunk по ответу на скачивание.
    7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.

    Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
    """
    report_state = ExportLeaksReportState()

    with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
        report_state.report_test = leak.export_leaks_report_test
        StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
            report_state.report_test
        ).is_not_none()

        report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
        report_state.period_end = t_utils.localize_as_moscow(
            imitator_start_time + timedelta(minutes=report_state.report_test.offset)
        )
        report_state.period_start_naive = report_utils.normalize_report_period_naive(report_state.period_start)
        report_state.period_end_naive = report_utils.normalize_report_period_naive(report_state.period_end)
        report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
            leak.expected_stationary_status
        )
        report_state.tu_description_lower = cfg.technological_unit.description.lower()
        time_offset_hours = t_utils.report_time_offset_hours()
        StepCheck(
            f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
            "time_offset_hours",
        ).actual(time_offset_hours).is_not_none()
        report_state.time_offset_hours = time_offset_hours

        StepCheck(
            "Задан ожидаемый текст режима МТ для отчёта",
            "expected_mt_mode",
        ).actual(report_state.expected_mt_mode).is_not_none()

        allure.attach(
            f"period.start={report_state.period_start}\n"
            f"period.end={report_state.period_end}\n"
            f"offset_minutes={report_state.report_test.offset}",
            name="Фильтр периода отчёта",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(
        f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"
    ):
        await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])

    with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
        request_payload = {
            "tuId": cfg.tu_id,
            "exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
            "timeOffset": report_state.time_offset_hours,
            "period": {
                "start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
                "end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
                "additionalProperties": {},
            },
        }
        await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)

    with allure.step(
        f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
    ):
        report_state.notification = await t_utils.poll_for_report_export_notification(
            ws_client=ws_client,
            parser=parser,
            total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Извлечение полей пуш-нотификации"):
        notification = report_state.notification
        notification_reply_status = notification.replyStatus if notification else None
        notification_reply_content = notification.replyContent if notification else None
        notification_export_status = (
            notification_reply_content.exportStatus if notification_reply_content else None
        )
        notification_error_message = (
            (notification_reply_content.errorMessage or "") if notification_reply_content else ""
        )

    with allure.step("Проверка пуш-нотификации о готовности отчёта"):
        StepCheck("Получена пуш-нотификация о готовности отчёта", "notification").actual(
            report_state.notification
        ).is_not_none()
        StepCheck("Проверка статуса пуш-нотификации", "replyStatus").actual(notification_reply_status).expected(
            ReplyStatus.OK.value
        ).equal_to()
        StepCheck("Проверка наличия контента нотификации", "replyContent").actual(
            notification_reply_content
        ).is_not_none()
        StepCheck("Проверка exportStatus в нотификации", "exportStatus").actual(notification_export_status).expected(
            ExportStatus.DONE
        ).equal_to()
        StepCheck("В нотификации нет текста ошибки", "errorMessage").actual(notification_error_message).is_empty()

    with allure.step(
        f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"
    ):
        report_state.report_item = await t_utils.poll_for_exported_file(
            ws_client=ws_client,
            parser=parser,
            list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
            expected_data_type=ExportedDataType.LEAKS_REPORT,
            name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
            tu_name_substring=cfg.technological_unit.description,
            period_start=report_state.period_start,
            period_end=report_state.period_end,
            total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Подготовка данных найденного отчёта в списке"):
        report_item = report_state.report_item
        if report_item is not None:
            allure.attach(
                f"id={report_item.id}, name={report_item.name}, "
                f"exportedDataType={report_item.exportedDataType}, "
                f"start={report_item.start}, end={report_item.end}",
                name="Найденный отчёт в списке",
                attachment_type=allure.attachment_type.TEXT,
            )
        report_state.report_file_name = report_utils.build_export_report_file_name(
            cfg.technological_unit.description,
            report_state.period_start,
            report_state.period_end,
        )

    with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
        StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
            report_state.report_item
        ).is_not_none()

    download_request = {
        "exportedDataId": report_state.report_item.id,
        "exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
        "additionalProperties": None,
        "timeOffset": report_state.time_offset_hours,
    }

    download_purpose = (
        f"скачивание xlsx-отчёта об утечках (exportedDataId={report_state.report_item.id}) "
        f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - выпадашка уведомлений на UI"
    )

    with allure.step(
        f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
    ):
        await t_utils.connect_stream(
            ws_client,
            ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            download_request,
            purpose=download_purpose,
        )
        report_state.download_invocation_id = ws_client.invocation_id

    with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
        report_state.download_reply = await t_utils.receive_download_exported_data_reply(
            ws_client=ws_client,
            parser=parser,
            invocation_id=report_state.download_invocation_id,
            request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
            purpose=download_purpose,
        )

    with allure.step("Извлечение данных ответа на скачивание"):
        download_reply = report_state.download_reply
        download_reply_status = download_reply.replyStatus
        has_download_reply_content = download_reply.replyContent is not None
        report_state.file_bytes = (
            download_reply.replyContent.fileChunk if has_download_reply_content else None
        )
        is_xlsx_signature = (
            report_utils.is_xlsx_file_bytes(report_state.file_bytes) if report_state.file_bytes else False
        )

    with allure.step("Проверка ответа на скачивание и формата xlsx"):
        StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
            ReplyStatus.OK.value
        ).equal_to()
        StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
            has_download_reply_content
        ).expected(True).equal_to()
        StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
        StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
            True
        ).equal_to()

    with allure.step("Подготовка данных для проверки имени файла отчёта"):
        report_file_name = report_state.report_file_name
        report_file_name_lower = report_file_name.lower()
        file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(report_file_name)
        period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
            report_state.period_start_naive,
            report_state.period_end_naive,
        )
        has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
        leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()

    with allure.step("Проверка имени файла отчёта"):
        StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name").actual(
            has_xlsx_extension
        ).expected(True).equal_to()
        StepCheck(f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name").contains(
            report_file_name_lower, leaks_report_name_part_lower
        )
        StepCheck(
            f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
            "file_name",
        ).contains(report_file_name_lower, report_state.tu_description_lower)
        StepCheck(
            "Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
            "period_start_in_file_name",
        ).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
        StepCheck(
            "Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
            "period_end_in_file_name",
        ).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)

    with allure.step("Этап 8. Сохранение, обработка и проверка отчета по утечкам"):
        report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(report_state.file_bytes)

    try:
        with allure.step("Проверка: временный xlsx файл создан"):
            StepCheck("Временный xlsx файл создан", "temp_file_path").actual(report_state.temp_file_path).is_not_none()

        with allure.step("Этап 9. Открытие xlsx и чтение шапки"):
            report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
            report_state.title_info = report_utils.parse_report_title(
                report_utils.get_report_title_cell(report_state.worksheet)
            )
            allure.attach(
                f"Шапка отчёта (raw): {report_state.title_info.raw_title}\n"
                f"period_start: {report_state.title_info.period_start}\n"
                f"period_end: {report_state.title_info.period_end}",
                name="Шапка отчёта (1-я строка)",
                attachment_type=allure.attachment_type.TEXT,
            )

        with allure.step("Подготовка данных шапки отчёта для проверки"):
            title_info = report_state.title_info
            report_title_lower = title_info.raw_title.lower()
            leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
            column_headers = report_utils.get_report_column_headers(report_state.worksheet)
            period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
                report_state.period_start_naive,
                report_state.period_end_naive,
            )
            header_period_start = title_info.period_start
            header_period_end = title_info.period_end

        with allure.step("Проверка двойной шапки отчёта"):
            StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()
            with SoftAssertions() as soft_failures:
                StepCheck(
                    f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
                    "report_title",
                    soft_failures,
                ).contains(report_title_lower, leaks_report_name_part_lower)

                StepCheck(
                    "Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
                    "period_start",
                    soft_failures,
                ).actual(header_period_start).is_between(period_start_lo, period_start_hi)
                StepCheck(
                    "Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
                    "period_end",
                    soft_failures,
                ).actual(header_period_end).is_between(period_end_lo, period_end_hi)
                StepCheck(
                    "Названия колонок в шапке отчёта",
                    "column_headers",
                    soft_failures,
                ).actual(column_headers).expected(ReportConst.EXPECTED_COLUMN_HEADERS).equal_to()

        with allure.step("Этап 10. Извлечение строк данных из отчёта"):
            report_state.data_rows = report_utils.iter_report_data_rows(report_state.worksheet)
            report_state.target_row = report_utils.find_row_with_object(
                report_state.data_rows, cfg.technological_unit.description
            )
            allure.attach(
                "\n".join(f"row#{row.row_index}: {row.cells}" for row in report_state.data_rows),
                name="Все строки данных отчёта",
                attachment_type=allure.attachment_type.TEXT,
            )

        with allure.step("Подготовка данных строки утечки для проверки"):
            target_row = report_state.target_row
            leak_datetime_value = target_row.datetime_value if target_row else None
            object_value_lower = target_row.object_value.lower() if target_row else ""
            lds_status_value = target_row.lds_status.strip() if target_row else ""
            masking_info_lower = target_row.masking_info.lower() if target_row else ""
            leak_coordinate_meters = target_row.coordinate_meters if target_row else None
            leak_volume_value = target_row.leak_volume if target_row else None
            mt_mode_lower = target_row.mt_mode.lower() if target_row else ""
            expected_mt_mode_lower = report_state.expected_mt_mode.lower()
            masking_not_masked_lower = ReportConst.MASKING_NOT_MASKED_TEXT.lower()
            period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
                report_state.period_start_naive,
                report_state.period_end_naive,
            )

        with allure.step("Проверка содержимого строки утечки"):
            StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
                report_state.data_rows
            ).is_not_empty()
            StepCheck(
                f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
                ReportConst.COL_OBJECT,
            ).actual(report_state.target_row).is_not_none()

            with SoftAssertions() as soft_failures:
                StepCheck(
                    "Время утечки в диапазоне [старт имитатора, старт + offset теста] (+-1 мин)",
                    ReportConst.COL_DATETIME,
                    soft_failures,
                ).actual(leak_datetime_value).is_between(period_start_lo, period_end_hi)

                StepCheck(
                    f"Колонка '{ReportConst.COL_OBJECT}' содержит '{cfg.technological_unit.description}'",
                    ReportConst.COL_OBJECT,
                    soft_failures,
                ).contains(object_value_lower, report_state.tu_description_lower)

                StepCheck(
                    f"Колонка '{ReportConst.COL_LDS_STATUS}'",
                    ReportConst.COL_LDS_STATUS,
                    soft_failures,
                ).actual(lds_status_value).expected(ReportConst.LDS_STATUS_OK_TEXT).equal_to()

                StepCheck(
                    f"Колонка '{ReportConst.COL_MASK_INFO}' содержит '{ReportConst.MASKING_NOT_MASKED_TEXT}'",
                    ReportConst.COL_MASK_INFO,
                    soft_failures,
                ).contains(masking_info_lower, masking_not_masked_lower)

                StepCheck(
                    f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью {cfg.allowed_distance_diff_meters} м)",
                    ReportConst.COL_COORDINATE,
                    soft_failures,
                ).actual(leak_coordinate_meters).is_close_to(
                    leak.coordinate_meters,
                    cfg.allowed_distance_diff_meters,
                    f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
                )

                StepCheck(
                    f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
                    ReportConst.COL_LEAK_VOLUME,
                    soft_failures,
                ).actual(leak_volume_value).is_not_none()

                StepCheck(
                    f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{report_state.expected_mt_mode}'",
                    ReportConst.COL_MT_MODE,
                    soft_failures,
                ).contains(mt_mode_lower, expected_mt_mode_lower)
    except Exception:
        with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
            if report_state.temp_file_path and report_state.report_file_name:
                report_utils.attach_report_file_to_allure(report_state.temp_file_path, report_state.report_file_name)
        raise
    finally:
        with allure.step("Удаление временного xlsx файла"):
            temp_path = report_state.temp_file_path
            if temp_path is not None:
                try:
                    temp_path.unlink(missing_ok=True)
                except OSError:
                    pass






































report xlxs 
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional

import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet

from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow


@dataclass
class ReportTitleInfo:
    """Разобранная шапка отчёта"""

    raw_title: str
    period_start: Optional[datetime] = None
    period_end: Optional[datetime] = None


@dataclass
class LeakReportRow:
    """Разобранная строка данных по утечке"""

    row_index: int
    cells: Dict[str, str] = field(default_factory=dict)

    @property
    def datetime_value(self) -> Optional[datetime]:
        return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))

    @property
    def object_value(self) -> str:
        return self.cells.get(ReportConst.COL_OBJECT, "")

    @property
    def lds_status(self) -> str:
        return self.cells.get(ReportConst.COL_LDS_STATUS, "")

    @property
    def masking_info(self) -> str:
        return self.cells.get(ReportConst.COL_MASK_INFO, "")

    @property
    def coordinate_meters(self) -> Optional[float]:
        coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
        if coordinate_km is None:
            return None
        return coordinate_km * TestConst.KM_TO_METERS

    @property
    def leak_volume(self) -> Optional[float]:
        return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))

    @property
    def mt_mode(self) -> str:
        return self.cells.get(ReportConst.COL_MT_MODE, "")


def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
    """Проверяет zip-сигнатуру xlsx (PK\\x03\\x04)."""
    if not file_bytes:
        return False
    return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)


def is_xlsx_extension(file_name: str) -> bool:
    """Проверяет расширение .xlsx без учёта регистра."""
    return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)


def parse_report_datetime(value: object) -> Optional[datetime]:
    """Парсит дату/время из ячейки отчёта."""
    if value is None:
        return None
    if isinstance(value, datetime):
        return value
    if isinstance(value, str):
        try:
            return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
        except ValueError:
            return None
    return None


def _stringify_cell(value: object) -> str:
    if value is None:
        return ""
    if isinstance(value, datetime):
        return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
    return str(value)


def normalize_report_period_naive(value: datetime) -> datetime:
    """Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
    return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)


def report_period_comparison_bounds(
    period_start: datetime,
    period_end: datetime,
    tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
    """
    Границы периода с допуском +-tolerance_minutes для start и end отдельно.
    Возвращает (start_lower, start_upper, end_lower, end_upper).
    """
    start = normalize_report_period_naive(period_start)
    end = normalize_report_period_naive(period_end)
    delta = timedelta(minutes=tolerance_minutes)
    return start - delta, start + delta, end - delta, end + delta


def build_export_report_file_name(
    tu_description: str,
    period_start: datetime,
    period_end: datetime,
) -> str:
    """
    Имя xlsx при скачивании: «Отчет об утечках Тихорецк-Новороссийск-3 DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx».
    """
    start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
    end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
    return (
        f"{ReportConst.LEAKS_REPORT_NAME_PART} {tu_description} {start_text} - {end_text}"
        f"{ReportConst.XLSX_EXTENSION}"
    )


def parse_period_from_export_file_name(file_name: str) -> tuple[Optional[datetime], Optional[datetime]]:
    """Извлекает границы периода из имени скачанного xlsx-файла."""
    match = re.search(ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN, file_name.strip(), re.IGNORECASE)
    if match is None:
        return None, None

    parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")

    def _parse_part(value: str) -> Optional[datetime]:
        try:
            return datetime.strptime(value.replace("_", ":"), parse_format)
        except ValueError:
            return None

    return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))


def parse_report_title(title_raw: object) -> ReportTitleInfo:
    """
    Парсит шапку отчёта с именованными группами period_start / period_end.
    """
    title_str = _stringify_cell(title_raw)
    match = re.search(ReportConst.REPORT_HEADER_PERIOD_PATTERN, title_str)
    if match is None:
        return ReportTitleInfo(raw_title=title_str)

    return ReportTitleInfo(
        raw_title=title_str,
        period_start=parse_report_datetime(match.group("period_start")),
        period_end=parse_report_datetime(match.group("period_end")),
    )


def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
    """Открывает первый лист xlsx. При ошибке возвращает None."""
    if not file_path.exists():
        return None
    try:
        workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
    except Exception:
        return None
    sheet_names = workbook.sheetnames
    if not sheet_names:
        return None
    return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]


def get_report_title_cell(worksheet: Worksheet) -> object:
    return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value


def get_report_column_headers(worksheet: Worksheet) -> List[str]:
    """Возвращает непустые заголовки колонок из строки REPORT_COLUMN_HEADERS_ROW."""
    headers: List[str] = []
    column_index = 1
    while True:
        cell_value = worksheet.cell(row=ReportConst.REPORT_COLUMN_HEADERS_ROW, column=column_index).value
        if cell_value is None or not str(cell_value).strip():
            break
        headers.append(_stringify_cell(cell_value).strip())
        column_index += 1
    return headers


def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
    """Собирает словарь {название колонки: значение ячейки} по строке данных."""
    return {
        header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
        for column_index, header in enumerate(headers)
    }


def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
    """
    Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
    Пустые строки пропускаются.
    """
    headers = get_report_column_headers(worksheet)
    if not headers:
        return []

    rows: List[LeakReportRow] = []
    for excel_row_index, row_values in enumerate(
        worksheet.iter_rows(
            min_row=ReportConst.REPORT_DATA_FIRST_ROW,
            max_col=len(headers),
            values_only=True,
        ),
        start=ReportConst.REPORT_DATA_FIRST_ROW,
    ):
        if not any(cell is not None and str(cell).strip() for cell in row_values):
            continue
        rows.append(
            LeakReportRow(
                row_index=excel_row_index,
                cells=build_column_cells(row_values, headers),
            )
        )
    return rows


def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
    """Ищет первую строку, где колонка «Объект» содержит подстроку без учёта регистра"""
    substring_lower = object_substring.lower()
    for row in rows:
        if substring_lower in row.object_value.lower():
            return row
    return None


def save_report_bytes_to_temp_file(file_bytes: bytes) -> Optional[Path]:
    """Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
    import tempfile

    try:
        with tempfile.NamedTemporaryFile(
            suffix=ReportConst.XLSX_EXTENSION,
            prefix="leaks_report_",
            delete=False,
        ) as temp_file:
            temp_file.write(file_bytes)
            return Path(temp_file.name)
    except OSError:
        return None


def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
    """Прикладывает xlsx к Allure при падении теста"""
    try:
        xlsx_type = allure.attachment_type.XLSX
    except AttributeError:
        xlsx_type = None

    if xlsx_type is not None:
        allure.attach.file(
            str(file_path),
            name=file_name,
            attachment_type=xlsx_type,
            extension="xlsx",
        )
        return

    try:
        with file_path.open("rb") as raw_file:
            allure.attach(raw_file.read(), name=file_name, extension="xlsx")
    except OSError:
        pass


































вс тест утил

from constants.architecture_constants import WebSocketClientConstants as WS_Const



    return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))


def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
    """
    Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
    """
    now = datetime.now(ZoneInfo(tz_name))
    utc_offset = now.utcoffset()
    if utc_offset is None:
        return None
    return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)


def localize_as_moscow(input_datetime: datetime) -> None | datetime:















        fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")


async def connect_stream(
    ws_client: WebSocketClient,
    ws_invoke_type: str,
    ws_invoke_params: Any = None,
    purpose: str = "streaming-вызов WS",
) -> None:
    """
    Streaming-вызов (StreamInvocation)
    """
    try:
        with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
            await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
    except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
        fail(
            f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
            f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
        )


def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
    """Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
    if not isinstance(msg, list):
        return None
    if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
        return None
    if not is_desired_invocation_id(msg, invocation_id):
        return None
    if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
        return None
    error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
    if isinstance(error_text, str):
        return error_text
    return None


async def receive_download_exported_data_reply(
    ws_client: WebSocketClient,
    parser,
    invocation_id: str,
    request_name: str,
    total_wait_seconds: float,
    poll_interval_seconds: float = 0.5,
    purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
    """
    Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
    """
    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    collected_messages: List[Any] = []
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            await asyncio.sleep(poll_interval_seconds)
            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)

            for msg in batch:
                stream_error = _stream_completion_error(msg, invocation_id)
                if stream_error:
                    _attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
                    fail(
                        f"При {purpose} бэк вернул Completion с ошибкой "
                        f"({request_name}, invocation_id={invocation_id}): {stream_error}"
                    )

            for msg in batch:
                if (
                    not isinstance(msg, list)
                    or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
                    or not is_desired_invocation_id(msg, invocation_id)
                ):
                    continue
                if parser._find_reply_status_in_ws_msg(msg) is None:
                    continue
                try:
                    return parser.parse_download_exported_data_msg(msg)
                except Exception as error:
                    _attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
                    fail(
                        f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
                        f"но не удалось разобрать ответ с fileChunk: {error}"
                    )
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
    fail(
        f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
        f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
    )


async def connect_and_get_parsed_msg_by_tu_id(














async def poll_for_exported_file(
    ws_client: WebSocketClient,
    parser,
    list_limit: int,
    expected_data_type: Any,
    name_substring: str,
    tu_name_substring: str,
    period_start: datetime,
    period_end: datetime,
    total_wait_seconds: float,
    poll_interval_seconds: float,
    period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
    """
    Периодически шлёт GetExportedDataListRequest, забирает ответы из очереди
    по invocation_id среди всех накопленных сообщений.
    При таймауте или ошибке парсинга прикрепляет к Allure полученные ответы.
    """

    deadline = asyncio.get_event_loop().time() + total_wait_seconds
    last_items_count = -1
    collected_messages: List[Any] = []
    request_name = ReportConst.GET_EXPORTED_DATA_LIST_REQUEST
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        while asyncio.get_event_loop().time() < deadline:
            drained_before_request = _drain_recv_queue(ws_client)
            collected_messages.extend(drained_before_request)
            await connect(
                ws_client,
                request_name,
                {"limit": list_limit},
            )
            invocation_id = ws_client.invocation_id
            await asyncio.sleep(poll_interval_seconds)

            batch = _drain_recv_queue(ws_client)
            collected_messages.extend(batch)
            list_reply_payload = _find_ws_reply_by_invocation_id(batch, invocation_id, parser)

            if list_reply_payload is None:
                continue

            try:
                parsed_payload = parser.parse_exported_data_list_msg(list_reply_payload)
            except Exception as error:
                _attach_ws_reply_parse_failure(list_reply_payload, invocation_id, request_name, error)
                for msg in collected_messages:
                    allure.attach(
                        pprint.pformat(msg, width=120, sort_dicts=False),
                        name="received ws message",
                        attachment_type=allure.attachment_type.TEXT,
                    )
                fail(f"Не удалось разобрать ответ на {request_name}: {error}")

            items = []
            if parsed_payload.replyContent is not None:
                items = parsed_payload.replyContent.exportedData or []

            if len(items) != last_items_count:
                allure.attach(
                    "\n".join(
                        f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
                        f"start={item.start}, end={item.end}"
                        for item in items
                    ),
                    name=f"Список сформированных файлов (попытка, всего: {len(items)})",
                    attachment_type=allure.attachment_type.TEXT,
                )
                last_items_count = len(items)

            match = find_matching_exported_item(
                items=items,
                expected_data_type=expected_data_type,
                name_substring=name_substring,
                tu_name_substring=tu_name_substring,
                period_start=period_start,
                period_end=period_end,
                period_tolerance_minutes=period_tolerance_minutes,
            )
            if match is not None:
                return match
    finally:
        collected_messages.extend(_drain_recv_queue(ws_client))
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    _attach_ws_poll_failure(
        collected_messages,
        total_wait_seconds,
        request_name,
    )
    return None


def _normalize_report_period_datetime(value: datetime) -> datetime:
    """Приводит datetime периода отчёта к московскому времени без микросекунд."""
    return localize_as_moscow(value).replace(microsecond=0)


def _exported_item_period_matches(
    item_start: datetime,
    item_end: datetime,
    period_start: datetime,
    period_end: datetime,
    tolerance_minutes: int,
) -> bool:
    """Проверяет start/end элемента списка в пределах периода запроса +- tolerance_minutes."""
    item_start_norm = _normalize_report_period_datetime(item_start)
    item_end_norm = _normalize_report_period_datetime(item_end)
    period_start_norm = _normalize_report_period_datetime(period_start)
    period_end_norm = _normalize_report_period_datetime(period_end)
    delta = timedelta(minutes=tolerance_minutes)
    return (
        (period_start_norm - delta) <= item_start_norm <= (period_start_norm + delta)
        and (period_end_norm - delta) <= item_end_norm <= (period_end_norm + delta)
    )


def find_matching_exported_item(
    items: List[Any],
    expected_data_type: Any,
    name_substring: str,
    tu_name_substring: str,
    period_start: datetime,
    period_end: datetime,
    period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
    """
    Ищет элемент списка по типу, подстрокам в имени (отчёт + ТУ) и периоду start/end с допуском.
    """
    name_substring_lower = name_substring.lower()
    tu_name_lower = tu_name_substring.lower()

    matched_items = []
    for item in items:
        if item.exportedDataType != expected_data_type:
            continue
        item_name_lower = (item.name or "").lower()
        if name_substring_lower not in item_name_lower:
            continue
        if tu_name_lower not in item_name_lower:
            continue
        if item.start is None or item.end is None:
            continue
        if not _exported_item_period_matches(
            item.start, item.end, period_start, period_end, period_tolerance_minutes
        ):
            continue
        matched_items.append(item)

    if not matched_items:
        return None
    return max(matched_items, key=lambda exported_item: exported_item.id)