Загрузка данных
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import sys
import os
import getpass
import subprocess
from datetime import datetime
import ldap3
from ldap3 import Server, Connection, ALL, SUBTREE
from ldap3.core.exceptions import LDAPBindError, LDAPSocketOpenError, LDAPException
# =========================
# Конфигурационные переменные
# =========================
SAMBA_DC_SERVER = os.environ.get("SAMBA_DC_SERVER", "ldaps://ur-ynn-dc-01.rs.corp")
DOMAIN_NAME = os.environ.get("DOMAIN_NAME", "RS.CORP")
BASE_OU = os.environ.get(
"BASE_OU",
"OU=Employees,OU=Users,DC=rs,DC=corp"
)
OUTPUT_CSV_FILE = os.environ.get(
"OUTPUT_CSV_FILE",
datetime.now().strftime("/tmp/ad_users_rs_corp_%Y-%m-%d_%H-%M-%S.csv")
)
OUTPUT_ARCHIVE_FILE = os.environ.get(
"OUTPUT_ARCHIVE_FILE",
datetime.now().strftime("/tmp/ad_users_rs_corp_%Y-%m-%d_%H-%M-%S.7z")
)
LDAP_USER = os.environ.get("LDAP_USER", f"Adm_Baturin-GA@{DOMAIN_NAME}")
LDAP_PASS = os.environ.get("LDAP_PASS", "")
ARCHIVE_PASS_ENV = "AD_EXPORT_PASS"
# Атрибуты для экспорта
ATTRIBUTES_TO_EXPORT = [
"cn",
"sAMAccountName",
"streetAddress",
"telephoneNumber",
"homePhone",
"otherHomePhone",
"userAccountControl",
"distinguishedName",
]
# LDAP-фильтр: только пользователи, исключаем disabled
LDAP_FILTER = "(&(objectClass=user)(objectCategory=person)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))"
PAGED_SIZE = 1000
# =========================
def get_all_nested_ous(conn, base_ou):
"""
Рекурсивно получает все вложенные OU из указанной базовой OU.
"""
all_ous = [base_ou]
try:
conn.search(
search_base=base_ou,
search_filter="(objectClass=organizationalUnit)",
search_scope=SUBTREE,
attributes=["distinguishedName"]
)
except Exception as e:
print(f"Ошибка при поиске OU в {base_ou}: {e}", file=sys.stderr)
return all_ous
for entry in conn.entries:
dn = entry.distinguishedName.value if hasattr(entry.distinguishedName, "value") else str(entry.distinguishedName)
if dn and dn != base_ou and dn not in all_ous:
all_ous.append(dn)
return all_ous
def is_disabled_by_uac(uac_value):
"""
Проверяет, заблокирован ли пользователь по userAccountControl.
disabled = bit 2 (ACCOUNTDISABLE)
"""
try:
if isinstance(uac_value, list):
uac_value = uac_value[0] if uac_value else 0
return bool(int(uac_value) & 2)
except Exception:
return False
def normalize_value(value):
"""
Приводит LDAP-значение к строке для CSV.
"""
if value is None:
return ""
if isinstance(value, list):
return "; ".join(str(v) for v in value if v is not None)
return str(value)
def get_users_from_ous(conn, ous_list):
"""
Получает всех не заблокированных пользователей из списка OU.
"""
all_users = []
for ou in ous_list:
try:
conn.search(
search_base=ou,
search_filter=LDAP_FILTER,
search_scope=SUBTREE,
attributes=ATTRIBUTES_TO_EXPORT
)
for entry in conn.entries:
user_data = {}
for attr in ATTRIBUTES_TO_EXPORT:
if hasattr(entry, attr):
raw_value = getattr(entry, attr).value if hasattr(getattr(entry, attr), "value") else getattr(entry, attr)
user_data[attr] = normalize_value(raw_value)
else:
user_data[attr] = ""
# Дополнительная защита, если фильтр на сервере вдруг не сработал
if is_disabled_by_uac(user_data.get("userAccountControl", "")):
continue
all_users.append(user_data)
except Exception as e:
print(f"Ошибка при поиске в OU {ou}: {e}", file=sys.stderr)
return all_users
def export_to_csv(users_data, output_file):
"""
Экспортирует данные в CSV.
"""
if not users_data:
print("Нет данных для экспорта", file=sys.stderr)
return False
try:
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
fieldnames = [
"Domain",
"Name",
"SamAccountName",
"StreetAddress",
"telephoneNumber",
"HomePhone",
"otherHomePhone",
"distinguishedName",
]
with open(output_file, "w", newline="", encoding="utf-8-sig") as csvfile:
writer = csv.DictWriter(
csvfile,
fieldnames=fieldnames,
delimiter=";",
quoting=csv.QUOTE_MINIMAL
)
writer.writeheader()
for user in users_data:
row = {
"Domain": "rs.corp",
"Name": user.get("cn", ""),
"SamAccountName": user.get("sAMAccountName", ""),
"StreetAddress": user.get("streetAddress", ""),
"telephoneNumber": user.get("telephoneNumber", ""),
"HomePhone": user.get("homePhone", ""),
"otherHomePhone": user.get("otherHomePhone", ""),
"distinguishedName": user.get("distinguishedName", ""),
}
writer.writerow(row)
print(f"Данные успешно экспортированы в {output_file}")
print(f"Всего экспортировано пользователей: {len(users_data)}")
return True
except PermissionError:
print(f"Ошибка доступа: нет прав на запись в {output_file}", file=sys.stderr)
return False
except Exception as e:
print(f"Ошибка при записи CSV файла: {e}", file=sys.stderr)
return False
def create_archive(csv_file, archive_file):
"""
Создаёт запароленный архив 7z.
Пароль берётся из переменной окружения AD_EXPORT_PASS.
"""
archive_password = os.environ.get(ARCHIVE_PASS_ENV, "")
if not shutil_which("7z"):
print("Ошибка: команда 7z не найдена. Установите p7zip-full.", file=sys.stderr)
return False
if archive_password:
cmd = [
"7z", "a", "-t7z",
f"-p{archive_password}",
"-mhe=on",
archive_file,
csv_file
]
else:
print(f"Предупреждение: переменная {ARCHIVE_PASS_ENV} не задана, архив будет без пароля.", file=sys.stderr)
cmd = ["7z", "a", "-t7z", archive_file, csv_file]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"Ошибка при создании архива: {result.stderr}", file=sys.stderr)
return False
print(f"Архив успешно создан: {archive_file}")
return True
except Exception as e:
print(f"Ошибка при архивации: {e}", file=sys.stderr)
return False
def shutil_which(cmd):
import shutil
return shutil.which(cmd)
def print_banner():
"""
Выводит информационный баннер.
"""
print("=" * 70)
print(" ЭКСПОРТ ПОЛЬЗОВАТЕЛЕЙ AD В CSV + 7Z")
print("=" * 70)
print(f"Сервер LDAP: {SAMBA_DC_SERVER}")
print(f"Домен: {DOMAIN_NAME}")
print(f"Базовая OU: {BASE_OU}")
print(f"Выходной CSV: {OUTPUT_CSV_FILE}")
print(f"Выходной архив: {OUTPUT_ARCHIVE_FILE}")
print(f"LDAP-пользователь: {LDAP_USER}")
print("=" * 70)
def main():
"""
Основная функция.
"""
print_banner()
try:
import ldap3
except ImportError:
print("Ошибка: не установлена библиотека ldap3", file=sys.stderr)
print("Установите её командой: pip install ldap3", file=sys.stderr)
sys.exit(1)
global LDAP_PASS
if not LDAP_PASS:
print("\nАутентификация")
LDAP_PASS = getpass.getpass(f"Введите пароль для {LDAP_USER}: ")
if not LDAP_PASS:
print("Пароль не может быть пустым", file=sys.stderr)
sys.exit(1)
try:
print(f"\nПодключение к {SAMBA_DC_SERVER}...")
server = Server(SAMBA_DC_SERVER, get_info=ALL)
conn = Connection(server, user=LDAP_USER, password=LDAP_PASS, auto_bind=True)
print("Подключение успешно")
print(f"\nПоиск OU в {BASE_OU}...")
ous = get_all_nested_ous(conn, BASE_OU)
print(f"Найдено OU: {len(ous)}")
if ous:
print("\nНайденные OU:")
for i, ou in enumerate(ous, 1):
print(f" {i:2d}. {ou}")
else:
print("OU не найдены")
print("\nПоиск пользователей...")
users = get_users_from_ous(conn, ous)
print(f"Найдено пользователей: {len(users)}")
if users:
print(f"\nЭкспорт данных в {OUTPUT_CSV_FILE}...")
if export_to_csv(users, OUTPUT_CSV_FILE):
print("\nЭкспорт завершен успешно!")
if os.path.exists(OUTPUT_CSV_FILE):
file_size = os.path.getsize(OUTPUT_CSV_FILE)
print(f"Размер файла: {file_size:,} байт")
print("\nСоздание архива...")
if create_archive(OUTPUT_CSV_FILE, OUTPUT_ARCHIVE_FILE):
print("Архивация завершена успешно!")
else:
print("Архивация не выполнена", file=sys.stderr)
else:
print("Пользователи не найдены, CSV файл не создан")
conn.unbind()
print("\nСоединение закрыто")
except LDAPBindError as e:
print(f"\nОшибка аутентификации: {e}", file=sys.stderr)
print("Проверьте:")
print(" - правильность логина и пароля")
print(" - доступность домена")
print(" - доверие к сертификату LDAP/LDAPS")
sys.exit(1)
except LDAPSocketOpenError as e:
print(f"\nОшибка подключения к серверу: {e}", file=sys.stderr)
print("Проверьте:")
print(" - доступность сервера Samba DC")
print(" - правильность адреса сервера")
print(" - настройки файервола")
sys.exit(1)
except KeyboardInterrupt:
print("\n\nОперация прервана пользователем")
sys.exit(130)
except LDAPException as e:
print(f"\nLDAP ошибка: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"\nНеожиданная ошибка: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()