Загрузка данных


#!/usr/bin/env python3
"""Enrich MP VM/FSTEC Excel exports with system ownership data and split by AC."""

from __future__ import annotations

import argparse
import datetime as dt
import re
import sys
from pathlib import Path
from typing import Any, Optional

import pandas as pd


NOT_FOUND = "Не найдено"
FQDN_COLUMNS = ["host.Fqdn", "Имя", "FQDN", "Hostname", "host.fqdn"]
ENRICHMENT_COLUMNS = ["ГАС", "Владелец ГАС", "АС", "Владелец АС"]

COLUMN_RENAME_MAP = {
    "@Host": "Актив",
    "host.Fqdn": "Имя",
    "host.IpAddress": "Сетевой адрес",
    "host.@vulners.Status": "Статус уязвимости",
    "Host.OsName": "Наименование ОС",
    "Host.OsVersion": "Версия ОС",
    "Host.@AuditTime": "Дата и время последнего аудита",
    "Host.@Vulners.CVEs": "Уязвимость",
    "Host.@Vulners.SeverityRating": "Уровень опасности уязвимости",
    "Host.@Vulners.IssueTime": "Дата публикации паспорта уязвимости",
    "Host.@Vulners.Description": "Описание уязвимости",
    "Host.@Vulners.VulnerableEntity.Name": "Название уязвимой сущности",
    "Host.@Vulners.VulnerableEntity.Version": "Версия уязвимой сущности",
    "Host.@Vulners.VulnerableEntity.Path": "Путь уязвимой сущности",
    "Host.@Vulners.Patch": "Патч",
    "Host.@Vulners.HowToFix": "Как исправить",
}


def read_table(path: Path) -> pd.DataFrame:
    suffix = path.suffix.lower()
    if suffix == ".csv":
        return pd.read_csv(path, sep=None, engine="python", encoding="utf-8-sig")
    if suffix in {".xlsx", ".xls"}:
        return pd.read_excel(path)
    raise ValueError(f"Неподдерживаемый формат: {path.suffix}. Нужен .xlsx, .xls или .csv")


def normalize_value(value: Any) -> str:
    if pd.isna(value):
        return ""
    value = str(value).strip()
    if value.casefold() in {"nan", "none", "null"}:
        return ""
    return value


def normalize_key(value: Any) -> str:
    return normalize_value(value).casefold()


def safe_filename(value: Any) -> str:
    name = normalize_value(value) or "EMPTY"
    name = re.sub(r'[\\/*?:"<>|]', "_", name)
    name = re.sub(r"\s+", " ", name).strip()
    return name[:90] or "EMPTY"


def build_name_prefix(vr: Optional[str], point: Optional[str], bdu: Optional[str]) -> str:
    parts = []
    if normalize_value(vr):
        parts.append(normalize_value(vr))
    if normalize_value(point):
        point_value = normalize_value(point)
        if not point_value.casefold().startswith("п"):
            point_value = f"п{point_value}"
        parts.append(point_value)
    if normalize_value(bdu):
        parts.append(normalize_value(bdu))
    return safe_filename(" ".join(parts))


def find_first_column(df: pd.DataFrame, candidates: list[str]) -> str | None:
    exact_columns = {str(column): str(column) for column in df.columns}
    lower_columns = {str(column).casefold(): str(column) for column in df.columns}

    for candidate in candidates:
        if candidate in exact_columns:
            return exact_columns[candidate]
    for candidate in candidates:
        column = lower_columns.get(candidate.casefold())
        if column is not None:
            return column
    return None


def get_reference_columns(reference: pd.DataFrame) -> dict[str, str]:
    columns: dict[str, str] = {}
    for index, column_name in enumerate(ENRICHMENT_COLUMNS, start=4):
        column = find_first_column(reference, [column_name])
        if column is None and len(reference.columns) > index:
            column = str(reference.columns[index])
        if column is None:
            raise ValueError(
                f"В справочнике не найден столбец '{column_name}'. "
                "Ожидаемый порядок: Hostname, Ip, OS, Version, ГАС, Владелец ГАС, АС, Владелец АС"
            )
        columns[column_name] = column
    return columns


