Загрузка данных
asserts
def does_not_contain(self, objects_list: List[ObjectType], forbidden_object: ObjectType) -> str:
message_parts = [
f"Ожидаемый результат: Список элементов: {objects_list}",
f"Не содержит элемента: {forbidden_object}",
]
return self._build_message(message_parts)
def contains(self, container: Any, expected_item: Any) -> str:
if isinstance(container, str):
message_parts = [
f"Ожидаемый результат: '{container}' содержит подстроку '{expected_item}'",
f"Фактический результат: {self.field_name} = {self._format_val(container)}",
]
else:
message_parts = [
f"Ожидаемый результат: список {self._format_val(container)} содержит элемент {expected_item}",
f"Фактический результат: {self.field_name} = {self._format_val(container)}",
]
return self._build_message(message_parts)
тоже внизу
except AssertionError as exc:
self._handle_assertion(exc)
def contains(self, container: Any, expected_item: Any) -> None:
"""Проверка, что container (список или строка) содержит expected_item."""
msg = self._msg_builder.contains(container, expected_item)
try:
with allure.step(msg):
assert_that(container).described_as(msg).contains(expected_item)
except AssertionError as exc:
self._handle_assertion(exc)
тест конст
DOWNLOAD_EXPORTED_DATA_REQUEST: str = "DownloadExportedDataRequest"
# Допустимая погрешность при сравнении границ периода отчёта
REPORT_PERIOD_TOLERANCE_MINUTES: int = 1
# Формат даты/времени в имени скачиваемого xlsx-файла
REPORT_FILE_NAME_DATETIME_FORMAT: str = "%d.%m.%Y %H_%M_%S"
REPORT_HEADER_PERIOD_PATTERN: str = (
r'Отчет об утечках с (?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
r' по (?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}:\d{2})'
)
REPORT_FILE_NAME_PERIOD_PATTERN: str = (
r'^Отчет об утечках (?P<tu>.+?) '
r'(?P<period_start>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r' - '
r'(?P<period_end>\d{2}\.\d{2}\.\d{4} \d{2}_\d{2}_\d{2})'
r'\.xlsx$'
)
сценарий
async def export_leaks_report(ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime):
"""
Сценарий формирования отчёта об утечках.
Этапы:
1. Подписка SubscribeReportsDataExportedRequest на пуш-нотификации.
2. Отправка ExportReportsCommandRequest с фильтром по времени
(start = старт имитатора, end = старт имитатора + offset теста).
3. Ожидание пуш-нотификации ReportDataExportedNotification о готовности отчёта.
4. Лонг-поллинг GetExportedDataListRequest до появления нашего отчёта в списке.
5. Отправка DownloadExportedDataRequest по id отчёта.
6. Получение fileChunk по ответу на скачивание.
7-10. Проверки: формат файла, имя, шапка xlsx, строка утечки.
Скачанный файл удаляется по завершению, прикладывается к Allure только при падении теста.
"""
report_state = ExportLeaksReportState()
with allure.step("Подготовка параметров сценария формирования отчёта об утечках"):
report_state.report_test = leak.export_leaks_report_test
StepCheck("В конфигурации задан export_leaks_report_test", "export_leaks_report_test").actual(
report_state.report_test
).is_not_none()
report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.report_test.offset)
)
report_state.period_start_naive = report_utils.normalize_report_period_naive(report_state.period_start)
report_state.period_end_naive = report_utils.normalize_report_period_naive(report_state.period_end)
report_state.expected_mt_mode = ReportConst.STATIONARY_STATUS_TO_REPORT_TEXT.get(
leak.expected_stationary_status
)
report_state.tu_description_lower = cfg.technological_unit.description.lower()
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
report_state.time_offset_hours = time_offset_hours
StepCheck(
"Задан ожидаемый текст режима МТ для отчёта",
"expected_mt_mode",
).actual(report_state.expected_mt_mode).is_not_none()
allure.attach(
f"period.start={report_state.period_start}\n"
f"period.end={report_state.period_end}\n"
f"offset_minutes={report_state.report_test.offset}",
name="Фильтр периода отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(
f"Этап 1. Подписка на пуш-нотификации ({ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST})"
):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта ({ReportConst.EXPORT_REPORTS_COMMAND_REQUEST})"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LEAKS_REPORT.value],
"timeOffset": report_state.time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Извлечение полей пуш-нотификации"):
notification = report_state.notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = (
notification_reply_content.exportStatus if notification_reply_content else None
)
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification").actual(
report_state.notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus").actual(notification_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent").actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus").actual(notification_export_status).expected(
ExportStatus.DONE
).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage").actual(notification_error_message).is_empty()
with allure.step(
f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"
):
report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.LEAKS_REPORT,
name_substring=ReportConst.LEAKS_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=report_state.period_start,
period_end=report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = report_state.report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={report_item.start}, end={report_item.end}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
report_state.report_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
report_state.period_start,
report_state.period_end,
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.report_item
).is_not_none()
download_request = {
"exportedDataId": report_state.report_item.id,
"exportedDataType": ExportedDataType.LEAKS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": report_state.time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта об утечках (exportedDataId={report_state.report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest - выпадашка уведомлений на UI"
)
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
):
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта по утечкам"):
report_state.download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=report_state.download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = report_state.download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
report_state.file_bytes = (
download_reply.replyContent.fileChunk if has_download_reply_content else None
)
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(report_state.file_bytes) if report_state.file_bytes else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
report_file_name = report_state.report_file_name
report_file_name_lower = report_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(report_file_name)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.period_start_naive,
report_state.period_end_naive,
)
has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
with allure.step("Проверка имени файла отчёта"):
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name").actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(f"Имя файла содержит '{ReportConst.LEAKS_REPORT_NAME_PART}'", "file_name").contains(
report_file_name_lower, leaks_report_name_part_lower
)
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
"file_name",
).contains(report_file_name_lower, report_state.tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
with allure.step("Этап 8. Сохранение, обработка и проверка отчета по утечкам"):
report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(report_state.file_bytes)
try:
with allure.step("Проверка: временный xlsx файл создан"):
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(report_state.temp_file_path).is_not_none()
with allure.step("Этап 9. Открытие xlsx и чтение шапки"):
report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
report_state.title_info = report_utils.parse_report_title(
report_utils.get_report_title_cell(report_state.worksheet)
)
allure.attach(
f"Шапка отчёта (raw): {report_state.title_info.raw_title}\n"
f"period_start: {report_state.title_info.period_start}\n"
f"period_end: {report_state.title_info.period_end}",
name="Шапка отчёта (1-я строка)",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных шапки отчёта для проверки"):
title_info = report_state.title_info
report_title_lower = title_info.raw_title.lower()
leaks_report_name_part_lower = ReportConst.LEAKS_REPORT_NAME_PART.lower()
column_headers = report_utils.get_report_column_headers(report_state.worksheet)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.period_start_naive,
report_state.period_end_naive,
)
header_period_start = title_info.period_start
header_period_end = title_info.period_end
with allure.step("Проверка двойной шапки отчёта"):
StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{ReportConst.LEAKS_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).contains(report_title_lower, leaks_report_name_part_lower)
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(column_headers).expected(ReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Этап 10. Извлечение строк данных из отчёта"):
report_state.data_rows = report_utils.iter_report_data_rows(report_state.worksheet)
report_state.target_row = report_utils.find_row_with_object(
report_state.data_rows, cfg.technological_unit.description
)
allure.attach(
"\n".join(f"row#{row.row_index}: {row.cells}" for row in report_state.data_rows),
name="Все строки данных отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных строки утечки для проверки"):
target_row = report_state.target_row
leak_datetime_value = target_row.datetime_value if target_row else None
object_value_lower = target_row.object_value.lower() if target_row else ""
lds_status_value = target_row.lds_status.strip() if target_row else ""
masking_info_lower = target_row.masking_info.lower() if target_row else ""
leak_coordinate_meters = target_row.coordinate_meters if target_row else None
leak_volume_value = target_row.leak_volume if target_row else None
mt_mode_lower = target_row.mt_mode.lower() if target_row else ""
expected_mt_mode_lower = report_state.expected_mt_mode.lower()
masking_not_masked_lower = ReportConst.MASKING_NOT_MASKED_TEXT.lower()
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.period_start_naive,
report_state.period_end_naive,
)
with allure.step("Проверка содержимого строки утечки"):
StepCheck("В отчёте есть хотя бы одна строка с данными", "data_rows").actual(
report_state.data_rows
).is_not_empty()
StepCheck(
f"Строка с объектом, содержащим '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
).actual(report_state.target_row).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Время утечки в диапазоне [старт имитатора, старт + offset теста] (+-1 мин)",
ReportConst.COL_DATETIME,
soft_failures,
).actual(leak_datetime_value).is_between(period_start_lo, period_end_hi)
StepCheck(
f"Колонка '{ReportConst.COL_OBJECT}' содержит '{cfg.technological_unit.description}'",
ReportConst.COL_OBJECT,
soft_failures,
).contains(object_value_lower, report_state.tu_description_lower)
StepCheck(
f"Колонка '{ReportConst.COL_LDS_STATUS}'",
ReportConst.COL_LDS_STATUS,
soft_failures,
).actual(lds_status_value).expected(ReportConst.LDS_STATUS_OK_TEXT).equal_to()
StepCheck(
f"Колонка '{ReportConst.COL_MASK_INFO}' содержит '{ReportConst.MASKING_NOT_MASKED_TEXT}'",
ReportConst.COL_MASK_INFO,
soft_failures,
).contains(masking_info_lower, masking_not_masked_lower)
StepCheck(
f"Колонка '{ReportConst.COL_COORDINATE}' (с погрешностью {cfg.allowed_distance_diff_meters} м)",
ReportConst.COL_COORDINATE,
soft_failures,
).actual(leak_coordinate_meters).is_close_to(
leak.coordinate_meters,
cfg.allowed_distance_diff_meters,
f"значение допустимой погрешности координаты {cfg.allowed_distance_diff_meters}",
)
StepCheck(
f"Колонка '{ReportConst.COL_LEAK_VOLUME}' не пустая",
ReportConst.COL_LEAK_VOLUME,
soft_failures,
).actual(leak_volume_value).is_not_none()
StepCheck(
f"Колонка '{ReportConst.COL_MT_MODE}' содержит '{report_state.expected_mt_mode}'",
ReportConst.COL_MT_MODE,
soft_failures,
).contains(mt_mode_lower, expected_mt_mode_lower)
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if report_state.temp_file_path and report_state.report_file_name:
report_utils.attach_report_file_to_allure(report_state.temp_file_path, report_state.report_file_name)
raise
finally:
with allure.step("Удаление временного xlsx файла"):
temp_path = report_state.temp_file_path
if temp_path is not None:
try:
temp_path.unlink(missing_ok=True)
except OSError:
pass
report xlxs
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow
@dataclass
class ReportTitleInfo:
"""Разобранная шапка отчёта"""
raw_title: str
period_start: Optional[datetime] = None
period_end: Optional[datetime] = None
@dataclass
class LeakReportRow:
"""Разобранная строка данных по утечке"""
row_index: int
cells: Dict[str, str] = field(default_factory=dict)
@property
def datetime_value(self) -> Optional[datetime]:
return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))
@property
def object_value(self) -> str:
return self.cells.get(ReportConst.COL_OBJECT, "")
@property
def lds_status(self) -> str:
return self.cells.get(ReportConst.COL_LDS_STATUS, "")
@property
def masking_info(self) -> str:
return self.cells.get(ReportConst.COL_MASK_INFO, "")
@property
def coordinate_meters(self) -> Optional[float]:
coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
if coordinate_km is None:
return None
return coordinate_km * TestConst.KM_TO_METERS
@property
def leak_volume(self) -> Optional[float]:
return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))
@property
def mt_mode(self) -> str:
return self.cells.get(ReportConst.COL_MT_MODE, "")
def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
"""Проверяет zip-сигнатуру xlsx (PK\\x03\\x04)."""
if not file_bytes:
return False
return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)
def is_xlsx_extension(file_name: str) -> bool:
"""Проверяет расширение .xlsx без учёта регистра."""
return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)
def parse_report_datetime(value: object) -> Optional[datetime]:
"""Парсит дату/время из ячейки отчёта."""
if value is None:
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
except ValueError:
return None
return None
def _stringify_cell(value: object) -> str:
if value is None:
return ""
if isinstance(value, datetime):
return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
return str(value)
def normalize_report_period_naive(value: datetime) -> datetime:
"""Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)
def report_period_comparison_bounds(
period_start: datetime,
period_end: datetime,
tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
"""
Границы периода с допуском +-tolerance_minutes для start и end отдельно.
Возвращает (start_lower, start_upper, end_lower, end_upper).
"""
start = normalize_report_period_naive(period_start)
end = normalize_report_period_naive(period_end)
delta = timedelta(minutes=tolerance_minutes)
return start - delta, start + delta, end - delta, end + delta
def build_export_report_file_name(
tu_description: str,
period_start: datetime,
period_end: datetime,
) -> str:
"""
Имя xlsx при скачивании: «Отчет об утечках Тихорецк-Новороссийск-3 DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx».
"""
start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
return (
f"{ReportConst.LEAKS_REPORT_NAME_PART} {tu_description} {start_text} - {end_text}"
f"{ReportConst.XLSX_EXTENSION}"
)
def parse_period_from_export_file_name(file_name: str) -> tuple[Optional[datetime], Optional[datetime]]:
"""Извлекает границы периода из имени скачанного xlsx-файла."""
match = re.search(ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN, file_name.strip(), re.IGNORECASE)
if match is None:
return None, None
parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")
def _parse_part(value: str) -> Optional[datetime]:
try:
return datetime.strptime(value.replace("_", ":"), parse_format)
except ValueError:
return None
return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))
def parse_report_title(title_raw: object) -> ReportTitleInfo:
"""
Парсит шапку отчёта с именованными группами period_start / period_end.
"""
title_str = _stringify_cell(title_raw)
match = re.search(ReportConst.REPORT_HEADER_PERIOD_PATTERN, title_str)
if match is None:
return ReportTitleInfo(raw_title=title_str)
return ReportTitleInfo(
raw_title=title_str,
period_start=parse_report_datetime(match.group("period_start")),
period_end=parse_report_datetime(match.group("period_end")),
)
def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
"""Открывает первый лист xlsx. При ошибке возвращает None."""
if not file_path.exists():
return None
try:
workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
except Exception:
return None
sheet_names = workbook.sheetnames
if not sheet_names:
return None
return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]
def get_report_title_cell(worksheet: Worksheet) -> object:
return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value
def get_report_column_headers(worksheet: Worksheet) -> List[str]:
"""Возвращает непустые заголовки колонок из строки REPORT_COLUMN_HEADERS_ROW."""
headers: List[str] = []
column_index = 1
while True:
cell_value = worksheet.cell(row=ReportConst.REPORT_COLUMN_HEADERS_ROW, column=column_index).value
if cell_value is None or not str(cell_value).strip():
break
headers.append(_stringify_cell(cell_value).strip())
column_index += 1
return headers
def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
"""Собирает словарь {название колонки: значение ячейки} по строке данных."""
return {
header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
for column_index, header in enumerate(headers)
}
def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
"""
Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
Пустые строки пропускаются.
"""
headers = get_report_column_headers(worksheet)
if not headers:
return []
rows: List[LeakReportRow] = []
for excel_row_index, row_values in enumerate(
worksheet.iter_rows(
min_row=ReportConst.REPORT_DATA_FIRST_ROW,
max_col=len(headers),
values_only=True,
),
start=ReportConst.REPORT_DATA_FIRST_ROW,
):
if not any(cell is not None and str(cell).strip() for cell in row_values):
continue
rows.append(
LeakReportRow(
row_index=excel_row_index,
cells=build_column_cells(row_values, headers),
)
)
return rows
def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
"""Ищет первую строку, где колонка «Объект» содержит подстроку без учёта регистра"""
substring_lower = object_substring.lower()
for row in rows:
if substring_lower in row.object_value.lower():
return row
return None
def save_report_bytes_to_temp_file(file_bytes: bytes) -> Optional[Path]:
"""Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
import tempfile
try:
with tempfile.NamedTemporaryFile(
suffix=ReportConst.XLSX_EXTENSION,
prefix="leaks_report_",
delete=False,
) as temp_file:
temp_file.write(file_bytes)
return Path(temp_file.name)
except OSError:
return None
def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
"""Прикладывает xlsx к Allure при падении теста"""
try:
xlsx_type = allure.attachment_type.XLSX
except AttributeError:
xlsx_type = None
if xlsx_type is not None:
allure.attach.file(
str(file_path),
name=file_name,
attachment_type=xlsx_type,
extension="xlsx",
)
return
try:
with file_path.open("rb") as raw_file:
allure.attach(raw_file.read(), name=file_name, extension="xlsx")
except OSError:
pass
вс тест утил
from constants.architecture_constants import WebSocketClientConstants as WS_Const
return input_datetime.astimezone(ZoneInfo(TestConst.ZONE_INFO))
def report_time_offset_hours(tz_name: str = TestConst.ZONE_INFO) -> Optional[int]:
"""
Смещение часового пояса (часы от UTC) для поля timeOffset в запросах отчётов.
"""
now = datetime.now(ZoneInfo(tz_name))
utc_offset = now.utcoffset()
if utc_offset is None:
return None
return int(utc_offset.total_seconds() // TestConst.SECONDS_PER_HOUR)
def localize_as_moscow(input_datetime: datetime) -> None | datetime:
fail(f"Не удалось отправить сообщение типа: {ws_invoke_type} c параметрами {ws_invoke_params}. Ошибка: {error}")
async def connect_stream(
ws_client: WebSocketClient,
ws_invoke_type: str,
ws_invoke_params: Any = None,
purpose: str = "streaming-вызов WS",
) -> None:
"""
Streaming-вызов (StreamInvocation)
"""
try:
with allure.step(f"Streaming-вызов {ws_invoke_type} c параметрами {ws_invoke_params}"):
await ws_client.invoke_stream(ws_invoke_type, ws_invoke_params)
except (asyncio.TimeoutError, ConnectionError, ConnectionResetError, OSError) as error:
fail(
f"Не удалось выполнить {purpose} ({ws_invoke_type}, StreamInvocation). "
f"Параметры запроса: {ws_invoke_params}. Ошибка соединения: {error}"
)
def _stream_completion_error(msg: Any, invocation_id: str) -> Optional[str]:
"""Текст ошибки из ответа SignalR Completion для данного invocation_id, если есть."""
if not isinstance(msg, list):
return None
if msg[0] != WS_Const.COMPLETION_MESSAGE_TYPE:
return None
if not is_desired_invocation_id(msg, invocation_id):
return None
if len(msg) <= WS_Const.COMPLETION_ERROR_MESSAGE_INDEX:
return None
error_text = msg[WS_Const.COMPLETION_ERROR_MESSAGE_INDEX]
if isinstance(error_text, str):
return error_text
return None
async def receive_download_exported_data_reply(
ws_client: WebSocketClient,
parser,
invocation_id: str,
request_name: str,
total_wait_seconds: float,
poll_interval_seconds: float = 0.5,
purpose: str = "скачивании xlsx-отчёта после выбора файла в списке сформированных отчётов",
) -> Any:
"""
Ожидает StreamItem с fileChunk после streaming DownloadExportedDataRequest.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
collected_messages: List[Any] = []
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
for msg in batch:
stream_error = _stream_completion_error(msg, invocation_id)
if stream_error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, RuntimeError(stream_error))
fail(
f"При {purpose} бэк вернул Completion с ошибкой "
f"({request_name}, invocation_id={invocation_id}): {stream_error}"
)
for msg in batch:
if (
not isinstance(msg, list)
or msg[0] != WS_Const.STREAM_ITEM_MESSAGE_TYPE
or not is_desired_invocation_id(msg, invocation_id)
):
continue
if parser._find_reply_status_in_ws_msg(msg) is None:
continue
try:
return parser.parse_download_exported_data_msg(msg)
except Exception as error:
_attach_ws_reply_parse_failure(msg, invocation_id, request_name, error)
fail(
f"При {purpose} получен StreamItem ({request_name}, invocation_id={invocation_id}), "
f"но не удалось разобрать ответ с fileChunk: {error}"
)
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(collected_messages, total_wait_seconds, f"{request_name} (StreamItem)")
fail(
f"При {purpose} за {total_wait_seconds} с не получен StreamItem с fileChunk "
f"({request_name}, invocation_id={invocation_id}). Смотреть вложения received ws message"
)
async def connect_and_get_parsed_msg_by_tu_id(
async def poll_for_exported_file(
ws_client: WebSocketClient,
parser,
list_limit: int,
expected_data_type: Any,
name_substring: str,
tu_name_substring: str,
period_start: datetime,
period_end: datetime,
total_wait_seconds: float,
poll_interval_seconds: float,
period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
"""
Периодически шлёт GetExportedDataListRequest, забирает ответы из очереди
по invocation_id среди всех накопленных сообщений.
При таймауте или ошибке парсинга прикрепляет к Allure полученные ответы.
"""
deadline = asyncio.get_event_loop().time() + total_wait_seconds
last_items_count = -1
collected_messages: List[Any] = []
request_name = ReportConst.GET_EXPORTED_DATA_LIST_REQUEST
ws_client.suppress_recv_logging = True
parser.suppress_recv_logging = True
try:
while asyncio.get_event_loop().time() < deadline:
drained_before_request = _drain_recv_queue(ws_client)
collected_messages.extend(drained_before_request)
await connect(
ws_client,
request_name,
{"limit": list_limit},
)
invocation_id = ws_client.invocation_id
await asyncio.sleep(poll_interval_seconds)
batch = _drain_recv_queue(ws_client)
collected_messages.extend(batch)
list_reply_payload = _find_ws_reply_by_invocation_id(batch, invocation_id, parser)
if list_reply_payload is None:
continue
try:
parsed_payload = parser.parse_exported_data_list_msg(list_reply_payload)
except Exception as error:
_attach_ws_reply_parse_failure(list_reply_payload, invocation_id, request_name, error)
for msg in collected_messages:
allure.attach(
pprint.pformat(msg, width=120, sort_dicts=False),
name="received ws message",
attachment_type=allure.attachment_type.TEXT,
)
fail(f"Не удалось разобрать ответ на {request_name}: {error}")
items = []
if parsed_payload.replyContent is not None:
items = parsed_payload.replyContent.exportedData or []
if len(items) != last_items_count:
allure.attach(
"\n".join(
f"id={item.id}, name={item.name}, type={item.exportedDataType}, "
f"start={item.start}, end={item.end}"
for item in items
),
name=f"Список сформированных файлов (попытка, всего: {len(items)})",
attachment_type=allure.attachment_type.TEXT,
)
last_items_count = len(items)
match = find_matching_exported_item(
items=items,
expected_data_type=expected_data_type,
name_substring=name_substring,
tu_name_substring=tu_name_substring,
period_start=period_start,
period_end=period_end,
period_tolerance_minutes=period_tolerance_minutes,
)
if match is not None:
return match
finally:
collected_messages.extend(_drain_recv_queue(ws_client))
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
_attach_ws_poll_failure(
collected_messages,
total_wait_seconds,
request_name,
)
return None
def _normalize_report_period_datetime(value: datetime) -> datetime:
"""Приводит datetime периода отчёта к московскому времени без микросекунд."""
return localize_as_moscow(value).replace(microsecond=0)
def _exported_item_period_matches(
item_start: datetime,
item_end: datetime,
period_start: datetime,
period_end: datetime,
tolerance_minutes: int,
) -> bool:
"""Проверяет start/end элемента списка в пределах периода запроса +- tolerance_minutes."""
item_start_norm = _normalize_report_period_datetime(item_start)
item_end_norm = _normalize_report_period_datetime(item_end)
period_start_norm = _normalize_report_period_datetime(period_start)
period_end_norm = _normalize_report_period_datetime(period_end)
delta = timedelta(minutes=tolerance_minutes)
return (
(period_start_norm - delta) <= item_start_norm <= (period_start_norm + delta)
and (period_end_norm - delta) <= item_end_norm <= (period_end_norm + delta)
)
def find_matching_exported_item(
items: List[Any],
expected_data_type: Any,
name_substring: str,
tu_name_substring: str,
period_start: datetime,
period_end: datetime,
period_tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> Optional[Any]:
"""
Ищет элемент списка по типу, подстрокам в имени (отчёт + ТУ) и периоду start/end с допуском.
"""
name_substring_lower = name_substring.lower()
tu_name_lower = tu_name_substring.lower()
matched_items = []
for item in items:
if item.exportedDataType != expected_data_type:
continue
item_name_lower = (item.name or "").lower()
if name_substring_lower not in item_name_lower:
continue
if tu_name_lower not in item_name_lower:
continue
if item.start is None or item.end is None:
continue
if not _exported_item_period_matches(
item.start, item.end, period_start, period_end, period_tolerance_minutes
):
continue
matched_items.append(item)
if not matched_items:
return None
return max(matched_items, key=lambda exported_item: exported_item.id)