Загрузка данных
class MatrixProfileDetector():
def __init__(
self,
path_to_data: str,
n_random_traces: int = 5, # сколько рандомных трасс из датафрейма обрабатывать если не будет указан список трасс
traces_df: pd.DataFrame = None
):
self.n_random_traces = n_random_traces
self.path_to_data = path_to_data
if not traces_df:
self.read_from_file()
else:
self.traces_df = traces_df
def read_from_file(
self,
time_col='Time',
trace_col='Trace Number',
):
self.time_col = time_col
self.trace_col = trace_col
traces_df = pd.read_csv(self.path_to_data)
if traces_df.shape[0] <= 0:
raise ValueError("Empty data")
if self.time_col not in traces_df.columns or trace_col not in traces_df.columns:
raise ValueError(f"Wrong structure of dataset {path_to_data}")
self.traces_df = traces_df
self.split_trajectories_by_time_gap()
def split_trajectories_by_time_gap(self,
gap_threshold=20, start_segment=0,
remove_first_point=True):
"""
Разделяет "слипшиеся" траектории по временным разрывам.
Новый идентификатор Trace_ID формируется как f"{original_trace}_{segment_index}".
Параметры:
df : DataFrame - исходные данные
time_col : str - столбец с временем (числовой формат)
trace_col : str - столбец с номером трассы из РЛС (строка)
gap_threshold : float - порог разрыва в секундах для разделения
start_segment : int - начальный номер сегмента (0 или 1)
remove_first_point : bool - удалить первую точку каждой траектории после разделения
Возвращает:
df с добавленным столбцом 'Trace_ID' (и, возможно, очищенный от первых точек)
"""
df = self.traces_df.copy()
df = df.sort_values(by=[self.trace_col, self.time_col]).reset_index(drop=True)
new_trace_ids = []
for original_trace, group in df.groupby(self.trace_col, sort=False):
segment_idx = start_segment
prev_time = None
for idx, row in group.iterrows():
cur_time = row[self.time_col]
if prev_time is None or (cur_time - prev_time) > gap_threshold:
if prev_time is not None:
segment_idx += 1
trace_id = f"{original_trace}_{segment_idx}"
new_trace_ids.append(trace_id)
prev_time = cur_time
df['Trace_ID'] = new_trace_ids
if remove_first_point:
df = df.sort_values(by=['Trace_ID', self.time_col])
df = df.groupby('Trace_ID', group_keys=False).apply(
lambda x: x.iloc[1:] if len(x) > 1 else pd.DataFrame(columns=x.columns)
).reset_index(drop=True)
self.traces_df = df
def visualize_one_trace(self, trace_id, axes="XZ"):
df = self.traces_df.copy()
trace = df[df["Trace_ID"]==trace_id]
if "Tick" in trace.columns:
trace_sorted = trace.sort_values("Tick")
elif "Time" in trace.columns:
trace_sorted = trace.sort_values("Time")
else:
trace_sorted = trace
axes_sorted = ''.join(sorted(axes))
if axes_sorted not in ["XY", "XZ", "YZ"]:
raise ValueError("Нужно задать плоскость в которой ожидается визуализация. Доступные опции: XY, XZ, YZ")
if axes_sorted == "XY":
x_col, y_col = "X", "Y"
elif axes_sorted == "XZ":
x_col, y_col = "X", "Z"
else: # "YZ"
x_col, y_col = "Y", "Z"
x = trace_sorted[x_col].values
y = trace_sorted[y_col].values
vx = trace_sorted["VX"].values
vy = trace_sorted["VY"].values
vz = trace_sorted["VZ"].values
speed = np.sqrt(vx**2 + vy**2 + vz**2)
x_start, y_start = x[0], y[0]
x_end, y_end = x[-1], y[-1]
plt.figure(figsize=(8, 6))
plt.plot(x, y, color='gray', linewidth=1, alpha=0.6, label='Траектория')
sc = plt.scatter(x, y, c=speed, cmap='viridis', s=20, alpha=0.7, label='Точки траектории')
plt.scatter(x_start, y_start, marker='x', color='green', s=100, label='Начало')
plt.scatter(x_end, y_end, marker='x', color='red', s=100, label='Конец')
plt.colorbar(sc, label='Скорость (м/с)')
plt.legend()
plt.xlabel(x_col)
plt.ylabel(y_col)
plt.title(f'Траектория (плоскость {axes_sorted})')
plt.grid(True, linestyle='--', alpha=0.6)
plt.axis('equal')
plt.show()
def plot_traces(
self,
trace_id: str = None,
plot_matrix_profile: bool = False
):
if not self.traces_df:
raise ValueError("Empty data")
if not trace_id:
if n_random_traces <= 0:
raise ValueError("There is nothing to plot")
else:
random_trace_idx = self.traces_df["Trace_ID"].sample(n=n_random_traces).unique()
for trace_idx in random_trace_idx:
self.visualize_one_trace(trace_id)
def drop_short_traces(self, treshold: int = 50):
value_counts = self.traces_df["Trace_ID"].value_counts()
long_traces = value_counts[value_counts > treshold].index
long_traces = list(long_traces.values)
self.traces_df = self.traces_df[self.traces_df["Trace_ID"].isin(long_traces)]
def _merge_intervals(self, starts, window_len):
intervals = []
for s in starts:
if intervals and s <= intervals[-1][1]:
intervals[-1] = (intervals[-1][0], max(intervals[-1][1], s + window_len - 1))
else:
intervals.append((s, s + window_len - 1))
return intervals
def _prepare_trace_data(self, trace_id, features, normalize=False):
"""
Извлекает траекторию и подготавливает матрицу T для stumpy.
Возвращает словарь с trace, T, N, speed, d.
Если траектория слишком короткая или недостаточно размерностей, возвращает None.
"""
# Берём колонки, гарантируя наличие X, Y, Z, Time
cols = list(set(features + ["X", "Y", "Z", "Time"]))
trace = self.traces_df[self.traces_df['Trace_ID'] == trace_id].sort_values('Time')[cols].dropna()
N = len(trace)
if N < 10:
print(f"Траектория {trace_id} слишком короткая ({N} точек), пропускаем")
return None
# Скорость для цветовой шкалы
if 'Time' in trace.columns:
time_diff = np.diff(trace['Time'].values)
if len(time_diff) > 0 and np.all(time_diff > 0):
speed = np.linalg.norm(np.diff(trace[['X', 'Y', 'Z']].values, axis=0), axis=1) / time_diff
speed = np.append(speed, speed[-1])
else:
speed = np.ones(N) * np.nan
else:
speed = np.ones(N)
# Матрица признаков (d, N)
T = trace[features].values.astype(np.float64).T
d = T.shape[0]
if d < 3:
print(f"Недостаточно размерностей ({d}) для многомерного stumpy")
return None
return {
'trace': trace,
'T': T,
'N': N,
'speed': speed,
'd': d
}
def _compute_single_mstump_profile(self, T, m, normalize):
"""Обёртка вокруг stumpy.mstump, возвращает профиль (максимум по размерностям)."""
P, I = stumpy.mstump(T, m, normalize=normalize)
return np.max(P, axis=0)
def _refine_anomalies(self, trace, speed, final_anomalies, max_m):
"""
Расширяет границы аномальных интервалов, если в окрестности (max_m//2 точек)
обнаруживаются резкие скачки расстояния, скорости или направления.
Возвращает обновлённый список непересекающихся интервалов.
"""
if not final_anomalies or max_m < 2:
return final_anomalies
# Подготовим массив координат для быстрого вычисления расстояний
coords = trace[['X', 'Y', 'Z']].values
N = len(coords)
half_window = max_m // 2
new_intervals = []
for s, e in final_anomalies:
# Вычислим средние характеристики исходного интервала
if e >= s:
seg_dists = np.linalg.norm(np.diff(coords[s:e+1], axis=0), axis=1)
avg_dist = np.median(seg_dists) if len(seg_dists) > 0 else 0.0
avg_speed = np.median(speed[s:e+1])
else:
avg_dist = 0.0
avg_speed = 0.0
# --- Расширение левой границы (s) ---
new_s = s
# Идём влево от s-1 до max(s - half_window, 0)
for i in range(s - 1, max(s - half_window, 0) - 1, -1):
# Проверка 1: аномальное расстояние между i и i+1
d = np.linalg.norm(coords[i] - coords[i+1])
dist_cond = (avg_dist > 1e-8) and (d > 3.0 * avg_dist)
# Проверка 2: скачок скорости (берём точку i)
speed_cond = (avg_speed > 1e-8) and (speed[i] > 3.0 * avg_speed)
# Проверка 3: излом траектории (угол в точке i+1, т.к. i+1 ближе к центру)
dir_cond = False
if i > 0 and i+2 < N: # чтобы были соседи для точки i+1
v1 = coords[i] - coords[i-1] # вектор до i
v2 = coords[i+2] - coords[i+1] # вектор после i+1
# угол в точке i+1 — это угол между (coords[i+1]-coords[i]) и (coords[i+2]-coords[i+1])
# Но мы хотим поймать излом именно в окрестности. Возьмём точку i+1:
if i+1 < N-1:
vec_in = coords[i+1] - coords[i]
vec_out = coords[i+2] - coords[i+1]
norm_in = np.linalg.norm(vec_in)
norm_out = np.linalg.norm(vec_out)
if norm_in > 1e-8 and norm_out > 1e-8:
cosang = np.dot(vec_in, vec_out) / (norm_in * norm_out)
angle = np.arccos(np.clip(cosang, -1, 1))
dir_cond = angle > np.radians(90) # порог 120°
if (dist_cond and speed_cond) or dir_cond:
new_s = i # сдвигаем левую границу на эту точку
# --- Расширение правой границы (e) ---
new_e = e
for j in range(e + 1, min(e + half_window + 1, N)):
# Проверка 1: аномальное расстояние между j-1 и j
d = np.linalg.norm(coords[j] - coords[j-1])
dist_cond = (avg_dist > 1e-8) and (d > 3.0 * avg_dist)
# Проверка 2: скачок скорости в точке j
speed_cond = (avg_speed > 1e-8) and (speed[j] > 3.0 * avg_speed)
# Проверка 3: излом в точке j-1 (ближе к интервалу)
dir_cond = False
if j-2 >= 0 and j < N: # соседи для j-1
vec_in = coords[j-1] - coords[j-2]
vec_out = coords[j] - coords[j-1]
norm_in = np.linalg.norm(vec_in)
norm_out = np.linalg.norm(vec_out)
if norm_in > 1e-8 and norm_out > 1e-8:
cosang = np.dot(vec_in, vec_out) / (norm_in * norm_out)
angle = np.arccos(np.clip(cosang, -1, 1))
dir_cond = angle > np.radians(90)
if (dist_cond and speed_cond) or dir_cond:
new_e = j
new_intervals.append([new_s, new_e])
# Слияние пересекающихся/соприкасающихся интервалов
merged = []
for start, end in sorted(new_intervals):
if merged and start <= merged[-1][1] + 1:
merged[-1][1] = max(merged[-1][1], end)
else:
merged.append([start, end])
return merged
def detect_anomalies_matrix_profile(
self,
trace_id,
m_values=[0.05, 0.1, 0.15],
percentile_values=[90, 95, 99],
features=["X", "Y", "Z"],
min_interval_length=3,
iqr_multiplier=1.5,
normalize=False,
refine_anomalies=True
):
"""
Выполняет обнаружение аномалий с помощью многомерного матричного профиля.
Сохраняет результаты в self.last_anomaly_result и возвращает тот же словарь.
Возвращаемый словарь:
- final_anomalies: список интервалов [start, end]
- profile_to_plot: профиль для визуализации (от медианного m)
- speed: массив скоростей
- trace: DataFrame траектории
- T: матрица признаков
- N: число точек
- params: словарь с параметрами для воспроизведения порогов
"""
prep = self._prepare_trace_data(trace_id, features, normalize)
if prep is None:
return None
trace, T, N, speed, d = prep['trace'], prep['T'], prep['N'], prep['speed'], prep['d']
# Собираем профили для каждого m (один раз, чтобы не дублировать вычисления)
profiles = {} # m -> profile
for m_ratio in m_values:
m = max(3, int(m_ratio * N))
if m >= N:
m = N - 1
if m < 3:
continue
try:
profiles[m] = self._compute_single_mstump_profile(T, m, normalize)
except Exception as e:
print(f"Ошибка при вычислении mstump для m={m}: {e}")
# IQR‑аномалии
all_iqr_intervals = []
for m, profile in profiles.items():
q1, q3 = np.percentile(profile, [25, 75])
iqr = q3 - q1
thr_iqr = q3 + iqr_multiplier * iqr
anomaly_starts = np.where(profile > thr_iqr)[0]
intervals = self._merge_intervals(anomaly_starts, m)
intervals = [iv for iv in intervals if (iv[1] - iv[0] + 1) >= min_interval_length]
all_iqr_intervals.extend(intervals)
# Перцентильные аномалии
all_percentile_intervals = []
for m, profile in profiles.items():
for p in percentile_values:
thr_percentile = np.percentile(profile, p)
anomaly_starts = np.where(profile > thr_percentile)[0]
intervals = self._merge_intervals(anomaly_starts, m)
intervals = [iv for iv in intervals if (iv[1] - iv[0] + 1) >= min_interval_length]
all_percentile_intervals.extend(intervals)
# Голосование
total_combinations = len(m_values) * len(percentile_values)
required_votes = total_combinations * 2 // 3
percentile_vote_count = {}
for start, end in all_percentile_intervals:
for i in range(start, end + 1):
percentile_vote_count[i] = percentile_vote_count.get(i, 0) + 1
iqr_points = set(
point for start, end in all_iqr_intervals for point in range(start, end + 1)
)
final_anomalies = []
for start in range(N):
if start in iqr_points:
if percentile_vote_count.get(start, 0) >= required_votes:
if not final_anomalies or start > final_anomalies[-1][1] + 1:
final_anomalies.append([start, start])
else:
final_anomalies[-1][1] = start
# Доуточнение
if refine_anomalies:
if final_anomalies and profiles:
max_m = max(profiles.keys())
print(max_m)
final_anomalies = self._refine_anomalies(trace, speed, final_anomalies, max_m)
# Профиль для визуализации – берём медианный m (или последний использованный)
if profiles:
median_m = np.median(list(profiles.keys()))
# округляем до ближайшего присутствующего ключа
m_sample = min(profiles.keys(), key=lambda x: abs(x - median_m))
profile_to_plot = profiles[m_sample]
else:
profile_to_plot = None
result = {
'final_anomalies': final_anomalies,
'profile_to_plot': profile_to_plot,
'speed': speed,
'trace': trace,
'T': T,
'N': N,
'params': {
'm_values': m_values,
'percentile_values': percentile_values,
'features': features,
'min_interval_length': min_interval_length,
'iqr_multiplier': iqr_multiplier,
'normalize': normalize,
}
}
self.last_anomaly_result = result
return result
# --- Визуализации ---
def plot_anomaly_projections(self, result_dict=None):
"""
Три проекции (XZ, XY, ZY) с выделением аномалий.
Принимает результат detect_anomalies_matrix_profile или использует self.last_anomaly_result.
"""
if result_dict is None:
result_dict = getattr(self, 'last_anomaly_result', None)
if result_dict is None:
raise ValueError("Нет данных для визуализации. Сначала вызовите detect_anomalies_matrix_profile()")
trace = result_dict['trace']
speed = result_dict['speed']
final_anomalies = result_dict['final_anomalies']
fig, axes = plt.subplots(2, 2, figsize=(18, 12))
ax1, ax2, ax3, ax_hist = axes.flatten()
colors = cycle(['red', 'orange', 'green', 'blue', 'purple', 'brown', 'cyan', 'magenta'])
plot_pairs = [('X', 'Z'), ('X', 'Y'), ('Z', 'Y')]
titles = ['XZ проекция', 'XY проекция', 'ZY проекция']
for ax, (x_col, y_col), title in zip([ax1, ax2, ax3], plot_pairs, titles):
ax.plot(trace[x_col], trace[y_col], 'k-', alpha=0.5, label='Траектория')
ax.scatter(trace[x_col], trace[y_col], c=speed, cmap='viridis', s=20, alpha=0.7)
if final_anomalies:
color = next(colors)
for start, end in final_anomalies:
ax.plot(
trace[x_col].iloc[start:end+1],
trace[y_col].iloc[start:end+1],
color=color, linewidth=3,
label='Аномалии' if start == final_anomalies[0][0] else ""
)
ax.set_xlabel(f"{x_col}, метры", fontsize=15)
ax.set_ylabel(f"{y_col}, метры", fontsize=15)
ax.set_title(title)
ax.legend(loc='upper left', fontsize=15)
ax.axis('equal')
# Гистограмма матричного профиля
profile_to_plot = result_dict['profile_to_plot']
if profile_to_plot is not None:
n_bins = min(50, len(profile_to_plot) // 10)
counts, bins, patches = ax_hist.hist(
profile_to_plot, bins=n_bins, alpha=0.7,
color='lightblue', edgecolor='black', label='Матричный профиль'
)
# Выделяем аномальные значения
anomaly_mask = np.zeros(len(profile_to_plot), dtype=bool)
for start, end in final_anomalies:
anomaly_mask[start:end+1] = True
anomaly_values = profile_to_plot[anomaly_mask]
if len(anomaly_values) > 0:
ax_hist.hist(anomaly_values, bins=bins, alpha=0.8, color='red',
edgecolor='darkred', linewidth=1, label='Аномальные значения')
# Пороговые линии
params = result_dict['params']
q1, q3 = np.percentile(profile_to_plot, [25, 75])
iqr = q3 - q1
thr_iqr_plot = q3 + params['iqr_multiplier'] * iqr
ax_hist.axvline(x=thr_iqr_plot, color='orange', linestyle='--',
linewidth=2, label=f"IQR порог ({params['iqr_multiplier']}×IQR)")
for p in params['percentile_values']:
thr = np.percentile(profile_to_plot, p)
ax_hist.axvline(x=thr, color='purple', linestyle=':', linewidth=1.5, alpha=0.8,
label=f'{p}-й перцентиль')
# Чистая легенда без дубликатов
handles, labels = ax_hist.get_legend_handles_labels()
by_label = dict(zip(labels, handles))
ax_hist.legend(by_label.values(), by_label.keys())
ax_hist.set_xlabel('Значение матричного профиля')
ax_hist.set_ylabel('Частота')
ax_hist.set_title('Гистограмма матричного профиля\n(аномалии выделены красным)')
plt.tight_layout()
plt.show()
def plot_matrix_profile_line(self, result_dict=None):
"""
Линейный график матричного профиля с подсветкой аномалий.
"""
if result_dict is None:
result_dict = getattr(self, 'last_anomaly_result', None)
if result_dict is None or result_dict['profile_to_plot'] is None:
raise ValueError("Нет профиля для отображения")
profile = result_dict['profile_to_plot']
final_anomalies = result_dict['final_anomalies']
fig, ax = plt.subplots(figsize=(12, 4))
ax.plot(profile, 'b-', alpha=0.7, label='Матричный профиль')
if final_anomalies:
for i, (start, end) in enumerate(final_anomalies):
ax.axvspan(start, end, color='red', alpha=0.3,
label='Аномалия' if i == 0 else "")
ax.set_xlabel('Индекс подпоследовательности')
ax.set_ylabel('Расстояние до ближайшего соседа')
ax.set_title('Матричный профиль (максимум по размерностям)')
ax.legend()
ax.grid(True)
plt.tight_layout()
plt.show()
def run_full_anomaly_pipeline(
self,
trace_id,
features=["VX", "VY", "VZ"], # или другие признаки, например ["VX", "VY", "VZ"]
m_values=[0.05, 0.08, 0.1],
percentile_values=[93, 96, 99],
iqr_multiplier=1.35,
min_interval_length=3,
normalize=False,
refine_anomalies=False
):
"""
Управляющий метод: детекция + полная визуализация (проекции + гистограмма + линейный профиль).
Возвращает результат detect_anomalies_matrix_profile.
"""
result = self.detect_anomalies_matrix_profile(
trace_id,
m_values=m_values,
percentile_values=percentile_values,
features=features,
min_interval_length=min_interval_length,
iqr_multiplier=iqr_multiplier,
normalize=normalize,
refine_anomalies=refine_anomalies
)
if result is None:
return None
self.plot_anomaly_projections(result)
self.plot_matrix_profile_line(result)
return result
detector = MatrixProfileDetector("augmented.csv")
for trace_id in interesting_traces[:10]:
detector.run_full_anomaly_pipeline(trace_id, refine_anomalies=False)
detector.run_full_anomaly_pipeline(trace_id, refine_anomalies=True)