Загрузка данных
#!/usr/bin/env python3
"""Enrich MP VM/FSTEC Excel exports with system ownership data and split by AC."""
from __future__ import annotations
import argparse
import datetime as dt
import re
import sys
from pathlib import Path
from typing import Any, Optional
import pandas as pd
NOT_FOUND = "Не найдено"
FQDN_COLUMNS = ["host.Fqdn", "Имя", "FQDN", "Hostname", "host.fqdn"]
ENRICHMENT_COLUMNS = ["ГАС", "Владелец ГАС", "АС", "Владелец АС"]
COLUMN_RENAME_MAP = {
"@Host": "Актив",
"host.Fqdn": "Имя",
"host.IpAddress": "Сетевой адрес",
"host.@vulners.Status": "Статус уязвимости",
"Host.OsName": "Наименование ОС",
"Host.OsVersion": "Версия ОС",
"Host.@AuditTime": "Дата и время последнего аудита",
"Host.@Vulners.CVEs": "Уязвимость",
"Host.@Vulners.SeverityRating": "Уровень опасности уязвимости",
"Host.@Vulners.IssueTime": "Дата публикации паспорта уязвимости",
"Host.@Vulners.Description": "Описание уязвимости",
"Host.@Vulners.VulnerableEntity.Name": "Название уязвимой сущности",
"Host.@Vulners.VulnerableEntity.Version": "Версия уязвимой сущности",
"Host.@Vulners.VulnerableEntity.Path": "Путь уязвимой сущности",
"Host.@Vulners.Patch": "Патч",
"Host.@Vulners.HowToFix": "Как исправить",
}
def read_table(path: Path) -> pd.DataFrame:
suffix = path.suffix.lower()
if suffix == ".csv":
return pd.read_csv(path, sep=None, engine="python", encoding="utf-8-sig")
if suffix in {".xlsx", ".xls"}:
return pd.read_excel(path)
raise ValueError(f"Неподдерживаемый формат: {path.suffix}. Нужен .xlsx, .xls или .csv")
def normalize_value(value: Any) -> str:
if pd.isna(value):
return ""
value = str(value).strip()
if value.casefold() in {"nan", "none", "null"}:
return ""
return value
def normalize_key(value: Any) -> str:
return normalize_value(value).casefold()
def safe_filename(value: Any) -> str:
name = normalize_value(value) or "EMPTY"
name = re.sub(r'[\\/*?:"<>|]', "_", name)
name = re.sub(r"\s+", " ", name).strip()
return name[:90] or "EMPTY"
def build_name_prefix(vr: Optional[str], point: Optional[str], bdu: Optional[str]) -> str:
parts = []
if normalize_value(vr):
parts.append(normalize_value(vr))
if normalize_value(point):
point_value = normalize_value(point)
if not point_value.casefold().startswith("п"):
point_value = f"п{point_value}"
parts.append(point_value)
if normalize_value(bdu):
parts.append(normalize_value(bdu))
return safe_filename(" ".join(parts))
def find_first_column(df: pd.DataFrame, candidates: list[str]) -> str | None:
exact_columns = {str(column): str(column) for column in df.columns}
lower_columns = {str(column).casefold(): str(column) for column in df.columns}
for candidate in candidates:
if candidate in exact_columns:
return exact_columns[candidate]
for candidate in candidates:
column = lower_columns.get(candidate.casefold())
if column is not None:
return column
return None
def get_reference_columns(reference: pd.DataFrame) -> dict[str, str]:
columns: dict[str, str] = {}
for index, column_name in enumerate(ENRICHMENT_COLUMNS, start=4):
column = find_first_column(reference, [column_name])
if column is None and len(reference.columns) > index:
column = str(reference.columns[index])
if column is None:
raise ValueError(
f"В справочнике не найден столбец '{column_name}'. "
"Ожидаемый порядок: Hostname, Ip, OS, Version, ГАС, Владелец ГАС, АС, Владелец АС"
)
columns[column_name] = column
return columns
def validate_inputs(report: pd.DataFrame, reference: pd.DataFrame) -> tuple[str, dict[str, str]]:
fqdn_column = find_first_column(report, FQDN_COLUMNS)
if fqdn_column is None:
raise ValueError("В основном файле не найден FQDN-столбец. Поддерживаются: " + ", ".join(FQDN_COLUMNS))
if "Hostname" not in {str(column) for column in reference.columns}:
raise ValueError("В справочнике нет столбца 'Hostname'")
return fqdn_column, get_reference_columns(reference)
def rename_technical_columns(df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
existing_columns = {str(column) for column in df.columns}
rename_map = {
old_name: new_name
for old_name, new_name in COLUMN_RENAME_MAP.items()
if old_name in existing_columns and new_name not in existing_columns
}
if not rename_map:
return df, []
return df.rename(columns=rename_map), sorted(rename_map.values())
def move_enrichment_columns_to_start(df: pd.DataFrame) -> pd.DataFrame:
first_columns = [column for column in ENRICHMENT_COLUMNS if column in df.columns]
other_columns = [column for column in df.columns if column not in first_columns]
return df[first_columns + other_columns]
def enrich_report(report: pd.DataFrame, reference: pd.DataFrame) -> tuple[pd.DataFrame, dict[str, Any]]:
fqdn_column, reference_columns = validate_inputs(report, reference)
prepared_reference = reference.copy()
prepared_reference["_hostname_key"] = prepared_reference["Hostname"].map(normalize_key)
prepared_reference = prepared_reference[prepared_reference["_hostname_key"] != ""]
exact_reference = (
prepared_reference
.drop_duplicates("_hostname_key", keep="first")
.set_index("_hostname_key")[list(reference_columns.values())]
.to_dict("index")
)
enrichment_values = {column: [] for column in ENRICHMENT_COLUMNS}
matched_exact = 0
matched_contains = 0
empty_fqdn = 0
for host in report[fqdn_column]:
host_key = normalize_key(host)
if not host_key:
for column in ENRICHMENT_COLUMNS:
enrichment_values[column].append(NOT_FOUND)
empty_fqdn += 1
continue
matched_row = exact_reference.get(host_key)
if matched_row is not None:
matched_exact += 1
else:
contains_match = prepared_reference[
prepared_reference["_hostname_key"].str.contains(host_key, na=False, regex=False)
]
if not contains_match.empty:
matched_row = contains_match.iloc[0]
matched_contains += 1
for output_column, reference_column in reference_columns.items():
if matched_row is None:
enrichment_values[output_column].append(NOT_FOUND)
else:
enrichment_values[output_column].append(normalize_value(matched_row[reference_column]) or NOT_FOUND)
result = report.copy()
for column in ENRICHMENT_COLUMNS:
result[column] = enrichment_values[column]
result, renamed_columns = rename_technical_columns(result)
result = move_enrichment_columns_to_start(result)
stats = {
"total_rows": len(result),
"fqdn_column": fqdn_column,
"matched_exact": matched_exact,
"matched_contains": matched_contains,
"empty_fqdn": empty_fqdn,
"not_found": enrichment_values["АС"].count(NOT_FOUND),
"renamed_columns": renamed_columns,
}
return result, stats
def make_output_dir(input_path: Path, output_dir: Optional[Path]) -> Path:
if output_dir is not None:
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
stamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
path = input_path.parent / f"{input_path.stem}_enriched_{stamp}"
path.mkdir(parents=True, exist_ok=False)
return path
def split_by_ac(enriched: pd.DataFrame, output_dir: Path, name_prefix: str) -> list[Path]:
split_paths: list[Path] = []
work = enriched.copy()
work["АС"] = work["АС"].map(lambda value: normalize_value(value) or NOT_FOUND)
for index, (ac_value, group) in enumerate(work.groupby("АС", dropna=False), start=1):
ac_name = safe_filename(ac_value)
if name_prefix:
filename = f"{index:02d}_{name_prefix} АС_{ac_name}.xlsx"
else:
filename = f"{index:02d}_{ac_name}.xlsx"
path = output_dir / filename
group.to_excel(path, index=False)
split_paths.append(path)
return split_paths
def write_summary(
output_dir: Path,
input_path: Path,
reference_path: Path,
stats: dict[str, Any],
split_paths: list[Path],
vr: Optional[str],
point: Optional[str],
bdu: Optional[str],
) -> Path:
lines = [
"Итог обработки Excel",
f"Дата: {dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"Основной файл: {input_path}",
f"Справочник: {reference_path}",
f"ВР: {normalize_value(vr) or '-'}",
f"Пункт: {normalize_value(point) or '-'}",
f"BDU: {normalize_value(bdu) or '-'}",
"",
f"Всего строк: {stats['total_rows']}",
f"Столбец для матчинга: {stats['fqdn_column']}",
f"Точных совпадений: {stats['matched_exact']}",
f"Совпадений через поиск в Hostname: {stats['matched_contains']}",
f"Пустых FQDN: {stats['empty_fqdn']}",
f"Не найдено: {stats['not_found']}",
f"Переименовано колонок: {len(stats['renamed_columns'])}",
"",
"Файлы по АС:",
]
lines.extend(f"- {path.name}" for path in split_paths)
path = output_dir / "summary.txt"
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return path
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Enrich Excel/CSV report with Host-AC.xlsx and split by AC.")
parser.add_argument("input", type=Path, help="Основной Excel/CSV отчет")
parser.add_argument("--reference", type=Path, help="Справочник Host-AC.xlsx")
parser.add_argument("--output-dir", type=Path, help="Папка для результата")
parser.add_argument("--vr", help="Номер ВР для имен файлов, например ВР-111")
parser.add_argument("--point", help="Пункт для имен файлов, например 1 или п1")
parser.add_argument("--bdu", help="BDU для имен файлов, например BDU:123-12312")
return parser.parse_args(argv)
def resolve_reference(input_path: Path, reference_path: Optional[Path]) -> Path:
if reference_path is not None:
return reference_path
script_dir = Path(__file__).resolve().parent
candidates = [
input_path.parent / "Host-AC.xlsx",
script_dir / "Host-AC.xlsx",
Path.cwd() / "Host-AC.xlsx",
]
for candidate in candidates:
if candidate.exists():
return candidate
return candidates[0]
def main(argv: list[str]) -> int:
args = parse_args(argv)
reference_path = resolve_reference(args.input, args.reference)
if not args.input.exists():
print(f"Основной файл не найден: {args.input}", file=sys.stderr)
return 2
if not reference_path.exists():
print(f"Справочник не найден: {reference_path}", file=sys.stderr)
print("Положите Host-AC.xlsx рядом с отчетом или укажите путь через --reference.", file=sys.stderr)
return 2
report = read_table(args.input)
reference = read_table(reference_path)
enriched, stats = enrich_report(report, reference)
output_dir = make_output_dir(args.input, args.output_dir)
name_prefix = build_name_prefix(args.vr, args.point, args.bdu)
all_filename = f"00_{name_prefix} All_AC.xlsx" if name_prefix else "00_All_AC.xlsx"
all_path = output_dir / all_filename
enriched.to_excel(all_path, index=False)
split_paths = split_by_ac(enriched, output_dir, name_prefix)
summary_path = write_summary(output_dir, args.input, reference_path, stats, split_paths, args.vr, args.point, args.bdu)
print(f"Готово: {output_dir}")
print(f"Общий файл: {all_path}")
print(f"Файлов по АС: {len(split_paths)}")
print(f"Отчет: {summary_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))