Загрузка данных
# границы прогноза max. мин.
# начало прогноза (с начала проекта, с указанной даты)
# приклеить низ
#
#
#
# -*- coding: utf-8 -*-
# gui/forecast_window.py
import tkinter as tk
import sqlite3
from tkinter import ttk
from modules.forecasting import monte_carlo_forecast
from matplotlib.figure import Figure
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from numpy import random as np_random
class ForecastWindow:
"""
Окно для отображения прогноза.
Принимает на вход родительское окно (root), чтобы иметь доступ к его данным.
"""
def __init__(self, parent):
# Сохраняем ссылку на главное окно
self.root_window = parent
# Создаем новое окно (Toplevel)
self.window = tk.Toplevel(parent)
self.window.title("Прогнозирование")
self.window.geometry("1050x650")
self.window.resizable(True, True)
# --- ГЛАВНЫЙ КОНТЕЙНЕР ---
main_container = ttk.Frame(self.window)
main_container.pack(fill="both", expand=True)
# Делим главное окно на две части: Верх (График+Настройки) и Низ (Результаты)
main_container.grid_rowconfigure(0, weight=4) # Верхняя часть выше
main_container.grid_rowconfigure(1, weight=1) # Нижняя часть ниже
# --- ВЕРХНИЙ БЛОК (График и Настройки) ---
top_block = ttk.Frame(main_container)
top_block.grid(row=0, column=0, sticky="nsew")
# Внутри верхнего блока делим на два фрейма: График (слева) и Настройки (справа)
top_block.grid_columnconfigure(0, weight=3) # График шире
top_block.grid_columnconfigure(1, weight=2) # Настройки уже
# 1. Левая панель: График
graph_frame = ttk.LabelFrame(top_block, text="График прогноза", padding="10")
graph_frame.grid(row=0, column=0, sticky="nsew")
self.figure = Figure(figsize=(5, 4), dpi=100)
self.ax = self.figure.add_subplot(111)
self.ax.grid(True, linestyle='--', alpha=0.7)
self.ax.set_ylabel('Накопленное значение')
self.ax.set_xlabel('Дата')
self.canvas = FigureCanvasTkAgg(self.figure, master=graph_frame)
self.canvas.draw()
self.canvas.get_tk_widget().pack(fill="both", expand=True)
# --- ПРАВАЯ ПАНЕЛЬ ---
settings_frame = ttk.LabelFrame(top_block, text="Параметры модели", padding="10") # <-- Изменили текст заголовка
settings_frame.grid(row=0, column=1, sticky="ns")
settings_frame.config(width=500)
settings_frame.pack_propagate(False)
# -- Блок выбора проекта (с новой надписью) --
project_frame = ttk.Frame(settings_frame)
project_frame.pack(fill="x", pady=(0, 10))
ttk.Label(project_frame, text=" Выбрать проект:").pack(side="left", anchor="w", padx=(0, 5)) # <-- НОВАЯ НАДПИСЬ
self.project_combobox = ttk.Combobox(project_frame, state="readonly", width=28)
self.project_combobox.pack(side="left", anchor="w")
self.refresh_button = ttk.Button(project_frame, text="Обновить список", command=self.load_projects)
self.refresh_button.pack(side="left", anchor="w", padx=(10, 0))
self.load_projects()
# -- Поля ввода параметров --
input_frame = ttk.Frame(settings_frame)
input_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(input_frame, text="Интенсивность событий (в день):").pack(side="left", anchor="w", padx=(0, 10))
self.lambda_entry = ttk.Entry(input_frame, width=10)
self.lambda_entry.insert(0, "5")
self.lambda_entry.pack(side="left", anchor="w")
ttk.Label(input_frame, text="Шаг (дни):").pack(side="left", anchor="w", padx=(20, 10))
self.step_entry = ttk.Entry(input_frame, width=10)
self.step_entry.insert(0, "1")
self.step_entry.pack(side="left", anchor="w")
# Поле "Прогноз на:"
period_frame = ttk.Frame(settings_frame)
period_frame.pack(fill="x", padx=10, pady=(5, 0))
ttk.Label(period_frame, text="Прогноз на:").pack(side="left", anchor="w", padx=(0, 10))
self.period_combobox = ttk.Combobox(period_frame, state="readonly",
values=["Месяц (30 дней)", "Квартал (90 дней)", "Полгода (180 дней)"],
width=18)
self.period_combobox.current(0)
self.period_combobox.pack(side="left", anchor="w")
# -- Настройки симуляции --
sim_settings_frame = ttk.Frame(settings_frame)
sim_settings_frame.pack(fill="x", padx=10, pady=5)
ttk.Label(sim_settings_frame, text="Кол-во итераций:\n(в симуляции)", justify="left").pack(
side="left", anchor="nw", padx=(0, 10))
self.iterations_entry = ttk.Entry(sim_settings_frame, width=15)
self.iterations_entry.insert(0, "10000")
self.iterations_entry.pack(side="left", anchor="w")
self.build_button = ttk.Button(settings_frame, text="Построить прогноз", command=self.run_forecast)
self.build_button.pack(pady=30)
# --- НИЖНИЙ БЛОК (Результаты расчета) ---
result_frame = ttk.LabelFrame(main_container, text="Результаты расчета", padding="5")
result_frame.grid(row=1, column=0, sticky="ew")
self.result_text = tk.Text(result_frame, height=2, wrap='word', state='disabled')
scrollbar = ttk.Scrollbar(result_frame, command=self.result_text.yview)
scrollbar.pack(side="right", fill="y")
self.result_text.configure(yscrollcommand=scrollbar.set)
self.result_text.pack(padx=10, pady=5, fill="x")
def toggle_simulation_widgets(self):
"""
Показывает или скрывает поля ввода для моделирования данных.
Этот вариант ищет нужные фреймы по их расположению в иерархии.
"""
# settings_frame - это главный фрейм справа, где лежит кнопка "Построить прогноз"
settings_frame = self.build_button.master
# Ищем все дочерние элементы внутри settings_frame
children = list(settings_frame.winfo_children())
# Первый фрейм (обычно index 1) содержит наши поля ввода
if len(children) > 2:
input_frame = children[2] # Это наш основной блок с Интенсивностью и Шагом
# Внутри этого блока ищем вложенный фрейм (index 0), который содержит Прогноз на:
period_frame = None
for grandchild in input_frame.winfo_children():
if isinstance(grandchild, ttk.Frame): # Ищем фрейм внутри фрейма
period_frame = grandchild
break
# Теперь показываем или прячем оба фрейма вместе
if self.simulation_var.get():
input_frame.pack(fill="x", padx=10, pady=(5, 0)) # Отступ снизу
if period_frame:
period_frame.pack(fill="x")
else:
input_frame.pack_forget()
if period_frame:
period_frame.pack_forget()
def _insert_into_result(self, text):
"""Вставляет текст в поле результатов."""
self.result_text.config(state='normal') # Включаем режим записи
self.result_text.insert(tk.END, text) # Вставляем текст в конец
self.result_text.config(state='disabled') # Выключаем режим записи
self.result_text.see(tk.END) # Автоматически прокручиваем вниз
def _clear_result_text(self):
"""Очищает поле результатов перед новым расчетом."""
self.result_text.config(state='normal')
self.result_text.delete(1.0, tk.END)
self.result_text.config(state='disabled')
def load_projects(self):
"""
Загружает список проектов из главного окна в выпадающий список.
"""
# Очищаем текущий список на случай, если он уже был заполнен
self.project_combobox.set('')
# Берем список проектов из главного окна
projects = self.root_window.project_manager.list_projects()
# Формируем списки для combobox: видимые названия и внутренние ID
project_names = [p.name for p in projects]
project_ids = [p.id for p in projects]
self.project_combobox['values'] = project_names
# Если проекты есть, выбираем первый по умолчанию
if project_names:
self.project_combobox.current(0)
def get_project_history(self):
"""
Получает историю проекта из БД или возвращает None, если данные не найдены.
Этот метод предназначен ТОЛЬКО для работы с реальными данными из базы.
:return: Словарь с 'base_level' и 'delta_history' ИЛИ None при ошибке/отсутствии данных.
"""
print("--- Режим работы с реальной базой данных ---")
selected_index = self.project_combobox.current()
if selected_index == -1:
print("Ошибка: Не выбран проект.")
return None
projects = self.root_window.project_manager.list_projects()
try:
selected_project_id = projects[selected_index].id
except IndexError:
print("Ошибка: Проект не найден в списке.")
return None
# По умолчанию считаем за весь период, если пользователь введет 0 или ерунду
try:
period_days = int(self.period_entry.get())
if period_days < 0:
period_days = 0
except ValueError:
period_days = 0
base_level = 0
delta_history = []
try:
conn = sqlite3.connect(f"project_dbs/{selected_project_id}.db")
cursor = conn.cursor()
# Считаем Базовый Уровень (сумма ДО периода)
if period_days > 0:
cursor.execute("""
SELECT COALESCE(SUM(event_sign), 0)
FROM project_data
WHERE project_code = ? AND event_data < date('now', ?)
""", (selected_project_id, f"-{period_days} days"))
result = cursor.fetchone()
base_level = result[0] if result else 0
# Считаем Дельту (события ВНУТРИ периода)
cursor.execute("""
SELECT event_sign, event_data
FROM project_data
WHERE project_code = ? AND event_sign IS NOT NULL
ORDER BY event_data DESC
""", (selected_project_id,))
rows = cursor.fetchall()
conn.close()
if period_days > 0 and rows:
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=period_days)
for row in rows:
event_date = datetime.strptime(row[1], "%Y-%m-%d")
if event_date >= cutoff_date:
delta_history.append(row[0])
else:
delta_history = [row[0] for row in rows if row[0] is not None]
print(f"Базовый уровень (до периода): {base_level}")
print(f"Динамика за период ({period_days} дней): {delta_history}")
return {
"base_level": base_level,
"delta_history": delta_history
}
except Exception as e:
print(f"Ошибка при работе с базой данных: {e}")
return None
def run_forecast(self):
"""
Финальная рабочая версия.
Находит дату пересечения с нижней границей и выводит понятный текст.
"""
# Очищаем поле результатов перед новым расчетом
self._clear_result_text()
try:
iterations = int(self.iterations_entry.get())
lambda_val = float(self.lambda_entry.get())
selected_period = self.period_combobox.get()
days_forward_dict = {"Месяц": 30, "Квартал": 90, "Полгода": 180}
period_key = next((k for k in days_forward_dict if k in selected_period), None)
days_forward = days_forward_dict.get(period_key, 30)
step_days = int(self.step_entry.get())
except ValueError as e:
error_msg = f"Ошибка ввода: {e}. Проверьте все поля."
print(error_msg)
self._insert_into_result("ОШИБКА: " + str(e) + "\n")
return
from datetime import datetime, timedelta
import numpy as np
# --- ГЕНЕРАЦИЯ ТРАЕКТОРИИ ---
value_pool_positive = [1, 2, 3, 4, 5]
trajectory = []
current_value = 0
start_date = datetime.now().date()
# Генерируем траекторию по дням (без шага, чтобы найти ТОЧНУЮ дату)
daily_values = [] # Сохраняем значения КАЖДОГО дня для точного поиска даты
for day_offset in range(days_forward + 1):
daily_sum = 0
num_events_today = np.random.poisson(lam=lambda_val)
for _ in range(num_events_today):
event_value = np.random.choice(value_pool_positive)
daily_sum += event_value
current_value += daily_sum
daily_values.append(current_value)
# Добавляем точку на график согласно шагу отображения
if day_offset % step_days == 0 or day_offset == days_forward:
trajectory.append(current_value)
# Вывод хода симуляции в КОНСОЛЬ
current_date = start_date + timedelta(days=day_offset)
print(f"{current_date}: Событий: {num_events_today:2d}. Итог дня: {daily_sum:6.2f}. Накоплено: {current_value:6.2f}")
if not trajectory:
self._insert_into_result("Не удалось сгенерировать данные.\n")
return
# --- РАСЧЕТ ГРАНИЦ ПРОГНОЗА ---
sim_result = monte_carlo_forecast(
pool_values=trajectory,
num_simulations=iterations,
p1=0.05,
p2=0.95
)
final_lower = sim_result["lower_bound"]
final_upper = sim_result["upper_bound"]
final_median = sim_result["median"]
# --- ИЩЕМ ДАТУ ПЕРЕСЕЧЕНИЯ С НИЖНЕЙ ГРАНИЦЕЙ ---
date_of_intersection = None
for idx, val in enumerate(daily_values):
if val >= final_lower:
date_of_intersection = start_date + timedelta(days=idx)
break
# Формируем текст для вывода в виджет в нужном формате
output_text = (
f"ПРОГНОЗ ДОСТИЖЕНИЯ ЦЕЛИ:\n"
f" Цель (Нижняя граница): {final_lower:.2f}\n\n"
f" Прогнозируемый диапазон значений:\n"
f" Минимум: {final_lower:.2f}\n"
f" Максимум: {final_upper:.2f}\n\n"
)
if date_of_intersection:
output_text += f"Достижение минимума прогнозируется: {date_of_intersection.strftime('%d.%m.%Y')}"
else:
output_text += "Достижение минимума в заданный период не прогнозируется."
# --- ВЫВОДИМ РЕЗУЛЬТАТ В ИНТЕРФЕЙС ---
self._insert_into_result(output_text)
# Передаем данные для отрисовки графика
result_data = {
"lower_bound": final_lower,
"upper_bound": final_upper,
"median": final_median,
"base_level": 0,
"delta_history": trajectory,
"intersections": {
"date": date_of_intersection,
"value": final_lower
}
}
self.update_graph(result_data)
# Вспомогательные методы для работы с текстовым полем
def _insert_into_result(self, text):
"""Вставляет текст в поле результатов."""
self.result_text.config(state='normal')
self.result_text.insert(tk.END, text)
self.result_text.config(state='disabled')
self.result_text.see(tk.END) # Автоматически прокручивает вниз
def _clear_result_text(self):
"""Очищает поле результатов."""
self.result_text.config(state='normal')
self.result_text.delete(1.0, tk.END)
self.result_text.config(state='disabled')
def update_graph(self, result_data):
"""
Обновляет график, рисует линии границ, траекторию и выделяет точку пересечения.
"""
lower_bound = result_data['lower_bound']
median = result_data['median']
upper_bound = result_data['upper_bound']
base_level = result_data['base_level']
trajectory = result_data['delta_history']
# Очищаем график
self.ax.clear()
# 1. Рисуем горизонтальные линии границ
self.ax.axhline(y=lower_bound, color='red', linestyle='--', label=f'Нижняя граница (P1)')
self.ax.axhline(y=median, color='green', linestyle='-', label=f'Медиана')
self.ax.axhline(y=upper_bound, color='blue', linestyle='--', label=f'Верхняя граница (P2)')
# 2. Рисуем траекторию (линию графика)
if len(trajectory) > 1 and trajectory[-1] >= lower_bound:
from datetime import datetime, timedelta
start_date = datetime.now().date()
step_days = int(self.step_entry.get()) # Берем текущее значение из интерфейса
dates_for_plot = []
for i in range(len(trajectory)):
day_offset = i * step_days
plot_date = start_date + timedelta(days=day_offset)
dates_for_plot.append(plot_date)
# Сама линия траектории
self.ax.plot(dates_for_plot, trajectory, color='orange',
marker='o', linewidth=2, markersize=6, label='Смоделированная динамика')
# 3. НАХОДИМ ТОЧКУ ПЕРЕСЕЧЕНИЯ С НИЖНЕЙ ГРАНИЦЕЙ
intersection_index = None
for idx, val in enumerate(trajectory):
if val >= lower_bound:
intersection_index = idx
break
# ЕСЛИ ПЕРЕСЕЧЕНИЕ НАЙДЕНО - ОТМЕЧАЕМ ЕГО!
if intersection_index is not None:
x_coord = dates_for_plot[intersection_index]
y_coord = trajectory[intersection_index]
# Рисуем большой красный маркер поверх точки
self.ax.scatter(x_coord, y_coord, color='darkred', s=150,
zorder=5, edgecolor='black', linewidth=1, alpha=0.7)
# Подписываем дату рядом с точкой
self.ax.annotate(f'{x_coord.strftime("%d.%m")}',
xy=(x_coord, y_coord),
xytext=(x_coord, y_coord + (upper_bound/20)),
textcoords="data",
arrowprops=dict(arrowstyle="->", color='black'),
ha='center', fontsize=8, backgroundcolor='white')
# Настраиваем ось X с датами
self.ax.set_xticks(dates_for_plot)
self.ax.set_xticklabels([d.strftime('%d.%m') for d in dates_for_plot])
self.ax.xaxis.set_tick_params(rotation=45)
# Настраиваем внешний вид
self.ax.set_ylabel('Накопленное значение')
self.ax.set_xlabel('Дата')
self.ax.grid(True, linestyle='--', alpha=0.7)
self.ax.legend(loc='upper left')
self.ax.set_ylim(bottom=0)
# Перерисовываем canvas
self.canvas.draw()