Загрузка данных


"""
Утилиты для разбора xlsx-отчёта о режиме работы СОУ.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime, time, timedelta
from typing import Dict, List, Optional, Tuple

from openpyxl.worksheet.worksheet import Worksheet

from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportLdsStatusReportConstants as LdsReportConst
from utils.helpers.report_xlsx_utils import (
    ReportTitleInfo,
    _stringify_cell,
    build_column_cells,
    get_report_column_headers,
    parse_report_title,
)


@dataclass
class LdsStatusReportSectionRow:
    """Строка участка с длительностями режимов СОУ."""

    row_index: int
    section_name: str
    cells: Dict[str, str] = field(default_factory=dict)

    @property
    def mode_durations_seconds(self) -> Dict[str, int]:
        return {
            column_name: parse_duration_seconds(self.cells.get(column_name)) or 0
            for column_name in LdsReportConst.MODE_DURATION_COLUMNS
        }

    @property
    def modes_sum_seconds(self) -> int:
        return sum(self.mode_durations_seconds.values())


@dataclass
class LdsStatusReportParsed:
    """Разобранный отчёт о режиме работы СОУ."""

    title_info: ReportTitleInfo
    column_headers: List[str]
    section_rows: List[LdsStatusReportSectionRow]
    total_duration_seconds: Optional[int] = None
    total_duration_raw: str = ""
    total_label_row_index: Optional[int] = None


def parse_duration_seconds(value: object) -> Optional[int]:
    """Парсит длительность из ячейки (H:MM:SS, MM:SS или time/timedelta из Excel)."""
    if value is None:
        return None
    if isinstance(value, timedelta):
        return int(value.total_seconds())
    if isinstance(value, time):
        return (
            value.hour * TestConst.SECONDS_PER_HOUR
            + value.minute * TestConst.SEC_PER_MIN
            + value.second
        )
    if isinstance(value, datetime):
        return (
            value.hour * TestConst.SECONDS_PER_HOUR
            + value.minute * TestConst.SEC_PER_MIN
            + value.second
        )

    duration_text = _stringify_cell(value).strip()
    if not duration_text:
        return None

    parts = duration_text.split(":")
    try:
        if len(parts) == LdsReportConst.DURATION_PARTS_COUNT_H_MM_SS:
            hours, minutes, seconds = (int(part) for part in parts)
            return (
                hours * TestConst.SECONDS_PER_HOUR
                + minutes * TestConst.SEC_PER_MIN
                + seconds
            )
        if len(parts) == LdsReportConst.DURATION_PARTS_COUNT_MM_SS:
            minutes, seconds = (int(part) for part in parts)
            return minutes * TestConst.SEC_PER_MIN + seconds
    except ValueError:
        return None
    return None


def is_duration_cell_filled(value: object) -> bool:
    """Ячейка с длительностью заполнена (допускается 0:00:00)."""
    return parse_duration_seconds(value) is not None


def format_duration_seconds(total_seconds: int) -> str:
    """Форматирует длительность в секундах в строку H:MM:SS (минуты и секунды с ведущим нулём)."""
    hours, remainder = divmod(total_seconds, TestConst.SECONDS_PER_HOUR)
    minutes, seconds = divmod(remainder, TestConst.SEC_PER_MIN)
    return f"{hours}:{minutes:02d}:{seconds:02d}"


def _find_total_work_duration(worksheet: Worksheet) -> Tuple[Optional[int], str, Optional[int]]:
    """
    Ищет строку "Суммарное время работы:" и парсит длительность рядом (в той же или следующей строке).

    Возвращает: (секунды, сырое значение ячейки, номер строки с меткой) или (None, "", None).
    """
    for row_index, row_values in enumerate(
        worksheet.iter_rows(min_row=LdsReportConst.REPORT_DATA_FIRST_ROW, values_only=True),
        start=LdsReportConst.REPORT_DATA_FIRST_ROW,
    ):
        for column_index, cell_value in enumerate(row_values):
            if cell_value is None:
                continue
            cell_text = _stringify_cell(cell_value).strip()
            if LdsReportConst.TOTAL_WORK_DURATION_LABEL not in cell_text:
                continue

            duration_candidates = []
            if column_index + 1 < len(row_values):
                duration_candidates.append(row_values[column_index + 1])
            if row_index + 1 <= worksheet.max_row:
                duration_candidates.append(
                    worksheet.cell(row=row_index + 1, column=column_index + 1).value
                )

            for candidate in duration_candidates:
                duration_seconds = parse_duration_seconds(candidate)
                if duration_seconds is not None:
                    return duration_seconds, _stringify_cell(candidate).strip(), row_index

            return None, "", row_index

    return None, "", None


def parse_lds_status_report_worksheet(
    worksheet: Worksheet,
    expected_section_names: List[str],
) -> LdsStatusReportParsed:
    """
    Разбирает лист xlsx-отчёта о режиме СОУ: шапка, колонки, строки участков и суммарное время.

    В section_rows попадают только участки из expected_section_names (без учёта регистра).
    """
    headers = get_report_column_headers(worksheet, LdsReportConst.REPORT_COLUMN_HEADERS_ROW)
    title_info = parse_report_title(
        worksheet.cell(row=LdsReportConst.REPORT_TITLE_ROW, column=1).value,
        LdsReportConst.REPORT_HEADER_PERIOD_PATTERN,
    )
    total_duration_seconds, total_duration_raw, total_label_row_index = _find_total_work_duration(worksheet)

    section_rows: List[LdsStatusReportSectionRow] = []
    expected_names_lower = {name.lower() for name in expected_section_names}

    for row_index, row_values in enumerate(
        worksheet.iter_rows(
            min_row=LdsReportConst.REPORT_DATA_FIRST_ROW,
            max_col=len(headers) if headers else 5,
            values_only=True,
        ),
        start=LdsReportConst.REPORT_DATA_FIRST_ROW,
    ):
        if total_label_row_index is not None and row_index >= total_label_row_index:
            break

        cells = build_column_cells(row_values, headers)
        section_name = cells.get(LdsReportConst.COL_SECTION, "").strip()
        if not section_name:
            continue
        if section_name.lower() not in expected_names_lower:
            continue

        section_rows.append(
            LdsStatusReportSectionRow(
                row_index=row_index,
                section_name=section_name,
                cells=cells,
            )
        )

    return LdsStatusReportParsed(
        title_info=title_info,
        column_headers=headers,
        section_rows=section_rows,
        total_duration_seconds=total_duration_seconds,
        total_duration_raw=total_duration_raw,
        total_label_row_index=total_label_row_index,
    )


def format_section_rows_for_allure(section_rows: List[LdsStatusReportSectionRow]) -> str:
    lines = []
    for row in section_rows:
        durations_text = ", ".join(
            f"{column}={format_duration_seconds(seconds)}"
            for column, seconds in row.mode_durations_seconds.items()
        )
        lines.append(
            f"row#{row.row_index}: {row.section_name} | sum={format_duration_seconds(row.modes_sum_seconds)} | "
            f"{durations_text}"
        )
    return "\n".join(lines)


__all__ = [
    "LdsStatusReportParsed",
    "LdsStatusReportSectionRow",
    "format_duration_seconds",
    "format_section_rows_for_allure",
    "is_duration_cell_filled",
    "parse_duration_seconds",
    "parse_lds_status_report_worksheet",
]





















report xlsx
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional

import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet

from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow


@dataclass
class ReportTitleInfo:
    """Разобранная шапка отчёта"""

    raw_title: str
    period_start: Optional[datetime] = None
    period_end: Optional[datetime] = None


@dataclass
class LeakReportRow:
    """Разобранная строка данных по утечке"""

    row_index: int
    cells: Dict[str, str] = field(default_factory=dict)

    @property
    def datetime_value(self) -> Optional[datetime]:
        return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))

    @property
    def object_value(self) -> str:
        return self.cells.get(ReportConst.COL_OBJECT, "")

    @property
    def lds_status(self) -> str:
        return self.cells.get(ReportConst.COL_LDS_STATUS, "")

    @property
    def masking_info(self) -> str:
        return self.cells.get(ReportConst.COL_MASK_INFO, "")

    @property
    def coordinate_meters(self) -> Optional[float]:
        coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
        if coordinate_km is None:
            return None
        return coordinate_km * TestConst.KM_TO_METERS

    @property
    def leak_volume(self) -> Optional[float]:
        return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))

    @property
    def mt_mode(self) -> str:
        return self.cells.get(ReportConst.COL_MT_MODE, "")


def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
    """Проверяет zip-сигнатуру xlsx"""
    if not file_bytes:
        return False
    return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)


def is_xlsx_extension(file_name: str) -> bool:
    """Проверяет расширение .xlsx без учёта регистра."""
    return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)


def parse_report_datetime(value: object) -> Optional[datetime]:
    """Парсит дату/время из ячейки отчёта."""
    if value is None:
        return None
    if isinstance(value, datetime):
        return value
    if isinstance(value, str):
        try:
            return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
        except ValueError:
            return None
    return None


def _stringify_cell(value: object) -> str:
    if value is None:
        return ""
    if isinstance(value, datetime):
        return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
    return str(value)


def normalize_report_period_naive(value: datetime) -> datetime:
    """Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
    return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)


