Загрузка данных
тетс конст
BASIC_MESSAGE_TIMEOUT = 10.0 # Таймаут ожидания сообщений в секундах
SUBSCRIBE_MESSAGE_POLL_ATTEMPTS = 3 # Число чтений из потока подписки до отказа
вс тест утил
from enum import IntEnum, IntFlag
from typing import Any, Callable, List, Optional, Set, Type, TypeVar
Event = TypeVar("Event")
ParsedPayloadType = TypeVar("ParsedPayloadType")
SignalType = TypeVar("SignalType")
после коннект и гет мес
fail(f"Не удалось получить сообщение типа: {ws_invoke_type}. Ошибка: {error}")
async def connect_and_poll_subscribed_signal(
ws_client: WebSocketClient,
ws_message_type: str,
ws_invoke_type: str,
ws_invoke_params: Any,
sensor_id: int,
sensor_description: str,
parse_payload: Callable[[list], ParsedPayloadType],
extract_signals: Callable[[ParsedPayloadType], List[SignalType]],
*,
max_messages: int = TestConst.SUBSCRIBE_MESSAGE_POLL_ATTEMPTS,
timeout: float = TestConst.BASIC_MESSAGE_TIMEOUT,
) -> tuple[ParsedPayloadType, SignalType]:
"""
Подписывается один раз и читает до max_messages сообщений из потока,
пока не найдёт сигнал с нужным id.
"""
await connect(ws_client, ws_invoke_type, ws_invoke_params)
received_ids: Set[int] = set()
try:
for attempt in range(1, max_messages + 1):
with allure.step(
f"Получение сообщения с контентом типа: {ws_message_type} — попытка {attempt} из {max_messages}"
):
payload = await ws_client.receive_by_type(ws_message_type, timeout=timeout)
parsed = parse_payload(payload)
signals = extract_signals(parsed) or []
for signal in signals:
signal_id = getattr(signal, "id", None)
if signal_id is not None:
received_ids.add(signal_id)
target_signal = find_object_by_field(signals, "id", sensor_id)
if target_signal is not None:
allure.attach(
f"Сигнал id={sensor_id} ({sensor_description}) найден на попытке {attempt} из {max_messages}\n"
f"Всего сигналов в сообщении: {len(signals)}\n"
f"Полученные id: {sorted(received_ids)}",
name=f"Результат поиска сигнала в {ws_message_type}",
attachment_type=allure.attachment_type.TEXT,
)
return parsed, target_signal
received_ids_text = ", ".join(str(signal_id) for signal_id in sorted(received_ids)) or "нет"
fail(
f"Информация по датчику {sensor_description} (id={sensor_id}) не пришла в ответе {ws_message_type} "
f"после {max_messages} попыток чтения из потока подписки. Полученные id: {received_ids_text}"
)
except (asyncio.TimeoutError, OSError, ConnectionError, ConnectionResetError) as error:
fail(
f"Не удалось получить сообщение типа: {ws_message_type} для датчика {sensor_description} "
f"(id={sensor_id}). Ошибка: {error}"
)
редж сцен
заменить
payload = await t_utils.connect_and_subscribe_msg(
на
_parsed_payload, target_signal = await t_utils.connect_and_poll_subscribed_signal(
ws_client,
"InputSignalsContent",
"SubscribeInputSignalsRequest",
{
'signalIds': [sensor.id],
'tuId': cfg.tu_id,
'additionalProperties': None,
},
sensor.id,
sensor.description,
parser.parse_input_signals_info_msg,
lambda parsed: parsed.replyContent.inputSignals,
)
далее буртаь старое
parsed_payload = parser.parse_input_signals_info_msg(payload)
sensor_data = parsed_payload.replyContent.inputSignals
target_signal = t_utils.find_object_by_field(sensor_data, "id", sensor.id)
тут с удалением тоже
try:
with allure.step(
f"Подключение по ws, получение данных SchemeSignalsStateContent "
f"для датчика {sensor.description} (id={sensor.id})"
):
_parsed_payload, target_signal = await t_utils.connect_and_poll_subscribed_signal(
ws_client,
"SchemeSignalsStateContent",
"SubscribeSchemeSignalsStateRequest",
{'tuId': cfg.tu_id},
sensor.id,
sensor.description,
parser.parse_scheme_signals_state_msg,
lambda parsed: parsed.replyContent.signalsStates,
)
allure.attach(
str(target_signal),
name=f"Тестируемый фрагмент ответа с бэка: сигнал id={sensor.id} ({sensor.description})",
attachment_type=allure.attachment_type.TEXT,
)
finally:
ws_client.suppress_recv_logging = False
parser.suppress_recv_logging = False
with SoftAssertions() as soft_failures: