Загрузка данных


async def rejection_journal(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase, imitator_start_time):
    """
    Проверка наличия записи об отбраковке в журнале по GetMessagesRequest.
    """
    sensor = rejection_case.sensor
    with allure.step("Подготовка запроса и ожидаемого диапазона времени"):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_REJECT_LIMIT, direction=Direction.FIRST.value),
            filtering=Filtering(
                messageTypes=int(MessageType.REJECTION),
                objects=FilteringObjects(tuId=cfg.tu_id),
            ),
        )
        range_start, range_end = t_utils.get_rejection_time_window(
            imitator_start_time=imitator_start_time,
            start_seconds=rejection_case.time_range_start_s,
            end_seconds=rejection_case.time_range_end_s,
            margin_seconds=TestConst.SEC_PER_MIN,
        )

    with allure.step("Получение сообщений журнала с фильтром messageTypes=REJECTION"):
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

    with allure.step("Проверка наличия сообщений в журнале"):
        StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()

    with allure.step(
        f"Подготовка сообщений к проверке по диапазону слоя данных "
        f"({rejection_case.time_range_start_s - TestConst.SEC_PER_MIN}-"
        f"{rejection_case.time_range_end_s + TestConst.SEC_PER_MIN} с от старта имитатора)"
    ):
        time_filtered, target_msg = t_utils.find_rejection_journal_message(
            messages_info=messages_info,
            tag=sensor.description,
            range_start=range_start,
            range_end=range_end,
            technological_section=cfg.tu_name,
            expected_event=rejection_case.expected_event,
        )

        allure.attach(
            f"Всего получено сообщений: {len(messages_info)}\n"
            f"Диапазон фильтрации: {range_start} - {range_end}\n"
            f"После фильтрации по tag='{sensor.description}' и времени: {len(time_filtered)}\n"
            f"Найдено ли сообщение с technologicalSection='{cfg.tu_name}' и событием {rejection_case.expected_event}: "
            f"{'True' if target_msg else 'False'}",
            name="Результат фильтрации сообщений журнала",
            attachment_type=allure.attachment_type.TEXT,
        )

    with allure.step(
        f"Проверка: найдено ли сообщение с tag='{sensor.description}' (id={sensor.id}) "
        f"в диапазоне {range_start}-{range_end} с"
    ):
        if target_msg is None:
            pytest.fail(
                f"Сообщение с tag='{sensor.description}' (id={sensor.id}) "
                f"и technologicalSection='{cfg.tu_name}' не найдено в диапазоне "
                f"{range_start} - {range_end} "
                f"(всего сообщений: {len(messages_info)}, после фильтрации: {len(time_filtered)})"
            )

    with SoftAssertions() as soft_failures:
        StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(target_msg.mainPipeline).expected(
            cfg.main_pipeline
        ).equal_to()

        StepCheck("Проверка messageType", "messageType", soft_failures).actual(target_msg.messageType).expected(
            TestConst.JOURNAL_MESSAGE_TYPE_REJECTION
        ).equal_to()

        StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
            target_msg.technologicalSection
        ).is_not_none()

        StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
            target_msg.technologicalObject
        ).is_not_none()

        StepCheck(f"Проверка tag для {sensor.description} (id={sensor.id})", "tag", soft_failures).actual(
            target_msg.tag
        ).expected(sensor.description).equal_to()

        if rejection_case.expected_signal_name:
            StepCheck("Проверка signalName", "signalName", soft_failures).actual(target_msg.signalName).expected(
                rejection_case.expected_signal_name
            ).equal_to()

        if rejection_case.expected_event:
            StepCheck("Проверка event", "event", soft_failures).actual(
                (target_msg.event.rstrip() or "").strip()
            ).expected(rejection_case.expected_event).equal_to()





















    return input_datetime.astimezone(moscow_tz)


def get_rejection_time_window(
    imitator_start_time: datetime, start_seconds: int | float, end_seconds: int | float, margin_seconds: int | float = 0
) -> tuple[datetime, datetime]:
    """
    Возвращает временное окно для проверки сообщения об отбраковке.
    """
    if not imitator_start_time:
        fail("Пришло пустое значение imitator_start_time")

    imitator_msk = localize_as_moscow(imitator_start_time)
    range_start = imitator_msk + timedelta(seconds=start_seconds - margin_seconds)
    range_end = imitator_msk + timedelta(seconds=end_seconds + margin_seconds)
    return range_start, range_end


def find_rejection_journal_message(
    messages_info: List[ObjectType],
    tag: str,
    range_start: datetime,
    range_end: datetime,
    technological_section: str,
    expected_event: str,
) -> tuple[list[ObjectType], ObjectType | None]:
    """
    Фильтрует сообщения журнала по tag и временному диапазону,
    затем ищет целевое сообщение по technologicalSection и event.
    """
    if not messages_info:
        fail("Список сообщений журнала пуст")

    time_filtered = [
        msg for msg in messages_info if msg.tag == tag and range_start <= ensure_moscow_timezone(msg.time) <= range_end
    ]
    time_filtered.sort(key=lambda msg: ensure_moscow_timezone(msg.time), reverse=True)

    target_msg = next(
        (
            msg
            for msg in time_filtered
            if msg.technologicalSection == technological_section and msg.event.rstrip() == expected_event
        ),
        None,
    )
    return time_filtered, target_msg