Загрузка данных


сцен

"""
Сценарии тестов - функции-обёртки без pytest маркеров.

Каждая функция содержит логику одного теста.
Pytest маркеры и allure декораторы применяются в тестовых файлах.
"""

from datetime import datetime, timedelta

import allure
import pytest

from constants.enums import (
    Direction,
    ExportedDataType,
    ExportStatus,
    MessageType,
    RejectionCriteria,
    RejectionSensorTag,
    ReplyStatus,
)
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportRejectedReportConstants as RejectedReportConst
from constants.test_constants import ExportReportConstants as ReportConst
from models.get_messages_model import Filtering, FilteringObjects, Pagination
from test_config.models_for_tests import ExportRejectedReportState, IsRejectedConfig, RejectionTestCase
from utils.helpers import rejection_report_xlsx_utils as rejection_report_utils
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers import ws_test_utils as t_utils
from utils.helpers.asserts import SoftAssertions, StepCheck
from utils.helpers.lds_status_report_xlsx_utils import format_duration_seconds
from utils.helpers.ws_message_parser import ws_message_parser as parser


# ===== Сценарии отбраковки сигналов =====
async def rejection_input_signals(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase):
    """
    Проверка отбраковки сигнала по подписке SubscribeInputSignalsRequest.
    Проверяет isRejected=True для указанного датчика.
    """
    sensor = rejection_case.sensor
    with allure.step(
        f"Подключение по ws, получение данных InputSignalsContent для датчика {sensor.description} (id={sensor.id})"
    ):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "InputSignalsContent",
            "SubscribeInputSignalsRequest",
            {
                'signalIds': [sensor.id],
                'tuId': cfg.tu_id,
                'additionalProperties': None,
            },
        )
        parsed_payload = parser.parse_input_signals_info_msg(payload)
        sensor_data = parsed_payload.replyContent.inputSignals
        target_signal = t_utils.find_object_by_field(sensor_data, "id", sensor.id)

    with SoftAssertions() as soft_failures:
        StepCheck(
            f"Проверка отбраковки датчика {sensor.description} (id={sensor.id})", "isRejected", soft_failures
        ).actual(target_signal.isRejected).expected(True).equal_to()

        if rejection_case.expected_criteria_names:
            raw_criteria = (
                target_signal.rejection.get(TestConst.CRITERIA_NAMES_FIELD)
                if isinstance(target_signal.rejection, dict)
                else None
            )
            criteria = RejectionCriteria(raw_criteria) if raw_criteria is not None else None
            StepCheck(
                f"Проверка rejection.criteriaNames для {sensor.description} (id={sensor.id})",
                TestConst.CRITERIA_NAMES_FIELD,
                soft_failures,
            ).actual(criteria).expected(rejection_case.expected_criteria_names).equal_to()


async def rejection_journal(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase, imitator_start_time):
    """
    Проверка наличия записи об отбраковке в журнале по GetMessagesRequest.
    """
    sensor = rejection_case.sensor
    with allure.step("Подготовка запроса и ожидаемого диапазона времени"):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_REJECT_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(
                messageTypes=int(MessageType.REJECTION),
                objects=FilteringObjects(tuId=cfg.tu_id),
            ),
        )
        range_start, range_end = t_utils.get_rejection_time_window(
            imitator_start_time=imitator_start_time,
            start_seconds=rejection_case.time_range_start_s,
            reserve_seconds=TestConst.SEC_PER_MIN,
        )

    with allure.step("Получение сообщений журнала с фильтром messageTypes=REJECTION"):
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

    with allure.step("Проверка наличия сообщений в журнале"):
        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

    with allure.step(
        f"Подготовка сообщений к проверке по диапазону слоя данных "
        f"({rejection_case.time_range_start_s - TestConst.SEC_PER_MIN}-"
        f"{rejection_case.time_range_end_s + TestConst.SEC_PER_MIN} с от старта имитатора)"
    ):
        time_filtered, target_msg = t_utils.find_rejection_journal_message(
            messages_info=messages_info,
            tag=sensor.description,
            range_start=range_start,
            range_end=range_end,
            technological_section=cfg.tu_name,
            expected_event=rejection_case.expected_event,
        )

        allure.attach(
            f"Всего получено сообщений: {len(messages_info)}\n"
            f"Диапазон фильтрации: {range_start} - {range_end}\n"
            f"После фильтрации по tag='{sensor.description}' и времени: {len(time_filtered)}\n"
            f"Найдено ли сообщение с technologicalSection='{cfg.tu_name}' и событием {rejection_case.expected_event}: "
            f"{'True' if target_msg else 'False'}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(
        f"Проверка: найдено ли сообщение с tag='{sensor.description}' (id={sensor.id}) "
        f"в диапазоне {range_start}-{range_end} с"
    ):
        if target_msg is None:
            pytest.fail(
                f"Сообщение с tag='{sensor.description}' (id={sensor.id}) "
                f"и technologicalSection='{cfg.tu_name}' не найдено в диапазоне "
                f"{range_start} - {range_end} "
                f"(всего сообщений: {len(messages_info)}, после фильтрации: {len(time_filtered)})"
            )

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(target_msg.mainPipeline).expected(
            cfg.main_pipeline
        ).equal_to()

        StepCheck("Проверка messageType", "messageType", soft_failures).actual(target_msg.messageType).expected(
            TestConst.JOURNAL_MESSAGE_TYPE_REJECTION
        ).equal_to()

        StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
            target_msg.technologicalSection
        ).is_not_none()

        StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
            target_msg.technologicalObject
        ).is_not_none()

        StepCheck(f"Проверка tag для {sensor.description} (id={sensor.id})", "tag", soft_failures).actual(
            target_msg.tag
        ).expected(sensor.description).equal_to()

        if rejection_case.expected_signal_name:
            StepCheck("Проверка signalName", "signalName", soft_failures).actual(target_msg.signalName).expected(
                rejection_case.expected_signal_name
            ).equal_to()

        if rejection_case.expected_event:
            StepCheck("Проверка event", "event", soft_failures).actual(
                (target_msg.event.rstrip() or "").strip()
            ).expected(rejection_case.expected_event).equal_to()


