Загрузка данных



def normalize_ip(value: object) -> str:
    if pd.isna(value):
        return ""

    text = str(value).strip()
    if not text:
        return ""

    # Some exports contain several addresses in one cell.
    for separator in [",", ";", "\n", " "]:
        if separator in text:
            text = text.split(separator, 1)[0].strip()

    try:
        return str(ipaddress.ip_address(text))
    except ValueError:
        return text


def require_columns(df: pd.DataFrame, required: dict[str, str], file_label: str) -> None:
    missing = [col for col in required.values() if col not in df.columns]
    if missing:
        raise SystemExit(
            f"{file_label}: не найдены обязательные колонки: {', '.join(missing)}\n"
            f"Фактические колонки: {', '.join(map(str, df.columns))}"
        )


def find_one_file(patterns: list[str], label: str) -> Path:
    matches: list[Path] = []
    for pattern in patterns:
        matches.extend(Path(".").glob(pattern))

    matches = sorted(
        {
            path
            for path in matches
            if path.is_file() and not path.name.startswith("~$") and path.suffix.lower() in {".xlsx", ".xls"}
        }
    )

    if not matches:
        raise SystemExit(
            f"Не нашел файл {label} в текущей папке.\n"
            "Передайте файлы явно:\n"
            '  python3 compare_missing_assets.py "Host-AC.xlsx" "audit.xlsx"'
        )

    if len(matches) > 1:
        raise SystemExit(
            f"Нашел несколько файлов {label}: {', '.join(str(path) for path in matches)}\n"
            "Передайте нужный файл явно."
        )

    return matches[0]


def is_active_state(value: object) -> bool:
    if pd.isna(value):
        return False
    return str(value).strip().lower() in ACTIVE_STATES


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Create an Excel file with active Host-AC assets missing from audit."
    )
    parser.add_argument("host_ac_xlsx", nargs="?", help="Source Excel file: Host-AC.xlsx")
    parser.add_argument("audit_xlsx", nargs="?", help="Audit Excel file: audit.xlsx")
    parser.add_argument(
        "output_xlsx",
        nargs="?",
        default="missing_assets.xlsx",
        help="Output Excel file. Default: missing_assets.xlsx",
    )
    parser.add_argument(
        "--all-states",
        action="store_true",
        help="Do not filter by VMState; compare all rows from Host-AC.",
    )
    args = parser.parse_args()

    if args.host_ac_xlsx and args.audit_xlsx:
        host_ac_path = Path(args.host_ac_xlsx)
        audit_path = Path(args.audit_xlsx)
    elif not args.host_ac_xlsx and not args.audit_xlsx:
        host_ac_path = find_one_file(["*Host-AC*.xlsx", "*Host-AC*.xls"], "Host-AC")
        audit_path = find_one_file(["*audit*.xlsx", "*audit*.xls", "*Audit*.xlsx", "*Audit*.xls"], "audit")
    else:
        raise SystemExit("Нужно передать оба файла или не передавать ни одного для авто-поиска.")

    output_path = Path(args.output_xlsx)

    host_ac = pd.read_excel(host_ac_path, dtype=str)
    audit = pd.read_excel(audit_path, dtype=str)

    require_columns(host_ac, HOST_AC_COLUMNS, str(host_ac_path))
    require_columns(audit, AUDIT_COLUMNS, str(audit_path))

    source = host_ac.copy()
    if not args.all_states:
        source = source[source[HOST_AC_COLUMNS["vm_state"]].map(is_active_state)].copy()

    source["_hostname_key"] = source[HOST_AC_COLUMNS["hostname"]].map(normalize_host)
    source["_vmname_key"] = source[HOST_AC_COLUMNS["vm_name"]].map(normalize_host)
    source["_ip_key"] = source[HOST_AC_COLUMNS["ip"]].map(normalize_ip)

    audit["_host_key"] = audit[AUDIT_COLUMNS["host"]].map(normalize_host)
    audit["_fqdn_key"] = audit[AUDIT_COLUMNS["fqdn"]].map(normalize_fqdn)
    audit["_fqdn_short_key"] = audit[AUDIT_COLUMNS["fqdn"]].map(normalize_host)
    audit["_ip_key"] = audit[AUDIT_COLUMNS["ip"]].map(normalize_ip)

    audit_hosts = set(audit["_host_key"]) | set(audit["_fqdn_key"]) | set(audit["_fqdn_short_key"])
    audit_hosts.discard("")
    audit_ips = set(audit["_ip_key"])
    audit_ips.discard("")

    def missing_reason(row: pd.Series) -> str:
        host_candidates = {row["_hostname_key"], row["_vmname_key"]}
        host_candidates.discard("")

        host_found = bool(host_candidates & audit_hosts)
        ip_found = bool(row["_ip_key"] and row["_ip_key"] in audit_ips)

        if host_found or ip_found:
            return ""
        if host_candidates and row["_ip_key"]:
            return "not found by hostname/vmname or ip"
        if host_candidates:
            return "not found by hostname/vmname; ip is empty"
        if row["_ip_key"]:
            return "not found by ip; hostname/vmname are empty"
        return "hostname/vmname and ip are empty"

    source["MissingReason"] = source.apply(missing_reason, axis=1)
    missing = source[source["MissingReason"] != ""].copy()

    helper_columns = ["_hostname_key", "_vmname_key", "_ip_key"]
    missing = missing.drop(columns=helper_columns)

    summary = pd.DataFrame(
        [
            ["Host-AC rows", len(host_ac)],
            ["Compared Host-AC rows", len(source)],
            ["Audit rows", len(audit)],
            ["Missing assets", len(missing)],
            ["Filter", "all VMState values" if args.all_states else f"VMState in {sorted(ACTIVE_STATES)}"],
        ],
        columns=["Metric", "Value"],
    )

    with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
        missing.to_excel(writer, index=False, sheet_name="MissingAssets")
        summary.to_excel(writer, index=False, sheet_name="Summary")

        for sheet_name in ["MissingAssets", "Summary"]:
            ws = writer.book[sheet_name]
            ws.freeze_panes = "A2"
            ws.auto_filter.ref = ws.dimensions
            for column_cells in ws.columns:
                max_len = max(len(str(cell.value or "")) for cell in column_cells)
                ws.column_dimensions[column_cells[0].column_letter].width = min(max(max_len + 2, 12), 60)

    print(f"Done: {output_path}")
    print(f"Missing assets: {len(missing)}")


if __name__ == "__main__":
    main()