Загрузка данных



async def mode_mt_in_journal(ws_client, cfg: SmokeSuiteConfig, imitator_start_time, test_data: CaseData):
    """
    Проверка записей журнала о режиме мт
    """
    exp_mode_part_message, exp_reason_part_message = test_data.expected_result

    with allure.step(
        "Подключение по ws, получение и обработка сообщения типа: MessagesInfoContent" "Фильтр MessageType. ЭФ Журнал"
    ):
        request_body = t_utils.create_journal_req_body(
            pagination=Pagination(limit=TestConst.ALL_CONTROLLED_SITES),
            filtering=Filtering(messageTypes=int(MessageType.PUMPING_STATUS), objects=FilteringObjects(tuId=cfg.tu_id)),
        )
        payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
        parsed_payload = parser.parse_journal_msg(payload)
        messages_info = parsed_payload.replyContent.messagesInfo

        control_points_list = []
        for msg in messages_info:
            control_points_list.append(msg.controlPoint)
        unique_control_points = len(set(control_points_list))
        all_control_points_list = set()
        duplicate_control_points = set()
        for control_point in control_points_list:
            if control_point in all_control_points_list:
                duplicate_control_points.add(control_point)
            else:
                all_control_points_list.add(control_point)

    with allure.step("Фильтрация сообщений по времени"):
        end_time = datetime.now()

        filter_start_msk = t_utils.localize_as_moscow(imitator_start_time)
        filter_end_msk = t_utils.localize_as_moscow(end_time)

        time_filtered = [
            msg
            for msg in messages_info
            if filter_start_msk <= t_utils.ensure_moscow_timezone(msg.time) <= filter_end_msk
        ]
        time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)

    with allure.step("Создание и наполнение списков сообщений с фильтром по 'event'"):
        containers = defaultdict(list)
        for msg in time_filtered:
            event = msg.event
            if event in (
                TestConst.JOURNAL_MESSAGE_EVENT_STATIONARY,
                TestConst.JOURNAL_MESSAGE_EVENT_NOT_STATIONARY,
                TestConst.JOURNAL_MESSAGE_EVENT_STOP,
            ):
                containers[event].append(msg)
            else:
                containers["another"].append(msg)

        stati_list = list(containers[TestConst.JOURNAL_MESSAGE_EVENT_STATIONARY])
        not_stati_list = list(containers[TestConst.JOURNAL_MESSAGE_EVENT_NOT_STATIONARY])
        stop_list = list(containers[TestConst.JOURNAL_MESSAGE_EVENT_STOP])
        another_list = list(containers["another"])
        longest_list = max([stati_list, not_stati_list, stop_list, another_list], key=len)

        if longest_list:
            first_message = next(iter(longest_list))
            mode_part, reason_part = t_utils.parse_event(first_message.event)
            priority_message = first_message.priority
        else:
            priority_message = None
            mode_part, reason_part = None, None

    allure.attach(
        f"Всего получено сообщений: {len(messages_info)}\n"
        f"Получено сообщений с типовой причиной для стационарного режима перекачки:"
        f" {len(stati_list)}\n"
        f"Получено сообщений с типовой причиной для нестационарного режима перекачки:"
        f"{len(not_stati_list)}\n"
        f"Получено сообщений c типовой причиной для остановленного режима перекачки"
        f":{len(stop_list)}",
        name="Результат: количество сообщений с типовыми причинами",
        attachment_type=allure.attachment_type.TEXT,
    )

    with SoftAssertions() as soft_failures:
        StepCheck(
            "Проверка результата фильтрации сообщений о режиме МТ",
            "Общее кол-во сообщений о режиме МТ",
            soft_failures,
        ).actual(len(messages_info)).expected(TestConst.ALL_CONTROLLED_SITES).equal_to()
        StepCheck(
            "Проверка уникальности наименований участков КП-КП",
            "Кол-во уникальных участков",
            soft_failures,
        ).actual(unique_control_points).is_between(TestConst.MIN_COUNT_CONTROLLED_SITES, TestConst.ALL_CONTROLLED_SITES)
        StepCheck(
            "Проверка режима МТ на ЛЧ в наибольшей области связности.",
            "Режим МТ",
            soft_failures,
        ).actual(
            mode_part
        ).expected(exp_mode_part_message).equal_to()
        StepCheck(
            "Проверка причины режима МТ на ЛЧ в наибольшей области связности.", "Причина режима МТ", soft_failures
        ).contains(reason_part, exp_reason_part_message)
        StepCheck("Проверка значимости сообщения", "Важность", soft_failures).actual(priority_message).expected(
            MessagePriority.MEDIUM
        ).equal_to()
    if exp_reason_part_message not in reason_part:
        allure.attach(
            f"Список сообщений с отличной причиной режима: {another_list}",
            name=f"Причина режима отлична от ожидаемой - {exp_reason_part_message} ",
            attachment_type=allure.attachment_type.TEXT,
        )
    if unique_control_points != TestConst.ALL_CONTROLLED_SITES:
        allure.attach(
            f"Список дублей наименований участков КП-КП{list(duplicate_control_points)}",
            name="Выявленные дубли",
            attachment_type=allure.attachment_type.TEXT,
        )



def parse_event(event_value: str) -> tuple[str | None, str | None]:
    """
    Разделяет строку события на имя и причину, вложенную с скобки
    """
    if not event_value:
        return None, None

    # Поиск открывающей скобки
    open_idx = event_value.find('(')
    if open_idx == -1:
        return event_value.strip(), None

    # Поиск закрывающей скобки ПОСЛЕ открывающей
    close_idx = event_value.find(')', open_idx + 1)
    if close_idx == -1:
        return event_value.strip(), None

    mode_part = event_value[:open_idx].strip()
    reason_part = event_value[open_idx + 1 : close_idx].strip()  # noqa: E203
    return mode_part, reason_part