Загрузка данных
# modules/Aggregate_Excel.py (ПОЛНОСТЬЮ ИСПРАВЛЕННЫЙ)
import os
import re
import tkinter as tk
import numpy as np
from pathlib import Path
from statistics import mode as stat_mode
from tkinter import filedialog, messagebox, ttk
from modules.database_module import Group, Indicator, MatrixEntry, Session, calculate_indicator_weights
import pandas as pd
from openpyxl import Workbook, load_workbook
from openpyxl.styles import PatternFill
# --- Импорт из модуля базы данных ---
from modules.database_module import Group, Indicator, MatrixEntry, Session
# --- Вспомогательные функции ---
def normalize_value_to_strict(value):
if value is None or str(value).strip() == "":
return "1"
val_str = str(value).replace(",", ".").strip()
try:
num = float(val_str)
if abs(num - 1.0) < 0.1:
return "1"
elif abs(num - 3.0) < 0.1:
return "3"
elif num < 1.0:
return "0.333"
else:
return "3"
except ValueError:
return "1"
def is_transitive_matrix_upper_triangular(matrix):
violations = 0
valid_comparisons = 0
n = len(matrix)
for i in range(n):
for j in range(i + 1, n):
aij = matrix[i][j]
if abs(aij) < 1e-6:
continue
valid_comparisons += 1
for k in range(j + 1, n):
ajk = matrix[j][k]
aik = matrix[i][k]
if abs(ajk) < 1e-6 or abs(aik) < 1e-6:
continue
if (aij >= ajk and aik < ajk) or (aij <= ajk and aik > ajk):
violations += 1
if valid_comparisons == 0:
return 0.0
return max(0.0, ((valid_comparisons - violations) / valid_comparisons) * 100)
def fix_transitivity_in_matrix(matrix):
n = len(matrix)
fixed_matrix = [row.copy() for row in matrix]
for i in range(n):
for j in range(i + 1, n):
for k in range(j + 1, n):
aij = fixed_matrix[i][j]
ajk = fixed_matrix[j][k]
aik = fixed_matrix[i][k]
if (aij >= ajk and aik < ajk) or (aij <= ajk and aik > ajk):
if aij >= ajk:
fixed_matrix[i][k] = ajk
else:
fixed_matrix[i][k] = 1 / ajk if ajk != 0 else 1.0
return fixed_matrix
def aggregate_matrices_by_mode(matrices):
result = []
max_rows = max(len(mat) for mat in matrices)
max_cols = max(len(row) for mat in matrices for row in mat)
for row_idx in range(max_rows):
row = []
for col_idx in range(max_cols):
cell_values = []
for matrix in matrices:
if row_idx < len(matrix) and col_idx < len(matrix[row_idx]):
cell_values.append(matrix[row_idx][col_idx])
if cell_values:
try:
most_common_val = stat_mode(cell_values)
row.append(most_common_val)
except Exception:
row.append(1.0)
else:
row.append(None)
result.append(row)
return result
def create_empty_excel_template(filename):
workbook = Workbook()
workbook.save(filename)
# --- Класс окна ---
class AggregateFilesWindow(tk.Toplevel):
def __init__(self, parent):
super().__init__(parent)
self.title("Агрегирование файлов")
self.geometry("470x250")
main_frame = ttk.Frame(self)
main_frame.pack(padx=10, pady=5, fill="both", expand=True)
main_frame.columnconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=2)
main_frame.columnconfigure(2, weight=1)
tk.Label(main_frame, text="Название создаваемого файла:").grid(
row=0, column=0, sticky="w"
)
self.new_file_name_entry = tk.Entry(main_frame, width=20)
self.new_file_name_entry.grid(row=0, column=1, sticky="we", pady=5)
# ИЗМЕНЕНИЕ: Кнопка теперь вызывает метод с валидацией
create_file_button = ttk.Button(
main_frame, text="Создать файл", command=self.create_file_with_validation
)
create_file_button.grid(row=0, column=2, sticky="e", pady=5)
aggregate_button = ttk.Button(
main_frame, text="Объединить информацию", command=self.aggregate_data
)
aggregate_button.grid(row=1, column=0, columnspan=4, sticky="we", pady=5)
tk.Label(main_frame, text="Выберите файл Excel:").grid(
row=2, column=0, sticky="w"
)
self.file_path_entry = tk.Entry(main_frame, width=30)
self.file_path_entry.grid(row=2, column=1, sticky="we", pady=5)
browse_button = ttk.Button(
main_frame, text="Обзор...", command=self.select_file_for_aggregation
)
browse_button.grid(row=2, column=2, sticky="e", pady=5)
tk.Label(main_frame, text="Выберите лист (группу):").grid(
row=3, column=0, sticky="w"
)
self.sheet_combobox = ttk.Combobox(main_frame, state="readonly", width=20)
self.sheet_combobox.grid(row=3, column=1, sticky="we", pady=5)
self.sheet_combobox.bind(
"<<ComboboxSelected>>", lambda _: self.reset_transitivity_field()
)
tk.Label(main_frame, text="Проверить на транзитивность:").grid(
row=4, column=0, sticky="w"
)
self.transitivity_percentage_label = tk.Label(
main_frame, text="", relief="groove", width=8
)
self.transitivity_percentage_label.grid(row=4, column=1, sticky="w", pady=5)
check_button = ttk.Button(
main_frame, text="Проверить", command=self.check_transitivity
)
check_button.grid(row=4, column=2, sticky="e", pady=5)
bottom_buttons_frame = ttk.Frame(main_frame)
bottom_buttons_frame.grid(row=5, column=0, columnspan=4, sticky="we", pady=5)
fix_transitivity_button = ttk.Button(
bottom_buttons_frame,
text="Исправить транзитивность",
command=self.apply_transitivity_corrections,
)
fix_transitivity_button.grid(row=0, column=0, sticky="we", padx=5)
send_to_db_button = ttk.Button(
bottom_buttons_frame,
text="Отправить в базу данных",
command=self.send_to_database,
)
send_to_db_button.grid(row=0, column=1, sticky="we", padx=5)
# ИЗМЕНЕНИЕ: Метод с валидацией имени файла
def create_file_with_validation(self):
filename = self.new_file_name_entry.get().strip()
# Проверка на пустое имя
if not filename:
messagebox.showwarning("Предупреждение", "Имя файла не может быть пустым.")
return
# Проверка на запрещенные символы в имени файла Windows и Excel
invalid_chars_pattern = r'[\\/:*?"<>|]'
if re.search(invalid_chars_pattern, filename):
messagebox.showerror(
"Ошибка", f'Имя файла содержит недопустимые символы: \\ / : * ? " < > |'
)
return
# Проверка на зарезервированные имена Windows (CON , PRN , AUX , NUL и
# т.д.)
reserved_names = [
"CON",
"PRN",
"AUX",
"NUL",
"COM1",
"COM2",
"COM3",
"COM4",
"COM5",
"COM6",
"COM7",
"COM8",
"COM9",
"LPT1",
"LPT2",
"LPT3",
"LPT4",
"LPT5",
"LPT6",
"LPT7",
"LPT8",
"LPT9",
]
# Получаем имя без расширения и приводим к верхнему регистру для
# сравнения
base_name = Path(filename).stem.upper()
if base_name in reserved_names:
messagebox.showerror(
"Ошибка",
f"'{base_name}' — зарезервированное имя. Пожалуйста выберите другое.",
)
return
# Если имя прошло все проверки — создаем файл.
try:
project_root = Path(__file__).resolve().parents[1]
experts_folder = project_root / "experts"
os.makedirs(experts_folder, exist_ok=True)
full_path = experts_folder / filename
# Проверяем наличие расширения .xlsx или .xlsm и добавляем .xlsx
# если его нет.
if not filename.endswith((".xlsx", ".xlsm")):
full_path = full_path.with_name(full_path.name + ".xlsx")
if not full_path.exists():
create_empty_excel_template(full_path)
messagebox.showinfo("Готово", f"Файл '{
full_path.name}' создан.")
else:
messagebox.showinfo(
"Готово", f"Файл '{full_path.name}' уже существует."
)
except Exception as e:
messagebox.showerror("Ошибка", f"Не удалось создать файл: {e}")
def select_file_for_aggregation(self):
file_path = filedialog.askopenfilename(
filetypes=[
("Excel Files", "*.xlsx"),
("Excel Macro Enabled Workbooks", "*.xlsm"),
]
)
if file_path:
self.file_path_entry.delete(0, tk.END)
self.file_path_entry.insert(0, file_path)
self.load_sheets_from_file(file_path)
messagebox.showinfo("Выбор файла", f"Файл выбран: {file_path}")
def load_sheets_from_file(self, file_path):
try:
workbook = load_workbook(file_path)
sheets = workbook.sheetnames
self.sheet_combobox["values"] = sheets
if sheets:
self.sheet_combobox.current(0)
except Exception as e:
messagebox.showerror("Ошибка", f"Не удалось загрузить листы: {e}")
def reset_transitivity_field(self):
self.transitivity_percentage_label.config(text="")
def check_transitivity(self):
file_path = self.file_path_entry.get()
selected_sheet = self.sheet_combobox.get()
if not file_path or not selected_sheet:
messagebox.showwarning("Внимание", "Пожалуйста, укажите файл и лист!")
return
try:
wb = load_workbook(file_path)
ws = wb[selected_sheet]
matrix_numbers = []
for row in ws.iter_rows(min_row=2):
current_row = []
for cell in row:
val = cell.value
if val is None or val == "" or str(val).lower() == "none":
current_row.append(1.0)
else:
try:
num = float(str(val).replace(",", "."))
current_row.append(num)
except BaseException:
current_row.append(1.0)
matrix_numbers.append(current_row)
matrix_for_check = [row[3:] for row in matrix_numbers]
percent = is_transitive_matrix_upper_triangular(matrix_for_check)
self.transitivity_percentage_label.config(text=f"{percent:.1f}%")
except Exception as e:
print(f"Ошибка при проверке транзитивности: {e}")
self.transitivity_percentage_label.config(text="Ошибка")
def apply_transitivity_corrections(self):
# --- ОСТАВЛЯЕМ КОД БЕЗ ИЗМЕНЕНИЙ ---
file_path = self.file_path_entry.get()
selected_sheet = self.sheet_combobox.get()
if not file_path or not selected_sheet:
messagebox.showwarning("Внимание", "Пожалуйста, укажите файл и лист!")
return
try:
wb = load_workbook(file_path)
ws = wb[selected_sheet]
matrix_numbers = []
for row in ws.iter_rows(min_row=2):
current_row = []
for cell in row:
val = cell.value
if val is None or val == "" or str(val).lower() == "none":
current_row.append(1.0)
else:
try:
num = float(str(val).replace(",", "."))
current_row.append(num)
except BaseException:
current_row.append(1.0)
matrix_numbers.append(current_row)
matrix_for_fix = [row[3:] for row in matrix_numbers]
upper_fixed_matrix = fix_transitivity_in_matrix(matrix_for_fix.copy())
n = len(upper_fixed_matrix)
final_matrix_numbers = [[0.0 for _ in range(n)] for _ in range(n)]
for i in range(n):
for j in range(n):
if i == j:
final_matrix_numbers[i][j] = 1.0
elif i < j:
final_matrix_numbers[i][j] = upper_fixed_matrix[i][j]
else:
val = upper_fixed_matrix[j][i]
final_matrix_numbers[i][j] = 1.0 / val if val != 0 else 1.0
data_to_write = []
for row in final_matrix_numbers:
new_row = []
for num in row:
if abs(num - 1) < 0.1:
new_row.append("1")
elif num < 1.0:
new_row.append("0.333")
else:
new_row.append("3")
data_to_write.append(new_row)
start_row = 2
start_col = 4
for i, row in enumerate(data_to_write):
for j, value in enumerate(row):
ws.cell(row=start_row + i, column=start_col + j, value=value)
cell = ws.cell(row=start_row + i, column=start_col + j)
cell.number_format = "@"
if i == j:
cell.fill = PatternFill(
start_color="FFFF00", end_color="FFFF00", fill_type="solid"
)
wb.save(file_path)
messagebox.showinfo("Готово", "Транзитивность исправлена и файл сохранён.")
except Exception as e:
messagebox.showerror("Ошибка", f"Не удалось исправить: {e}")
def aggregate_data(self):
# --- ОСТАВЛЯЕМ КОД БЕЗ ИЗМЕНЕНИЙ ---
output_file = self.new_file_name_entry.get()
if not output_file:
messagebox.showwarning("Предупреждение", "Сначала создайте имя файла.")
return
if not output_file.endswith(".xlsx"):
output_file += ".xlsx"
project_root = Path(__file__).resolve().parents[1]
experts_folder = project_root / "experts"
os.makedirs(experts_folder, exist_ok=True)
full_output_path = experts_folder / output_file
file_paths = filedialog.askopenfilenames(
title="Выберите файлы экспертов", filetypes=[("Excel Files", "*.xlsx")]
)
if not file_paths:
messagebox.showwarning("Предупреждение", "Файлы не выбраны.")
return
sheets_results = {}
for path in file_paths:
try:
wb = load_workbook(path)
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
data = [[cell.value for cell in row] for row in ws.rows]
sheets_results.setdefault(sheet_name, []).append(data)
except Exception as e:
messagebox.showerror("Ошибка чтения", f"Ошибка в файле {path}: {e}")
return # Останавливаем процесс
if not sheets_results:
raise ValueError("Нет данных для объединения.")
new_wb = Workbook()
new_wb.remove(new_wb.active) # Удаляем лист по умолчанию
for sheet_name, matrices_list in sheets_results.items():
agg_matrix = aggregate_matrices_by_mode(matrices_list)
new_ws = new_wb.create_sheet(title=sheet_name)
for r_idx, row in enumerate(agg_matrix):
for c_idx, val in enumerate(row):
new_ws.cell(row=r_idx + 1, column=c_idx + 1, value=str(val))
new_wb.save(full_output_path)
new_wb.close()
messagebox.showinfo("Готово", f"Данные объединены в {output_file}.")
# --- ОТПРАВКА В БАЗУ ДАННЫХ ---
def send_to_database(self):
file_path = self.file_path_entry.get()
selected_sheet = self.sheet_combobox.get()
if not file_path:
messagebox.showwarning("Внимание", "Пожалуйста, выберите файл Excel!")
return
if not selected_sheet:
messagebox.showwarning("Внимание", "Пожалуйста, выберите лист (группу)!")
return
try:
# Читаем данные из выбранного листа Excel
df = pd.read_excel(file_path, sheet_name=selected_sheet, header=None)
session = Session()
# Ищем группу по названию листа
group = session.query(Group).filter_by(name=selected_sheet).first()
if not group:
# Если группы нет, создаем новую
group_code = "".join(filter(str.isalnum, selected_sheet.lower()))
group = Group(name=selected_sheet, code=group_code)
session.add(group)
# --- ВАЖНО: Сбрасываем сессию, чтобы у ГРУППЫ появился ID ---
session.flush()
# Очищаем старые данные этой группы перед импортом новых
session.query(MatrixEntry).filter_by(group_id=group.id).delete()
session.query(Indicator).filter_by(group_code=group.code).delete()
# --- ЧТЕНИЕ ИНДИКАТОРОВ (Названий) ---
indicators_to_add = []
for row_idx, row in df.iloc[1:].iterrows():
indicator_name = str(row[1]).strip()
# Если ячейка пустая, пропускаем строку
if not indicator_name:
continue
indicator_code = f"{group.code}-{row_idx}"
# Создаем объект индикатора и добавляем в список
indicators_to_add.append(
Indicator(
name=indicator_name,
code=indicator_code,
group_code=group.code, # Привязываем к группе по коду
)
)
# Сохраняем все индикаторы в базу одним запросом
if indicators_to_add:
session.bulk_save_objects(indicators_to_add)
# --- ЧТЕНИЕ МАТРИЦЫ (Значений) ---
matrix_entries = []
for r_idx, row in df.iloc[1:].iterrows():
for c_offset, cell_value in enumerate(row[3:], start=0):
try:
if pd.isna(cell_value):
continue
num = float(str(cell_value).replace(",", "."))
# Нормализация значений к 1 или 3
if abs(num - 1.0) < 0.1:
norm_value = 1.0
elif abs(num - 3.0) < 0.1:
norm_value = 3.0
elif num < 1.0:
norm_value = 1 / 3
else:
norm_value = 3.0
matrix_entries.append(
MatrixEntry(
group_id=group.id,
row=r_idx - 1,
col=c_offset,
value=norm_value,
)
)
except (ValueError, TypeError):
continue
# Сохраняем все записи матрицы одним запросом
if matrix_entries:
session.bulk_save_objects(matrix_entries)
# Финальное подтверждение всех изменений в базе данных
session.commit()
except Exception as e:
# Если что-то пошло не так, откатываем изменения (это важно!)
session.rollback()
messagebox.showerror("Ошибка", f"Не удалось отправить данные: {e}")
finally:
# Обязательно закрываем сессию в конце
session.close()
messagebox.showinfo("Готово", "Данные успешно отправлены в базу данных.")
def send_to_database(self):
file_path = self.file_path_entry.get()
# Проверка выбора файла
if not file_path:
messagebox.showwarning("Внимание", "Пожалуйста, выберите файл Excel!")
return
try:
# Получаем список всех листов в файле
all_sheets = pd.ExcelFile(file_path).sheet_names
if not all_sheets:
messagebox.showwarning("Внимание", "В файле нет листов для обработки!")
return
session = Session()
success = True # Флаг для отслеживания общего успеха
# Функция для обработки ОДНОГО листа (группы)
def process_sheet(sheet_name, session_obj):
print(f"\n--- Начало обработки листа: {sheet_name} ---")
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
# Поиск или создание группы
group = session_obj.query(Group).filter_by(name=sheet_name).first()
if not group:
group_code = "".join(filter(str.isalnum, sheet_name.lower()))
group = Group(name=sheet_name, code=group_code)
session_obj.add(group)
session_obj.flush() # Чтобы у группы появился ID
# Очистка старых данных группы
session_obj.query(MatrixEntry).filter_by(group_id=group.id).delete()
session_obj.query(Indicator).filter_by(group_code=group.code).delete()
# --- ЧТЕНИЕ ИНДИКАТОРОВ (Названий) ---
indicators_to_add = []
for row_idx, row in df.iloc[1:].iterrows():
indicator_name = str(row[1]).strip()
if not indicator_name:
continue
indicator_code = f"{group.code}-{row_idx}"
indicators_to_add.append(
Indicator(
name=indicator_name,
code=indicator_code,
group_code=group.code,
)
)
# Сохраняем индикаторы в БД
if indicators_to_add:
session_obj.bulk_save_objects(indicators_to_add)
session_obj.flush() # Сохраняем, чтобы получить ID индикаторов
# --- ЧТЕНИЕ МАТРИЦЫ (Значений) ---
matrix_entries = []
for r_idx, row in df.iloc[1:].iterrows():
for c_offset, cell_value in enumerate(row[3:], start=0):
try:
if pd.isna(cell_value):
continue
num = float(str(cell_value).replace(",", "."))
# Нормализация значений
if abs(num - 1.0) < 0.1:
norm_value = 1.0
elif abs(num - 3.0) < 0.1:
norm_value = 3.0
elif num < 1.0:
norm_value = 1 / 3
else:
norm_value = 3.0
matrix_entries.append(
MatrixEntry(
group_id=group.id,
row=r_idx - 1,
col=c_offset,
value=norm_value,
)
)
except (ValueError, TypeError):
continue
# Сохраняем матрицу в БД
if matrix_entries:
session_obj.bulk_save_objects(matrix_entries)
# --- РАСЧЕТ ВЕСОВ (КОВ) ---
# Получаем только что сохраненные записи матрицы для этой группы
matrix_entries_for_calc = session_obj.query(MatrixEntry).filter_by(group_id=group.id).all()
indicator_weights = calculate_indicator_weights(matrix_entries_for_calc)
# --- СОХРАНЕНИЕ ВЕСОВ В БД ---
if indicator_weights is not None and len(indicator_weights) > 0:
# Получаем индикаторы этой группы в порядке их создания (по ID)
indicators_in_db = (
session_obj.query(Indicator)
.filter_by(group_code=group.code)
.order_by(Indicator.id.asc())
.all()
)
# Проверка на совпадение количества (защита от ошибок)
if len(indicators_in_db) == len(indicator_weights):
for idx, indicator in enumerate(indicators_in_db):
indicator.kov = indicator_weights[idx]
print(f"--- ВЕСА УСПЕШНО СОХРАНЕНЫ ДЛЯ ГРУППЫ: {sheet_name} ---")
success_local = True
else:
print(f"--- ОШИБКА: Несовпадение количества! Индикаторов: {len(indicators_in_db)}, Весов: {len(indicator_weights)} ---")
success_local = False
else:
print(f"--- ОШИБКА: Расчёт весов для группы {sheet_name} не удался. ---")
success_local = False
return success_local
# --- ГЛАВНЫЙ ЦИКЛ ПО ВСЕМ ЛИСТАМ ---
for sheet in all_sheets:
try:
sheet_success = process_sheet(sheet, session)
success = success and sheet_success # Если хоть одна группа не обработалась - общий успех False
except Exception as e:
print(f"--- Критическая ошибка при обработке листа '{sheet}': {e} ---")
success = False
# Финальное сохранение всех изменений (одной транзакцией)
if success:
session.commit()
except Exception as e:
session.rollback()
messagebox.showerror("Ошибка", f"Не удалось отправить данные: {e}")
finally:
session.close()
# Сообщение пользователю после завершения цикла
if success:
messagebox.showinfo("Готово", "Данные успешно отправлены в базу данных. Расчёт весов выполнен.")
else:
messagebox.showerror("Ошибка", "Обработка завершена с ошибками. Проверьте лог в консоли.")