def validate_inputs(report: pd.DataFrame, reference: pd.DataFrame) -> tuple[str, dict[str, str]]:
    fqdn_column = find_first_column(report, FQDN_COLUMNS)
    if fqdn_column is None:
        raise ValueError("В основном файле не найден FQDN-столбец. Поддерживаются: " + ", ".join(FQDN_COLUMNS))
    if "Hostname" not in {str(column) for column in reference.columns}:
        raise ValueError("В справочнике нет столбца 'Hostname'")
    return fqdn_column, get_reference_columns(reference)


def rename_technical_columns(df: pd.DataFrame) -> tuple[pd.DataFrame, list[str]]:
    existing_columns = {str(column) for column in df.columns}
    rename_map = {
        old_name: new_name
        for old_name, new_name in COLUMN_RENAME_MAP.items()
        if old_name in existing_columns and new_name not in existing_columns
    }
    if not rename_map:
        return df, []
    return df.rename(columns=rename_map), sorted(rename_map.values())


def move_enrichment_columns_to_start(df: pd.DataFrame) -> pd.DataFrame:
    first_columns = [column for column in ENRICHMENT_COLUMNS if column in df.columns]
    other_columns = [column for column in df.columns if column not in first_columns]
    return df[first_columns + other_columns]


def enrich_report(report: pd.DataFrame, reference: pd.DataFrame) -> tuple[pd.DataFrame, dict[str, Any]]:
    fqdn_column, reference_columns = validate_inputs(report, reference)

    prepared_reference = reference.copy()
    prepared_reference["_hostname_key"] = prepared_reference["Hostname"].map(normalize_key)
    prepared_reference = prepared_reference[prepared_reference["_hostname_key"] != ""]

    exact_reference = (
        prepared_reference
        .drop_duplicates("_hostname_key", keep="first")
        .set_index("_hostname_key")[list(reference_columns.values())]
        .to_dict("index")
    )

    enrichment_values = {column: [] for column in ENRICHMENT_COLUMNS}
    matched_exact = 0
    matched_contains = 0
    empty_fqdn = 0

    for host in report[fqdn_column]:
        host_key = normalize_key(host)
        if not host_key:
            for column in ENRICHMENT_COLUMNS:
                enrichment_values[column].append(NOT_FOUND)
            empty_fqdn += 1
            continue

        matched_row = exact_reference.get(host_key)
        if matched_row is not None:
            matched_exact += 1
        else:
            contains_match = prepared_reference[
                prepared_reference["_hostname_key"].str.contains(host_key, na=False, regex=False)
            ]
            if not contains_match.empty:
                matched_row = contains_match.iloc[0]
                matched_contains += 1

        for output_column, reference_column in reference_columns.items():
            if matched_row is None:
                enrichment_values[output_column].append(NOT_FOUND)
            else:
                enrichment_values[output_column].append(normalize_value(matched_row[reference_column]) or NOT_FOUND)

    result = report.copy()
    for column in ENRICHMENT_COLUMNS:
        result[column] = enrichment_values[column]

    result, renamed_columns = rename_technical_columns(result)
    result = move_enrichment_columns_to_start(result)

    stats = {
        "total_rows": len(result),
        "fqdn_column": fqdn_column,
        "matched_exact": matched_exact,
        "matched_contains": matched_contains,
        "empty_fqdn": empty_fqdn,
        "not_found": enrichment_values["АС"].count(NOT_FOUND),
        "renamed_columns": renamed_columns,
    }
    return result, stats


def make_output_dir(input_path: Path, output_dir: Optional[Path]) -> Path:
    if output_dir is not None:
        output_dir.mkdir(parents=True, exist_ok=True)
        return output_dir
    stamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S-%f")
    path = input_path.parent / f"{input_path.stem}_enriched_{stamp}"
    path.mkdir(parents=True, exist_ok=False)
    return path