def report_period_comparison_bounds(
    period_start: datetime,
    period_end: datetime,
    tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
    """
    Границы периода с допуском +-tolerance_minutes для start и end отдельно.
    Возвращает (start_lower, start_upper, end_lower, end_upper).
    """
    start = normalize_report_period_naive(period_start)
    end = normalize_report_period_naive(period_end)
    delta = timedelta(minutes=tolerance_minutes)
    return start - delta, start + delta, end - delta, end + delta


def build_export_report_file_name(
    tu_description: str,
    period_start: datetime,
    period_end: datetime,
    report_name_part: str = ReportConst.LEAKS_REPORT_NAME_PART,
    name_tu_separator: str = " ",
) -> str:
    """
    Имя xlsx при скачивании: '{название}{sep}{ТУ} DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx'.
    По умолчанию - отчёт об утечках.
    """
    start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
    end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
    return (
        f"{report_name_part}{name_tu_separator}{tu_description} {start_text} - {end_text}"
        f"{ReportConst.XLSX_EXTENSION}"
    )


def parse_period_from_export_file_name(
    file_name: str,
    period_pattern: str | None = None,
) -> tuple[Optional[datetime], Optional[datetime]]:
    """Извлекает границы периода из имени скачанного xlsx-файла."""
    match = re.search(
        period_pattern or ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
        file_name.strip(),
        re.IGNORECASE,
    )
    if match is None:
        return None, None

    parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")

    def _parse_part(value: str) -> Optional[datetime]:
        try:
            return datetime.strptime(value.replace("_", ":"), parse_format)
        except ValueError:
            return None

    return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))


