Загрузка данных


#!/usr/bin/env python3

import json
import ssl
import sys
import urllib.error
import urllib.request
from pathlib import Path


# Fill before running.
TT_BASE_URL = "https://portal.works.prod.localnet/swtr"
TT_TOKEN = ""
TT_TICKET = ""

# False: read author only. True: try PATCH shapes on a noise-safe ticket.
RUN_EXECUTOR_UPDATE = False
RUN_OPENAPI_PROBE = False

# Use "createdBy" unless UI proves "reporter" is the business author.
AUTHOR_SOURCE = "createdBy"
VERIFY_SSL = False
TIMEOUT_SECONDS = 30

OPENAPI_PATHS = [
    "v3/api-docs",
    "v3/api-docs/swagger-config",
    "v2/api-docs",
    "api-docs",
    "openapi.json",
    "swagger.json",
    "swagger-resources",
]


OUT = Path(__file__).resolve().parent
HTTP_METHODS = ("get", "put", "post", "delete", "patch", "options", "head", "trace")


def die(message):
    print("[ERROR] " + message, file=sys.stderr)
    raise SystemExit(1)


def parse(text):
    if not text:
        return {}
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        return text


def dump(name, value):
    path = OUT / name
    path.write_text(json.dumps(value, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
    print("[OK] wrote " + str(path))


def tt(method, path, payload=None):
    data = None if payload is None else json.dumps(payload, ensure_ascii=False).encode("utf-8")
    req = urllib.request.Request(
        TT_BASE_URL.rstrip("/") + "/" + path.lstrip("/"),
        data=data,
        method=method,
        headers={
            "Authorization": "Bearer " + TT_TOKEN,
            "accept": "application/json",
            "Content-Type": "application/json",
            "Cookie": "api_swtr_as21=true",
        },
    )
    context = None if VERIFY_SSL else ssl._create_unverified_context()
    try:
        with urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS, context=context) as resp:
            return resp.status, parse(resp.read().decode("utf-8", errors="replace"))
    except urllib.error.HTTPError as err:
        return err.code, parse(err.read().decode("utf-8", errors="replace"))


def compact_text(value, limit=240):
    if value is None:
        return ""
    text = " ".join(str(value).split())
    return text[:limit] + "..." if len(text) > limit else text


def path_summary(api_path, path_item):
    item = {"path": api_path, "descriptions": []}
    if not isinstance(path_item, dict):
        return item
    for method in HTTP_METHODS:
        operation = path_item.get(method)
        if not isinstance(operation, dict):
            continue
        description = {
            "method": method.upper(),
            "summary": compact_text(operation.get("summary")),
            "description": compact_text(operation.get("description")),
        }
        item["descriptions"].append({k: v for k, v in description.items() if v})
    return item


def is_interesting_path(api_path, path_item):
    low_path = api_path.lower()
    text = json.dumps(path_item, ensure_ascii=False).lower()
    return (
        "/rest/api/unit/v1/update/" in low_path
        or "/rest/api/unit/v2/update/" in low_path
        or "assigned_to" in text
        or "assignee" in text
        or "исполн" in text
    )


def openapi_probe():
    probes = []
    for path in OPENAPI_PATHS:
        status, body = tt("GET", path)
        probe = {"path": path, "http_status": status, "is_openapi": isinstance(body, dict) and isinstance(body.get("paths"), dict)}
        if probe["is_openapi"]:
            selected = []
            for api_path, path_item in body.get("paths", {}).items():
                if is_interesting_path(api_path, path_item):
                    selected.append(path_summary(api_path, path_item))
            probe["openapi"] = body.get("openapi") or body.get("swagger")
            info = body.get("info") if isinstance(body.get("info"), dict) else {}
            probe["title"] = info.get("title", "")
            probe["version"] = info.get("version", "")
            probe["selected_path_count"] = len(selected)
            probe["selected_paths"] = selected
        elif isinstance(body, dict):
            probe["body_keys"] = sorted(str(k) for k in body.keys())
        elif isinstance(body, list):
            probe["body_type"] = "list"
            probe["body_count"] = len(body)
        probes.append(probe)
    dump("s46.openapi-probe.json", probes)


def get_unit():
    status, body = tt("GET", f"rest/api/unit/v2/{TT_TICKET}?validatorEnabled=false")
    if status < 200 or status >= 300:
        dump("s46.read-error.json", {"http_status": status, "body": body})
        die(f"ticket read failed: HTTP {status}")
    if not isinstance(body, dict):
        die("ticket read response is not a JSON object")
    return body


def attr(unit, code):
    for item in unit.get("attributes") or []:
        if isinstance(item, dict) and item.get("code") == code:
            return item
    selected = unit.get("selected_attributes") or {}
    if isinstance(selected, dict):
        return selected.get(code)
    return None


def login(value):
    if not isinstance(value, dict):
        return ""
    value = value.get("value") if isinstance(value.get("value"), dict) else value
    return str(value.get("login") or "")


def short_attr(item):
    if not isinstance(item, dict):
        return None
    return {
        "code": item.get("code"),
        "name": item.get("name"),
        "type": item.get("type"),
        "value": item.get("value"),
        "valueAsString": item.get("valueAsString"),
    }


def attr_value(unit, code):
    item = attr(unit, code)
    if not isinstance(item, dict):
        return None
    return item.get("value")


def attr_value_or(unit, code, fallback):
    value = attr_value(unit, code)
    return fallback if value is None else value


def status_code(unit):
    value = attr_value(unit, "workflow_status")
    if isinstance(value, dict):
        return value.get("code") or value.get("name") or ""
    return value or ""


def status_object(unit):
    value = attr_value(unit, "workflow_status")
    if isinstance(value, dict):
        return value
    return {"code": value} if value else {}


def unit_code(value):
    if isinstance(value, dict):
        return value.get("code") or ""
    return value or ""


def label_value(unit):
    value = attr_value(unit, "label")
    if isinstance(value, dict) and "sample" in value:
        return value.get("sample") or []
    return [] if value is None else value


def compact(unit):
    assigned_to = attr(unit, "assigned_to")
    reporter = attr(unit, "reporter")
    attrs = [short_attr(attr(unit, c)) for c in ("assigned_to", "reporter", "workflow_status", "due_date", "label")]
    return {
        "code": unit.get("code"),
        "summary": unit.get("summary"),
        "description": unit.get("description"),
        "space": unit.get("space"),
        "suit": unit.get("suit"),
        "createdBy": unit.get("createdBy"),
        "updatedBy": unit.get("updatedBy"),
        "author_candidate_createdBy_login": login(unit.get("createdBy")),
        "attributes": [x for x in attrs if x],
        "assigned_to_login": login(assigned_to),
        "reporter_login": login(reporter),
    }


def pick_author(c):
    if AUTHOR_SOURCE == "createdBy":
        return c["author_candidate_createdBy_login"]
    if AUTHOR_SOURCE == "reporter":
        return c["reporter_login"]
    die('AUTHOR_SOURCE must be "createdBy" or "reporter"')


def flat_update_body(unit, author, workflow_status):
    return {
        "assigned_to": author,
        "description": unit.get("description") or "",
        "due_date": attr_value_or(unit, "due_date", None),
        "label": label_value(unit),
        "suit": unit_code(unit.get("suit")),
        "summary": unit.get("summary") or "",
        "workflow_status": workflow_status,
    }


def update_attempts(unit, author):
    by_status_code = flat_update_body(unit, author, status_code(unit))
    by_status_object = flat_update_body(unit, author, status_object(unit))
    v2_path = f"rest/api/unit/v2/update/{TT_TICKET}"
    v1_path = f"rest/api/unit/v1/update/{TT_TICKET}"
    return [
        ("v2_attributes_dict_string", v2_path, {"attributes": {"assigned_to": author}}),
        ("v2_attributes_dict_user", v2_path, {"attributes": {"assigned_to": {"login": author}}}),
        ("v2_top_assigned_to_string", v2_path, {"assigned_to": author}),
        ("v2_flat_required_status_code", v2_path, by_status_code),
        ("v2_flat_required_status_object", v2_path, by_status_object),
        ("v1_flat_required_status_code", v1_path, by_status_code),
        ("v1_flat_required_status_object", v1_path, by_status_object),
    ]


def main():
    if not TT_TOKEN or not TT_TICKET:
        die("fill TT_TOKEN and TT_TICKET at the top of the script")

    if RUN_OPENAPI_PROBE:
        openapi_probe()

    before_unit = get_unit()
    before = compact(before_unit)
    dump("s46.author.compact.json", before)

    author = pick_author(before)
    if not author:
        die("author login is empty")
    print("[INFO] author_login=" + author)
    print("[INFO] assigned_to_before=" + before["assigned_to_login"])

    if not RUN_EXECUTOR_UPDATE:
        print("[OK] read-only run complete")
        return

    attempts = []
    for name, update_path, request_body in update_attempts(before_unit, author):
        status, body = tt("PATCH", update_path, request_body)
        after = compact(get_unit()) if 200 <= status < 300 else {}
        matches = after.get("assigned_to_login") == author
        attempts.append({
            "name": name,
            "path": update_path,
            "http_status": status,
            "ok": 200 <= status < 300,
            "matches": matches,
            "request_body": request_body,
            "body": body,
            "assigned_to_after": after.get("assigned_to_login", ""),
        })
        dump("s46.executor-update.attempts.json", attempts)
        print("[INFO] " + name + " http=" + str(status) + " assigned_to_after=" + after.get("assigned_to_login", "") + " matches=" + str(matches))
        if matches:
            after["executor_update_matches_author"] = True
            after["executor_update_payload_shape"] = name
            dump("s46.after.compact.json", after)
            print("[OK] executor update worked with " + name)
            return

    die("no tested PATCH shape changed assigned_to to author")


if __name__ == "__main__":
    main()