Загрузка данных
#!/usr/bin/env python3
import json
import ssl
import sys
import urllib.error
import urllib.request
from pathlib import Path
# Fill before running.
TT_BASE_URL = "https://portal.works.prod.localnet/swtr"
TT_TOKEN = ""
TT_TICKET = ""
# False: read author only. True: try PATCH shapes on a noise-safe ticket.
RUN_EXECUTOR_UPDATE = False
RUN_OPENAPI_PROBE = False
# Use "createdBy" unless UI proves "reporter" is the business author.
AUTHOR_SOURCE = "createdBy"
VERIFY_SSL = False
TIMEOUT_SECONDS = 30
OPENAPI_PATHS = [
"v3/api-docs",
"v3/api-docs/swagger-config",
"v2/api-docs",
"api-docs",
"openapi.json",
"swagger.json",
"swagger-resources",
]
OUT = Path(__file__).resolve().parent
def die(message):
print("[ERROR] " + message, file=sys.stderr)
raise SystemExit(1)
def parse(text):
if not text:
return {}
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def dump(name, value):
path = OUT / name
path.write_text(json.dumps(value, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print("[OK] wrote " + str(path))
def tt(method, path, payload=None):
data = None if payload is None else json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
TT_BASE_URL.rstrip("/") + "/" + path.lstrip("/"),
data=data,
method=method,
headers={
"Authorization": "Bearer " + TT_TOKEN,
"accept": "application/json",
"Content-Type": "application/json",
"Cookie": "api_swtr_as21=true",
},
)
context = None if VERIFY_SSL else ssl._create_unverified_context()
try:
with urllib.request.urlopen(req, timeout=TIMEOUT_SECONDS, context=context) as resp:
return resp.status, parse(resp.read().decode("utf-8", errors="replace"))
except urllib.error.HTTPError as err:
return err.code, parse(err.read().decode("utf-8", errors="replace"))
def refs(value):
found = set()
if isinstance(value, dict):
ref = value.get("$ref")
if isinstance(ref, str) and ref.startswith("#/"):
found.add(ref)
for child in value.values():
found.update(refs(child))
elif isinstance(value, list):
for child in value:
found.update(refs(child))
return found
def pointer(spec, ref):
cur = spec
for part in ref[2:].split("/"):
part = part.replace("~1", "/").replace("~0", "~")
if not isinstance(cur, dict) or part not in cur:
return None
cur = cur[part]
return cur
def openapi_probe():
probes = []
for path in OPENAPI_PATHS:
status, body = tt("GET", path)
probe = {"path": path, "http_status": status, "is_openapi": isinstance(body, dict) and isinstance(body.get("paths"), dict)}
if probe["is_openapi"]:
selected = {}
for api_path, path_item in body.get("paths", {}).items():
text = json.dumps(path_item, ensure_ascii=False).lower()
if (
"unit/v1/update" in api_path.lower()
or "assigned_to" in text
or "assignee" in text
or "исполн" in text
):
selected[api_path] = path_item
ref_names = sorted(ref for item in selected.values() for ref in refs(item))
schemas = {ref: pointer(body, ref) for ref in ref_names}
probe["selected_paths"] = selected
probe["selected_schemas"] = schemas
elif isinstance(body, (dict, list)):
text = json.dumps(body, ensure_ascii=False)
probe["body_sample"] = text[:2000]
probes.append(probe)
dump("s46.openapi-probe.json", probes)
def get_unit():
status, body = tt("GET", f"rest/api/unit/v2/{TT_TICKET}?validatorEnabled=false")
if status < 200 or status >= 300:
dump("s46.read-error.json", {"http_status": status, "body": body})
die(f"ticket read failed: HTTP {status}")
if not isinstance(body, dict):
die("ticket read response is not a JSON object")
return body
def attr(unit, code):
for item in unit.get("attributes") or []:
if isinstance(item, dict) and item.get("code") == code:
return item
selected = unit.get("selected_attributes") or {}
if isinstance(selected, dict):
return selected.get(code)
return None
def login(value):
if not isinstance(value, dict):
return ""
value = value.get("value") if isinstance(value.get("value"), dict) else value
return str(value.get("login") or "")
def short_attr(item):
if not isinstance(item, dict):
return None
return {
"code": item.get("code"),
"name": item.get("name"),
"type": item.get("type"),
"value": item.get("value"),
"valueAsString": item.get("valueAsString"),
}
def attr_value(unit, code):
item = attr(unit, code)
if not isinstance(item, dict):
return None
return item.get("value")
def attr_value_or(unit, code, fallback):
value = attr_value(unit, code)
return fallback if value is None else value
def status_code(unit):
value = attr_value(unit, "workflow_status")
if isinstance(value, dict):
return value.get("code") or value.get("name") or ""
return value or ""
def status_object(unit):
value = attr_value(unit, "workflow_status")
if isinstance(value, dict):
return value
return {"code": value} if value else {}
def unit_code(value):
if isinstance(value, dict):
return value.get("code") or ""
return value or ""
def label_value(unit):
value = attr_value(unit, "label")
if isinstance(value, dict) and "sample" in value:
return value.get("sample") or []
return [] if value is None else value
def compact(unit):
assigned_to = attr(unit, "assigned_to")
reporter = attr(unit, "reporter")
attrs = [short_attr(attr(unit, c)) for c in ("assigned_to", "reporter", "workflow_status", "due_date", "label")]
return {
"code": unit.get("code"),
"summary": unit.get("summary"),
"description": unit.get("description"),
"space": unit.get("space"),
"suit": unit.get("suit"),
"createdBy": unit.get("createdBy"),
"updatedBy": unit.get("updatedBy"),
"author_candidate_createdBy_login": login(unit.get("createdBy")),
"attributes": [x for x in attrs if x],
"assigned_to_login": login(assigned_to),
"reporter_login": login(reporter),
}
def pick_author(c):
if AUTHOR_SOURCE == "createdBy":
return c["author_candidate_createdBy_login"]
if AUTHOR_SOURCE == "reporter":
return c["reporter_login"]
die('AUTHOR_SOURCE must be "createdBy" or "reporter"')
def flat_update_body(unit, author, workflow_status):
return {
"assigned_to": author,
"description": unit.get("description") or "",
"due_date": attr_value_or(unit, "due_date", None),
"label": label_value(unit),
"suit": unit_code(unit.get("suit")),
"summary": unit.get("summary") or "",
"workflow_status": workflow_status,
}
def update_attempts(unit, author):
by_status_code = flat_update_body(unit, author, status_code(unit))
by_status_object = flat_update_body(unit, author, status_object(unit))
return [
("v2_flat_required_status_code", f"rest/api/unit/v2/update/{TT_TICKET}", by_status_code),
("v2_flat_required_status_object", f"rest/api/unit/v2/update/{TT_TICKET}", by_status_object),
("v1_flat_required_status_code", f"rest/api/unit/v1/update/{TT_TICKET}", by_status_code),
("v1_flat_required_status_object", f"rest/api/unit/v1/update/{TT_TICKET}", by_status_object),
]
def main():
if not TT_TOKEN or not TT_TICKET:
die("fill TT_TOKEN and TT_TICKET at the top of the script")
if RUN_OPENAPI_PROBE:
openapi_probe()
before_unit = get_unit()
before = compact(before_unit)
dump("s46.author.compact.json", before)
author = pick_author(before)
if not author:
die("author login is empty")
print("[INFO] author_login=" + author)
print("[INFO] assigned_to_before=" + before["assigned_to_login"])
if not RUN_EXECUTOR_UPDATE:
print("[OK] read-only run complete")
return
attempts = []
for name, update_path, request_body in update_attempts(before_unit, author):
status, body = tt("PATCH", update_path, request_body)
after = compact(get_unit()) if 200 <= status < 300 else {}
matches = after.get("assigned_to_login") == author
attempts.append({
"name": name,
"path": update_path,
"http_status": status,
"ok": 200 <= status < 300,
"matches": matches,
"request_body": request_body,
"body": body,
"assigned_to_after": after.get("assigned_to_login", ""),
})
dump("s46.executor-update.attempts.json", attempts)
print("[INFO] " + name + " http=" + str(status) + " assigned_to_after=" + after.get("assigned_to_login", "") + " matches=" + str(matches))
if matches:
after["executor_update_matches_author"] = True
after["executor_update_payload_shape"] = name
dump("s46.after.compact.json", after)
print("[OK] executor update worked with " + name)
return
die("no tested PATCH shape changed assigned_to to author")
if __name__ == "__main__":
main()