Загрузка данных
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import os
import shutil
import tarfile
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
DEFAULT_OPERATIONS = ("etl_inbox", "etl_ttr", "etl_ttcom")
DEFAULT_DECISION_LOG_ARTIFACT = ".apa_etl_work/state/etl_decisions_events.jsonl"
DEFAULT_TTCOM_TASKTRACKER_ACTION_LOG_ARTIFACT = ".apa_etl_work/state/ttcom_tasktracker_actions.jsonl"
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z")
def parse_mapping(value: str) -> tuple[str, str]:
if "=" not in value:
raise argparse.ArgumentTypeError(f"expected NAME=VALUE, got: {value}")
name, mapped = value.split("=", 1)
name = name.strip()
if not name:
raise argparse.ArgumentTypeError(f"mapping name is empty: {value}")
return name, mapped
def mappings(values: list[str] | None) -> dict[str, str]:
result: dict[str, str] = {}
for raw in values or []:
name, mapped = parse_mapping(raw)
result[name] = mapped
return result
def load_json(path: Path) -> dict[str, Any]:
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return data if isinstance(data, dict) else {}
def child_builds(values: list[str] | None) -> dict[str, dict[str, Any]]:
result: dict[str, dict[str, Any]] = {}
for raw in values or []:
operation, path = parse_mapping(raw)
result[operation] = load_json(Path(path))
return result
def artifact_url(child: dict[str, Any], artifact_path: str) -> str:
base = str(child.get("absolute_url") or "").strip()
if not base:
return ""
quoted = "/".join(urllib.parse.quote(part) for part in artifact_path.strip("/").split("/"))
return f"{base.rstrip('/')}/artifact/{quoted}"
def auth_headers() -> dict[str, str]:
header = os.environ.get("APA_ETL_CHILD_ARTIFACT_AUTH_HEADER", "").strip()
if header:
return {"Authorization": header}
user = os.environ.get("JENKINS_USER", "")
token = os.environ.get("JENKINS_TOKEN", "") or os.environ.get("JENKINS_PASSWORD", "")
if user and token:
raw = f"{user}:{token}".encode("utf-8")
return {"Authorization": "Basic " + base64.b64encode(raw).decode("ascii")}
return {}
def read_http_url(url: str, timeout: float) -> tuple[bool, str, str]:
request = urllib.request.Request(url, headers=auth_headers())
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
payload = response.read()
except urllib.error.HTTPError as exc:
return False, "", f"HTTP {exc.code}"
except Exception as exc:
return False, "", f"{type(exc).__name__}: {exc}"
return True, payload.decode("utf-8", errors="replace"), ""
def read_local_file(path_text: str) -> tuple[bool, str, str]:
path = Path(path_text)
if not path.is_file():
return False, "", "file not found"
try:
return True, path.read_text(encoding="utf-8", errors="replace"), ""
except Exception as exc:
return False, "", f"{type(exc).__name__}: {exc}"
def read_source(source: str, timeout: float) -> tuple[bool, str, str]:
if not source:
return False, "", "source not available"
parsed = urllib.parse.urlparse(source)
if parsed.scheme in {"http", "https"}:
return read_http_url(source, timeout)
if parsed.scheme == "file":
return read_local_file(urllib.request.url2pathname(parsed.path))
return read_local_file(source)
def source_candidates(local_source: str, remote_source: str) -> list[str]:
result = []
for source in (local_source, remote_source):
if source and source not in result:
result.append(source)
return result or [""]
def chain_context(args: argparse.Namespace, operation: str, child: dict[str, Any], source: str) -> dict[str, Any]:
return {
"chain_mode": args.chain_mode,
"parent": {
"job_name": args.parent_job_name,
"job_invoke_id": args.parent_job_invoke_id,
"job_instance_url": args.parent_job_url,
},
"operation": operation,
"child_build": {
"job_name": str(child.get("job_name") or ""),
"build_number": str(child.get("build_number") or ""),
"absolute_url": str(child.get("absolute_url") or ""),
"result": str(child.get("result") or ""),
"duration": child.get("duration", ""),
},
"source_artifact": source,
}
def write_json(path: Path, data: Any) -> None:
path.write_text(json.dumps(data, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def status_event(
args: argparse.Namespace,
operation: str,
child: dict[str, Any],
source: str,
status: str,
reason: str,
log_kind: str = "decision_events",
) -> dict[str, Any]:
return {
"schema": 1,
"event_time_utc": utc_now(),
"etl_job": "etl_all",
"event_type": "chain.child_action_log.collect",
"decision": "collect_child_action_log",
"reason": reason,
"outcome": status,
"request": {
"request_kind": "child_action_log",
"request_key": f"{operation}:{log_kind}",
"status": status,
},
"details": {"log_kind": log_kind},
"chain": chain_context(args, operation, child, source),
}
def combined_entry(
args: argparse.Namespace,
operation: str,
child: dict[str, Any],
source: str,
line_number: int,
line: str,
) -> dict[str, Any]:
try:
data = json.loads(line)
except json.JSONDecodeError as exc:
return {
"schema": 1,
"event_time_utc": utc_now(),
"etl_job": "etl_all",
"event_type": "chain.child_action_log.invalid_json",
"decision": "read_child_action_log_line",
"reason": f"invalid_json: {exc.msg}",
"outcome": "failed",
"request": {
"request_kind": "child_action_log",
"request_key": operation,
"status": "invalid_json",
},
"chain": chain_context(args, operation, child, source),
"source_line_number": line_number,
"raw_line": line,
}
if not isinstance(data, dict):
return {
"schema": 1,
"event_time_utc": utc_now(),
"etl_job": "etl_all",
"event_type": "chain.child_action_log.invalid_json",
"decision": "read_child_action_log_line",
"reason": "json_line_is_not_object",
"outcome": "failed",
"request": {
"request_kind": "child_action_log",
"request_key": operation,
"status": "invalid_json",
},
"chain": chain_context(args, operation, child, source),
"source_line_number": line_number,
"raw_line": line,
}
result = dict(data)
existing_chain = result.get("chain")
chain = dict(existing_chain) if isinstance(existing_chain, dict) else {}
chain.update(chain_context(args, operation, child, source))
result["chain"] = chain
result["source_line_number"] = line_number
return result
def write_jsonl_entry(handle: Any, entry: dict[str, Any]) -> None:
handle.write(json.dumps(entry, ensure_ascii=False, sort_keys=True, separators=(",", ":")))
handle.write("\n")
def create_tarball(output_dir: Path, tar_gz: Path) -> None:
tar_gz.parent.mkdir(parents=True, exist_ok=True)
if tar_gz.exists() or tar_gz.is_symlink():
tar_gz.unlink()
with tarfile.open(tar_gz, "w:gz") as tar:
for path in sorted(output_dir.rglob("*")):
tar.add(path, arcname=str(Path(output_dir.name) / path.relative_to(output_dir)))
def collect_log_artifact(
args: argparse.Namespace,
combined: Any,
output_dir: Path,
manifest: dict[str, Any],
*,
operation: str,
child: dict[str, Any],
sources: list[str],
output_file_name: str,
log_kind: str,
emit_missing: bool,
) -> None:
if not any(sources) and not emit_missing:
return
source = sources[-1] if sources else ""
ok = False
content = ""
source_attempts = []
for candidate_source in sources:
source = candidate_source
ok, content, error = read_source(candidate_source, args.http_timeout)
source_attempts.append(
{
"source_artifact": candidate_source,
"status": "collected" if ok else "missing",
"reason": "child action log collected" if ok else error,
}
)
if ok:
break
raw_output = output_dir / output_file_name
fetch_status = "collected" if ok else "missing"
if ok:
reason = "child action log collected"
else:
reason = "; ".join(
f"{attempt['source_artifact'] or '(empty source)'}: {attempt['reason']}"
for attempt in source_attempts
)
write_jsonl_entry(combined, status_event(args, operation, child, source, fetch_status, reason, log_kind))
manifest_item = {
"operation": operation,
"log_kind": log_kind,
"status": fetch_status,
"reason": reason,
"source_artifact": source,
"source_attempts": source_attempts,
"raw_output": str(raw_output.relative_to(output_dir.parent)) if ok else "",
"child_build": child,
}
if ok:
raw_output.write_text(content, encoding="utf-8")
line_count = 0
for line_number, line in enumerate(content.splitlines(), start=1):
if not line.strip():
continue
line_count += 1
write_jsonl_entry(
combined,
combined_entry(args, operation, child, source, line_number, line),
)
manifest_item["line_count"] = line_count
elif emit_missing:
error_path = output_dir / f"{operation}.{log_kind}.fetch_error.json"
write_json(
error_path,
{
"schema": 1,
"kind": "etl_child_action_log_fetch_error",
"generated_at_utc": utc_now(),
"operation": operation,
"log_kind": log_kind,
"source_artifact": source,
"source_attempts": source_attempts,
"reason": reason,
"child_build": child,
},
)
manifest_item["error_output"] = str(error_path.relative_to(output_dir.parent))
manifest["artifacts"].append(manifest_item)
def collect(args: argparse.Namespace) -> int:
output_dir = Path(args.output_dir)
if output_dir.exists():
shutil.rmtree(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
output_jsonl = Path(args.output_jsonl)
output_jsonl.parent.mkdir(parents=True, exist_ok=True)
children = child_builds(args.child_build)
local_logs = mappings(args.local_log)
artifact_paths = mappings(args.artifact_path)
local_tasktracker_action_logs = mappings(args.local_tasktracker_action_log)
tasktracker_action_artifact_paths = mappings(args.tasktracker_action_artifact_path)
operations = list(args.operation or DEFAULT_OPERATIONS)
manifest: dict[str, Any] = {
"schema": 1,
"kind": "etl_chain_action_log_manifest",
"generated_at_utc": utc_now(),
"chain_mode": args.chain_mode,
"parent": {
"job_name": args.parent_job_name,
"job_invoke_id": args.parent_job_invoke_id,
"job_instance_url": args.parent_job_url,
},
"artifacts": [],
}
with output_jsonl.open("w", encoding="utf-8") as combined:
for operation in operations:
child = children.get(operation, {})
artifact_path = artifact_paths.get(operation, DEFAULT_DECISION_LOG_ARTIFACT)
sources = source_candidates(local_logs.get(operation, ""), artifact_url(child, artifact_path))
collect_log_artifact(
args,
combined,
output_dir,
manifest,
operation=operation,
child=child,
sources=sources,
output_file_name=f"{operation}.etl_decisions_events.jsonl",
log_kind="decision_events",
emit_missing=True,
)
if operation == "etl_ttcom" and (
operation in local_tasktracker_action_logs or operation in tasktracker_action_artifact_paths
):
tasktracker_artifact_path = tasktracker_action_artifact_paths.get(
operation,
DEFAULT_TTCOM_TASKTRACKER_ACTION_LOG_ARTIFACT,
)
tasktracker_sources = source_candidates(
local_tasktracker_action_logs.get(operation, ""),
artifact_url(child, tasktracker_artifact_path),
)
collect_log_artifact(
args,
combined,
output_dir,
manifest,
operation=operation,
child=child,
sources=tasktracker_sources,
output_file_name="etl_ttcom.ttcom_tasktracker_actions.jsonl",
log_kind="ttcom_tasktracker_actions",
emit_missing=True,
)
write_json(output_dir / "manifest.json", manifest)
if args.tar_gz:
create_tarball(output_dir, Path(args.tar_gz))
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Collect child ETL decision/event logs into a chain action log.")
parser.add_argument("--chain-mode", choices=("remote", "local"), required=True)
parser.add_argument("--output-dir", required=True)
parser.add_argument("--output-jsonl", required=True)
parser.add_argument("--tar-gz", default="")
parser.add_argument("--parent-job-name", default="")
parser.add_argument("--parent-job-invoke-id", default="")
parser.add_argument("--parent-job-url", default="")
parser.add_argument("--operation", action="append", default=[])
parser.add_argument("--child-build", action="append", default=[], help="Operation child metadata as OPERATION=path.json")
parser.add_argument("--local-log", action="append", default=[], help="Local action log source as OPERATION=path")
parser.add_argument("--artifact-path", action="append", default=[], help="Child artifact path override as OPERATION=path")
parser.add_argument(
"--local-tasktracker-action-log",
action="append",
default=[],
help="Local TTCOM TaskTracker HTTP/action trace source as OPERATION=path",
)
parser.add_argument(
"--tasktracker-action-artifact-path",
action="append",
default=[],
help="Child TTCOM TaskTracker HTTP/action trace artifact path as OPERATION=path",
)
parser.add_argument("--http-timeout", type=float, default=10.0)
return collect(parser.parse_args())
if __name__ == "__main__":
raise SystemExit(main())