def parse_report_title(
    title_raw: object,
    header_period_pattern: str | None = None,
) -> ReportTitleInfo:
    """
    Парсит шапку отчёта с именованными группами period_start/period_end.
    """
    title_str = _stringify_cell(title_raw)
    pattern = header_period_pattern or ReportConst.REPORT_HEADER_PERIOD_PATTERN
    match = re.search(pattern, title_str)
    if match is None:
        return ReportTitleInfo(raw_title=title_str)

    return ReportTitleInfo(
        raw_title=title_str,
        period_start=parse_report_datetime(match.group("period_start")),
        period_end=parse_report_datetime(match.group("period_end")),
    )


def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
    """Открывает первый лист xlsx. При ошибке возвращает None."""
    if not file_path.exists():
        return None
    try:
        workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
    except Exception:
        return None
    sheet_names = workbook.sheetnames
    if not sheet_names:
        return None
    return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]


def get_report_title_cell(worksheet: Worksheet) -> object:
    return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value


def get_report_column_headers(
    worksheet: Worksheet,
    headers_row: int = ReportConst.REPORT_COLUMN_HEADERS_ROW,
) -> List[str]:
    """Возвращает непустые заголовки колонок из указанной строки шапки."""
    headers: List[str] = []
    column_index = 1
    while True:
        cell_value = worksheet.cell(row=headers_row, column=column_index).value
        if cell_value is None or not str(cell_value).strip():
            break
        headers.append(_stringify_cell(cell_value).strip())
        column_index += 1
    return headers


def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
    """Собирает словарь {название колонки: значение ячейки} по строке данных."""
    return {
        header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
        for column_index, header in enumerate(headers)
    }


def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
    """
    Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
    Пустые строки пропускаются.
    """
    headers = get_report_column_headers(worksheet)
    if not headers:
        return []

    rows: List[LeakReportRow] = []
    for excel_row_index, row_values in enumerate(
        worksheet.iter_rows(
            min_row=ReportConst.REPORT_DATA_FIRST_ROW,
            max_col=len(headers),
            values_only=True,
        ),
        start=ReportConst.REPORT_DATA_FIRST_ROW,
    ):
        if not any(cell is not None and str(cell).strip() for cell in row_values):
            continue
        rows.append(
            LeakReportRow(
                row_index=excel_row_index,
                cells=build_column_cells(row_values, headers),
            )
        )
    return rows