async def rejection_main_page(ws_client, cfg: IsRejectedConfig):
    """
    Проверка numberOfRejectedSignals > 0 по подписке subscribeMainPageSignalsInfoRequest.
    """
    with allure.step("Подключение по ws, получение и обработка сообщения типа: MainPageSignalsInfoContent"):
        payload = await t_utils.connect_and_subscribe_msg(
            ws_client,
            "MainPageSignalsInfoContent",
            "subscribeMainPageSignalsInfoRequest",
            {'tuIds': [cfg.tu_id], 'additionalProperties': None},
        )
        parsed_payload = parser.parse_main_page_signals_msg(payload)

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка id полученного ТУ", "tu_id", soft_failures).actual(
            parsed_payload.replyContent.tuId
        ).expected(cfg.tu_id).equal_to()

        StepCheck(
            f"Проверка numberOfRejectedSignals > 0 для ТУ {cfg.tu_name}",
            "numberOfRejectedSignals",
            soft_failures,
        ).actual(parsed_payload.replyContent.signalsInfo.numberOfRejectedSignals).is_greater_than(0)


async def rejection_scheme_signals_state(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase):
    """
    Проверка отбраковки сигнала по подписке SubscribeSchemeSignalsStateRequest.
    Проверяет isRejected, isMasked, isImitated и rejection.criteriaNames.
    Логирование больших ответов подавляется suppress_recv_logging.
    """
    sensor = rejection_case.sensor
    ws_client.suppress_recv_logging = True
    parser.suppress_recv_logging = True
    try:
        with allure.step(
            f"Подключение по ws, получение данных SchemeSignalsStateContent "
            f"для датчика {sensor.description} (id={sensor.id})"
        ):
            payload = await t_utils.connect_and_subscribe_msg(
                ws_client,
                "SchemeSignalsStateContent",
                "SubscribeSchemeSignalsStateRequest",
                {'tuId': cfg.tu_id},
            )
            parsed_payload = parser.parse_scheme_signals_state_msg(payload)
            signals = parsed_payload.replyContent.signalsStates

            target_signal = next(
                (signal for signal in signals if signal.id == sensor.id),
                None,
            )

            allure.attach(
                f"Всего сигналов получено: {len(signals)}\n"
                f"Поиск сигнала с id={sensor.id} ({sensor.description}): "
                f"{'Найден' if target_signal else 'Не найден'}",
                name="Результат поиска сигнала в SchemeSignalsState",
                attachment_type=allure.attachment_type.TEXT,
            )

            if target_signal is not None:
                allure.attach(
                    str(target_signal),
                    name=f"Тестируемый фрагмент ответа с бэка: сигнал id={sensor.id} ({sensor.description})",
                    attachment_type=allure.attachment_type.TEXT,
                )
    finally:
        ws_client.suppress_recv_logging = False
        parser.suppress_recv_logging = False

    with allure.step(f"Проверка: найден ли сигнал с id={sensor.id} ({sensor.description})"):
        if target_signal is None:
            pytest.fail(
                f"Сигнал с id={sensor.id} ({sensor.description}) " f"не найден среди {len(signals)} полученных сигналов"
            )

    with SoftAssertions() as soft_failures:
        StepCheck(f"Проверка isRejected для {sensor.description} (id={sensor.id})", "isRejected", soft_failures).actual(
            target_signal.isRejected
        ).expected(True).equal_to()

        StepCheck(f"Проверка isMasked для {sensor.description} (id={sensor.id})", "isMasked", soft_failures).actual(
            target_signal.isMasked
        ).expected(False).equal_to()

        StepCheck(f"Проверка isImitated для {sensor.description} (id={sensor.id})", "isImitated", soft_failures).actual(
            target_signal.isImitated
        ).expected(False).equal_to()

        if rejection_case.expected_criteria_names and target_signal.rejection is not None:
            raw_criteria = (
                target_signal.rejection.get(TestConst.CRITERIA_NAMES_FIELD)
                if isinstance(target_signal.rejection, dict)
                else None
            )
            criteria = RejectionCriteria(raw_criteria) if raw_criteria is not None else None
            StepCheck(
                f"Проверка rejection.criteriaNames для {sensor.description} (id={sensor.id})",
                TestConst.CRITERIA_NAMES_FIELD,
                soft_failures,
            ).actual(criteria).expected(rejection_case.expected_criteria_names).equal_to()


