Загрузка данных
async def diagnostics_of_signals_after_initialization(
ws_client,
cfg: SmokeSuiteConfig,
):
"""
Проверка выходных сигналов после окончания режим Инициализация по причине "холодного" пуска СОУ.
"""
with allure.step("Подписка на сигналы для участков"):
payload = await t_utils.connect_and_subscribe_msg(
ws_client,
"OutputSignalsInfo",
"SubscribeOutputSignalsRequest",
{
'objects': {
'linearParts': [],
'controlledSites': [
SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
SiteKpKp.BACKUP_ROUTE_PONURA.value,
SiteKpKp.BACKUP_ROUTE_KUBAN.value,
SiteKpKp.NPZ_AFIPSKIJ.value,
SiteKpKp.NPZ_ILINSKIJ.value,
],
},
'signalTypes': 1023,
'tuId': cfg.tu_id,
'additionalProperties': None,
},
)
parsed_payload = parser.parse_output_signals_info_msg(payload)
controlled_site_dict = {
"controlled_site_first": SiteKpKp.TIXORECZKAYA_NOVOVELICHKOVSKAYA.value,
"controlled_site_second": SiteKpKp.NOVOVELICHKOVSKAYA_KRYMSKAYA.value,
"controlled_site_third": SiteKpKp.KRYMSKAYA_GRUSHOVAYA.value,
"controlled_site_fourth": SiteKpKp.BACKUP_ROUTE_BEJSUG.value,
"controlled_site_fifth": SiteKpKp.BACKUP_ROUTE_PONURA.value,
"controlled_site_sixth": SiteKpKp.BACKUP_ROUTE_KUBAN.value,
"controlled_site_seventh": SiteKpKp.NPZ_AFIPSKIJ.value,
"controlled_site_eight": SiteKpKp.NPZ_ILINSKIJ.value,
}
controlled_site_messages = {}
for name, key in controlled_site_dict.items():
controlled_site_messages[name] = t_utils.find_object_by_a_few_fields(
parsed_payload.replyContent.controlledSiteSignals, key
)
all_signals = {}
for site_name, site_message in controlled_site_messages.items():
signal_dict = {'pump': None, 'sou': None, 'gravity': None}
if site_message:
all_signals[site_name] = {
'pump': t_utils.get_signal(site_message, SignalType.REGLU),
'sou': t_utils.get_signal(site_message, SignalType.REGSOU),
'gravity': t_utils.get_signal(site_message, SignalType.GRAVITYPIPE),
}
else:
all_signals[site_name] = signal_dict
first_kp_kp = all_signals.get("controlled_site_first")
if first_kp_kp:
first_site_signal_pump = get_value(first_kp_kp.get("pump"))
first_site_signal_sou = get_value(first_kp_kp.get("sou"))
first_site_signal_gravity = get_value(first_kp_kp.get("gravity"))
second_kp_kp = all_signals.get("controlled_site_first")
if second_kp_kp:
second_site_signal_pump = get_value(second_kp_kp.get("pump"))
second_site_signal_sou = get_value(second_kp_kp.get("sou"))
second_site_signal_gravity = get_value(second_kp_kp.get("gravity"))
third_kp_kp = all_signals.get("controlled_site_third")
if third_kp_kp:
third_site_signal_pump = get_value(third_kp_kp.get("pump"))
third_site_signal_sou = get_value(third_kp_kp.get("sou"))
third_site_signal_gravity = get_value(third_kp_kp.get("gravity"))
fourth_kp_kp = all_signals.get("controlled_site_fourth")
if fourth_kp_kp:
fourth_site_signal_pump = get_value(fourth_kp_kp.get("pump"))
fourth_site_signal_sou = get_value(fourth_kp_kp.get("sou"))
fourth_site_signal_gravity = get_value(fourth_kp_kp.get("gravity"))
fifth_kp_kp = all_signals.get("controlled_site_fifth")
if fifth_kp_kp:
fifth_site_signal_pump = get_value(fifth_kp_kp.get("pump"))
fifth_site_signal_sou = get_value(fifth_kp_kp.get("sou"))
fifth_site_signal_gravity = get_value(fifth_kp_kp.get("gravity"))
sixth_kp_kp = all_signals.get("controlled_site_sixth")
if sixth_kp_kp:
sixth_site_signal_pump = get_value(sixth_kp_kp.get("pump"))
sixth_site_signal_sou = get_value(sixth_kp_kp.get("sou"))
sixth_site_signal_gravity = get_value(sixth_kp_kp.get("gravity"))
seventh_kp_kp = all_signals.get("controlled_site_seventh")
if seventh_kp_kp:
seventh_site_signal_pump = get_value(seventh_kp_kp.get("pump"))
seventh_site_signal_sou = get_value(seventh_kp_kp.get("sou"))
seventh_site_signal_gravity = get_value(seventh_kp_kp.get("gravity"))
eighth_kp_kp = all_signals.get("controlled_site_eight")
if eighth_kp_kp:
eight_site_signal_pump = get_value(eighth_kp_kp.get("pump"))
eight_site_signal_sou = get_value(eighth_kp_kp.get("sou"))
eight_site_signal_gravity = get_value(eighth_kp_kp.get("gravity"))
with SoftAssertions() as soft_failures:
StepCheck(
"Проверка сигнала - режим МТ на участке Тихорецкая-Нововеличковская",
"Режим МТ",
soft_failures,
).actual(first_site_signal_pump).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Тихорецкая-Нововеличковская",
"Режим СОУ",
soft_failures,
).actual(first_site_signal_sou).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} \n"
f"на участке Тихорецкая-Нововеличковская",
"Количество самотеков",
soft_failures,
).actual(first_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на участке Нововеличковская-Крымская",
"Режим МТ",
soft_failures,
).actual(second_site_signal_pump).expected(str(cfg.exp_novovelichkovskaya_krymskaya_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description}\n"
f"на участке Нововеличковская-Крымская",
"Количество самотеков",
soft_failures,
).actual(second_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Нововеличковская-Крымская",
"Режим СОУ",
soft_failures,
).actual(second_site_signal_sou).expected(str(cfg.exp_tixoreczkaya_novovelichkovskaya_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на участке Крымская-Грушовая",
"Режим МТ",
soft_failures,
).actual(
third_site_signal_pump
).expected(str(cfg.exp_krymskaya_grushovaya_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_true.description} на участке Крымская-Грушовая",
"Количество самотеков",
soft_failures,
).actual(third_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_true.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на участке Крымская-Грушовая",
"Режим СОУ",
soft_failures,
).actual(
third_site_signal_sou
).expected(str(cfg.exp_krymskaya_grushovaya_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Бейсуг",
"Режим МТ",
soft_failures,
).actual(
fourth_site_signal_pump
).expected(str(cfg.exp_backup_route_bejsug_reg_lu)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Бейсуг",
"Количество самотеков",
soft_failures,
).actual(fourth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Бейсуг",
"Режим СОУ",
soft_failures,
).actual(
fourth_site_signal_sou
).expected(str(cfg.exp_backup_route_bejsug_reg_sou)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Понура",
"Режим МТ",
soft_failures,
).actual(
fifth_site_signal_pump
).expected(str(cfg.exp_backup_route_ponura_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Понура",
"Режим СОУ",
soft_failures,
).actual(
fifth_site_signal_sou
).expected(str(cfg.exp_backup_route_ponura_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Понура",
"Количество самотеков",
soft_failures,
).actual(fifth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на резервной нитке Кубань",
"Режим МТ",
soft_failures,
).actual(
sixth_site_signal_pump
).expected(str(cfg.exp_backup_route_kuban_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на резервной нитке Кубань",
"Режим СОУ",
soft_failures,
).actual(
sixth_site_signal_sou
).expected(str(cfg.exp_backup_route_kuban_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на резервной нитке Кубань",
"Количество самотеков",
soft_failures,
).actual(sixth_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на НПЗ Афипский",
"Режим МТ",
soft_failures,
).actual(
seventh_site_signal_pump
).expected(str(cfg.exp_npz_afipskij_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на НПЗ Афипский",
"Режим СОУ",
soft_failures,
).actual(
seventh_site_signal_sou
).expected(str(cfg.exp_npz_afipskij_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Афипский",
"Количество самотеков",
soft_failures,
).actual(seventh_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()
StepCheck(
"Проверка сигнала - режим МТ на НПЗ Ильинский",
"Режим МТ",
soft_failures,
).actual(
eight_site_signal_pump
).expected(str(cfg.exp_npz_ilinskij_reg_lu)).equal_to()
StepCheck(
"Проверка сигнала - режим СОУ на НПЗ Ильинский",
"Режим СОУ",
soft_failures,
).actual(
eight_site_signal_sou
).expected(str(cfg.exp_npz_ilinskij_reg_sou)).equal_to()
StepCheck(
f"Проверка {GravityPipe.expected_lds_status_gravity_false.description} на НПЗ Ильинский",
"Количество самотеков",
soft_failures,
).actual(eight_site_signal_gravity).expected(str(GravityPipe.expected_lds_status_gravity_false.id)).equal_to()