Загрузка данных
#!/usr/bin/env python3
import json
import ssl
import sys
import urllib.error
import urllib.request
from pathlib import Path
# Fill before running.
TT_BASE_URL = "https://portal.works.prod.localnet/swtr"
TT_TOKEN = ""
TT_TICKET = ""
# False: read author only. True: try PATCH shapes on a noise-safe ticket.
RUN_EXECUTOR_UPDATE = False
RUN_OPENAPI_PROBE = False
# Use "createdBy" unless UI proves "reporter" is the business author.
AUTHOR_SOURCE = "createdBy"
VERIFY_SSL = False
TIMEOUT_SECONDS = 30
OPENAPI_PATHS = [
"v3/api-docs",
"v3/api-docs/swagger-config",
"v2/api-docs",
"api-docs",
"openapi.json",
"swagger.json",
"swagger-resources",
]
OUT = Path(__file__).resolve().parent
HTTP_METHODS = ("get", "put", "post", "delete", "patch", "options", "head", "trace")
def die(message):
print("[ERROR] " + message, file=sys.stderr)
raise SystemExit(1)
def parse(text):
if not text:
return {}
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def dump(name, value):
path = OUT / name
path.write_text(json.dumps(value, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print("[OK] wrote " + str(path))
def tt(method, path, payload=None):
data = None if payload is None else json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
TT_BASE_URL.rstrip("/") + "/" + path.lstrip("/"),
data=data,
method=method,
headers={
"Authorization": "Bearer " + TT_TOKEN,
"accept": "application/json",
"Content-Type": "application/json",
"Cookie": "api_swtr_as21=true",
},
)
context = None if VERIFY_SSL else ssl._create_unverified_context()
try:
with urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS, context=context) as resp:
return resp.status, parse(resp.read().decode("utf-8", errors="replace"))
except urllib.error.HTTPError as err:
return err.code, parse(err.read().decode("utf-8", errors="replace"))
def compact_text(value, limit=240):
if value is None:
return ""
text = " ".join(str(value).split())
return text[:limit] + "..." if len(text) > limit else text
def path_summary(api_path, path_item):
item = {"path": api_path, "descriptions": []}
if not isinstance(path_item, dict):
return item
for method in HTTP_METHODS:
operation = path_item.get(method)
if not isinstance(operation, dict):
continue
description = {
"method": method.upper(),
"summary": compact_text(operation.get("summary")),
"description": compact_text(operation.get("description")),
}
item["descriptions"].append({k: v for k, v in description.items() if v})
return item
def is_interesting_path(api_path, path_item):
low_path = api_path.lower()
text = json.dumps(path_item, ensure_ascii=False).lower()
return (
"/rest/api/unit/v1/update/" in low_path
or "/rest/api/unit/v2/update/" in low_path
or "assigned_to" in text
or "assignee" in text
or "исполн" in text
)
def openapi_probe():
probes = []
for path in OPENAPI_PATHS:
status, body = tt("GET", path)
probe = {"path": path, "http_status": status, "is_openapi": isinstance(body, dict) and isinstance(body.get("paths"), dict)}
if probe["is_openapi"]:
selected = []
for api_path, path_item in body.get("paths", {}).items():
if is_interesting_path(api_path, path_item):
selected.append(path_summary(api_path, path_item))
probe["openapi"] = body.get("openapi") or body.get("swagger")
info = body.get("info") if isinstance(body.get("info"), dict) else {}
probe["title"] = info.get("title", "")
probe["version"] = info.get("version", "")
probe["selected_path_count"] = len(selected)
probe["selected_paths"] = selected
elif isinstance(body, dict):
probe["body_keys"] = sorted(str(k) for k in body.keys())
elif isinstance(body, list):
probe["body_type"] = "list"
probe["body_count"] = len(body)
probes.append(probe)
dump("s46.openapi-probe.json", probes)
def get_unit():
status, body = tt("GET", f"rest/api/unit/v2/{TT_TICKET}?validatorEnabled=false")
if status < 200 or status >= 300:
dump("s46.read-error.json", {"http_status": status, "body": body})
die(f"ticket read failed: HTTP {status}")
if not isinstance(body, dict):
die("ticket read response is not a JSON object")
return body
def attr(unit, code):
for item in unit.get("attributes") or []:
if isinstance(item, dict) and item.get("code") == code:
return item
selected = unit.get("selected_attributes") or {}
if isinstance(selected, dict):
return selected.get(code)
return None
def login(value):
if not isinstance(value, dict):
return ""
value = value.get("value") if isinstance(value.get("value"), dict) else value
return str(value.get("login") or "")
def short_attr(item):
if not isinstance(item, dict):
return None
return {
"code": item.get("code"),
"name": item.get("name"),
"type": item.get("type"),
"value": item.get("value"),
"valueAsString": item.get("valueAsString"),
}
def attr_value(unit, code):
item = attr(unit, code)
if not isinstance(item, dict):
return None
return item.get("value")
def attr_value_or(unit, code, fallback):
value = attr_value(unit, code)
return fallback if value is None else value
def status_code(unit):
value = attr_value(unit, "workflow_status")
if isinstance(value, dict):
return value.get("code") or value.get("name") or ""
return value or ""
def status_object(unit):
value = attr_value(unit, "workflow_status")
if isinstance(value, dict):
return value
return {"code": value} if value else {}
def unit_code(value):
if isinstance(value, dict):
return value.get("code") or ""
return value or ""
def label_value(unit):
value = attr_value(unit, "label")
if isinstance(value, dict) and "sample" in value:
return value.get("sample") or []
return [] if value is None else value
def compact(unit):
assigned_to = attr(unit, "assigned_to")
reporter = attr(unit, "reporter")
attrs = [short_attr(attr(unit, c)) for c in ("assigned_to", "reporter", "workflow_status", "due_date", "label")]
return {
"code": unit.get("code"),
"summary": unit.get("summary"),
"description": unit.get("description"),
"space": unit.get("space"),
"suit": unit.get("suit"),
"createdBy": unit.get("createdBy"),
"updatedBy": unit.get("updatedBy"),
"author_candidate_createdBy_login": login(unit.get("createdBy")),
"attributes": [x for x in attrs if x],
"assigned_to_login": login(assigned_to),
"reporter_login": login(reporter),
}
def pick_author(c):
if AUTHOR_SOURCE == "createdBy":
return c["author_candidate_createdBy_login"]
if AUTHOR_SOURCE == "reporter":
return c["reporter_login"]
die('AUTHOR_SOURCE must be "createdBy" or "reporter"')
def flat_update_body(unit, author, workflow_status):
return {
"assigned_to": author,
"description": unit.get("description") or "",
"due_date": attr_value_or(unit, "due_date", None),
"label": label_value(unit),
"suit": unit_code(unit.get("suit")),
"summary": unit.get("summary") or "",
"workflow_status": workflow_status,
}
def update_attempts(unit, author):
by_status_code = flat_update_body(unit, author, status_code(unit))
by_status_object = flat_update_body(unit, author, status_object(unit))
v2_path = f"rest/api/unit/v2/update/{TT_TICKET}"
v1_path = f"rest/api/unit/v1/update/{TT_TICKET}"
return [
("v2_attributes_dict_string", v2_path, {"attributes": {"assigned_to": author}}),
("v2_attributes_dict_user", v2_path, {"attributes": {"assigned_to": {"login": author}}}),
("v2_top_assigned_to_string", v2_path, {"assigned_to": author}),
("v2_flat_required_status_code", v2_path, by_status_code),
("v2_flat_required_status_object", v2_path, by_status_object),
("v1_flat_required_status_code", v1_path, by_status_code),
("v1_flat_required_status_object", v1_path, by_status_object),
]
def main():
if not TT_TOKEN or not TT_TICKET:
die("fill TT_TOKEN and TT_TICKET at the top of the script")
if RUN_OPENAPI_PROBE:
openapi_probe()
before_unit = get_unit()
before = compact(before_unit)
dump("s46.author.compact.json", before)
author = pick_author(before)
if not author:
die("author login is empty")
print("[INFO] author_login=" + author)
print("[INFO] assigned_to_before=" + before["assigned_to_login"])
if not RUN_EXECUTOR_UPDATE:
print("[OK] read-only run complete")
return
attempts = []
for name, update_path, request_body in update_attempts(before_unit, author):
status, body = tt("PATCH", update_path, request_body)
after = compact(get_unit()) if 200 <= status < 300 else {}
matches = after.get("assigned_to_login") == author
attempts.append({
"name": name,
"path": update_path,
"http_status": status,
"ok": 200 <= status < 300,
"matches": matches,
"request_body": request_body,
"body": body,
"assigned_to_after": after.get("assigned_to_login", ""),
})
dump("s46.executor-update.attempts.json", attempts)
print("[INFO] " + name + " http=" + str(status) + " assigned_to_after=" + after.get("assigned_to_login", "") + " matches=" + str(matches))
if matches:
after["executor_update_matches_author"] = True
after["executor_update_payload_shape"] = name
dump("s46.after.compact.json", after)
print("[OK] executor update worked with " + name)
return
die("no tested PATCH shape changed assigned_to to author")
if __name__ == "__main__":
main()