async def export_rejection_report(ws_client, cfg: IsRejectedConfig, imitator_start_time: datetime):
    """
    Сценарий формирования общего xlsx-отчёта об отбракованных входных данных.

    Этапы:
    1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
    2. ExportReportsCommandRequest с периодом от старта имитатора до offset теста.
    3. Ожидание ReportDataExportedNotification.
    4. Лонг-поллинг GetExportedDataListRequest до появления отчёта в списке.
    5. DownloadExportedDataRequest и получение fileChunk.
    6. Проверка формата, имени файла, двойной шапки и строк по RejectionTestCase.
    """
    report_state = ExportRejectedReportState()

    with allure.step("Подготовка параметров сценария формирования отчёта об отбракованных входных данных"):
        # offset теста задаёт конец периода отчёта - после всех отдельных проверок отбраковок
        report_state.expected_report_test = cfg.rejection_report_test
        StepCheck("В конфигурации задан rejection_report_test", "rejection_report_test").actual(
            report_state.expected_report_test
        ).is_not_none()

        report_state.expected_period_start = t_utils.localize_as_moscow(imitator_start_time)
        report_state.expected_period_end = t_utils.localize_as_moscow(
            imitator_start_time + timedelta(minutes=report_state.expected_report_test.offset)
        )
        report_state.expected_period_start_naive = report_utils.normalize_report_period_naive(
            report_state.expected_period_start
        )
        report_state.expected_period_end_naive = report_utils.normalize_report_period_naive(
            report_state.expected_period_end
        )
        report_state.expected_tu_description_lower = cfg.technological_unit.description.lower()
        report_state.expected_file_name = report_utils.build_export_report_file_name(
            cfg.technological_unit.description,
            report_state.expected_period_start,
            report_state.expected_period_end,
            RejectedReportConst.REJECTED_REPORT_NAME_PART,
            " ",
        )
        time_offset_hours = t_utils.report_time_offset_hours()
        StepCheck(
            f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
            "time_offset_hours",
        ).actual(time_offset_hours).is_not_none()
        report_state.actual_time_offset_hours = time_offset_hours

        allure.attach(
            f"period.start={report_state.expected_period_start}\n"
            f"period.end={report_state.expected_period_end}\n"
            f"offset_minutes={report_state.expected_report_test.offset}",
            name="Фильтр периода отчёта об отбраковках",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(
        f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"
    ):
        # без подписки не придёт уведомление о готовности отчёта
        await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])

    with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
        request_payload = {
            "tuId": cfg.tu_id,
            "exportedDataTypes": [ExportedDataType.REJECTED_REPORT.value],
            "timeOffset": report_state.actual_time_offset_hours,
            "period": {
                "start": t_utils.datetime_to_msgpack_timestamp(report_state.expected_period_start),
                "end": t_utils.datetime_to_msgpack_timestamp(report_state.expected_period_end),
                "additionalProperties": {},
            },
        }
        await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)

    with allure.step(
        f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
    ):
        report_state.actual_notification = await t_utils.poll_for_report_export_notification(
            ws_client=ws_client,
            parser=parser,
            total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

    with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
        report_state.actual_report_item = await t_utils.poll_for_exported_file(
            ws_client=ws_client,
            parser=parser,
            list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
            expected_data_type=ExportedDataType.REJECTED_REPORT,
            name_substring=RejectedReportConst.REJECTED_REPORT_NAME_PART,
            tu_name_substring=cfg.technological_unit.description,
            period_start=report_state.expected_period_start,
            period_end=report_state.expected_period_end,
            total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
            poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
        )

        with allure.step("Подготовка данных найденного отчёта в списке"):
            report_item = report_state.actual_report_item
            if report_item is not None:
                allure.attach(
                    f"id={report_item.id}, name={report_item.name}, "
                    f"exportedDataType={report_item.exportedDataType}, "
                    f"start={t_utils.format_datetime_moscow(report_item.start)}, "
                    f"end={t_utils.format_datetime_moscow(report_item.end)}",
                    name="Найденный отчёт в списке",
                    attachment_type=allure.attachment_type.TEXT,
                )

        with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
            StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
                report_state.actual_report_item
            ).is_not_none()

    with allure.step(
        f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} "
        f"по id={report_state.actual_report_item.id}"
    ):
        download_request = {
            "exportedDataId": report_state.actual_report_item.id,
            "exportedDataType": ExportedDataType.REJECTED_REPORT.to_download_name(),
            "additionalProperties": None,
            "timeOffset": report_state.actual_time_offset_hours,
        }
        download_purpose = (
            f"скачивание xlsx-отчёта об отбракованных входных данных "
            f"(exportedDataId={report_state.actual_report_item.id}) "
            f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest"
        )
        await t_utils.connect_stream(
            ws_client,
            ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            download_request,
            purpose=download_purpose,
        )
        report_state.actual_download_invocation_id = ws_client.invocation_id

    with allure.step("Этап 6. Получение fileChunk - скачивание отчёта об отбракованных входных данных"):
        report_state.actual_download_reply = await t_utils.receive_download_exported_data_reply(
            ws_client=ws_client,
            parser=parser,
            invocation_id=report_state.actual_download_invocation_id,
            request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
            total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
            purpose=download_purpose,
        )

        with allure.step("Извлечение данных ответа на скачивание"):
            download_reply = report_state.actual_download_reply
            download_reply_status = download_reply.replyStatus
            has_download_reply_content = download_reply.replyContent is not None
            report_state.actual_file_bytes = (
                download_reply.replyContent.fileChunk if has_download_reply_content else None
            )
            is_xlsx_signature = (
                report_utils.is_xlsx_file_bytes(report_state.actual_file_bytes)
                if report_state.actual_file_bytes
                else False
            )

        with allure.step("Проверка ответа на скачивание и формата xlsx"):
            StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
                ReplyStatus.OK.value
            ).equal_to()
            StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
                has_download_reply_content
            ).expected(True).equal_to()
            StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.actual_file_bytes).is_not_empty()
            StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
                True
            ).equal_to()

        with allure.step("Подготовка данных для проверки имени файла отчёта"):
            # имя файла берём из ответа бэка (список сформированных отчётов), не из шапки xlsx
            actual_file_name = report_state.actual_report_item.name if report_state.actual_report_item else ""
            actual_file_name_lower = actual_file_name.lower()
            file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(
                actual_file_name,
                RejectedReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
            )
            period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
                report_state.expected_period_start_naive,
                report_state.expected_period_end_naive,
            )
            has_xlsx_extension = report_utils.is_xlsx_extension(actual_file_name)
            rejected_report_file_name_part_lower = RejectedReportConst.REJECTED_REPORT_NAME_PART.lower()
            rejected_report_file_name_part_alt_lower = RejectedReportConst.REJECTED_REPORT_NAME_PART_ALT.lower()

    try:
        with allure.step("Этап 7. Сохранение и разбор xlsx-отчёта об отбракованных входных данных"):
            report_state.actual_temp_file_path = report_utils.save_report_bytes_to_temp_file(
                report_state.actual_file_bytes,
                prefix="rejected_report_",
            )
            StepCheck("Временный xlsx файл создан", "temp_file_path").actual(
                report_state.actual_temp_file_path
            ).is_not_none()
            report_state.actual_worksheet = report_utils.load_report_worksheet(report_state.actual_temp_file_path)
            report_state.actual_title_info = report_utils.parse_report_title(
                report_utils.get_report_title_cell(report_state.actual_worksheet),
                RejectedReportConst.REPORT_HEADER_PERIOD_PATTERN,
            )
            allure.attach(
                f"Шапка (raw): {report_state.actual_title_info.raw_title}\n"
                f"period_start: {report_state.actual_title_info.period_start}\n"
                f"period_end: {report_state.actual_title_info.period_end}",
                name="Первая строка шапки xlsx-отчёта",
                attachment_type=allure.attachment_type.TEXT,
            )

        with allure.step("Этап 8. Извлечение строк данных из отчёта"):
            report_state.actual_data_rows = rejection_report_utils.iter_rejection_report_rows(
                report_state.actual_worksheet
            )
            report_state.actual_monitored_tag_rows = rejection_report_utils.filter_rows_by_monitored_tags(
                report_state.actual_data_rows,
                RejectionSensorTag,
            )
            allure.attach(
                rejection_report_utils.format_rejection_rows_for_allure(report_state.actual_monitored_tag_rows),
                name="Строки отчёта по тегам RejectionSensorTag",
                attachment_type=allure.attachment_type.TEXT,
            )

            with allure.step("Подготовка данных шапки xlsx для проверки"):
                title_info = report_state.actual_title_info
                column_headers = report_utils.get_report_column_headers(
                    report_state.actual_worksheet,
                    headers_row=RejectedReportConst.REPORT_COLUMN_HEADERS_ROW,
                )
                header_period_start = title_info.period_start
                header_period_end = title_info.period_end
                header_contains_expected_title = rejection_report_utils.report_header_contains_expected_title(
                    title_info.raw_title
                )

        with allure.step("Проверка первой строки шапки xlsx-отчёта"):
            StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.actual_worksheet).is_not_none()
            with SoftAssertions() as soft_failures:
                StepCheck(
                    "Первая строка шапки содержит заголовок отчёта об отбракованных входных данных",
                    "report_title",
                    soft_failures,
                ).actual(header_contains_expected_title).expected(True).equal_to()
                StepCheck(
                    "Время начала периода в первой строке шапки совпадает с фильтром запроса (+-1 мин)",
                    "period_start",
                    soft_failures,
                ).actual(header_period_start).is_between(period_start_lo, period_start_hi)
                StepCheck(
                    "Время конца периода в первой строке шапки совпадает с фильтром запроса (+-1 мин)",
                    "period_end",
                    soft_failures,
                ).actual(header_period_end).is_between(period_end_lo, period_end_hi)
                StepCheck(
                    "Названия колонок во второй строке шапки отчёта",
                    "column_headers",
                    soft_failures,
                ).actual(column_headers).expected(RejectedReportConst.EXPECTED_COLUMN_HEADERS).equal_to()

        with allure.step("Проверка строк отчёта по каждому RejectionTestCase из конфигурации набора"):
            # Проверяем только установление отбраковок: каждый кейс из набора должен быть в таблице
            with SoftAssertions() as soft_failures:
                for rejection_case in cfg.rejection_cases:
                    report_event = rejection_report_utils.expected_event_to_report_event(rejection_case.expected_event)
                    window_start, window_end = rejection_report_utils.get_case_time_window(
                        imitator_start_time,
                        rejection_case,
                    )
                    raw_case_rows = rejection_report_utils.filter_rows_for_rejection_case(
                        report_state.actual_monitored_tag_rows,
                        rejection_case,
                        imitator_start_time,
                    )
                    merged_case_rows = rejection_report_utils.merge_rejection_rows(raw_case_rows)
                    primary_row = rejection_report_utils.select_primary_merged_row(merged_case_rows)
                    expected_signal_suffix = rejection_report_utils.report_signal_suffix_by_expected_name(
                        rejection_case.expected_signal_name,
                    )
                    case_label = (
                        f"события '{report_event}' - {rejection_case.sensor.description}"
                    )

                    StepCheck(
                        f"В отчёте найдена отбраковка для {case_label} в интервале времени "
                        f"{window_start} - {window_end}",
                        RejectedReportConst.COL_TAG,
                        soft_failures,
                    ).actual(primary_row).is_not_none()

                    if primary_row is None:
                        continue

                    merge_key = rejection_report_utils.build_merge_key(primary_row)
                    expected_duration_seconds = rejection_report_utils.sum_duration_for_merge_key(
                        raw_case_rows,
                        merge_key,
                    )
                    pipe_section, actual_signal_suffix = rejection_report_utils.split_object_column(
                        primary_row.object_value
                    )

                    StepCheck(
                        f"Для {case_label} время получения отбраковки в допустимом диапазоне",
                        RejectedReportConst.COL_DATETIME,
                        soft_failures,
                    ).actual(primary_row.datetime_value).is_between(window_start, window_end)

                    StepCheck(
                        f"Для {case_label} суммарная продолжительность отбраковки "
                        f"({format_duration_seconds(expected_duration_seconds)}) совпадает",
                        RejectedReportConst.COL_DURATION,
                        soft_failures,
                    ).actual(primary_row.duration_seconds).expected(expected_duration_seconds).equal_to()

                    StepCheck(
                        f"Для {case_label} участок трубопровода в колонке '{RejectedReportConst.COL_OBJECT}' не пустой",
                        RejectedReportConst.COL_OBJECT,
                        soft_failures,
                    ).actual(pipe_section).is_not_empty()

                    StepCheck(
                        f"Для {case_label} после последней точки в колонке '{RejectedReportConst.COL_OBJECT}' "
                        f"указан сигнал '{expected_signal_suffix}'",
                        RejectedReportConst.COL_OBJECT,
                        soft_failures,
                    ).actual(actual_signal_suffix).expected(expected_signal_suffix).equal_to()

    except Exception:
        with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
            attachment_name = (
                report_state.actual_report_item.name
                if report_state.actual_report_item and report_state.actual_report_item.name
                else report_state.expected_file_name
            )
            if report_state.actual_temp_file_path and attachment_name:
                report_utils.attach_report_file_to_allure(report_state.actual_temp_file_path, attachment_name)
        raise

    with allure.step("Проверка имени файла отчёта из списка сформированных файлов"):
        with SoftAssertions() as soft_failures:
            StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
                has_xlsx_extension
            ).expected(True).equal_to()
            StepCheck(
                "Имя файла из ответа бэка содержит название отчёта об отбракованных входных данных",
                "file_name",
                soft_failures,
            ).actual(
                rejected_report_file_name_part_lower in actual_file_name_lower
                or rejected_report_file_name_part_alt_lower in actual_file_name_lower
            ).expected(True).equal_to()
            StepCheck(
                f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
                "file_name",
                soft_failures,
            ).contains(actual_file_name_lower, report_state.expected_tu_description_lower)
            StepCheck(
                "Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
                "period_start_in_file_name",
                soft_failures,
            ).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
            StepCheck(
                "Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
                "period_end_in_file_name",
                soft_failures,
            ).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)

    with allure.step("Проверка пуш-нотификации о готовности отчёта"):
        notification = report_state.actual_notification
        notification_reply_status = notification.replyStatus if notification else None
        notification_reply_content = notification.replyContent if notification else None
        notification_export_status = notification_reply_content.exportStatus if notification_reply_content else None
        notification_error_message = (
            (notification_reply_content.errorMessage or "") if notification_reply_content else ""
        )
        with SoftAssertions() as soft_failures:
            StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
                report_state.actual_notification
            ).is_not_none()
            StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
                notification_reply_status
            ).expected(ReplyStatus.OK.value).equal_to()
            StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
                notification_reply_content
            ).is_not_none()
            StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
                notification_export_status
            ).expected(ExportStatus.DONE).equal_to()
            StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
                notification_error_message
            ).is_empty()














