# Функция принимает 4 параметра:
# 1. Объект операции (то, вы создавали здесь в UI). Формат операции и ее ключи можно посмотреть в итоговом JSON
# 2. Словарь, имя переменной: объект UserVar с локальными переменными потока
# 3. Список созданных листенеров, например: Kafka, WS, gRPC
# 4. Объект класса UserClass
#
# И должна возвращать 5 параметров:
# 1. Тело запроса (None, если изменять тело в пре-процессоре нет необходимости)
# 2. Заголовки запроса (None, если изменять заголовки в пре-процессоре нет необходимости)
# 3. Словарь, "переменная": "значение" (может быть пустым - {}) для локальных переменных потока
# 4. Словарь, "переменная": "значение" (может быть пустым - {}) для глобальных переменных теста
# 5. True, если необходимо пропустить текущую операцию, False – пропуск не требуется
from typing import NamedTuple, Optional, Union
from uuid import uuid4
from requests import get
from datetime import datetime
class ReturnFromPreProcessor(NamedTuple):
body: Optional[Union[bytes, str]]
headers: Optional[dict[str, str]]
local_vars: dict[str, str]
global_vars: dict[str, str]
need_skip_operation: bool
def pre_processor(operation, user_vars, listeners, user_class, *args, **kwargs) -> ReturnFromPreProcessor:
# ...
# return ReturnFromPreProcessor(
# "body-from-pre-processor",
# {"X-HEADER-FROM-PRE-P": "VALUE!"},
# {"hello-local-var": "from pre p"},
# {},
# False,
# )
boid = str(uuid4())
rqUid = str(uuid4())
date = str(datetime.now().isoformat())
return ReturnFromPreProcessor(
None,
None,
{"boid": boid, "rqUid": rqUid, "date": date},
{},
False,
)