Загрузка данных
async def rejection_journal(ws_client, cfg: IsRejectedConfig, rejection_case: RejectionTestCase, imitator_start_time):
"""
Проверка наличия записи об отбраковке в журнале по GetMessagesRequest.
"""
sensor = rejection_case.sensor
with allure.step("Запрос сообщений журнала с фильтром messageTypes=REJECTION"):
request_body = t_utils.create_journal_req_body(
pagination=Pagination(limit=TestConst.JOURNAL_PAGINATION_REJECT_LIMIT, direction=Direction.FIRST.value),
filtering=Filtering(
messageTypes=int(MessageType.REJECTION),
objects=FilteringObjects(tuId=cfg.tu_id),
),
)
payload = await t_utils.connect_and_get_msg(ws_client, "GetMessagesRequest", request_body)
parsed_payload = parser.parse_journal_msg(payload)
messages_info = parsed_payload.replyContent.messagesInfo
StepCheck("Проверка наличия сообщений в журнале", "messagesInfo").actual(messages_info).is_not_empty()
with allure.step(
f"Фильтрация сообщений по диапазону слоя данных "
f"({rejection_case.time_range_start_s - TestConst.SEC_PER_MIN}-"
f"{rejection_case.time_range_end_s + TestConst.SEC_PER_MIN} с от старта имитатора)"
):
imitator_msk = t_utils.localize_as_moscow(imitator_start_time)
range_start = imitator_msk + timedelta(seconds=rejection_case.time_range_start_s - TestConst.SEC_PER_MIN)
range_end = imitator_msk + timedelta(seconds=rejection_case.time_range_end_s + TestConst.SEC_PER_MIN)
time_filtered = [
msg
for msg in messages_info
if msg.tag == sensor.description and range_start <= t_utils.ensure_moscow_timezone(msg.time) <= range_end
]
time_filtered.sort(key=lambda msg: t_utils.ensure_moscow_timezone(msg.time), reverse=True)
target_msg = next(
(
msg
for msg in time_filtered
if msg.technologicalSection == cfg.tu_name and msg.event.rstrip() == rejection_case.expected_event
),
None,
)
allure.attach(
f"Всего получено сообщений: {len(messages_info)}\n"
f"Диапазон фильтрации: {range_start} - {range_end}\n"
f"После фильтрации по tag='{sensor.description}' и времени: {len(time_filtered)}\n"
f"Найдено ли сообщение с technologicalSection='{cfg.tu_name}' и событием {rejection_case.expected_event}: "
f"{'True' if target_msg else 'False'}",
name="Результат фильтрации сообщений журнала",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Проверка: найдено ли сообщение с tag='{sensor.description}' (id={sensor.id}) "
f"в диапазоне {range_start}-{range_end} с"
):
if target_msg is None:
pytest.fail(
f"Сообщение с tag='{sensor.description}' (id={sensor.id}) "
f"и technologicalSection='{cfg.tu_name}' не найдено в диапазоне "
f"{range_start} - {range_end} "
f"(всего сообщений: {len(messages_info)}, после фильтрации: {len(time_filtered)})"
)
with SoftAssertions() as soft_failures:
StepCheck("Проверка mainPipeline", "mainPipeline", soft_failures).actual(target_msg.mainPipeline).expected(
cfg.main_pipeline
).equal_to()
StepCheck("Проверка messageType", "messageType", soft_failures).actual(target_msg.messageType).expected(
TestConst.JOURNAL_MESSAGE_TYPE_REJECTION
).equal_to()
StepCheck("Проверка technologicalSection не пустой", "technologicalSection", soft_failures).actual(
target_msg.technologicalSection
).is_not_none()
StepCheck("Проверка technologicalObject не пустой", "technologicalObject", soft_failures).actual(
target_msg.technologicalObject
).is_not_none()
StepCheck(f"Проверка tag для {sensor.description} (id={sensor.id})", "tag", soft_failures).actual(
target_msg.tag
).expected(sensor.description).equal_to()
if rejection_case.expected_signal_name:
StepCheck("Проверка signalName", "signalName", soft_failures).actual(target_msg.signalName).expected(
rejection_case.expected_signal_name
).equal_to()
if rejection_case.expected_event:
StepCheck("Проверка event", "event", soft_failures).actual(
(target_msg.event.rstrip() or "").strip()
).expected(rejection_case.expected_event).equal_to()