конфтест
            or test_name in IS_REJECTED_LEVEL_TEST_MAPPING
            or test_name in IS_REJECTED_SUITE_LEVEL_MAPPING



    # Проверяем, есть ли параметр rejection_case для тестов отбраковки
    if 'rejection_case' in params and test_name in IS_REJECTED_LEVEL_TEST_MAPPING:
        rejection_case = params['rejection_case']
        attr_name = IS_REJECTED_LEVEL_TEST_MAPPING[test_name]
        return getattr(rejection_case, attr_name, None)

    # Suite-level тесты отбраковки (без rejection_case)
    if 'config' in params and 'rejection_case' not in params and test_name in IS_REJECTED_SUITE_LEVEL_MAPPING:
        suite_config = params['config']
        attr_name = IS_REJECTED_SUITE_LEVEL_MAPPING[test_name]
        return getattr(suite_config, attr_name, None)





# Тесты уровня отбраковки (маркеры из RejectionTestCase - параметр rejection_case)
IS_REJECTED_LEVEL_TEST_MAPPING = {
    'test_rejection_input_signals': 'rejection_input_signals_test',
    'test_rejection_journal': 'rejection_journal_test',
    'test_rejection_main_page': 'rejection_main_page_test',
    'test_rejection_scheme_signals_state': 'rejection_scheme_signals_state_test',
}

