Загрузка данных


import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import signal
from scipy.optimize import curve_fit


# ============================================================
# НАСТРОЙКИ
# ============================================================

CSV_FILE = "can_log_20260506_140611.csv"

# "az" — азимут
# "el" — угол места
AXIS = "az"

# сколько секунд показывать до найденной ступени
WINDOW_BEFORE = 0.4

# сколько секунд показывать после найденной ступени
WINDOW_AFTER = 5.0

ID_COMMAND = "ID5"
ID_FEEDBACK = "ID6"


# ============================================================
# ЧТЕНИЕ CSV
# ============================================================

def read_log(filename):
    df = pd.read_csv(filename)

    required_columns = [
        "iso_time",
        "t_ms",
        "id6_az_raw",
        "id6_el_raw",
        "id6_az_deg",
        "id6_el_deg",
        "id5_az_raw",
        "id5_el_raw",
        "id5_az_deg",
        "id5_el_deg",
        "src"
    ]

    for col in required_columns:
        if col not in df.columns:
            raise RuntimeError(f"В CSV не найден столбец: {col}")

    df["iso_time"] = pd.to_datetime(df["iso_time"], errors="coerce")
    df = df.dropna(subset=["iso_time"]).copy()

    df["src"] = df["src"].astype(str).str.strip().str.upper()

    numeric_columns = [
        "t_ms",
        "id6_az_raw",
        "id6_el_raw",
        "id6_az_deg",
        "id6_el_deg",
        "id5_az_raw",
        "id5_el_raw",
        "id5_az_deg",
        "id5_el_deg"
    ]

    for col in numeric_columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    df = df.sort_values("iso_time").reset_index(drop=True)

    # время в секундах от начала записи
    df["t"] = (df["t_ms"] - df["t_ms"].iloc[0]) / 1000.0

    return df


# ============================================================
# ПОИСК СТУПЕНИ
# ============================================================

def get_axis_columns(axis):
    if axis == "az":
        command_col = "id5_az_deg"
        feedback_col = "id6_az_deg"
        axis_title = "азимута"
        y_label = "Азимут, градусы"
    elif axis == "el":
        command_col = "id5_el_deg"
        feedback_col = "id6_el_deg"
        axis_title = "угла места"
        y_label = "Угол места, градусы"
    else:
        raise RuntimeError("AXIS должен быть 'az' или 'el'")

    return command_col, feedback_col, axis_title, y_label


def find_step(df, command_col):
    cmd = df[df["src"] == ID_COMMAND][["t", command_col]].dropna().copy()

    if len(cmd) < 2:
        raise RuntimeError(
            "Не найдено достаточно строк ID5 с командой. "
            "Проверь, есть ли в файле строки src=ID5 и заполнены ли id5_az_deg/id5_el_deg."
        )

    cmd_t = cmd["t"].to_numpy()
    cmd_y = cmd[command_col].to_numpy()

    diff = np.diff(cmd_y)

    if len(diff) == 0:
        raise RuntimeError("Недостаточно командных точек для поиска ступени.")

    step_index = int(np.argmax(np.abs(diff)))
    step_size = diff[step_index]

    if abs(step_size) < 0.001:
        raise RuntimeError(
            "Ступень не найдена: команда ID5 почти не меняется. "
            "Нужен лог, где была отправлена новая команда положения."
        )

    step_time = cmd_t[step_index + 1]
    value_before = cmd_y[step_index]
    value_after = cmd_y[step_index + 1]

    return step_time, value_before, value_after, step_size


def prepare_step_data(df, command_col, feedback_col):
    step_time, cmd_before, cmd_after, step_size = find_step(df, command_col)

    t_start = step_time - WINDOW_BEFORE
    t_end = step_time + WINDOW_AFTER

    cmd = df[df["src"] == ID_COMMAND][["t", command_col]].dropna().copy()
    fb = df[df["src"] == ID_FEEDBACK][["t", feedback_col]].dropna().copy()

    cmd = cmd[(cmd["t"] >= t_start) & (cmd["t"] <= t_end)]
    fb = fb[(fb["t"] >= t_start) & (fb["t"] <= t_end)]

    if len(fb) < 5:
        raise RuntimeError("После найденной ступени слишком мало ответов ID6.")

    fb_t = fb["t"].to_numpy() - step_time
    fb_y = fb[feedback_col].to_numpy()

    # красивая идеальная ступень для отображения
    cmd_plot_t = np.array([
        -WINDOW_BEFORE,
        0.0,
        0.0,
        WINDOW_AFTER
    ])

    cmd_plot_y = np.array([
        cmd_before,
        cmd_before,
        cmd_after,
        cmd_after
    ])

    # для модели берём только реакцию после подачи ступени
    after_mask = fb_t >= 0.0

    t_response = fb_t[after_mask]
    y_response = fb_y[after_mask]

    if len(t_response) < 5:
        raise RuntimeError("Недостаточно точек ID6 после ступени.")

    t_response = t_response - t_response[0]

    return {
        "step_time": step_time,
        "cmd_before": cmd_before,
        "cmd_after": cmd_after,
        "step_size": step_size,
        "fb_t": fb_t,
        "fb_y": fb_y,
        "cmd_plot_t": cmd_plot_t,
        "cmd_plot_y": cmd_plot_y,
        "t_response": t_response,
        "y_response": y_response
    }