def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
    """Ищет первую строку, где колонка 'Объект' содержит подстроку без учёта регистра"""
    substring_lower = object_substring.lower()
    for row in rows:
        if substring_lower in row.object_value.lower():
            return row
    return None


def save_report_bytes_to_temp_file(
    file_bytes: bytes,
    prefix: str = "leaks_report_",
) -> Optional[Path]:
    """Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
    import tempfile

    try:
        with tempfile.NamedTemporaryFile(
            suffix=ReportConst.XLSX_EXTENSION,
            prefix=prefix,
            delete=False,
        ) as temp_file:
            temp_file.write(file_bytes)
            return Path(temp_file.name)
    except OSError:
        return None


def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
    """Прикладывает xlsx к Allure при падении теста"""
    try:
        xlsx_type = allure.attachment_type.XLSX
    except AttributeError:
        xlsx_type = None

    if xlsx_type is not None:
        allure.attach.file(
            str(file_path),
            name=file_name,
            attachment_type=xlsx_type,
            extension="xlsx",
        )
        return

    try:
        with file_path.open("rb") as raw_file:
            allure.attach(raw_file.read(), name=file_name, extension="xlsx")
    except OSError:
        pass




























сценарий 

async def export_lds_status_report(
    ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime
):
    """
    Сценарий формирования xlsx-отчёта о режиме работы СОУ.
    """
    report_state = ExportLdsStatusReportState()

    with allure.step("Подготовка параметров сценария формирования отчёта о режиме работы СОУ"):
        report_state.report_test = leak.export_lds_status_report_test
        StepCheck("В конфигурации задан export_lds_status_report_test", "export_lds_status_report_test").actual(
            report_state.report_test
        ).is_not_none()
        report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
        report_state.period_end = t_utils.localize_as_moscow(
            imitator_start_time + timedelta(minutes=report_state.report_test.offset)
        )
        report_state.period_start_naive = report_utils.normalize_report_period_naive(report_state.period_start)
        report_state.period_end_naive = report_utils.normalize_report_period_naive(report_state.period_end)
        report_state.tu_description_lower = cfg.technological_unit.description.lower()
        time_offset_hours = t_utils.report_time_offset_hours()
        StepCheck(
            f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
            "time_offset_hours",
        ).actual(time_offset_hours).is_not_none()
        report_state.time_offset_hours = time_offset_hours

        allure.attach(
            f"period.start={report_state.period_start}\n"
            f"period.end={report_state.period_end}\n"
            f"offset_minutes={report_state.report_test.offset}\n"
            f"sections={LdsReportConst.SECTION_NAMES}",
            name="Фильтр периода отчёта о режиме СОУ",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(f"Этап 1. Подписка на пуш-нотификации {ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST}"):
        await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])

    with allure.step(f"Этап 2. Запрос формирования отчёта {ReportConst.EXPORT_REPORTS_COMMAND_REQUEST}"):
        request_payload = {
            "tuId": cfg.tu_id,
            "exportedDataTypes": [ExportedDataType.LDS_STATUS_REPORT.value],
            "timeOffset": report_state.time_offset_hours,
            "period": {
                "start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
                "end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
                "additionalProperties": {},
            },
        }
        await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)

    with allure.step(
        f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
    ):
        report_state.notification = await t_utils.poll_for_report_export_notification(
            ws_client=ws_client,
            parser=parser,
            total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
        report_state.report_item = await t_utils.poll_for_exported_file(
            ws_client=ws_client,
            parser=parser,
            list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
            expected_data_type=ExportedDataType.LDS_STATUS_REPORT,
            name_substring=LdsReportConst.LDS_STATUS_REPORT_NAME_PART,
            tu_name_substring=cfg.technological_unit.description,
            period_start=report_state.period_start,
            period_end=report_state.period_end,
            total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step("Подготовка данных найденного отчёта в списке"):
        report_item = report_state.report_item
        if report_item is not None:
            allure.attach(
                f"id={report_item.id}, name={report_item.name}, "
                f"exportedDataType={report_item.exportedDataType}, "
                f"start={t_utils.format_datetime_moscow(report_item.start)}, "
                f"end={t_utils.format_datetime_moscow(report_item.end)}",
                name="Найденный отчёт в списке",
                attachment_type=allure.attachment_type.TEXT,
            )
        report_state.report_file_name = report_utils.build_export_report_file_name(
            cfg.technological_unit.description,
            report_state.period_start,
            report_state.period_end,
            LdsReportConst.LDS_STATUS_REPORT_NAME_PART,
            ". ",
        )

    with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
        StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
            report_state.report_item
        ).is_not_none()

    with allure.step(
        f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
    ):
        download_request = {
            "exportedDataId": report_state.report_item.id,
            "exportedDataType": ExportedDataType.LDS_STATUS_REPORT.to_download_name(),
            "additionalProperties": None,
            "timeOffset": report_state.time_offset_hours,
        }
        download_purpose = (
            f"скачивание xlsx-отчёта о режиме СОУ (exportedDataId={report_state.report_item.id}) "
            f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest"
        )
        await t_utils.connect_stream(
            ws_client,
            ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            download_request,
            purpose=download_purpose,
        )
        report_state.download_invocation_id = ws_client.invocation_id

    with allure.step("Этап 6. Получение fileChunk - скачивание отчёта о режиме СОУ"):
        report_state.download_reply = await t_utils.receive_download_exported_data_reply(
            ws_client=ws_client,
            parser=parser,
            invocation_id=report_state.download_invocation_id,
            request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
            purpose=download_purpose,
        )

    with allure.step("Извлечение данных ответа на скачивание"):
        download_reply = report_state.download_reply
        download_reply_status = download_reply.replyStatus
        has_download_reply_content = download_reply.replyContent is not None
        report_state.file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
        is_xlsx_signature = (
            report_utils.is_xlsx_file_bytes(report_state.file_bytes) if report_state.file_bytes else False
        )

    with allure.step("Проверка ответа на скачивание и формата xlsx"):
        StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
            ReplyStatus.OK.value
        ).equal_to()
        StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
            has_download_reply_content
        ).expected(True).equal_to()
        StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
        StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
            True
        ).equal_to()

    with allure.step("Подготовка данных для проверки имени файла отчёта"):
        report_file_name = report_state.report_file_name
        report_file_name_lower = report_file_name.lower()
        file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(
            report_file_name,
            LdsReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
        )
        period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
            report_state.period_start_naive,
            report_state.period_end_naive,
        )
        has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
        lds_report_name_part_lower = LdsReportConst.LDS_STATUS_REPORT_NAME_PART.lower()

    try:
        with allure.step("Этап 7. Сохранение и разбор xlsx-отчёта о режиме СОУ"):
            report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(
                report_state.file_bytes,
                prefix="lds_status_report_",
            )
            StepCheck("Временный xlsx файл создан", "temp_file_path").actual(report_state.temp_file_path).is_not_none()
            report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
            report_state.parsed_report = lds_report_utils.parse_lds_status_report_worksheet(
                report_state.worksheet,
                LdsReportConst.SECTION_NAMES,
            )
            allure.attach(
                f"Шапка (raw): {report_state.parsed_report.title_info.raw_title}\n"
                f"period_start: {report_state.parsed_report.title_info.period_start}\n"
                f"period_end: {report_state.parsed_report.title_info.period_end}\n"
                f"total_duration: {report_state.parsed_report.total_duration_raw}",
                name="Шапка отчёта о режиме СОУ",
                attachment_type=allure.attachment_type.TEXT,
            )
            allure.attach(
                lds_report_utils.format_section_rows_for_allure(report_state.parsed_report.section_rows),
                name="Строки участков отчёта",
                attachment_type=allure.attachment_type.TEXT,
            )

        with allure.step("Подготовка данных таблицы отчёта для проверки"):
            parsed_report = report_state.parsed_report
            expected_section_names = LdsReportConst.SECTION_NAMES
            section_rows = parsed_report.section_rows
            total_duration_seconds = parsed_report.total_duration_seconds
            duration_tolerance = LdsReportConst.TOTAL_DURATION_TOLERANCE_SECONDS

        with allure.step("Проверка содержимого таблицы отчёта о режиме СОУ"):
            StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()
            with SoftAssertions() as soft_failures:
                StepCheck(
                    "Количество строк участков в отчёте",
                    "section_rows_count",
                    soft_failures,
                ).actual(len(section_rows)).expected(len(expected_section_names)).equal_to()

                for section_index, expected_section_name in enumerate(expected_section_names):
                    actual_section_name = (
                        section_rows[section_index].section_name if section_index < len(section_rows) else None
                    )
                    StepCheck(
                        f"Наименование участка #{section_index + 1}",
                        LdsReportConst.COL_SECTION,
                        soft_failures,
                    ).actual(actual_section_name).expected(expected_section_name).equal_to()

                for section_row in section_rows:
                    for column_name in LdsReportConst.MODE_DURATION_COLUMNS:
                        cell_value = section_row.cells.get(column_name)
                        StepCheck(
                            f"Длительность '{column_name}' для участка '{section_row.section_name}' заполнена",
                            column_name,
                            soft_failures,
                        ).actual(lds_report_utils.is_duration_cell_filled(cell_value)).expected(True).equal_to()

                StepCheck(
                    "В отчёте найдена строка 'Суммарное время работы:'",
                    "total_work_duration_label",
                    soft_failures,
                ).actual(parsed_report.total_label_row_index).is_not_none()
                StepCheck(
                    "Суммарное время работы в отчёте не нулевое",
                    "total_work_duration",
                    soft_failures,
                ).actual(total_duration_seconds).is_greater_than(0, LdsReportConst.ZERO_DURATION_TEXT)

                for section_row in section_rows:
                    duration_diff = abs(section_row.modes_sum_seconds - (total_duration_seconds or 0))
                    StepCheck(
                        f"Сумма режимов СОУ для '{section_row.section_name}' "
                        f"совпадает с суммарным временем (+-{duration_tolerance} с)",
                        "modes_sum_seconds",
                        soft_failures,
                    ).actual(duration_diff).is_less_than(
                        duration_tolerance + 1,
                        f"погрешность {duration_tolerance} с",
                    )

        with allure.step("Подготовка данных шапки отчёта для проверки"):
            title_info = parsed_report.title_info
            report_title_lower = title_info.raw_title.lower()
            lds_report_name_part_lower = LdsReportConst.LDS_STATUS_REPORT_NAME_PART.lower()
            column_headers = parsed_report.column_headers
            period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
                report_state.period_start_naive,
                report_state.period_end_naive,
            )
            header_period_start = title_info.period_start
            header_period_end = title_info.period_end

        with allure.step("Проверка двойной шапки отчёта о режиме СОУ"):
            with SoftAssertions() as soft_failures:
                StepCheck(
                    f"В шапке отчёта присутствует '{LdsReportConst.LDS_STATUS_REPORT_NAME_PART}'",
                    "report_title",
                    soft_failures,
                ).contains(report_title_lower, lds_report_name_part_lower)
                StepCheck(
                    "Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
                    "period_start",
                    soft_failures,
                ).actual(header_period_start).is_between(period_start_lo, period_start_hi)
                StepCheck(
                    "Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
                    "period_end",
                    soft_failures,
                ).actual(header_period_end).is_between(period_end_lo, period_end_hi)
                StepCheck(
                    "Названия колонок в шапке отчёта",
                    "column_headers",
                    soft_failures,
                ).actual(column_headers).expected(LdsReportConst.EXPECTED_COLUMN_HEADERS).equal_to()

        with allure.step("Проверка имени файла отчёта о режиме СОУ"):
            with SoftAssertions() as soft_failures:
                StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
                    has_xlsx_extension
                ).expected(True).equal_to()
                StepCheck(
                    f"Имя файла содержит '{LdsReportConst.LDS_STATUS_REPORT_NAME_PART}'",
                    "file_name",
                    soft_failures,
                ).contains(report_file_name_lower, lds_report_name_part_lower)
                StepCheck(
                    f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
                    "file_name",
                    soft_failures,
                ).contains(report_file_name_lower, report_state.tu_description_lower)
                StepCheck(
                    "Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
                    "period_start_in_file_name",
                    soft_failures,
                ).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
                StepCheck(
                    "Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
                    "period_end_in_file_name",
                    soft_failures,
                ).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)

    except Exception:
        with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
            if report_state.temp_file_path and report_state.report_file_name:
                report_utils.attach_report_file_to_allure(
                    report_state.temp_file_path, report_state.report_file_name
                )
        raise

    with allure.step("Проверка пуш-нотификации о готовности отчёта"):
        notification = report_state.notification
        notification_reply_status = notification.replyStatus if notification else None
        notification_reply_content = notification.replyContent if notification else None
        notification_export_status = (
            notification_reply_content.exportStatus if notification_reply_content else None
        )
        notification_error_message = (
            (notification_reply_content.errorMessage or "") if notification_reply_content else ""
        )
        with SoftAssertions() as soft_failures:
            StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
                notification
            ).is_not_none()
            StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
                notification_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()
            StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
                notification_reply_content
            ).is_not_none()
            StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
                notification_export_status
            ).expected(ExportStatus.DONE).equal_to()
            StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
                notification_error_message
            ).is_empty()