Загрузка данных
#!/usr/bin/env python3
import argparse
import copy
import json
import re
import sys
from pathlib import Path
KEEP_SCHEMA_KEYS = {
"$ref",
"additionalProperties",
"allOf",
"anyOf",
"content",
"enum",
"format",
"in",
"items",
"name",
"nullable",
"oneOf",
"properties",
"required",
"schema",
"type",
}
HTTP_METHODS = {"get", "put", "post", "delete", "patch", "options", "head", "trace"}
def die(message):
print("[ERROR] " + message, file=sys.stderr)
raise SystemExit(2)
def load_json(path):
try:
value = json.loads(Path(path).read_text(encoding="utf-8"))
except FileNotFoundError:
die("input file not found: " + str(path))
except json.JSONDecodeError as exc:
die("input is not valid JSON: " + str(exc))
if not isinstance(value, dict):
die("input OpenAPI schema must be a JSON object")
return value
def write_json(path, value):
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(value, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def compact_text(value, limit=360):
if value is None:
return None
text = re.sub(r"\s+", " ", str(value)).strip()
if len(text) > limit:
return text[:limit] + "..."
return text
def selected_path_reasons(path, path_item):
low_path = path.lower()
text = json.dumps(path_item, ensure_ascii=False, sort_keys=True).lower()
reasons = []
unit_read = low_path in {"/rest/api/unit/v1/{code}", "/rest/api/unit/v2/{code}"}
unit_create = bool(re.fullmatch(r"/rest/api/unit/v\d+/\{[^/]+\}/create", low_path))
unit_update = bool(re.fullmatch(r"/rest/api/unit/v\d+(?:/\{[^/]+\})?/update/\{[^/]+\}", low_path))
if unit_update:
reasons.append("unit_update_path")
if unit_read:
reasons.append("unit_read_context")
if unit_create:
reasons.append("unit_create_context")
if (
(unit_read or unit_create or unit_update)
and ("assigned_to" in text or "assignee" in text or "исполн" in text)
and ("update" in text or "patch" in text)
):
reasons.append("assignment_update_text")
return reasons
def collect_refs(value):
result = set()
if isinstance(value, dict):
ref = value.get("$ref")
if isinstance(ref, str) and ref.startswith("#/"):
result.add(ref)
for child in value.values():
result.update(collect_refs(child))
elif isinstance(value, list):
for child in value:
result.update(collect_refs(child))
return result
def pointer_parts(ref):
return [part.replace("~1", "/").replace("~0", "~") for part in ref[2:].split("/")]
def get_pointer(spec, ref):
current = spec
for part in pointer_parts(ref):
if not isinstance(current, dict) or part not in current:
return None
current = current[part]
return current
def set_pointer(root, ref, value):
current = root
parts = pointer_parts(ref)
for part in parts[:-1]:
current = current.setdefault(part, {})
current[parts[-1]] = value
def trim_schema(value):
if isinstance(value, list):
return [trim_schema(item) for item in value]
if not isinstance(value, dict):
return value
result = {}
for key, child in value.items():
if key == "properties" and isinstance(child, dict):
result[key] = {prop_name: trim_schema(prop_schema) for prop_name, prop_schema in child.items()}
elif key == "content" and isinstance(child, dict):
result[key] = {media_type: trim_schema(media) for media_type, media in child.items()}
elif key in KEEP_SCHEMA_KEYS:
result[key] = trim_schema(child)
elif key in {"description", "title"}:
text = compact_text(child)
if text:
result[key] = text
elif key == "discriminator":
result[key] = trim_schema(child)
return result
def trim_parameter(value):
if not isinstance(value, dict):
return value
result = {}
for key in ("name", "in", "required", "schema", "$ref", "description"):
if key in value:
result[key] = trim_schema(value[key]) if key == "schema" else compact_text(value[key]) if key == "description" else copy.deepcopy(value[key])
return result
def trim_operation(operation):
result = {}
for key in ("operationId", "summary", "description", "tags"):
if key in operation:
result[key] = compact_text(operation[key]) if key in {"summary", "description"} else copy.deepcopy(operation[key])
if "parameters" in operation:
result["parameters"] = [trim_parameter(item) for item in operation.get("parameters") or []]
if "requestBody" in operation:
result["requestBody"] = trim_schema(operation["requestBody"])
if "responses" in operation:
responses = {}
for code, response in (operation.get("responses") or {}).items():
if str(code).startswith("2") or str(code) in {"default", "400", "404"}:
responses[code] = trim_schema(response)
result["responses"] = responses
return result
def trim_path_item(path_item):
result = {}
for key, value in path_item.items():
if key in HTTP_METHODS and isinstance(value, dict):
result[key] = trim_operation(value)
elif key == "parameters":
result[key] = [trim_parameter(item) for item in value or []]
return result
def build_extract(spec):
source_paths = spec.get("paths")
if not isinstance(source_paths, dict):
die("input OpenAPI schema has no object-valued paths")
selected = {}
reasons = {}
for path, path_item in source_paths.items():
if not isinstance(path_item, dict):
continue
why = selected_path_reasons(path, path_item)
if why:
selected[path] = trim_path_item(path_item)
reasons[path] = why
refs = set()
for path_item in selected.values():
refs.update(collect_refs(path_item))
components = {}
pending = set(refs)
seen = set()
while pending:
ref = sorted(pending)[0]
pending.remove(ref)
if ref in seen:
continue
seen.add(ref)
value = get_pointer(spec, ref)
if value is None:
continue
trimmed = trim_schema(value)
set_pointer(components, ref, trimmed)
pending.update(collect_refs(trimmed) - seen)
return {
"openapi": spec.get("openapi"),
"swagger": spec.get("swagger"),
"info": spec.get("info"),
"paths": selected,
"components": components.get("components", {}),
"definitions": components.get("definitions", {}),
"_extract_info": {
"purpose": "Minimal Sprint 46 TaskTracker assigned_to update schema extract",
"selected_path_count": len(selected),
"selected_paths": reasons,
"schema_ref_count": len(seen),
"note": "Examples and unrelated paths are removed. Descriptions are compacted. Schema refs are followed from selected paths only.",
},
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Huge TaskTracker OpenAPI JSON schema.")
parser.add_argument("--output", required=True, help="Minimal extract JSON to write.")
args = parser.parse_args()
extract = build_extract(load_json(args.input))
write_json(args.output, extract)
print("S46_TT_OPENAPI_MIN_EXTRACT " + json.dumps(extract["_extract_info"], ensure_ascii=False, sort_keys=True))
if __name__ == "__main__":
main()