# ============================================================
# МОДЕЛЬ ОБЪЕКТА
# ============================================================

def first_order_model(t, k, tau, delay, y0):
    y = np.full_like(t, y0, dtype=float)

    active = t >= delay

    y[active] = y0 + k * (1.0 - np.exp(-(t[active] - delay) / tau))

    return y


def fit_first_order_model(t, y, cmd_before, cmd_after):
    y0 = np.median(y[:max(3, len(y) // 20)])
    y_final = np.median(y[-max(3, len(y) // 10):])

    k0 = y_final - y0
    tau0 = max(0.05, (t[-1] - t[0]) / 5.0)
    delay0 = 0.0

    try:
        params, _ = curve_fit(
            first_order_model,
            t,
            y,
            p0=[k0, tau0, delay0, y0],
            bounds=(
                [-10000.0, 0.001, 0.0, -10000.0],
                [10000.0, 100.0, max(t[-1], 0.001), 10000.0]
            ),
            maxfev=20000
        )

        k_response, tau, delay, y0_fit = params

    except Exception:
        k_response = k0
        tau = tau0
        delay = 0.0
        y0_fit = y0

    command_delta = cmd_after - cmd_before

    if abs(command_delta) < 1e-9:
        gain = 1.0
    else:
        gain = k_response / command_delta

    return {
        "k_response": k_response,
        "gain": gain,
        "tau": abs(tau),
        "delay": max(0.0, delay),
        "y0": y0_fit
    }


# ============================================================
# ПАРАМЕТРЫ ПЕРЕХОДНОЙ ХАРАКТЕРИСТИКИ
# ============================================================

def calculate_step_metrics(t, y):
    y0 = np.median(y[:max(3, len(y) // 20)])
    y_final = np.median(y[-max(3, len(y) // 10):])

    delta = y_final - y0

    if abs(delta) < 1e-9:
        return {
            "y0": y0,
            "y_final": y_final,
            "overshoot_percent": 0.0,
            "rise_time": np.nan,
            "settling_time": np.nan,
            "lower_5": y_final,
            "upper_5": y_final
        }

    if delta > 0:
        y_peak = np.max(y)
        overshoot = max(0.0, y_peak - y_final)
    else:
        y_peak = np.min(y)
        overshoot = max(0.0, y_final - y_peak)

    overshoot_percent = overshoot / abs(delta) * 100.0

    y10 = y0 + 0.1 * delta
    y90 = y0 + 0.9 * delta

    if delta > 0:
        idx10 = np.where(y >= y10)[0]
        idx90 = np.where(y >= y90)[0]
    else:
        idx10 = np.where(y <= y10)[0]
        idx90 = np.where(y <= y90)[0]

    rise_time = np.nan

    if len(idx10) > 0 and len(idx90) > 0:
        rise_time = t[idx90[0]] - t[idx10[0]]

    tolerance = 0.05 * abs(delta)

    lower_5 = y_final - tolerance
    upper_5 = y_final + tolerance

    settling_time = np.nan

    for i in range(len(y)):
        tail = y[i:]

        if np.all((tail >= lower_5) & (tail <= upper_5)):
            settling_time = t[i]
            break

    return {
        "y0": y0,
        "y_final": y_final,
        "overshoot_percent": overshoot_percent,
        "rise_time": rise_time,
        "settling_time": settling_time,
        "lower_5": lower_5,
        "upper_5": upper_5
    }


# ============================================================
# ГРАФИК 1 — ПЕРЕХОДНАЯ ХАРАКТЕРИСТИКА
# ============================================================

def plot_step_response(data, model, axis_title, y_label):
    t_response = data["t_response"]
    y_response = data["y_response"]

    metrics = calculate_step_metrics(t_response, y_response)

    t_model = np.linspace(0.0, max(t_response), 800)

    y_model = first_order_model(
        t_model,
        model["k_response"],
        model["tau"],
        model["delay"],
        model["y0"]
    )

    fig = plt.figure(figsize=(13, 7))
    fig.canvas.manager.set_window_title("График переходной характеристики")

    ax = fig.add_subplot(111)

    ax.plot(
        data["cmd_plot_t"],
        data["cmd_plot_y"],
        linewidth=2.5,
        drawstyle="steps-post",
        label="Входная ступень, команда ID5"
    )

    ax.plot(
        data["fb_t"],
        data["fb_y"],
        linewidth=2.2,
        marker="o",
        markersize=3,
        label="Выходной угол, ответ ID6"
    )

    ax.plot(
        t_model,
        y_model,
        linewidth=2.2,
        linestyle="--",
        label="Приближённая модель"
    )

    ax.axvline(
        0.0,
        linewidth=1.5,
        linestyle=":",
        label="Момент подачи ступени"
    )

    ax.axhline(
        metrics["y_final"],
        linewidth=1.5,
        linestyle="--",
        label="Установившееся значение"
    )

    ax.axhline(
        metrics["lower_5"],
        linewidth=1.0,
        linestyle=":",
        label="Зона установления ±5%"
    )

    ax.axhline(
        metrics["upper_5"],
        linewidth=1.0,
        linestyle=":"
    )

    if np.isfinite(metrics["settling_time"]):
        ax.axvline(
            metrics["settling_time"],
            linewidth=1.5,
            linestyle="-.",
            label=f"Время установления ≈ {metrics['settling_time']:.3f} с"
        )

    title = (
        f"Переходная характеристика канала {axis_title}\n"
        f"перерегулирование = {metrics['overshoot_percent']:.2f} %, "
        f"время нарастания = {metrics['rise_time']:.3f} с, "
        f"время установления = {metrics['settling_time']:.3f} с"
    )

    ax.set_title(title, fontsize=14)
    ax.set_xlabel("Время относительно ступени, с")
    ax.set_ylabel(y_label)

    ax.grid(True, which="both", linestyle=":")
    ax.legend(loc="best")

    fig.tight_layout()

    print()
    print("============================================================")
    print("ПЕРЕХОДНАЯ ХАРАКТЕРИСТИКА")
    print("============================================================")
    print(f"Команда до ступени:          {data['cmd_before']:.4f} град")
    print(f"Команда после ступени:       {data['cmd_after']:.4f} град")
    print(f"Размер ступени:              {data['step_size']:.4f} град")
    print(f"Начальное положение:         {metrics['y0']:.4f} град")
    print(f"Установившееся положение:    {metrics['y_final']:.4f} град")
    print(f"Перерегулирование:           {metrics['overshoot_percent']:.2f} %")
    print(f"Время нарастания 10-90%:     {metrics['rise_time']:.4f} с")
    print(f"Время установления ±5%:      {metrics['settling_time']:.4f} с")

    print()
    print("============================================================")
    print("ПРИБЛИЖЁННАЯ МОДЕЛЬ ОБЪЕКТА")
    print("============================================================")
    print(f"Коэффициент передачи K:      {model['gain']:.6f}")
    print(f"Постоянная времени tau:      {model['tau']:.6f} с")
    print(f"Запаздывание L:              {model['delay']:.6f} с")


# ============================================================
# ГРАФИК 2 — АЧХ И ФЧХ
# ============================================================

def plot_frequency_response(model):
    k = model["gain"]
    tau = model["tau"]
    delay = model["delay"]

    w = np.logspace(-1, 2, 800)

    system = signal.TransferFunction([k], [tau, 1.0])

    w, mag, phase = signal.bode(system, w=w)

    # Запаздывание не меняет амплитуду, но добавляет фазовое отставание
    phase_with_delay = phase - np.degrees(w * delay)

    fig = plt.figure(figsize=(13, 8))
    fig.canvas.manager.set_window_title("АЧХ и ФЧХ объекта управления")

    ax1 = fig.add_subplot(211)
    ax2 = fig.add_subplot(212)

    ax1.semilogx(w, mag, linewidth=2.4)
    ax1.set_title("Амплитудно-частотная характеристика объекта управления", fontsize=13)
    ax1.set_ylabel("Амплитуда, дБ")
    ax1.grid(True, which="both", linestyle=":")

    ax2.semilogx(w, phase_with_delay, linewidth=2.4)
    ax2.set_title("Фазочастотная характеристика объекта управления", fontsize=13)
    ax2.set_xlabel("Частота, рад/с")
    ax2.set_ylabel("Фаза, градусы")
    ax2.grid(True, which="both", linestyle=":")

    fig.suptitle(
        f"АЧХ и ФЧХ приближённой модели: "
        f"K = {k:.4f}, tau = {tau:.4f} с, L = {delay:.4f} с",
        fontsize=14
    )

    fig.tight_layout()


# ============================================================
# MAIN
# ============================================================

def main():
    df = read_log(CSV_FILE)

    command_col, feedback_col, axis_title, y_label = get_axis_columns(AXIS)

    print("Файл загружен:", CSV_FILE)
    print("Всего строк:", len(df))
    print("Строк ID5:", len(df[df["src"] == ID_COMMAND]))
    print("Строк ID6:", len(df[df["src"] == ID_FEEDBACK]))
    print("Анализируемый канал:", axis_title)

    data = prepare_step_data(df, command_col, feedback_col)

    model = fit_first_order_model(
        data["t_response"],
        data["y_response"],
        data["cmd_before"],
        data["cmd_after"]
    )

    plot_step_response(data, model, axis_title, y_label)
    plot_frequency_response(model)

    plt.show()


if __name__ == "__main__":
    main()