Загрузка данных
def normalize_ip(value: object) -> str:
if pd.isna(value):
return ""
text = str(value).strip()
if not text:
return ""
# Some exports contain several addresses in one cell.
for separator in [",", ";", "\n", " "]:
if separator in text:
text = text.split(separator, 1)[0].strip()
try:
return str(ipaddress.ip_address(text))
except ValueError:
return text
def require_columns(df: pd.DataFrame, required: dict[str, str], file_label: str) -> None:
missing = [col for col in required.values() if col not in df.columns]
if missing:
raise SystemExit(
f"{file_label}: не найдены обязательные колонки: {', '.join(missing)}\n"
f"Фактические колонки: {', '.join(map(str, df.columns))}"
)
def find_one_file(patterns: list[str], label: str) -> Path:
matches: list[Path] = []
for pattern in patterns:
matches.extend(Path(".").glob(pattern))
matches = sorted(
{
path
for path in matches
if path.is_file() and not path.name.startswith("~$") and path.suffix.lower() in {".xlsx", ".xls"}
}
)
if not matches:
raise SystemExit(
f"Не нашел файл {label} в текущей папке.\n"
"Передайте файлы явно:\n"
' python3 compare_missing_assets.py "Host-AC.xlsx" "audit.xlsx"'
)
if len(matches) > 1:
raise SystemExit(
f"Нашел несколько файлов {label}: {', '.join(str(path) for path in matches)}\n"
"Передайте нужный файл явно."
)
return matches[0]
def is_active_state(value: object) -> bool:
if pd.isna(value):
return False
return str(value).strip().lower() in ACTIVE_STATES
def main() -> None:
parser = argparse.ArgumentParser(
description="Create an Excel file with active Host-AC assets missing from audit."
)
parser.add_argument("host_ac_xlsx", nargs="?", help="Source Excel file: Host-AC.xlsx")
parser.add_argument("audit_xlsx", nargs="?", help="Audit Excel file: audit.xlsx")
parser.add_argument(
"output_xlsx",
nargs="?",
default="missing_assets.xlsx",
help="Output Excel file. Default: missing_assets.xlsx",
)
parser.add_argument(
"--all-states",
action="store_true",
help="Do not filter by VMState; compare all rows from Host-AC.",
)
args = parser.parse_args()
if args.host_ac_xlsx and args.audit_xlsx:
host_ac_path = Path(args.host_ac_xlsx)
audit_path = Path(args.audit_xlsx)
elif not args.host_ac_xlsx and not args.audit_xlsx:
host_ac_path = find_one_file(["*Host-AC*.xlsx", "*Host-AC*.xls"], "Host-AC")
audit_path = find_one_file(["*audit*.xlsx", "*audit*.xls", "*Audit*.xlsx", "*Audit*.xls"], "audit")
else:
raise SystemExit("Нужно передать оба файла или не передавать ни одного для авто-поиска.")
output_path = Path(args.output_xlsx)
host_ac = pd.read_excel(host_ac_path, dtype=str)
audit = pd.read_excel(audit_path, dtype=str)
require_columns(host_ac, HOST_AC_COLUMNS, str(host_ac_path))
require_columns(audit, AUDIT_COLUMNS, str(audit_path))
source = host_ac.copy()
if not args.all_states:
source = source[source[HOST_AC_COLUMNS["vm_state"]].map(is_active_state)].copy()
source["_hostname_key"] = source[HOST_AC_COLUMNS["hostname"]].map(normalize_host)
source["_vmname_key"] = source[HOST_AC_COLUMNS["vm_name"]].map(normalize_host)
source["_ip_key"] = source[HOST_AC_COLUMNS["ip"]].map(normalize_ip)
audit["_host_key"] = audit[AUDIT_COLUMNS["host"]].map(normalize_host)
audit["_fqdn_key"] = audit[AUDIT_COLUMNS["fqdn"]].map(normalize_fqdn)
audit["_fqdn_short_key"] = audit[AUDIT_COLUMNS["fqdn"]].map(normalize_host)
audit["_ip_key"] = audit[AUDIT_COLUMNS["ip"]].map(normalize_ip)
audit_hosts = set(audit["_host_key"]) | set(audit["_fqdn_key"]) | set(audit["_fqdn_short_key"])
audit_hosts.discard("")
audit_ips = set(audit["_ip_key"])
audit_ips.discard("")
def missing_reason(row: pd.Series) -> str:
host_candidates = {row["_hostname_key"], row["_vmname_key"]}
host_candidates.discard("")
host_found = bool(host_candidates & audit_hosts)
ip_found = bool(row["_ip_key"] and row["_ip_key"] in audit_ips)
if host_found or ip_found:
return ""
if host_candidates and row["_ip_key"]:
return "not found by hostname/vmname or ip"
if host_candidates:
return "not found by hostname/vmname; ip is empty"
if row["_ip_key"]:
return "not found by ip; hostname/vmname are empty"
return "hostname/vmname and ip are empty"
source["MissingReason"] = source.apply(missing_reason, axis=1)
missing = source[source["MissingReason"] != ""].copy()
helper_columns = ["_hostname_key", "_vmname_key", "_ip_key"]
missing = missing.drop(columns=helper_columns)
summary = pd.DataFrame(
[
["Host-AC rows", len(host_ac)],
["Compared Host-AC rows", len(source)],
["Audit rows", len(audit)],
["Missing assets", len(missing)],
["Filter", "all VMState values" if args.all_states else f"VMState in {sorted(ACTIVE_STATES)}"],
],
columns=["Metric", "Value"],
)
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
missing.to_excel(writer, index=False, sheet_name="MissingAssets")
summary.to_excel(writer, index=False, sheet_name="Summary")
for sheet_name in ["MissingAssets", "Summary"]:
ws = writer.book[sheet_name]
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
for column_cells in ws.columns:
max_len = max(len(str(cell.value or "")) for cell in column_cells)
ws.column_dimensions[column_cells[0].column_letter].width = min(max(max_len + 2, 12), 60)
print(f"Done: {output_path}")
print(f"Missing assets: {len(missing)}")
if __name__ == "__main__":
main()