Загрузка данных
"""
Утилиты для разбора xlsx-отчёта о режиме работы СОУ.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, time, timedelta
from typing import Dict, List, Optional, Tuple
from openpyxl.worksheet.worksheet import Worksheet
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportLdsStatusReportConstants as LdsReportConst
from utils.helpers.report_xlsx_utils import (
ReportTitleInfo,
_stringify_cell,
build_column_cells,
get_report_column_headers,
parse_report_title,
)
@dataclass
class LdsStatusReportSectionRow:
"""Строка участка с длительностями режимов СОУ."""
row_index: int
section_name: str
cells: Dict[str, str] = field(default_factory=dict)
@property
def mode_durations_seconds(self) -> Dict[str, int]:
return {
column_name: parse_duration_seconds(self.cells.get(column_name)) or 0
for column_name in LdsReportConst.MODE_DURATION_COLUMNS
}
@property
def modes_sum_seconds(self) -> int:
return sum(self.mode_durations_seconds.values())
@dataclass
class LdsStatusReportParsed:
"""Разобранный отчёт о режиме работы СОУ."""
title_info: ReportTitleInfo
column_headers: List[str]
section_rows: List[LdsStatusReportSectionRow]
total_duration_seconds: Optional[int] = None
total_duration_raw: str = ""
total_label_row_index: Optional[int] = None
def parse_duration_seconds(value: object) -> Optional[int]:
"""Парсит длительность из ячейки (H:MM:SS, MM:SS или time/timedelta из Excel)."""
if value is None:
return None
if isinstance(value, timedelta):
return int(value.total_seconds())
if isinstance(value, time):
return (
value.hour * TestConst.SECONDS_PER_HOUR
+ value.minute * TestConst.SEC_PER_MIN
+ value.second
)
if isinstance(value, datetime):
return (
value.hour * TestConst.SECONDS_PER_HOUR
+ value.minute * TestConst.SEC_PER_MIN
+ value.second
)
duration_text = _stringify_cell(value).strip()
if not duration_text:
return None
parts = duration_text.split(":")
try:
if len(parts) == LdsReportConst.DURATION_PARTS_COUNT_H_MM_SS:
hours, minutes, seconds = (int(part) for part in parts)
return (
hours * TestConst.SECONDS_PER_HOUR
+ minutes * TestConst.SEC_PER_MIN
+ seconds
)
if len(parts) == LdsReportConst.DURATION_PARTS_COUNT_MM_SS:
minutes, seconds = (int(part) for part in parts)
return minutes * TestConst.SEC_PER_MIN + seconds
except ValueError:
return None
return None
def is_duration_cell_filled(value: object) -> bool:
"""Ячейка с длительностью заполнена (допускается 0:00:00)."""
return parse_duration_seconds(value) is not None
def format_duration_seconds(total_seconds: int) -> str:
"""Форматирует длительность в секундах в строку H:MM:SS (минуты и секунды с ведущим нулём)."""
hours, remainder = divmod(total_seconds, TestConst.SECONDS_PER_HOUR)
minutes, seconds = divmod(remainder, TestConst.SEC_PER_MIN)
return f"{hours}:{minutes:02d}:{seconds:02d}"
def _find_total_work_duration(worksheet: Worksheet) -> Tuple[Optional[int], str, Optional[int]]:
"""
Ищет строку "Суммарное время работы:" и парсит длительность рядом (в той же или следующей строке).
Возвращает: (секунды, сырое значение ячейки, номер строки с меткой) или (None, "", None).
"""
for row_index, row_values in enumerate(
worksheet.iter_rows(min_row=LdsReportConst.REPORT_DATA_FIRST_ROW, values_only=True),
start=LdsReportConst.REPORT_DATA_FIRST_ROW,
):
for column_index, cell_value in enumerate(row_values):
if cell_value is None:
continue
cell_text = _stringify_cell(cell_value).strip()
if LdsReportConst.TOTAL_WORK_DURATION_LABEL not in cell_text:
continue
duration_candidates = []
if column_index + 1 < len(row_values):
duration_candidates.append(row_values[column_index + 1])
if row_index + 1 <= worksheet.max_row:
duration_candidates.append(
worksheet.cell(row=row_index + 1, column=column_index + 1).value
)
for candidate in duration_candidates:
duration_seconds = parse_duration_seconds(candidate)
if duration_seconds is not None:
return duration_seconds, _stringify_cell(candidate).strip(), row_index
return None, "", row_index
return None, "", None
def parse_lds_status_report_worksheet(
worksheet: Worksheet,
expected_section_names: List[str],
) -> LdsStatusReportParsed:
"""
Разбирает лист xlsx-отчёта о режиме СОУ: шапка, колонки, строки участков и суммарное время.
В section_rows попадают только участки из expected_section_names (без учёта регистра).
"""
headers = get_report_column_headers(worksheet, LdsReportConst.REPORT_COLUMN_HEADERS_ROW)
title_info = parse_report_title(
worksheet.cell(row=LdsReportConst.REPORT_TITLE_ROW, column=1).value,
LdsReportConst.REPORT_HEADER_PERIOD_PATTERN,
)
total_duration_seconds, total_duration_raw, total_label_row_index = _find_total_work_duration(worksheet)
section_rows: List[LdsStatusReportSectionRow] = []
expected_names_lower = {name.lower() for name in expected_section_names}
for row_index, row_values in enumerate(
worksheet.iter_rows(
min_row=LdsReportConst.REPORT_DATA_FIRST_ROW,
max_col=len(headers) if headers else 5,
values_only=True,
),
start=LdsReportConst.REPORT_DATA_FIRST_ROW,
):
if total_label_row_index is not None and row_index >= total_label_row_index:
break
cells = build_column_cells(row_values, headers)
section_name = cells.get(LdsReportConst.COL_SECTION, "").strip()
if not section_name:
continue
if section_name.lower() not in expected_names_lower:
continue
section_rows.append(
LdsStatusReportSectionRow(
row_index=row_index,
section_name=section_name,
cells=cells,
)
)
return LdsStatusReportParsed(
title_info=title_info,
column_headers=headers,
section_rows=section_rows,
total_duration_seconds=total_duration_seconds,
total_duration_raw=total_duration_raw,
total_label_row_index=total_label_row_index,
)
def format_section_rows_for_allure(section_rows: List[LdsStatusReportSectionRow]) -> str:
lines = []
for row in section_rows:
durations_text = ", ".join(
f"{column}={format_duration_seconds(seconds)}"
for column, seconds in row.mode_durations_seconds.items()
)
lines.append(
f"row#{row.row_index}: {row.section_name} | sum={format_duration_seconds(row.modes_sum_seconds)} | "
f"{durations_text}"
)
return "\n".join(lines)
__all__ = [
"LdsStatusReportParsed",
"LdsStatusReportSectionRow",
"format_duration_seconds",
"format_section_rows_for_allure",
"is_duration_cell_filled",
"parse_duration_seconds",
"parse_lds_status_report_worksheet",
]
report xlsx
"""
Утилиты для разбора xlsx-отчётов и проверки их формата.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
import allure
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from constants.test_constants import BaseTN3Constants as TestConst
from constants.test_constants import ExportReportConstants as ReportConst
from utils.helpers.ws_test_utils import extract_first_number, localize_as_moscow
@dataclass
class ReportTitleInfo:
"""Разобранная шапка отчёта"""
raw_title: str
period_start: Optional[datetime] = None
period_end: Optional[datetime] = None
@dataclass
class LeakReportRow:
"""Разобранная строка данных по утечке"""
row_index: int
cells: Dict[str, str] = field(default_factory=dict)
@property
def datetime_value(self) -> Optional[datetime]:
return parse_report_datetime(self.cells.get(ReportConst.COL_DATETIME))
@property
def object_value(self) -> str:
return self.cells.get(ReportConst.COL_OBJECT, "")
@property
def lds_status(self) -> str:
return self.cells.get(ReportConst.COL_LDS_STATUS, "")
@property
def masking_info(self) -> str:
return self.cells.get(ReportConst.COL_MASK_INFO, "")
@property
def coordinate_meters(self) -> Optional[float]:
coordinate_km = extract_first_number(self.cells.get(ReportConst.COL_COORDINATE))
if coordinate_km is None:
return None
return coordinate_km * TestConst.KM_TO_METERS
@property
def leak_volume(self) -> Optional[float]:
return extract_first_number(self.cells.get(ReportConst.COL_LEAK_VOLUME))
@property
def mt_mode(self) -> str:
return self.cells.get(ReportConst.COL_MT_MODE, "")
def is_xlsx_file_bytes(file_bytes: Optional[bytes]) -> bool:
"""Проверяет zip-сигнатуру xlsx"""
if not file_bytes:
return False
return file_bytes.startswith(ReportConst.ZIP_SIGNATURE)
def is_xlsx_extension(file_name: str) -> bool:
"""Проверяет расширение .xlsx без учёта регистра."""
return file_name.lower().endswith(ReportConst.XLSX_EXTENSION)
def parse_report_datetime(value: object) -> Optional[datetime]:
"""Парсит дату/время из ячейки отчёта."""
if value is None:
return None
if isinstance(value, datetime):
return value
if isinstance(value, str):
try:
return datetime.strptime(value.strip(), ReportConst.REPORT_DATETIME_FORMAT)
except ValueError:
return None
return None
def _stringify_cell(value: object) -> str:
if value is None:
return ""
if isinstance(value, datetime):
return value.strftime(ReportConst.REPORT_DATETIME_FORMAT)
return str(value)
def normalize_report_period_naive(value: datetime) -> datetime:
"""Московское время без tzinfo и микросекунд - для сравнения периодов в отчёте."""
return localize_as_moscow(value).replace(microsecond=0, tzinfo=None)
def report_period_comparison_bounds(
period_start: datetime,
period_end: datetime,
tolerance_minutes: int = ReportConst.REPORT_PERIOD_TOLERANCE_MINUTES,
) -> tuple[datetime, datetime, datetime, datetime]:
"""
Границы периода с допуском +-tolerance_minutes для start и end отдельно.
Возвращает (start_lower, start_upper, end_lower, end_upper).
"""
start = normalize_report_period_naive(period_start)
end = normalize_report_period_naive(period_end)
delta = timedelta(minutes=tolerance_minutes)
return start - delta, start + delta, end - delta, end + delta
def build_export_report_file_name(
tu_description: str,
period_start: datetime,
period_end: datetime,
report_name_part: str = ReportConst.LEAKS_REPORT_NAME_PART,
name_tu_separator: str = " ",
) -> str:
"""
Имя xlsx при скачивании: '{название}{sep}{ТУ} DD.MM.YYYY HH_MM_SS - DD.MM.YYYY HH_MM_SS.xlsx'.
По умолчанию - отчёт об утечках.
"""
start_text = normalize_report_period_naive(period_start).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
end_text = normalize_report_period_naive(period_end).strftime(ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT)
return (
f"{report_name_part}{name_tu_separator}{tu_description} {start_text} - {end_text}"
f"{ReportConst.XLSX_EXTENSION}"
)
def parse_period_from_export_file_name(
file_name: str,
period_pattern: str | None = None,
) -> tuple[Optional[datetime], Optional[datetime]]:
"""Извлекает границы периода из имени скачанного xlsx-файла."""
match = re.search(
period_pattern or ReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
file_name.strip(),
re.IGNORECASE,
)
if match is None:
return None, None
parse_format = ReportConst.REPORT_FILE_NAME_DATETIME_FORMAT.replace("_", ":")
def _parse_part(value: str) -> Optional[datetime]:
try:
return datetime.strptime(value.replace("_", ":"), parse_format)
except ValueError:
return None
return _parse_part(match.group("period_start")), _parse_part(match.group("period_end"))
def parse_report_title(
title_raw: object,
header_period_pattern: str | None = None,
) -> ReportTitleInfo:
"""
Парсит шапку отчёта с именованными группами period_start/period_end.
"""
title_str = _stringify_cell(title_raw)
pattern = header_period_pattern or ReportConst.REPORT_HEADER_PERIOD_PATTERN
match = re.search(pattern, title_str)
if match is None:
return ReportTitleInfo(raw_title=title_str)
return ReportTitleInfo(
raw_title=title_str,
period_start=parse_report_datetime(match.group("period_start")),
period_end=parse_report_datetime(match.group("period_end")),
)
def load_report_worksheet(file_path: Path) -> Optional[Worksheet]:
"""Открывает первый лист xlsx. При ошибке возвращает None."""
if not file_path.exists():
return None
try:
workbook = load_workbook(filename=str(file_path), read_only=True, data_only=True)
except Exception:
return None
sheet_names = workbook.sheetnames
if not sheet_names:
return None
return workbook[sheet_names[ReportConst.DEFAULT_SHEET_INDEX]]
def get_report_title_cell(worksheet: Worksheet) -> object:
return worksheet.cell(row=ReportConst.REPORT_TITLE_ROW, column=1).value
def get_report_column_headers(
worksheet: Worksheet,
headers_row: int = ReportConst.REPORT_COLUMN_HEADERS_ROW,
) -> List[str]:
"""Возвращает непустые заголовки колонок из указанной строки шапки."""
headers: List[str] = []
column_index = 1
while True:
cell_value = worksheet.cell(row=headers_row, column=column_index).value
if cell_value is None or not str(cell_value).strip():
break
headers.append(_stringify_cell(cell_value).strip())
column_index += 1
return headers
def build_column_cells(row_values: tuple, headers: List[str]) -> Dict[str, str]:
"""Собирает словарь {название колонки: значение ячейки} по строке данных."""
return {
header: _stringify_cell(row_values[column_index]) if column_index < len(row_values) else ""
for column_index, header in enumerate(headers)
}
def iter_report_data_rows(worksheet: Worksheet) -> List[LeakReportRow]:
"""
Возвращает строки данных по утечкам, начиная с REPORT_DATA_FIRST_ROW.
Пустые строки пропускаются.
"""
headers = get_report_column_headers(worksheet)
if not headers:
return []
rows: List[LeakReportRow] = []
for excel_row_index, row_values in enumerate(
worksheet.iter_rows(
min_row=ReportConst.REPORT_DATA_FIRST_ROW,
max_col=len(headers),
values_only=True,
),
start=ReportConst.REPORT_DATA_FIRST_ROW,
):
if not any(cell is not None and str(cell).strip() for cell in row_values):
continue
rows.append(
LeakReportRow(
row_index=excel_row_index,
cells=build_column_cells(row_values, headers),
)
)
return rows
def find_row_with_object(rows: List[LeakReportRow], object_substring: str) -> Optional[LeakReportRow]:
"""Ищет первую строку, где колонка 'Объект' содержит подстроку без учёта регистра"""
substring_lower = object_substring.lower()
for row in rows:
if substring_lower in row.object_value.lower():
return row
return None
def save_report_bytes_to_temp_file(
file_bytes: bytes,
prefix: str = "leaks_report_",
) -> Optional[Path]:
"""Сохраняет байты отчёта во временный xlsx-файл. При ошибке возвращает None."""
import tempfile
try:
with tempfile.NamedTemporaryFile(
suffix=ReportConst.XLSX_EXTENSION,
prefix=prefix,
delete=False,
) as temp_file:
temp_file.write(file_bytes)
return Path(temp_file.name)
except OSError:
return None
def attach_report_file_to_allure(file_path: Path, file_name: str) -> None:
"""Прикладывает xlsx к Allure при падении теста"""
try:
xlsx_type = allure.attachment_type.XLSX
except AttributeError:
xlsx_type = None
if xlsx_type is not None:
allure.attach.file(
str(file_path),
name=file_name,
attachment_type=xlsx_type,
extension="xlsx",
)
return
try:
with file_path.open("rb") as raw_file:
allure.attach(raw_file.read(), name=file_name, extension="xlsx")
except OSError:
pass
сценарий
async def export_lds_status_report(
ws_client, cfg: SmokeSuiteConfig, leak: LeakTestConfig, imitator_start_time: datetime
):
"""
Сценарий формирования xlsx-отчёта о режиме работы СОУ.
"""
report_state = ExportLdsStatusReportState()
with allure.step("Подготовка параметров сценария формирования отчёта о режиме работы СОУ"):
report_state.report_test = leak.export_lds_status_report_test
StepCheck("В конфигурации задан export_lds_status_report_test", "export_lds_status_report_test").actual(
report_state.report_test
).is_not_none()
report_state.period_start = t_utils.localize_as_moscow(imitator_start_time)
report_state.period_end = t_utils.localize_as_moscow(
imitator_start_time + timedelta(minutes=report_state.report_test.offset)
)
report_state.period_start_naive = report_utils.normalize_report_period_naive(report_state.period_start)
report_state.period_end_naive = report_utils.normalize_report_period_naive(report_state.period_end)
report_state.tu_description_lower = cfg.technological_unit.description.lower()
time_offset_hours = t_utils.report_time_offset_hours()
StepCheck(
f"Смещение timeOffset для запросов отчёта (часовой пояс {TestConst.ZONE_INFO})",
"time_offset_hours",
).actual(time_offset_hours).is_not_none()
report_state.time_offset_hours = time_offset_hours
allure.attach(
f"period.start={report_state.period_start}\n"
f"period.end={report_state.period_end}\n"
f"offset_minutes={report_state.report_test.offset}\n"
f"sections={LdsReportConst.SECTION_NAMES}",
name="Фильтр периода отчёта о режиме СОУ",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step(f"Этап 1. Подписка на пуш-нотификации {ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST}"):
await t_utils.connect(ws_client, ReportConst.SUBSCRIBE_REPORTS_DATA_EXPORTED_REQUEST, [])
with allure.step(f"Этап 2. Запрос формирования отчёта {ReportConst.EXPORT_REPORTS_COMMAND_REQUEST}"):
request_payload = {
"tuId": cfg.tu_id,
"exportedDataTypes": [ExportedDataType.LDS_STATUS_REPORT.value],
"timeOffset": report_state.time_offset_hours,
"period": {
"start": t_utils.datetime_to_msgpack_timestamp(report_state.period_start),
"end": t_utils.datetime_to_msgpack_timestamp(report_state.period_end),
"additionalProperties": {},
},
}
await t_utils.connect(ws_client, ReportConst.EXPORT_REPORTS_COMMAND_REQUEST, request_payload)
with allure.step(
f"Этап 3. Ожидание пуш-нотификации {ReportConst.REPORT_DATA_EXPORTED_NOTIFICATION} о готовности отчёта"
):
report_state.notification = await t_utils.poll_for_report_export_notification(
ws_client=ws_client,
parser=parser,
total_wait_seconds=ReportConst.NOTIFICATION_TIMEOUT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step(f"Этап 4. Лонг-поллинг {ReportConst.GET_EXPORTED_DATA_LIST_REQUEST} до появления отчёта в списке"):
report_state.report_item = await t_utils.poll_for_exported_file(
ws_client=ws_client,
parser=parser,
list_limit=ReportConst.EXPORTED_DATA_LIST_LIMIT,
expected_data_type=ExportedDataType.LDS_STATUS_REPORT,
name_substring=LdsReportConst.LDS_STATUS_REPORT_NAME_PART,
tu_name_substring=cfg.technological_unit.description,
period_start=report_state.period_start,
period_end=report_state.period_end,
total_wait_seconds=ReportConst.LIST_POLL_TOTAL_WAIT_SECONDS,
poll_interval_seconds=ReportConst.LIST_POLL_INTERVAL_SECONDS,
)
with allure.step("Подготовка данных найденного отчёта в списке"):
report_item = report_state.report_item
if report_item is not None:
allure.attach(
f"id={report_item.id}, name={report_item.name}, "
f"exportedDataType={report_item.exportedDataType}, "
f"start={t_utils.format_datetime_moscow(report_item.start)}, "
f"end={t_utils.format_datetime_moscow(report_item.end)}",
name="Найденный отчёт в списке",
attachment_type=allure.attachment_type.TEXT,
)
report_state.report_file_name = report_utils.build_export_report_file_name(
cfg.technological_unit.description,
report_state.period_start,
report_state.period_end,
LdsReportConst.LDS_STATUS_REPORT_NAME_PART,
". ",
)
with allure.step("Проверка: отчёт найден в списке сформированных файлов"):
StepCheck("Отчёт найден в списке сформированных файлов", "report_item").actual(
report_state.report_item
).is_not_none()
with allure.step(
f"Этап 5. Streaming-вызов {ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST} по id={report_state.report_item.id}"
):
download_request = {
"exportedDataId": report_state.report_item.id,
"exportedDataType": ExportedDataType.LDS_STATUS_REPORT.to_download_name(),
"additionalProperties": None,
"timeOffset": report_state.time_offset_hours,
}
download_purpose = (
f"скачивание xlsx-отчёта о режиме СОУ (exportedDataId={report_state.report_item.id}) "
f"после формирования отчёта и выбора файла в списке GetExportedDataListRequest"
)
await t_utils.connect_stream(
ws_client,
ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
download_request,
purpose=download_purpose,
)
report_state.download_invocation_id = ws_client.invocation_id
with allure.step("Этап 6. Получение fileChunk - скачивание отчёта о режиме СОУ"):
report_state.download_reply = await t_utils.receive_download_exported_data_reply(
ws_client=ws_client,
parser=parser,
invocation_id=report_state.download_invocation_id,
request_name=ReportConst.DOWNLOAD_EXPORTED_DATA_REQUEST,
total_wait_seconds=ReportConst.DOWNLOAD_TIMEOUT_SECONDS,
purpose=download_purpose,
)
with allure.step("Извлечение данных ответа на скачивание"):
download_reply = report_state.download_reply
download_reply_status = download_reply.replyStatus
has_download_reply_content = download_reply.replyContent is not None
report_state.file_bytes = download_reply.replyContent.fileChunk if has_download_reply_content else None
is_xlsx_signature = (
report_utils.is_xlsx_file_bytes(report_state.file_bytes) if report_state.file_bytes else False
)
with allure.step("Проверка ответа на скачивание и формата xlsx"):
StepCheck("Проверка статуса ответа на скачивание", "replyStatus").actual(download_reply_status).expected(
ReplyStatus.OK.value
).equal_to()
StepCheck("Проверка наличия контента ответа на скачивание", "replyContent").actual(
has_download_reply_content
).expected(True).equal_to()
StepCheck("Проверка наличия байт файла", "fileChunk").actual(report_state.file_bytes).is_not_empty()
StepCheck("Проверка xlsx (zip) сигнатуры файла", "file_signature").actual(is_xlsx_signature).expected(
True
).equal_to()
with allure.step("Подготовка данных для проверки имени файла отчёта"):
report_file_name = report_state.report_file_name
report_file_name_lower = report_file_name.lower()
file_name_period_start, file_name_period_end = report_utils.parse_period_from_export_file_name(
report_file_name,
LdsReportConst.REPORT_FILE_NAME_PERIOD_PATTERN,
)
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.period_start_naive,
report_state.period_end_naive,
)
has_xlsx_extension = report_utils.is_xlsx_extension(report_file_name)
lds_report_name_part_lower = LdsReportConst.LDS_STATUS_REPORT_NAME_PART.lower()
try:
with allure.step("Этап 7. Сохранение и разбор xlsx-отчёта о режиме СОУ"):
report_state.temp_file_path = report_utils.save_report_bytes_to_temp_file(
report_state.file_bytes,
prefix="lds_status_report_",
)
StepCheck("Временный xlsx файл создан", "temp_file_path").actual(report_state.temp_file_path).is_not_none()
report_state.worksheet = report_utils.load_report_worksheet(report_state.temp_file_path)
report_state.parsed_report = lds_report_utils.parse_lds_status_report_worksheet(
report_state.worksheet,
LdsReportConst.SECTION_NAMES,
)
allure.attach(
f"Шапка (raw): {report_state.parsed_report.title_info.raw_title}\n"
f"period_start: {report_state.parsed_report.title_info.period_start}\n"
f"period_end: {report_state.parsed_report.title_info.period_end}\n"
f"total_duration: {report_state.parsed_report.total_duration_raw}",
name="Шапка отчёта о режиме СОУ",
attachment_type=allure.attachment_type.TEXT,
)
allure.attach(
lds_report_utils.format_section_rows_for_allure(report_state.parsed_report.section_rows),
name="Строки участков отчёта",
attachment_type=allure.attachment_type.TEXT,
)
with allure.step("Подготовка данных таблицы отчёта для проверки"):
parsed_report = report_state.parsed_report
expected_section_names = LdsReportConst.SECTION_NAMES
section_rows = parsed_report.section_rows
total_duration_seconds = parsed_report.total_duration_seconds
duration_tolerance = LdsReportConst.TOTAL_DURATION_TOLERANCE_SECONDS
with allure.step("Проверка содержимого таблицы отчёта о режиме СОУ"):
StepCheck("Лист xlsx открыт", "worksheet").actual(report_state.worksheet).is_not_none()
with SoftAssertions() as soft_failures:
StepCheck(
"Количество строк участков в отчёте",
"section_rows_count",
soft_failures,
).actual(len(section_rows)).expected(len(expected_section_names)).equal_to()
for section_index, expected_section_name in enumerate(expected_section_names):
actual_section_name = (
section_rows[section_index].section_name if section_index < len(section_rows) else None
)
StepCheck(
f"Наименование участка #{section_index + 1}",
LdsReportConst.COL_SECTION,
soft_failures,
).actual(actual_section_name).expected(expected_section_name).equal_to()
for section_row in section_rows:
for column_name in LdsReportConst.MODE_DURATION_COLUMNS:
cell_value = section_row.cells.get(column_name)
StepCheck(
f"Длительность '{column_name}' для участка '{section_row.section_name}' заполнена",
column_name,
soft_failures,
).actual(lds_report_utils.is_duration_cell_filled(cell_value)).expected(True).equal_to()
StepCheck(
"В отчёте найдена строка 'Суммарное время работы:'",
"total_work_duration_label",
soft_failures,
).actual(parsed_report.total_label_row_index).is_not_none()
StepCheck(
"Суммарное время работы в отчёте не нулевое",
"total_work_duration",
soft_failures,
).actual(total_duration_seconds).is_greater_than(0, LdsReportConst.ZERO_DURATION_TEXT)
for section_row in section_rows:
duration_diff = abs(section_row.modes_sum_seconds - (total_duration_seconds or 0))
StepCheck(
f"Сумма режимов СОУ для '{section_row.section_name}' "
f"совпадает с суммарным временем (+-{duration_tolerance} с)",
"modes_sum_seconds",
soft_failures,
).actual(duration_diff).is_less_than(
duration_tolerance + 1,
f"погрешность {duration_tolerance} с",
)
with allure.step("Подготовка данных шапки отчёта для проверки"):
title_info = parsed_report.title_info
report_title_lower = title_info.raw_title.lower()
lds_report_name_part_lower = LdsReportConst.LDS_STATUS_REPORT_NAME_PART.lower()
column_headers = parsed_report.column_headers
period_start_lo, period_start_hi, period_end_lo, period_end_hi = report_utils.report_period_comparison_bounds(
report_state.period_start_naive,
report_state.period_end_naive,
)
header_period_start = title_info.period_start
header_period_end = title_info.period_end
with allure.step("Проверка двойной шапки отчёта о режиме СОУ"):
with SoftAssertions() as soft_failures:
StepCheck(
f"В шапке отчёта присутствует '{LdsReportConst.LDS_STATUS_REPORT_NAME_PART}'",
"report_title",
soft_failures,
).contains(report_title_lower, lds_report_name_part_lower)
StepCheck(
"Время начала периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_start",
soft_failures,
).actual(header_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Время конца периода в шапке совпадает с фильтром запроса (+-1 мин)",
"period_end",
soft_failures,
).actual(header_period_end).is_between(period_end_lo, period_end_hi)
StepCheck(
"Названия колонок в шапке отчёта",
"column_headers",
soft_failures,
).actual(column_headers).expected(LdsReportConst.EXPECTED_COLUMN_HEADERS).equal_to()
with allure.step("Проверка имени файла отчёта о режиме СОУ"):
with SoftAssertions() as soft_failures:
StepCheck(f"Имя файла оканчивается на {ReportConst.XLSX_EXTENSION}", "file_name", soft_failures).actual(
has_xlsx_extension
).expected(True).equal_to()
StepCheck(
f"Имя файла содержит '{LdsReportConst.LDS_STATUS_REPORT_NAME_PART}'",
"file_name",
soft_failures,
).contains(report_file_name_lower, lds_report_name_part_lower)
StepCheck(
f"Имя файла содержит описание ТУ '{cfg.technological_unit.description}'",
"file_name",
soft_failures,
).contains(report_file_name_lower, report_state.tu_description_lower)
StepCheck(
"Дата начала периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_start_in_file_name",
soft_failures,
).actual(file_name_period_start).is_between(period_start_lo, period_start_hi)
StepCheck(
"Дата конца периода в имени файла совпадает с фильтром запроса (+-1 мин)",
"period_end_in_file_name",
soft_failures,
).actual(file_name_period_end).is_between(period_end_lo, period_end_hi)
except Exception:
with allure.step("Прикрепление xlsx отчёта к Allure при падении теста"):
if report_state.temp_file_path and report_state.report_file_name:
report_utils.attach_report_file_to_allure(
report_state.temp_file_path, report_state.report_file_name
)
raise
with allure.step("Проверка пуш-нотификации о готовности отчёта"):
notification = report_state.notification
notification_reply_status = notification.replyStatus if notification else None
notification_reply_content = notification.replyContent if notification else None
notification_export_status = (
notification_reply_content.exportStatus if notification_reply_content else None
)
notification_error_message = (
(notification_reply_content.errorMessage or "") if notification_reply_content else ""
)
with SoftAssertions() as soft_failures:
StepCheck("Получена пуш-нотификация о готовности отчёта", "notification", soft_failures).actual(
notification
).is_not_none()
StepCheck("Проверка статуса пуш-нотификации", "replyStatus", soft_failures).actual(
notification_reply_status
).expected(ReplyStatus.OK.value).equal_to()
StepCheck("Проверка наличия контента нотификации", "replyContent", soft_failures).actual(
notification_reply_content
).is_not_none()
StepCheck("Проверка exportStatus в нотификации", "exportStatus", soft_failures).actual(
notification_export_status
).expected(ExportStatus.DONE).equal_to()
StepCheck("В нотификации нет текста ошибки", "errorMessage", soft_failures).actual(
notification_error_message
).is_empty()