Загрузка данных
from typing import NamedTuple, Optional
import re
class ReturnFromPostProcessor(NamedTuple):
is_operation_successful: bool
next_operation_id: Optional[str]
local_vars: dict
global_vars: dict
def post_processor(
response, operation, user_vars, listener, user_class, *args, **kwargs
) -> ReturnFromPostProcessor:
if hasattr(response, "status_code") and response.status_code == 200:
if hasattr(response, "headers") and ('set-cookie' in response.headers) and (len(response.headers["set-cookie"]) > 1):
try:
# Безопасно извлекаем JSON и проверяем структуру success
resp_json = response.json()
if not isinstance(resp_json, dict):
return ReturnFromPostProcessor(False, None, {}, {})
success_data = resp_json.get("success")
if not isinstance(success_data, dict):
return ReturnFromPostProcessor(False, None, {}, {})
sessionId = success_data.get("sbbolSessionId")
# Проверяем наличие обязательного заголовка csrftoken
if "csrftoken" not in response.headers:
return ReturnFromPostProcessor(False, None, {}, {})
csrftoken = response.headers["csrftoken"]
# Логика разбора конфигурации (кавычки исправлены на прямые)
conf = f"{user_class.environment.config=}"
position = conf.find("UC51_prelogin_mob_AUTH_pp.py")
if position == -1:
return ReturnFromPostProcessor(False, None, {}, {})
pacing = conf[position:position+130]
pacing = pacing.split(",")[3]
position = pacing.find("'")
pacing = pacing[position+1:len(pacing)-1]
position = pacing.find("-")
numA = pacing[0:position]
numB = pacing[position+1:len(pacing)]
iteration = 0
total_iterations = 60
# Исправлены кавычки в split
index_header_set_cookies_login = conf.split("UserVar(name='index_header_set_cookies_login', next_value_rule=<NextValueRule.NEVER: 'never'>, select_rule=None, value='")[1][:1]
idx = int(index_header_set_cookies_login)
# Проверяем, что индексов в set-cookie достаточно
if len(response.headers["set-cookie"]) > idx + 1:
TOKEN = response.headers["set-cookie"][idx]
TOKEN_Split = TOKEN.split(';')[0]
SESSION = response.headers["set-cookie"][idx+1]
SESSION_split = SESSION.split(';')[0]
return ReturnFromPostProcessor(True, 'UC51_POST_rest_wholesale_client', {"numA":numA, "numB":numB, "iteration":iteration, "total_iterations":total_iterations, "UFS-TOKEN": TOKEN_Split,"UFS-SESSION":SESSION_split,"csrftoken":csrftoken,"sessionId":sessionId}, {})
except (ValueError, KeyError, IndexError, AttributeError, TypeError):
# Любые реальные ошибки парсинга прерывают операцию
pass
return ReturnFromPostProcessor(False, None, {}, {})
elif hasattr(response, "status_code") and response.status_code >= 400:
return ReturnFromPostProcessor(False, None, {}, {})
else:
return ReturnFromPostProcessor(False, None, {}, {})