Загрузка данных
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.stats.diagnostic import het_breuschpagan, acorr_breusch_godfrey
from statsmodels.stats.stattools import durbin_watson
from statsmodels.tools import add_constant
from scipy import stats
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.linear_model import LinearRegression
# ---------------------------
# 1. Подготовка данных
# ---------------------------
# Предполагаем, что df уже загружен и содержит все необходимые колонки.
# Убедимся, что дата в правильном формате
df['end_month_plan'] = pd.to_datetime(df['end_month_plan'])
# Целевая переменная
target = 'renewed_fact_rate'
weight_col = 'sum_total_out'
# Признаки финальной модели
features = ['duration_months', 'pr_p_n', 'new_delta']
# Агрегированная функция (взвешенное среднее по месяцам)
def aggregate(df, group_col, value_col, weight_col):
return (df.groupby(group_col)
.apply(lambda g: pd.Series({
value_col: np.average(g[value_col], weights=g[weight_col]),
weight_col: g[weight_col].sum()
}))
.reset_index()
.sort_values(group_col))
# ---------------------------
# 2. График целевой переменной
# ---------------------------
target_agg = aggregate(df, 'end_month_plan', target, weight_col)
plt.figure(figsize=(12,5))
plt.plot(target_agg['end_month_plan'], target_agg[target], marker='o')
plt.title('Взвешенная по объёму доля пролонгаций по месяцам')
plt.xlabel('Дата')
plt.ylabel('Доля пролонгаций')
plt.grid(True)
plt.show()
# ---------------------------
# 3. Функция прогнозирования (с сигмоидом)
# ---------------------------
def prolong_rate_logit(duration_months, pr_p_n, new_delta):
"""Возвращает логит прогноза."""
return (-0.03865802) * duration_months + 0.55179107 * pr_p_n + 0.29300738 * new_delta - 2.256419058
def prolong_rate_prob(duration_months, pr_p_n, new_delta, eps=1e-6):
"""Возвращает вероятность (долю) с сигмоидом."""
logit = prolong_rate_logit(duration_months, pr_p_n, new_delta)
# ограничим, чтобы избежать переполнения
logit = np.clip(logit, -500, 500)
return 1 / (1 + np.exp(-logit))
# Прогнозы на всей выборке
df['pred_logit'] = prolong_rate_logit(df['duration_months'], df['pr_p_n'], df['new_delta'])
df['pred_prob'] = prolong_rate_prob(df['duration_months'], df['pr_p_n'], df['new_delta'])
# Агрегированные прогнозы (взвешенные средние)
pred_agg = aggregate(df, 'end_month_plan', 'pred_prob', weight_col)
# ---------------------------
# 4. Качество прогноза (на агрегированном уровне)
# ---------------------------
# Соединим факт и прогноз по месяцам
comparison = target_agg.merge(pred_agg, on='end_month_plan', suffixes=('_fact', '_pred'))
comparison['error'] = comparison[target] - comparison['pred_prob']
# Метрики
mae = mean_absolute_error(comparison[target], comparison['pred_prob'])
rmse = np.sqrt(mean_squared_error(comparison[target], comparison['pred_prob']))
mape = np.mean(np.abs((comparison[target] - comparison['pred_prob']) / np.maximum(comparison[target], 1e-6))) * 100
r2 = r2_score(comparison[target], comparison['pred_prob'])
# Взвешенные метрики (по sum_total_out)
weights = comparison['sum_total_out_fact'] # или pred, они одинаковы
wmae = np.average(np.abs(comparison['error']), weights=weights)
wrmse = np.sqrt(np.average(comparison['error']**2, weights=weights))
wmape = np.average(np.abs(comparison['error'] / np.maximum(comparison[target], 1e-6)), weights=weights) * 100
print("Метрики качества на агрегированных по месяцам данных:")
print(f"MAE: {mae:.5f}, RMSE: {rmse:.5f}, MAPE: {mape:.2f}%, R2: {r2:.5f}")
print(f"Взвешенные: wMAE: {wmae:.5f}, wRMSE: {wrmse:.5f}, wMAPE: {wmape:.2f}%")
# График фактической и прогнозной доли
plt.figure(figsize=(12,5))
plt.plot(comparison['end_month_plan'], comparison[target], label='Факт', marker='o')
plt.plot(comparison['end_month_plan'], comparison['pred_prob'], label='Прогноз (sigmoid)', marker='x')
plt.title('Сравнение фактической и прогнозной доли пролонгаций')
plt.legend()
plt.grid(True)
plt.show()
# ---------------------------
# 5. Статистическая значимость коэффициентов
# ---------------------------
# Переобучим линейную регрессию в логит-пространстве на всех исторических данных
# с весами (sum_total_out) и получим стандартные ошибки.
# Для этого используем statsmodels с WLS (взвешенные наименьшие квадраты)
# Преобразуем целевую переменную в логит (с ограничением, чтобы избежать 0 и 1)
def to_logit(p, eps=1e-4):
p = np.clip(p, eps, 1-eps)
return np.log(p / (1 - p))
df['target_logit'] = to_logit(df[target])
X = df[features]
X = sm.add_constant(X) # константа
y = df['target_logit']
weights = df[weight_col]
# Взвешенная регрессия
wls_model = sm.WLS(y, X, weights=weights)
results = wls_model.fit()
print("\nРезультаты переобученной линейной регрессии (логит-пространство):")
print(results.summary())
# Сравним с заданными коэффициентами
coef_given = np.array([-2.256419058, -0.03865802, 0.55179107, 0.29300738]) # константа, duration, pr_p_n, new_delta
coef_estimated = results.params.values
print("\nСравнение коэффициентов:")
print("Заданные: ", coef_given)
print("Оценённые: ", coef_estimated)
print("Разница: ", coef_estimated - coef_given)
# Статистическая значимость: p-values уже в summary.
# Доверительные интервалы:
conf_int = results.conf_int()
print("\n95% доверительные интервалы для коэффициентов:")
print(conf_int)
# ---------------------------
# 6. Существенность влияния факторов (бета-коэффициенты)
# ---------------------------
# Стандартизованные коэффициенты: b_j * (std_X_j / std_y)
X_std = X[features].std()
y_std = y.std()
beta = results.params[features] * X_std / y_std
print("\nСтандартизованные коэффициенты (бета):")
print(beta)
# ---------------------------
# 7. Мультиколлинеарность (VIF)
# ---------------------------
X_vif = X[features]
vif_data = pd.DataFrame()
vif_data["Feature"] = X_vif.columns
vif_data["VIF"] = [variance_inflation_factor(X_vif.values, i) for i in range(X_vif.shape[1])]
print("\nVIF для признаков:")
print(vif_data)
# Если VIF > 10, есть мультиколлинеарность.
# ---------------------------
# 8. Нелинейные зависимости
# ---------------------------
# Получим остатки на уровне отдельных записей (не агрегированных)
df['residual'] = df['target_logit'] - results.fittedvalues # остатки в логит-пространстве
# График остатков vs предсказанные
plt.figure(figsize=(12,4))
plt.subplot(1,2,1)
plt.scatter(results.fittedvalues, df['residual'], alpha=0.5)
plt.axhline(0, color='red', linestyle='--')
plt.xlabel('Предсказанные (логит)')
plt.ylabel('Остатки')
plt.title('Остатки vs предсказанные')
# График остатков vs каждый признак
for i, feat in enumerate(features):
plt.subplot(1,3,i+2) # для трёх признаков
plt.scatter(df[feat], df['residual'], alpha=0.5)
plt.axhline(0, color='red', linestyle='--')
plt.xlabel(feat)
plt.ylabel('Остатки')
plt.tight_layout()
plt.show()
# Тест Рамасея (RESET) на пропущенные нелинейности (можно использовать statsmodels)
# Для этого добавим квадраты и кубы предсказанных значений и проверим их значимость
from statsmodels.stats.diagnostic import linear_reset
reset_test = linear_reset(results, power=2, test_type='fitted')
print("\nТест Рамасея (RESET) на нелинейность:")
print(f"F-статистика: {reset_test.fvalue:.4f}, p-value: {reset_test.pvalue:.4f}")
if reset_test.pvalue < 0.05:
print("Есть свидетельства нелинейности (p < 0.05).")
else:
print("Нет явных свидетельств нелинейности (p >= 0.05).")
# ---------------------------
# 9. Дополнительные тесты
# ---------------------------
# 9.1 Автокорреляция остатков (тест Дарбина-Уотсона и Бройша-Годфри)
dw = durbin_watson(results.resid)
print(f"\nТест Дарбина-Уотсона: {dw:.4f} (значения близкие к 2 указывают на отсутствие автокорреляции)")
# Тест Бройша-Годфри на автокорреляцию порядка 1
bg_test = acorr_breusch_godfrey(results, nlags=1)
print(f"Тест Бройша-Годфри (lag=1): LM-статистика = {bg_test[0]:.4f}, p-value = {bg_test[1]:.4f}")
if bg_test[1] < 0.05:
print("Есть автокорреляция остатков (p < 0.05).")
# 9.2 Гетероскедастичность (тест Бройша-Пагана)
bp_test = het_breuschpagan(results.resid, results.model.exog)
print(f"\nТест Бройша-Пагана на гетероскедастичность: LM = {bp_test[0]:.4f}, p-value = {bp_test[1]:.4f}")
if bp_test[1] < 0.05:
print("Есть гетероскедастичность (p < 0.05).")
# 9.3 Стабильность коэффициентов во времени (rolling regression)
# Разобьём данные по годам и оценим модель на каждом окне
windows = df['end_month_plan'].dt.year.unique()
coefs_over_time = []
for year in windows:
mask = (df['end_month_plan'].dt.year == year)
if mask.sum() < 10:
continue
X_year = sm.add_constant(df.loc[mask, features])
y_year = df.loc[mask, 'target_logit']
w_year = df.loc[mask, weight_col]
try:
res_year = sm.WLS(y_year, X_year, weights=w_year).fit()
coefs_over_time.append([year] + res_year.params.values.tolist())
except:
continue
coefs_df = pd.DataFrame(coefs_over_time, columns=['year'] + ['const'] + features)
print("\nКоэффициенты по годам:")
print(coefs_df)
# График изменения коэффициентов
coefs_df.set_index('year').plot(marker='o')
plt.title('Изменение коэффициентов модели по годам')
plt.ylabel('Значение коэффициента')
plt.grid(True)
plt.show()
# 9.4 Сравнение с бейзлайнами (на агрегированных данных)
# Rolling mean (окно 3)
comparison['rolling_3'] = comparison[target].shift(1).rolling(3, min_periods=1).mean()
# Предыдущий месяц
comparison['prev_month'] = comparison[target].shift(1)
# Среднее за предыдущий год
comparison['year'] = comparison['end_month_plan'].dt.year
yearly_avg = comparison.groupby('year')[target].mean().shift(1).rename('prev_year_avg')
comparison = comparison.merge(yearly_avg, left_on='year', right_index=True, how='left')
# Метрики для бейзлайнов
baselines = ['rolling_3', 'prev_month', 'prev_year_avg']
for bl in baselines:
bl_mae = mean_absolute_error(comparison[target], comparison[bl].fillna(method='bfill'))
bl_rmse = np.sqrt(mean_squared_error(comparison[target], comparison[bl].fillna(method='bfill')))
print(f"Бейзлайн {bl}: MAE = {bl_mae:.5f}, RMSE = {bl_rmse:.5f}")
# 9.5 Калибровка прогнозов (если модель выдаёт вероятности)
# Для этого нужно иметь прогнозы на уровне отдельных записей и фактические значения.
# Построим калибровочную кривую (разбивка на бины)
from sklearn.calibration import calibration_curve
prob_true, prob_pred = calibration_curve(df[target], df['pred_prob'], n_bins=10, strategy='quantile')
plt.figure(figsize=(6,6))
plt.plot(prob_pred, prob_true, marker='o', label='Модель')
plt.plot([0,1], [0,1], linestyle='--', label='Идеальная калибровка')
plt.xlabel('Средний предсказанный')
plt.ylabel('Доля положительных (факт)')
plt.title('Калибровочная кривая')
plt.legend()
plt.grid(True)
plt.show()