# Suite-level тесты отбраковки (маркеры из IsRejectedConfig - параметр config)
IS_REJECTED_SUITE_LEVEL_MAPPING = {
    'test_rejection_report': 'rejection_report_test',
}

















test const 



class ExportRejectedReportConstants:
    """Константы для теста формирования xlsx-отчёта об отбракованных входных данных"""

    REJECTED_REPORT_NAME_PART: str = "Отчет об отбракованных входных данных"
    REJECTED_REPORT_NAME_PART_ALT: str = "Отчёт об отбракованных входных данных"

    REPORT_TITLE_ROW: int = 1
    REPORT_COLUMN_HEADERS_ROW: int = 2
    REPORT_DATA_FIRST_ROW: int = 3

    COL_DATETIME: str = "Дата и время"
    COL_OBJECT: str = "Объект"
    COL_EVENT: str = "Событие"
    COL_VALUE: str = "Значение"
    COL_DURATION: str = "Продолжительность отбраковки"
    COL_TAG: str = "Тег сигнала"

    EXPECTED_COLUMN_HEADERS: list = [
        COL_DATETIME,
        COL_OBJECT,
        COL_EVENT,
        COL_VALUE,
        COL_DURATION,
        COL_TAG,
    ]

    REPORT_HEADER_PERIOD_PATTERN: str = (
        r'[Оо]тч[её]т об отбракованных входных данных с '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
        r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
    )
    REPORT_FILE_NAME_PERIOD_PATTERN: str = (
        r'^[Оо]тч[её]т об отбракованных входных данных (?P<tu>.+?) '
        r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r' - '
        r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
        r'\.xlsx$'
    )

    TIME_FILTER_TOLERANCE_SECONDS: int = 60

    # Суффикс сигнала в колонке «Объект» отчёта (после последней точки в строке)
    REPORT_SIGNAL_FLOW: str = "Расход"
    REPORT_SIGNAL_PRESSURE: str = "Давление"
    REPORT_SIGNAL_SUFFIX_BY_EXPECTED_NAME: dict = {
        BaseTN3Constants.JOURNAL_SIGNAL_FLOW: REPORT_SIGNAL_FLOW,
        BaseTN3Constants.JOURNAL_SIGNAL_PRESSURE: REPORT_SIGNAL_PRESSURE,
    }

    # Разбор колонки «Объект»: участок трубопровода и суффикс сигнала разделяются последней точкой
    OBJECT_SIGNAL_SEPARATOR: str = "."
    OBJECT_SIGNAL_RSPLIT_MAXSPLIT: int = 1

    REJECTED_REPORT_HEADER_TITLE_PART: str = "отчет об отбракованных входных данных с"
    REJECTED_REPORT_HEADER_TITLE_PART_ALT: str = "отчёт об отбракованных входных данных с"



















