Загрузка данных
from typing import NamedTuple, Optional
class ReturnFromPostProcessor(NamedTuple):
is_operation_successful: bool
next_operation_id: Optional[str]
local_vars: dict
global_vars: dict
def post_processor(
response, operation, user_vars, listener, user_class, *args, **kwargs
) -> ReturnFromPostProcessor:
if hasattr(response, "status_code") and response.status_code == 200:
if hasattr(response, "headers") and ('set-cookie' in response.headers) and (len(response.headers["set-cookie"]) > 0):
try:
# Безопасная проверка JSON структуры success -> srpB
resp_json = response.json()
if not isinstance(resp_json, dict):
return ReturnFromPostProcessor(False, None, {}, {})
success_data = resp_json.get("success")
if not isinstance(success_data, dict):
return ReturnFromPostProcessor(False, None, {}, {})
prelogin = success_data.get("srpB")
conf = f"{user_class.environment.config=}"
index_header_set_cookies_prelogin_Split = conf.split("UserVar(name='index_header_set_cookies_prelogin', next_value_rule=<NextValueRule.NEVER: 'never'>, select_rule=None, value='")[1][:2]
index_header_set_cookies_prelogin = index_header_set_cookies_prelogin_Split.split("'")[0]
idx = int(index_header_set_cookies_prelogin)
if idx < 0:
# Проверяем тип, так как split() есть только у строк
cookie_data = response.headers["set-cookie"]
sbbid_session_id_Split = cookie_data[0].split(";")[0] if isinstance(cookie_data, list) else cookie_data.split(";")[0]
sbbid_session_id = sbbid_session_id_Split.split('=')[1]
else:
if len(response.headers["set-cookie"]) > idx:
sbbid_session_id_Split_List = response.headers["set-cookie"][idx]
sbbid_session_id_Split = sbbid_session_id_Split_List.split(';')[0]
sbbid_session_id = sbbid_session_id_Split.split('=')[1]
else:
return ReturnFromPostProcessor(False, None, {}, {})
return ReturnFromPostProcessor(True, 'UC29_login_AUTH', {"sbbid_session_id": sbbid_session_id_Split, "prelogin": prelogin}, {})
except (ValueError, KeyError, IndexError, AttributeError, TypeError):
pass
return ReturnFromPostProcessor(False, None, {}, {})
elif hasattr(response, "status_code") and response.status_code >= 400:
return ReturnFromPostProcessor(False, None, {}, {})
else:
return ReturnFromPostProcessor(False, None, {}, {})
from typing import NamedTuple, Optional
class ReturnFromPostProcessor(NamedTuple):
is_operation_successful: bool
next_operation_id: Optional[str]
local_vars: dict
global_vars: dict
def post_processor(
response, operation, user_vars, listener, user_class, *args, **kwargs
) -> ReturnFromPostProcessor:
if hasattr(response, "status_code") and response.status_code == 200:
return ReturnFromPostProcessor(True, 'UC29_create_AUTH', {}, {})
else:
return ReturnFromPostProcessor(False, None, {}, {})
from typing import NamedTuple, Optional
class ReturnFromPostProcessor(NamedTuple):
is_operation_successful: bool
next_operation_id: Optional[str]
local_vars: dict
global_vars: dict
def post_processor(
response, operation, user_vars, listener, user_class, *args, **kwargs
) -> ReturnFromPostProcessor:
if hasattr(response, "status_code") and response.status_code == 200:
# Проверяем, что в массиве кук достаточно элементов для индексов 1 и 2
if hasattr(response, "headers") and ('set-cookie' in response.headers) and (len(response.headers["set-cookie"]) > 2):
try:
resp_json = response.json()
p_csrfToken = resp_json.get("csrfToken") if isinstance(resp_json, dict) else None
TOKEN = response.headers["set-cookie"][1]
TOKEN_Split = TOKEN.split(';')[0]
SESSION = response.headers["set-cookie"][2]
SESSION_split = SESSION.split(';')[0]
return ReturnFromPostProcessor(True, 'UC29_decrypt_AUTH', {"p_csrfToken":p_csrfToken,"UFS-TOKEN": TOKEN_Split,"UFS-SESSION":SESSION_split}, {})
except (ValueError, KeyError, IndexError, AttributeError, TypeError):
pass
return ReturnFromPostProcessor(False, None, {}, {})
else:
return ReturnFromPostProcessor(False, None, {}, {})
from typing import NamedTuple, Optional
class ReturnFromPostProcessor(NamedTuple):
is_operation_successful: bool
next_operation_id: Optional[str]
local_vars: dict
global_vars: dict
def post_processor(
response, operation, user_vars, listener, user_class, *args, **kwargs
) -> ReturnFromPostProcessor:
if hasattr(response, "status_code") and response.status_code == 200:
try:
resp_json = response.json()
csrftoken = resp_json.get("data") if isinstance(resp_json, dict) else None
return ReturnFromPostProcessor(True, 'UC29_GET_contacts_WEB', {"csrftoken": csrftoken}, {})
except (ValueError, AttributeError, TypeError):
pass
return ReturnFromPostProcessor(False, None, {}, {})
else:
return ReturnFromPostProcessor(False, None, {}, {})