Загрузка данных
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Export AD users from rs.corp OU to CSV and pack to passworded 7z archive.
Адаптировано под атрибуты: StreetAddress, telephoneNumber, HomePhone, otherHomePhone.
Использует LDAPS (SAMBA_DC_SERVER должен быть ldaps://... или ldap:// с use_starttls=True).
Пароли можно задавать через переменные окружения LDAP_USER и LDAP_PASS.
Пароль для архива из AD_EXPORT_PASS.
"""
import os
import sys
import csv
import getpass
import subprocess
import shutil # добавлен импорт для shutil.which
from datetime import datetime
from ldap3 import Server, Connection, ALL, SUBTREE, Tls
from ldap3.core.exceptions import LDAPException, LDAPBindError, LDAPSocketOpenError
# ---------- CONFIG ----------
# Укажите здесь FQDN контроллера (с ldaps:// если хотите SSL) или оставьте как есть
SAMBA_DC_SERVER = os.environ.get("SAMBA_DC_SERVER", "ldaps://ur-ynn-dc-01.rs.corp")
DOMAIN_NAME = os.environ.get("DOMAIN_NAME", "RS.CORP")
# Укажите базовую OU для поиска (для rs.corp) — замените на ваш DN OU
BASE_OU = os.environ.get("BASE_OU", "OU=Employees,OU=Users,DC=rs,DC=corp")
# Путь CSV и архива
OUTPUT_CSV_FILE = os.environ.get("OUTPUT_CSV_FILE", "/tmp/ad_users_rs_corp_export_{}.csv".format(datetime.now().strftime("%Y-%m-%d_%H-%M-%S")))
OUTPUT_ARCHIVE = os.environ.get("OUTPUT_ARCHIVE", "/tmp/ad_users_rs_corp_export.7z")
# Учетные данные: можно задать LDAP_USER и LDAP_PASS в окружении, иначе будет prompt
LDAP_USER = os.environ.get("LDAP_USER", f"Adm_Baturin-GA@{DOMAIN_NAME}")
LDAP_PASS = os.environ.get("LDAP_PASS", "")
# Пароль для архива в переменной AD_EXPORT_PASS
ARCHIVE_PASS_ENV = "AD_EXPORT_PASS"
# Атрибуты, которые нужно получить
ATTRIBUTES_TO_EXPORT = [
'cn', 'sAMAccountName',
'streetAddress', 'telephoneNumber', 'homePhone', 'otherHomePhone',
'userAccountControl', 'distinguishedName'
]
# page size для paged search
PAGED_SIZE = 1000
# ---------- /CONFIG ----------
# LDAP фильтр: объекты пользователей И НЕ ЗАБЛОКИРОВАННЫЕ
LDAP_FILTER = "(&(objectClass=user)(objectCategory=person)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))"
def prompt_for_password_if_needed():
global LDAP_PASS
if not LDAP_PASS:
try:
LDAP_PASS = getpass.getpass(f"Введите пароль для {LDAP_USER}: ")
except Exception:
LDAP_PASS = ""
if not LDAP_PASS:
print("Ошибка: пароль не задан.", file=sys.stderr)
sys.exit(2)
def connect_ldap(server_uri, user, password):
"""
Подключение к LDAP/AD. Поддерживает ldaps:// URI (SSL).
"""
use_ssl = server_uri.lower().startswith("ldaps://")
# Если указана схема ldap://, ldap3 требует убрать префикс при создании Server
server_host = server_uri.split("://",1)[-1]
try:
server = Server(server_host, get_info=ALL, use_ssl=use_ssl)
# используем simple bind с переданными учетными данными
conn = Connection(server, user=user, password=password, auto_bind=True)
return conn
except LDAPBindError as e:
raise
except LDAPSocketOpenError as e:
raise
except LDAPException as e:
raise
def fetch_users_paged(conn, base_dn, search_filter, attributes, paged_size):
"""
Возвращает генератор словарей атрибутов пользователей, используя paged_search.
"""
try:
entries = conn.extend.standard.paged_search(
search_base=base_dn,
search_filter=search_filter,
search_scope=SUBTREE,
attributes=attributes,
paged_size=paged_size,
generator=True
)
for entry in entries:
if entry.get('type') != 'searchResEntry':
continue
attrs = entry.get('attributes', {})
yield attrs
except LDAPException as e:
raise
def is_disabled_by_uac(uac_value):
"""
Проверяет userAccountControl на бит ACCOUNTDISABLE (2).
uac_value может быть int или строкой или списком.
"""
try:
if isinstance(uac_value, (list, tuple)):
uac_value = uac_value[0] if uac_value else 0
val = int(uac_value) if uac_value not in (None, "") else 0
return bool(val & 2)
except Exception:
return False
def write_csv_header(path, fieldnames):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=';', quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
def append_csv(path, fieldnames, rowdict):
with open(path, 'a', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=';', quoting=csv.QUOTE_MINIMAL)
writer.writerow(rowdict)
def create_7z_archive(csv_path, archive_path, password_env):
passwd = os.environ.get(password_env, "")
if not shutil.which("7z"):
print("Ошибка: 7z не найден в PATH. Установите p7zip-full.", file=sys.stderr)
return False
if passwd:
cmd = ['7z','a','-t7z', f'-p{passwd}', '-mhe=on', archive_path, csv_path]
else:
cmd = ['7z','a', archive_path, csv_path]
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if res.returncode != 0:
print("7z failed:", res.stderr, file=sys.stderr)
return False
return True
def main():
print("="*70)
print(" EXPORT AD USERS (rs.corp) TO CSV + 7z")
print("="*70)
print(f"LDAP server: {SAMBA_DC_SERVER}")
print(f"Base OU: {BASE_OU}")
print(f"CSV output: {OUTPUT_CSV_FILE}")
print(f"Archive out: {OUTPUT_ARCHIVE}")
print(f"LDAP user: {LDAP_USER}")
print("="*70)
prompt_for_password_if_needed()
try:
print("\nПодключение к LDAP...")
conn = connect_ldap(SAMBA_DC_SERVER, LDAP_USER, LDAP_PASS)
print("Подключено.")
except LDAPBindError as e:
print(f"\nОшибка аутентификации: {e}", file=sys.stderr)
print("Проверьте LDAP_USER/LDAP_PASS и права.", file=sys.stderr)
sys.exit(3)
except LDAPSocketOpenError as e:
print(f"\nОшибка подключения к серверу: {e}", file=sys.stderr)
sys.exit(4)
except Exception as e:
print(f"\nОшибка LDAP: {e}", file=sys.stderr)
sys.exit(5)
# Пишем заголовок CSV
fieldnames = ['Domain','Name','SamAccountName','StreetAddress','telephoneNumber','HomePhone','otherHomePhone','distinguishedName']
write_csv_header(OUTPUT_CSV_FILE, fieldnames)
total = 0
skipped_disabled = 0
print("\nВыполняем поиск пользователей (paged)...")
try:
for attrs in fetch_users_paged(conn, BASE_OU, LDAP_FILTER, ATTRIBUTES_TO_EXPORT, PAGED_SIZE):
# attrs - dict: ключи как атрибуты
uac = attrs.get('userAccountControl', attrs.get('useraccountcontrol', 0))
if is_disabled_by_uac(uac):
skipped_disabled += 1
continue
name = attrs.get('cn','')
sam = attrs.get('sAMAccountName','')
street = attrs.get('streetAddress','') or ''
tel = attrs.get('telephoneNumber','') or ''
home = attrs.get('homePhone','') or ''
other = attrs.get('otherHomePhone','') or ''
dn = attrs.get('distinguishedName','') or ''
row = {
'Domain': 'rs.corp',
'Name': name,
'SamAccountName': sam,
'StreetAddress': street,
'telephoneNumber': tel,
'HomePhone': home,
'otherHomePhone': other,
'distinguishedName': dn
}
append_csv(OUTPUT_CSV_FILE, fieldnames, row)
total += 1
except Exception as e:
print(f"Ошибка при поиске/чтении записей: {e}", file=sys.stderr)
conn.unbind()
sys.exit(6)
conn.unbind()
print(f"\nЭкспорт завершён: {total} пользователей записано, {skipped_disabled} пропущено (disabled).")
print(f"CSV: {OUTPUT_CSV_FILE}")
# Создаём архив 7z
print("\nСоздаём запароленный архив (7z)...")
try:
ok = create_7z_archive(OUTPUT_CSV_FILE, OUTPUT_ARCHIVE, ARCHIVE_PASS_ENV)
if ok:
print(f"Архив создан: {OUTPUT_ARCHIVE}")
else:
print("Не удалось создать архив.", file=sys.stderr)
sys.exit(7)
except Exception as e:
print(f"Ошибка при архивации: {e}", file=sys.stderr)
sys.exit(8)
if __name__ == "__main__":
main()