iдатасет
    main_pipeline=MAIN_PIPELINE,
    rejection_report_test=CaseMarkers(test_case_id="210", offset=70),























модели тестов


@dataclass
class ExportRejectedReportState:
    """
    Состояние сценария формирования xlsx-отчёта об отбракованных входных данных.
    Поля с префиксом expected_ - из конфигурации теста, actual_ - из ответов бэка и разбора xlsx.
    """

    # --- expected: конфигурация теста и расчётные ожидания ---
    expected_report_test: Optional[CaseMarkers] = None
    expected_period_start: Optional[datetime] = None
    expected_period_end: Optional[datetime] = None
    expected_period_start_naive: Optional[datetime] = None
    expected_period_end_naive: Optional[datetime] = None
    expected_tu_description_lower: str = ""
    expected_file_name: str = ""

    # --- actual: ответы бэка ---
    actual_time_offset_hours: Optional[int] = None
    actual_notification: Optional[ReportDataExportedNotification] = None
    actual_report_item: Optional[ExportedDataItem] = None
    actual_download_invocation_id: Optional[str] = None
    actual_download_reply: Optional[DownloadExportedDataReply] = None
    actual_file_bytes: Optional[bytes] = None

    # --- actual: разбор xlsx ---
    actual_temp_file_path: Optional[Path] = None
    actual_worksheet: Any = None
    actual_title_info: Optional[ReportTitleInfo] = None
    actual_data_rows: list[RejectionReportRow] = field(default_factory=list)
    actual_monitored_tag_rows: list[RejectionReportRow] = field(default_factory=list)






























test rej

# ===== ПАРАМЕТРЫ ДЛЯ ТЕСТОВ =====
REJECTION_PARAMS: List[Any] = _generate_rejection_params()


