Загрузка данных


#!/usr/bin/env python3

import json
import ssl
import sys
import urllib.error
import urllib.request
from pathlib import Path


# Fill before running.
TT_BASE_URL = "https://portal.works.prod.localnet/swtr"
TT_TOKEN = ""
TT_TICKET = ""

# False: read author only. True: try PATCH shapes on a noise-safe ticket.
RUN_EXECUTOR_UPDATE = False
RUN_OPENAPI_PROBE = False

# Use "createdBy" unless UI proves "reporter" is the business author.
AUTHOR_SOURCE = "createdBy"
VERIFY_SSL = False
TIMEOUT_SECONDS = 30

OPENAPI_PATHS = [
    "v3/api-docs",
    "v3/api-docs/swagger-config",
    "v2/api-docs",
    "api-docs",
    "openapi.json",
    "swagger.json",
    "swagger-resources",
]


OUT = Path(__file__).resolve().parent


def die(message):
    print("[ERROR] " + message, file=sys.stderr)
    raise SystemExit(1)


def parse(text):
    if not text:
        return {}
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return text


def dump(name, value):
    path = OUT / name
    path.write_text(json.dumps(value, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
    print("[OK] wrote " + str(path))


def tt(method, path, payload=None):
    data = None if payload is None else json.dumps(payload, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        TT_BASE_URL.rstrip("/") + "/" + path.lstrip("/"),
        data=data,
        method=method,
        headers={
            "Authorization": "Bearer " + TT_TOKEN,
            "accept": "application/json",
            "Content-Type": "application/json",
            "Cookie": "api_swtr_as21=true",
        },
    )
    context = None if VERIFY_SSL else ssl._create_unverified_context()
    try:
        with urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS, context=context) as resp:
            return resp.status, parse(resp.read().decode("utf-8", errors="replace"))
    except urllib.error.HTTPError as err:
        return err.code, parse(err.read().decode("utf-8", errors="replace"))


def refs(value):
    found = set()
    if isinstance(value, dict):
        ref = value.get("$ref")
        if isinstance(ref, str) and ref.startswith("#/"):
            found.add(ref)
        for child in value.values():
            found.update(refs(child))
    elif isinstance(value, list):
        for child in value:
            found.update(refs(child))
    return found


def pointer(spec, ref):
    cur = spec
    for part in ref[2:].split("/"):
        part = part.replace("~1", "/").replace("~0", "~")
        if not isinstance(cur, dict) or part not in cur:
            return None
        cur = cur[part]
    return cur


def openapi_probe():
    probes = []
    for path in OPENAPI_PATHS:
        status, body = tt("GET", path)
        probe = {"path": path, "http_status": status, "is_openapi": isinstance(body, dict) and isinstance(body.get("paths"), dict)}
        if probe["is_openapi"]:
            selected = {}
            for api_path, path_item in body.get("paths", {}).items():
                text = json.dumps(path_item, ensure_ascii=False).lower()
                if (
                    "unit/v1/update" in api_path.lower()
                    or "assigned_to" in text
                    or "assignee" in text
                    or "исполн" in text
                ):
                    selected[api_path] = path_item
            ref_names = sorted(ref for item in selected.values() for ref in refs(item))
            schemas = {ref: pointer(body, ref) for ref in ref_names}
            probe["selected_paths"] = selected
            probe["selected_schemas"] = schemas
        elif isinstance(body, (dict, list)):
            text = json.dumps(body, ensure_ascii=False)
            probe["body_sample"] = text[:2000]
        probes.append(probe)
    dump("s46.openapi-probe.json", probes)


def get_unit():
    status, body = tt("GET", f"rest/api/unit/v2/{TT_TICKET}?validatorEnabled=false")
    if status < 200 or status >= 300:
        dump("s46.read-error.json", {"http_status": status, "body": body})
        die(f"ticket read failed: HTTP {status}")
    if not isinstance(body, dict):
        die("ticket read response is not a JSON object")
    return body


def attr(unit, code):
    for item in unit.get("attributes") or []:
        if isinstance(item, dict) and item.get("code") == code:
            return item
    selected = unit.get("selected_attributes") or {}
    if isinstance(selected, dict):
        return selected.get(code)
    return None


def login(value):
    if not isinstance(value, dict):
        return ""
    value = value.get("value") if isinstance(value.get("value"), dict) else value
    return str(value.get("login") or "")


def short_attr(item):
    if not isinstance(item, dict):
        return None
    return {
        "code": item.get("code"),
        "name": item.get("name"),
        "type": item.get("type"),
        "value": item.get("value"),
        "valueAsString": item.get("valueAsString"),
    }


def attr_value(unit, code):
    item = attr(unit, code)
    if not isinstance(item, dict):
        return None
    return item.get("value")


def attr_value_or(unit, code, fallback):
    value = attr_value(unit, code)
    return fallback if value is None else value


def status_code(unit):
    value = attr_value(unit, "workflow_status")
    if isinstance(value, dict):
        return value.get("code") or value.get("name") or ""
    return value or ""


def status_object(unit):
    value = attr_value(unit, "workflow_status")
    if isinstance(value, dict):
        return value
    return {"code": value} if value else {}


def unit_code(value):
    if isinstance(value, dict):
        return value.get("code") or ""
    return value or ""


def label_value(unit):
    value = attr_value(unit, "label")
    if isinstance(value, dict) and "sample" in value:
        return value.get("sample") or []
    return [] if value is None else value


def compact(unit):
    assigned_to = attr(unit, "assigned_to")
    reporter = attr(unit, "reporter")
    attrs = [short_attr(attr(unit, c)) for c in ("assigned_to", "reporter", "workflow_status", "due_date", "label")]
    return {
        "code": unit.get("code"),
        "summary": unit.get("summary"),
        "description": unit.get("description"),
        "space": unit.get("space"),
        "suit": unit.get("suit"),
        "createdBy": unit.get("createdBy"),
        "updatedBy": unit.get("updatedBy"),
        "author_candidate_createdBy_login": login(unit.get("createdBy")),
        "attributes": [x for x in attrs if x],
        "assigned_to_login": login(assigned_to),
        "reporter_login": login(reporter),
    }


def pick_author(c):
    if AUTHOR_SOURCE == "createdBy":
        return c["author_candidate_createdBy_login"]
    if AUTHOR_SOURCE == "reporter":
        return c["reporter_login"]
    die('AUTHOR_SOURCE must be "createdBy" or "reporter"')


def flat_update_body(unit, author, workflow_status):
    return {
        "assigned_to": author,
        "description": unit.get("description") or "",
        "due_date": attr_value_or(unit, "due_date", None),
        "label": label_value(unit),
        "suit": unit_code(unit.get("suit")),
        "summary": unit.get("summary") or "",
        "workflow_status": workflow_status,
    }


def update_attempts(unit, author):
    by_status_code = flat_update_body(unit, author, status_code(unit))
    by_status_object = flat_update_body(unit, author, status_object(unit))
    return [
        ("v2_flat_required_status_code", f"rest/api/unit/v2/update/{TT_TICKET}", by_status_code),
        ("v2_flat_required_status_object", f"rest/api/unit/v2/update/{TT_TICKET}", by_status_object),
        ("v1_flat_required_status_code", f"rest/api/unit/v1/update/{TT_TICKET}", by_status_code),
        ("v1_flat_required_status_object", f"rest/api/unit/v1/update/{TT_TICKET}", by_status_object),
    ]


def main():
    if not TT_TOKEN or not TT_TICKET:
        die("fill TT_TOKEN and TT_TICKET at the top of the script")

    if RUN_OPENAPI_PROBE:
        openapi_probe()

    before_unit = get_unit()
    before = compact(before_unit)
    dump("s46.author.compact.json", before)

    author = pick_author(before)
    if not author:
        die("author login is empty")
    print("[INFO] author_login=" + author)
    print("[INFO] assigned_to_before=" + before["assigned_to_login"])

    if not RUN_EXECUTOR_UPDATE:
        print("[OK] read-only run complete")
        return

    attempts = []
    for name, update_path, request_body in update_attempts(before_unit, author):
        status, body = tt("PATCH", update_path, request_body)
        after = compact(get_unit()) if 200 <= status < 300 else {}
        matches = after.get("assigned_to_login") == author
        attempts.append({
            "name": name,
            "path": update_path,
            "http_status": status,
            "ok": 200 <= status < 300,
            "matches": matches,
            "request_body": request_body,
            "body": body,
            "assigned_to_after": after.get("assigned_to_login", ""),
        })
        dump("s46.executor-update.attempts.json", attempts)
        print("[INFO] " + name + " http=" + str(status) + " assigned_to_after=" + after.get("assigned_to_login", "") + " matches=" + str(matches))
        if matches:
            after["executor_update_matches_author"] = True
            after["executor_update_payload_shape"] = name
            dump("s46.after.compact.json", after)
            print("[OK] executor update worked with " + name)
            return

    die("no tested PATCH shape changed assigned_to to author")


if __name__ == "__main__":
    main()