Загрузка данных
async def rejection_journal(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase, imitator_start_time):
"""
Проверка наличия записи об отбраковке в журнале по GetMessagesRequest.
"""
sensor = rejection_case.sensor
with allure.step("Подготовка запроса и ожидаемого диапазона времени"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_REJECT_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(
messageTypes=int(MessageType.REJECTION),
objects=FilteringObjects(tuId=cfg.tu_id),
),
)
range_start, range_end = t_utils.get_rejection_time_window(
imitator_start_time=imitator_start_time,
start_seconds=rejection_case.time_range_start_s,
end_seconds=rejection_case.time_range_end_s,
margin_seconds=TestConst.SEC_PER_MIN,
)
with allure.step("Получение сообщений журнала с фильтром messageTypes=REJECTION"):
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
with allure.step("Проверка наличия сообщений в журнале"):
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step(
f"Подготовка сообщений к проверке по диапазону слоя данных "
f"({rejection_case.time_range_start_s - TestConst.SEC_PER_MIN}-"
f"{rejection_case.time_range_end_s + TestConst.SEC_PER_MIN} с от старта имитатора)"
):
time_filtered, target_msg = t_utils.find_rejection_journal_message(
messages_info=messages_info,
tag=sensor.description,
range_start=range_start,
range_end=range_end,
technological_section=cfg.tu_name,
expected_event=rejection_case.expected_event,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"Диапазон фильтрации: {range_start} - {range_end}\n"
f"После фильтрации по tag='{sensor.description}' и времени: {len(time_filtered)}\n"
f"Найдено ли сообщение с technologicalSection='{cfg.tu_name}' и событием {rejection_case.expected_event}: "
f"{'True' if target_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с tag='{sensor.description}' (id={sensor.id}) "
f"в диапазоне {range_start}-{range_end} с"
):
if target_msg is None:
pytest.fail(
f"Сообщение с tag='{sensor.description}' (id={sensor.id}) "
f"и technologicalSection='{cfg.tu_name}' не найдено в диапазоне "
f"{range_start} - {range_end} "
f"(всего сообщений: {len(messages_info)}, после фильтрации: {len(time_filtered)})"
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(target_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(target_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_REJECTION
).equal_to()
StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
target_msg.technologicalSection
).is_not_none()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
target_msg.technologicalObject
).is_not_none()
StepCheck(f"Проверка tag для {sensor.description} (id={sensor.id})", "tag", soft_failures).actual(
target_msg.tag
).expected(sensor.description).equal_to()
if rejection_case.expected_signal_name:
StepCheck("Проверка signalName", "signalName", soft_failures).actual(target_msg.signalName).expected(
rejection_case.expected_signal_name
).equal_to()
if rejection_case.expected_event:
StepCheck("Проверка event", "event", soft_failures).actual(
(target_msg.event.rstrip() or "").strip()
).expected(rejection_case.expected_event).equal_to()
return input_datetime.astimezone(moscow_tz)
def get_rejection_time_window(
imitator_start_time: datetime, start_seconds: int | float, end_seconds: int | float, margin_seconds: int | float = 0
) -> tuple[datetime, datetime]:
"""
Возвращает временное окно для проверки сообщения об отбраковке.
"""
if not imitator_start_time:
fail("Пришло пустое значение imitator_start_time")
imitator_msk = localize_as_moscow(imitator_start_time)
range_start = imitator_msk + timedelta(seconds=start_seconds - margin_seconds)
range_end = imitator_msk + timedelta(seconds=end_seconds + margin_seconds)
return range_start, range_end
def find_rejection_journal_message(
messages_info: List[ObjectType],
tag: str,
range_start: datetime,
range_end: datetime,
technological_section: str,
expected_event: str,
) -> tuple[list[ObjectType], ObjectType | None]:
"""
Фильтрует сообщения журнала по tag и временному диапазону,
затем ищет целевое сообщение по technologicalSection и event.
"""
if not messages_info:
fail("Список сообщений журнала пуст")
time_filtered = [
msg for msg in messages_info if msg.tag == tag and range_start <= ensure_moscow_timezone(msg.time) <= range_end
]
time_filtered.sort(key=lambda msg: ensure_moscow_timezone(msg.time), reverse=True)
target_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == technological_section and msg.event.rstrip() == expected_event
),
None,
)
return time_filtered, target_msg