def _generate_suite_params() -> List[Any]:
    """Генерирует параметры для suite-level тестов отбраковки (один параметр на набор)."""
    params = []
    for config in ALL_IS_REJECTED_CONFIGS:
        params.append(
            pytest.param(
                config,
                id=config.suite_name,
                marks=_get_suite_markers(config),
            )
        )
    return params


SUITE_PARAMS: List[Any] = _generate_suite_params()








@pytest.mark.parametrize("config", SUITE_PARAMS)
class TestIsRejectedReport:
    """Suite-level тест общего отчёта по отбраковкам"""

    @pytest.mark.asyncio
    async def test_rejection_report(
        self,
        ws_client: WebSocketClient,
        config: IsRejectedConfig,
        imitator_start_time: datetime,
    ) -> None:
        """[ExportReports] Проверка общего отчёта об отбракованных входных данных"""
        tag = "ExportReports"
        title = (
            f"[{tag}] Проверка общего отчёта об отбракованных входных данных. "
            f"ЭФ: Выпадашка отчётов"
        )
        _apply_allure_markers(
            config.rejection_report_test,
            tag,
            title,
            (
                f"Проверка формирования и содержимого xlsx-отчёта об отбракованных входных данных "
                f"на наборе данных {config.suite_name},\n"
                f"на технологическом участке {config.technological_unit.description}\n"
                f"Период отчёта: от старта имитатора до старта + "
                f"{config.rejection_report_test.offset} мин.\n"
                "Этапы сценария:\n"
                "1) SubscribeReportsDataExportedRequest - подписка на пуш-нотификации\n"
                "2) ExportReportsCommandRequest - запрос формирования отчёта (тип RejectedReport)\n"
                "3) Ожидание ReportDataExportedNotification\n"
                "4) Лонг-поллинг GetExportedDataListRequest - поиск отчёта в списке\n"
                "5) DownloadExportedDataRequest (StreamInvocation) - скачивание по exportedDataId\n"
                "6) Проверка xlsx: шапка, колонки, строки по каждому RejectionTestCase из набора\n"
                "7) Проверка имени файла (.xlsx, название отчёта, ТУ, период +-1 мин)\n"
                "Во вложениях Allure xlsx прикладывается только при падении теста"
            ),
        )
        await scenarios.export_rejection_report(ws_client, config, imitator_start_time)

























utils\helpers\rejection_report_xlsx_utils.py
"""
Утилиты для разбора xlsx-отчёта об отбракованных входных данных.
"""

from __future__ import annotations

from dataclasses import replace
from datetime import datetime, timedelta
from typing import Iterable, List, Optional, Tuple

from constants.enums import RejectionSensorTag
from constants.test_constants import ExportRejectedReportConstants as RejectedReportConst
from test_config.models_for_tests import RejectionReportRow, RejectionTestCase
from utils.helpers import report_xlsx_utils as report_utils
from utils.helpers.lds_status_report_xlsx_utils import format_duration_seconds, parse_duration_seconds
from utils.helpers.ws_test_utils import localize_as_moscow

MergeKey = Tuple[Optional[datetime], str, str, str, str]


def expected_event_to_report_event(expected_event: str) -> str:
    """Преобразует формулировку события из журнала в формулировку отчёта."""
    return expected_event.replace("Отбраковка", "Отбракован", 1)


def report_signal_suffix_by_expected_name(expected_signal_name: str) -> str:
    """Возвращает суффикс сигнала в колонке 'Объект' отчёта по expected_signal_name из кейса."""
    return RejectedReportConst.REPORT_SIGNAL_SUFFIX_BY_EXPECTED_NAME.get(
        expected_signal_name,
        expected_signal_name,
    )


def split_object_column(object_value: str) -> tuple[str, str]:
    """
    Разбирает колонку «Объект»:
    - до последней точки - участок трубопровода (имя объекта);
    - после последней точки - название сигнала.
    """
    if not object_value:
        return "", ""
    if RejectedReportConst.OBJECT_SIGNAL_SEPARATOR not in object_value:
        return object_value.strip(), ""

    pipe_section, signal_suffix = object_value.rsplit(
        RejectedReportConst.OBJECT_SIGNAL_SEPARATOR,
        RejectedReportConst.OBJECT_SIGNAL_RSPLIT_MAXSPLIT,
    )
    return pipe_section.strip(), signal_suffix.strip()


def is_datetime_within_closed_interval(
    value: datetime,
    interval_start: datetime,
    interval_end: datetime,
) -> bool:
    """True, если value (в Europe/Moscow) попадает в закрытый интервал [interval_start, interval_end]."""
    localized_value = localize_as_moscow(value)
    return interval_start <= localized_value <= interval_end


def report_header_contains_expected_title(raw_title: str) -> bool:
    """Проверяет, что первая строка шапки xlsx содержит ожидаемый заголовок отчёта."""
    title_lower = raw_title.lower()
    return (
        RejectedReportConst.REJECTED_REPORT_HEADER_TITLE_PART in title_lower
        or RejectedReportConst.REJECTED_REPORT_HEADER_TITLE_PART_ALT in title_lower
    )


def build_merge_key(row: RejectionReportRow) -> MergeKey:
    """Ключ объединения строк с одинаковым содержимым, кроме длительности."""
    return (
        row.datetime_value,
        row.object_value,
        row.event_value,
        row.value_text,
        row.tag_value,
    )