def split_by_ac(enriched: pd.DataFrame, output_dir: Path, name_prefix: str) -> list[Path]:
    split_paths: list[Path] = []
    work = enriched.copy()
    work["АС"] = work["АС"].map(lambda value: normalize_value(value) or NOT_FOUND)

    for index, (ac_value, group) in enumerate(work.groupby("АС", dropna=False), start=1):
        ac_name = safe_filename(ac_value)
        if name_prefix:
            filename = f"{index:02d}_{name_prefix} АС_{ac_name}.xlsx"
        else:
            filename = f"{index:02d}_{ac_name}.xlsx"
        path = output_dir / filename
        group.to_excel(path, index=False)
        split_paths.append(path)
    return split_paths


def write_summary(
    output_dir: Path,
    input_path: Path,
    reference_path: Path,
    stats: dict[str, Any],
    split_paths: list[Path],
    vr: Optional[str],
    point: Optional[str],
    bdu: Optional[str],
) -> Path:
    lines = [
        "Итог обработки Excel",
        f"Дата: {dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
        f"Основной файл: {input_path}",
        f"Справочник: {reference_path}",
        f"ВР: {normalize_value(vr) or '-'}",
        f"Пункт: {normalize_value(point) or '-'}",
        f"BDU: {normalize_value(bdu) or '-'}",
        "",
        f"Всего строк: {stats['total_rows']}",
        f"Столбец для матчинга: {stats['fqdn_column']}",
        f"Точных совпадений: {stats['matched_exact']}",
        f"Совпадений через поиск в Hostname: {stats['matched_contains']}",
        f"Пустых FQDN: {stats['empty_fqdn']}",
        f"Не найдено: {stats['not_found']}",
        f"Переименовано колонок: {len(stats['renamed_columns'])}",
        "",
        "Файлы по АС:",
    ]
    lines.extend(f"- {path.name}" for path in split_paths)
    path = output_dir / "summary.txt"
    path.write_text("\n".join(lines) + "\n", encoding="utf-8")
    return path


def parse_args(argv: list[str]) -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Enrich Excel/CSV report with Host-AC.xlsx and split by AC.")
    parser.add_argument("input", type=Path, help="Основной Excel/CSV отчет")
    parser.add_argument("--reference", type=Path, help="Справочник Host-AC.xlsx")
    parser.add_argument("--output-dir", type=Path, help="Папка для результата")
    parser.add_argument("--vr", help="Номер ВР для имен файлов, например ВР-111")
    parser.add_argument("--point", help="Пункт для имен файлов, например 1 или п1")
    parser.add_argument("--bdu", help="BDU для имен файлов, например BDU:123-12312")
    return parser.parse_args(argv)


def resolve_reference(input_path: Path, reference_path: Optional[Path]) -> Path:
    if reference_path is not None:
        return reference_path

    script_dir = Path(__file__).resolve().parent
    candidates = [
        input_path.parent / "Host-AC.xlsx",
        script_dir / "Host-AC.xlsx",
        Path.cwd() / "Host-AC.xlsx",
    ]
    for candidate in candidates:
        if candidate.exists():
            return candidate
    return candidates[0]


def main(argv: list[str]) -> int:
    args = parse_args(argv)
    reference_path = resolve_reference(args.input, args.reference)
    if not args.input.exists():
        print(f"Основной файл не найден: {args.input}", file=sys.stderr)
        return 2
    if not reference_path.exists():
        print(f"Справочник не найден: {reference_path}", file=sys.stderr)
        print("Положите Host-AC.xlsx рядом с отчетом или укажите путь через --reference.", file=sys.stderr)
        return 2

    report = read_table(args.input)
    reference = read_table(reference_path)
    enriched, stats = enrich_report(report, reference)

    output_dir = make_output_dir(args.input, args.output_dir)
    name_prefix = build_name_prefix(args.vr, args.point, args.bdu)
    all_filename = f"00_{name_prefix} All_AC.xlsx" if name_prefix else "00_All_AC.xlsx"
    all_path = output_dir / all_filename
    enriched.to_excel(all_path, index=False)
    split_paths = split_by_ac(enriched, output_dir, name_prefix)
    summary_path = write_summary(output_dir, args.input, reference_path, stats, split_paths, args.vr, args.point, args.bdu)

    print(f"Готово: {output_dir}")
    print(f"Общий файл: {all_path}")
    print(f"Файлов по АС: {len(split_paths)}")
    print(f"Отчет: {summary_path}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))