Загрузка данных


#!/usr/bin/env python3

import argparse
import copy
import json
import re
import sys
from pathlib import Path


KEEP_SCHEMA_KEYS = {
    "$ref",
    "additionalProperties",
    "allOf",
    "anyOf",
    "content",
    "enum",
    "format",
    "in",
    "items",
    "name",
    "nullable",
    "oneOf",
    "properties",
    "required",
    "schema",
    "type",
}
HTTP_METHODS = {"get", "put", "post", "delete", "patch", "options", "head", "trace"}


def die(message):
    print("[ERROR] " + message, file=sys.stderr)
    raise SystemExit(2)


def load_json(path):
    try:
        value = json.loads(Path(path).read_text(encoding="utf-8"))
    except FileNotFoundError:
        die("input file not found: " + str(path))
    except json.JSONDecodeError as exc:
        die("input is not valid JSON: " + str(exc))
    if not isinstance(value, dict):
        die("input OpenAPI schema must be a JSON object")
    return value


def write_json(path, value):
    out = Path(path)
    out.parent.mkdir(parents=True, exist_ok=True)
    out.write_text(json.dumps(value, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")


def compact_text(value, limit=360):
    if value is None:
        return None
    text = re.sub(r"\s+", " ", str(value)).strip()
    if len(text) > limit:
        return text[:limit] + "..."
    return text


def selected_path_reasons(path, path_item):
    low_path = path.lower()
    text = json.dumps(path_item, ensure_ascii=False, sort_keys=True).lower()
    reasons = []
    unit_read = low_path in {"/rest/api/unit/v1/{code}", "/rest/api/unit/v2/{code}"}
    unit_create = bool(re.fullmatch(r"/rest/api/unit/v\d+/\{[^/]+\}/create", low_path))
    unit_update = bool(re.fullmatch(r"/rest/api/unit/v\d+(?:/\{[^/]+\})?/update/\{[^/]+\}", low_path))

    if unit_update:
        reasons.append("unit_update_path")
    if unit_read:
        reasons.append("unit_read_context")
    if unit_create:
        reasons.append("unit_create_context")
    if (
        (unit_read or unit_create or unit_update)
        and ("assigned_to" in text or "assignee" in text or "исполн" in text)
        and ("update" in text or "patch" in text)
    ):
        reasons.append("assignment_update_text")
    return reasons


def collect_refs(value):
    result = set()
    if isinstance(value, dict):
        ref = value.get("$ref")
        if isinstance(ref, str) and ref.startswith("#/"):
            result.add(ref)
        for child in value.values():
            result.update(collect_refs(child))
    elif isinstance(value, list):
        for child in value:
            result.update(collect_refs(child))
    return result


def pointer_parts(ref):
    return [part.replace("~1", "/").replace("~0", "~") for part in ref[2:].split("/")]


def get_pointer(spec, ref):
    current = spec
    for part in pointer_parts(ref):
        if not isinstance(current, dict) or part not in current:
            return None
        current = current[part]
    return current


def set_pointer(root, ref, value):
    current = root
    parts = pointer_parts(ref)
    for part in parts[:-1]:
        current = current.setdefault(part, {})
    current[parts[-1]] = value


def trim_schema(value):
    if isinstance(value, list):
        return [trim_schema(item) for item in value]
    if not isinstance(value, dict):
        return value

    result = {}
    for key, child in value.items():
        if key == "properties" and isinstance(child, dict):
            result[key] = {prop_name: trim_schema(prop_schema) for prop_name, prop_schema in child.items()}
        elif key == "content" and isinstance(child, dict):
            result[key] = {media_type: trim_schema(media) for media_type, media in child.items()}
        elif key in KEEP_SCHEMA_KEYS:
            result[key] = trim_schema(child)
        elif key in {"description", "title"}:
            text = compact_text(child)
            if text:
                result[key] = text
        elif key == "discriminator":
            result[key] = trim_schema(child)
    return result


def trim_parameter(value):
    if not isinstance(value, dict):
        return value
    result = {}
    for key in ("name", "in", "required", "schema", "$ref", "description"):
        if key in value:
            result[key] = trim_schema(value[key]) if key == "schema" else compact_text(value[key]) if key == "description" else copy.deepcopy(value[key])
    return result


def trim_operation(operation):
    result = {}
    for key in ("operationId", "summary", "description", "tags"):
        if key in operation:
            result[key] = compact_text(operation[key]) if key in {"summary", "description"} else copy.deepcopy(operation[key])
    if "parameters" in operation:
        result["parameters"] = [trim_parameter(item) for item in operation.get("parameters") or []]
    if "requestBody" in operation:
        result["requestBody"] = trim_schema(operation["requestBody"])
    if "responses" in operation:
        responses = {}
        for code, response in (operation.get("responses") or {}).items():
            if str(code).startswith("2") or str(code) in {"default", "400", "404"}:
                responses[code] = trim_schema(response)
        result["responses"] = responses
    return result


def trim_path_item(path_item):
    result = {}
    for key, value in path_item.items():
        if key in HTTP_METHODS and isinstance(value, dict):
            result[key] = trim_operation(value)
        elif key == "parameters":
            result[key] = [trim_parameter(item) for item in value or []]
    return result


def build_extract(spec):
    source_paths = spec.get("paths")
    if not isinstance(source_paths, dict):
        die("input OpenAPI schema has no object-valued paths")

    selected = {}
    reasons = {}
    for path, path_item in source_paths.items():
        if not isinstance(path_item, dict):
            continue
        why = selected_path_reasons(path, path_item)
        if why:
            selected[path] = trim_path_item(path_item)
            reasons[path] = why

    refs = set()
    for path_item in selected.values():
        refs.update(collect_refs(path_item))

    components = {}
    pending = set(refs)
    seen = set()
    while pending:
        ref = sorted(pending)[0]
        pending.remove(ref)
        if ref in seen:
            continue
        seen.add(ref)
        value = get_pointer(spec, ref)
        if value is None:
            continue
        trimmed = trim_schema(value)
        set_pointer(components, ref, trimmed)
        pending.update(collect_refs(trimmed) - seen)

    return {
        "openapi": spec.get("openapi"),
        "swagger": spec.get("swagger"),
        "info": spec.get("info"),
        "paths": selected,
        "components": components.get("components", {}),
        "definitions": components.get("definitions", {}),
        "_extract_info": {
            "purpose": "Minimal Sprint 46 TaskTracker assigned_to update schema extract",
            "selected_path_count": len(selected),
            "selected_paths": reasons,
            "schema_ref_count": len(seen),
            "note": "Examples and unrelated paths are removed. Descriptions are compacted. Schema refs are followed from selected paths only.",
        },
    }


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True, help="Huge TaskTracker OpenAPI JSON schema.")
    parser.add_argument("--output", required=True, help="Minimal extract JSON to write.")
    args = parser.parse_args()

    extract = build_extract(load_json(args.input))
    write_json(args.output, extract)
    print("S46_TT_OPENAPI_MIN_EXTRACT " + json.dumps(extract["_extract_info"], ensure_ascii=False, sort_keys=True))


if __name__ == "__main__":
    main()