def parse_rejection_report_row(row_index: int, cells: dict[str, str]) -> RejectionReportRow:
    """Собирает RejectionReportRow из словаря ячеек строки отчёта."""
    duration_seconds = parse_duration_seconds(cells.get(RejectedReportConst.COL_DURATION)) or 0
    return RejectionReportRow(
        row_index=row_index,
        datetime_value=report_utils.parse_report_datetime(cells.get(RejectedReportConst.COL_DATETIME)),
        object_value=(cells.get(RejectedReportConst.COL_OBJECT) or "").strip(),
        event_value=(cells.get(RejectedReportConst.COL_EVENT) or "").strip(),
        value_text=(cells.get(RejectedReportConst.COL_VALUE) or "").strip(),
        duration_seconds=duration_seconds,
        tag_value=(cells.get(RejectedReportConst.COL_TAG) or "").strip(),
    )


def iter_rejection_report_rows(worksheet) -> List[RejectionReportRow]:
    """Возвращает строки данных отчёта, начиная с третьей строки листа."""
    headers = report_utils.get_report_column_headers(
        worksheet,
        headers_row=RejectedReportConst.REPORT_COLUMN_HEADERS_ROW,
    )
    if not headers:
        return []

    rows: List[RejectionReportRow] = []
    for excel_row_index, row_values in enumerate(
        worksheet.iter_rows(
            min_row=RejectedReportConst.REPORT_DATA_FIRST_ROW,
            max_col=len(headers),
            values_only=True,
        ),
        start=RejectedReportConst.REPORT_DATA_FIRST_ROW,
    ):
        if not any(cell is not None and str(cell).strip() for cell in row_values):
            continue
        cells = report_utils.build_column_cells(row_values, headers)
        rows.append(parse_rejection_report_row(excel_row_index, cells))
    return rows


def filter_rows_by_monitored_tags(
    rows: Iterable[RejectionReportRow],
    monitored_tags: Iterable[RejectionSensorTag],
) -> List[RejectionReportRow]:
    """Оставляет только строки с тегами из RejectionSensorTag."""
    allowed_tags = {tag.description for tag in monitored_tags}
    return [row for row in rows if row.tag_value in allowed_tags]


def get_case_time_window(
    imitator_start_time: datetime,
    rejection_case: RejectionTestCase,
    tolerance_seconds: int = RejectedReportConst.TIME_FILTER_TOLERANCE_SECONDS,
) -> tuple[datetime, datetime]:
    """Возвращает окно фильтрации строк отчёта для конкретного RejectionTestCase."""
    imitator_msk = localize_as_moscow(imitator_start_time)
    window_start = imitator_msk + timedelta(seconds=rejection_case.time_range_start_s - tolerance_seconds)
    window_end = imitator_msk + timedelta(seconds=rejection_case.time_range_end_s + tolerance_seconds)
    return window_start, window_end


def filter_rows_for_rejection_case(
    rows: Iterable[RejectionReportRow],
    rejection_case: RejectionTestCase,
    imitator_start_time: datetime,
) -> List[RejectionReportRow]:
    """Фильтрует строки отчёта по тегу, событию и временному окну RejectionTestCase."""
    report_event = expected_event_to_report_event(rejection_case.expected_event)
    window_start, window_end = get_case_time_window(imitator_start_time, rejection_case)
    filtered_rows: List[RejectionReportRow] = []

    for row in rows:
        if row.tag_value != rejection_case.sensor.description:
            continue
        if row.event_value != report_event:
            continue
        if row.datetime_value is None:
            continue
        if not is_datetime_within_closed_interval(row.datetime_value, window_start, window_end):
            continue
        filtered_rows.append(row)

    return filtered_rows


def merge_rejection_rows(rows: Iterable[RejectionReportRow]) -> List[RejectionReportRow]:
    """
    Объединяет полностью идентичные строки, суммируя длительность отбраковки.
    """
    merged_rows: dict[MergeKey, RejectionReportRow] = {}
    for row in rows:
        merge_key = build_merge_key(row)
        if merge_key not in merged_rows:
            merged_rows[merge_key] = replace(row)
            continue
        merged_rows[merge_key].duration_seconds += row.duration_seconds

    return list(merged_rows.values())


def select_primary_merged_row(merged_rows: List[RejectionReportRow]) -> Optional[RejectionReportRow]:
    """Выбирает основную строку отбраковки - с максимальной суммарной длительностью."""
    if not merged_rows:
        return None
    return max(merged_rows, key=lambda row: row.duration_seconds)


def sum_duration_for_merge_key(rows: Iterable[RejectionReportRow], merge_key: MergeKey) -> int:
    """Суммирует длительности всех сырых строк с одинаковым ключом объединения."""
    return sum(row.duration_seconds for row in rows if build_merge_key(row) == merge_key)


def format_rejection_rows_for_allure(rows: Iterable[RejectionReportRow]) -> str:
    """Форматирует строки отчёта для вложения в Allure."""
    lines = []
    for row in rows:
        duration_text = format_duration_seconds(row.duration_seconds)
        lines.append(
            f"row#{row.row_index}: {row.datetime_value} | {row.object_value} | "
            f"{row.event_value} | {row.value_text} | {duration_text} | {row.tag_value}"
        )
    return "\n".join(lines) if lines else "(нет строк)"