Загрузка данных
#!/usr/bin/env python3
import argparse
import base64
import os
import hvac
from dotenv import load_dotenv
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
VAULT_MOUNT = "it-infradev"
# ============================================================
# LOGGING
# ============================================================
def info(msg: str):
print(f"[OK] {msg}")
# ============================================================
# VAULT LOGIN
# ============================================================
def vault_login():
client = hvac.Client(
url=os.environ["VAULT_ADDR"]
)
login = client.auth.ldap.login(
username=os.environ["VAULT_USERNAME"],
password=os.environ["VAULT_PASSWORD"]
)
client.token = login["auth"]["client_token"]
if not client.is_authenticated():
raise RuntimeError("Vault authentication failed")
return client
# ============================================================
# VAULT READ
# ============================================================
def read_secret(client, path: str):
response = client.secrets.kv.v2.read_secret_version(
mount_point=VAULT_MOUNT,
path=path
)
return response["data"]["data"]
# ============================================================
# WRITE HELPERS
# ============================================================
def write_text(path: Path, content: str):
path.parent.mkdir(parents=True, exist_ok=True)
if content is None:
raise ValueError(f"Empty text for {path}")
path.write_text(content)
def write_binary(path: Path, content: bytes):
path.parent.mkdir(parents=True, exist_ok=True)
if content is None:
raise ValueError(f"Empty binary for {path}")
path.write_bytes(content)
# ============================================================
# WEBSERVER BUILDER
# ============================================================
def prepare_webserver(build_dir: Path, webserver: dict):
wd = build_dir / "webserver"
wd.mkdir(parents=True, exist_ok=True)
# ---------------- CERTS ----------------
if webserver.get("tls.crt"):
write_text(wd / "tls.crt", webserver["tls.crt"])
if webserver.get("tls.key"):
write_text(wd / "tls.key", webserver["tls.key"])
# ---------------- KEYTAB ----------------
keytab = webserver.get("infradev.keytab")
if keytab:
write_binary(
wd / "infradev.keytab",
base64.b64decode(keytab)
)
# ---------------- VHOST ----------------
templates_dir = Path("apache-ma/conf/sites-enabled")
jinja_env = Environment(
loader=FileSystemLoader(str(templates_dir)),
autoescape=False
)
template = jinja_env.get_template("mailarchive.conf.j2")
rendered = template.render(**webserver)
write_text(wd / "vhost.conf", rendered)
info("WEB server configured")
# ============================================================
# MAIN
# ============================================================
def main():
load_dotenv()
parser = argparse.ArgumentParser()
parser.add_argument("--env", choices=["dev", "stage", "prod"], required=True)
args = parser.parse_args()
env = args.env
client = vault_login()
base = f"projects/mailarchive/{env}"
# ============================================================
# READ SECRETS
# ============================================================
django = read_secret(client, f"{base}/django")
webserver = read_secret(client, f"{base}/webserver")
k5 = read_secret(client, f"{base}/k5-ma-client")
db = read_secret(client, f"{base}/db")
build_dir = Path("build")
# ============================================================
# DJANGO
# ============================================================
django_env = django.get(".env")
if not django_env:
raise RuntimeError("Missing django .env")
write_text(
build_dir / "django-ma/src" / ".env",
django_env
)
info("DJANGO env applied")
# ============================================================
# KERBEROS
# ============================================================
principal = k5.get("KRB_PRINCIPAL")
keytab_b64 = k5.get("mailarchive.keytab")
if not principal or not keytab_b64:
raise RuntimeError("Invalid K5 secret")
write_text(
build_dir / "kerberos-ma" / "KRB_PRINCIPAL",
principal
)
write_binary(
build_dir / "k5-ma-client" / "mailarchive.keytab",
base64.b64decode(keytab_b64)
)
info("K5 kerberos applied")
# ============================================================
# DB
# ============================================================
pg_conf = db.get("pg_service.conf")
if not pg_conf:
raise RuntimeError("Missing db pg_service.conf")
write_text(
build_dir / "db" / "pg_service.conf",
pg_conf
)
info("DB config applied")
# ============================================================
# WEBSERVER
# ============================================================
prepare_webserver(build_dir, webserver)
# ============================================================
# DONE
# ============================================================
info("ALL SECRETS MATERIALIZED SUCCESSFULLY")
info(f"ENV: {env}")
info(f"OUTPUT: {build_dir.resolve()}")
if __name__ == "__main__":
main()