Загрузка данных


import os
import sys
import time
import ctypes
import logging
import traceback
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple

import cv2
import numpy as np
import pyautogui
import pyvirtualcam
from pyvirtualcam import PixelFormat

import tkinter as tk
from tkinter import filedialog, messagebox

import psutil
import win32con
import win32gui
import win32process


APP_NAME = "AyuGramVideoCam"
VIRTUAL_CAMERA_BACKEND = "obs"

# Ты проверил, что "none" помогло, поэтому оставляем так.
# Если потом снова понадобится повернуть:
# "none", "clockwise", "counterclockwise", "180"
ROTATE_MODE = "none"

# Включить звук из видео.
# Для попадания звука в звонок нужен VB-CABLE.
AUDIO_ENABLED = True

VIDEO_EXTENSIONS = (
    "*.mp4",
    "*.avi",
    "*.mkv",
    "*.mov",
    "*.wmv",
    "*.webm",
    "*.m4v",
)

pyautogui.PAUSE = 0.05
pyautogui.FAILSAFE = True


def setup_logging() -> None:
    log_dir = Path(os.getenv("LOCALAPPDATA", str(Path.home()))) / APP_NAME
    log_dir.mkdir(parents=True, exist_ok=True)

    logging.basicConfig(
        filename=str(log_dir / "app.log"),
        level=logging.INFO,
        format="%(asctime)s [%(levelname)s] %(message)s",
        encoding="utf-8",
    )


def set_dpi_awareness() -> None:
    try:
        ctypes.windll.user32.SetProcessDPIAware()
    except Exception:
        pass


def show_error(title: str, text: str) -> None:
    root = tk.Tk()
    root.withdraw()
    root.attributes("-topmost", True)
    messagebox.showerror(title, text)
    root.destroy()


def show_warning(title: str, text: str) -> None:
    root = tk.Tk()
    root.withdraw()
    root.attributes("-topmost", True)
    messagebox.showwarning(title, text)
    root.destroy()


def select_video_file() -> Optional[str]:
    root = tk.Tk()
    root.withdraw()
    root.attributes("-topmost", True)

    filetypes = [
        ("Видео файлы", " ".join(VIDEO_EXTENSIONS)),
        ("Все файлы", "*.*"),
    ]

    video_path = filedialog.askopenfilename(
        title="Выберите видеофайл",
        filetypes=filetypes,
    )

    root.destroy()

    if not video_path:
        return None

    return video_path


def sanitize_fps(raw_fps: float) -> float:
    if raw_fps is None or raw_fps <= 0 or raw_fps > 120:
        return 30.0
    return float(raw_fps)


def sanitize_size(width: int, height: int) -> Tuple[int, int]:
    if width <= 0 or height <= 0:
        return 1280, 720

    width = int(width)
    height = int(height)

    if width % 2 != 0:
        width -= 1
    if height % 2 != 0:
        height -= 1

    width = max(width, 320)
    height = max(height, 240)

    return width, height


def rotate_frame_bgr(frame_bgr):
    if ROTATE_MODE == "clockwise":
        return cv2.rotate(frame_bgr, cv2.ROTATE_90_CLOCKWISE)

    if ROTATE_MODE == "counterclockwise":
        return cv2.rotate(frame_bgr, cv2.ROTATE_90_COUNTERCLOCKWISE)

    if ROTATE_MODE == "180":
        return cv2.rotate(frame_bgr, cv2.ROTATE_180)

    return frame_bgr


def prepare_frame_for_camera(frame_bgr, width: int, height: int):
    frame_bgr = rotate_frame_bgr(frame_bgr)

    if frame_bgr.shape[1] != width or frame_bgr.shape[0] != height:
        frame_bgr = cv2.resize(frame_bgr, (width, height), interpolation=cv2.INTER_AREA)

    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    frame_rgb = np.ascontiguousarray(frame_rgb)

    return frame_rgb


def audio_worker(video_path: str, stop_event: threading.Event) -> None:
    """
    Проигрывает звук из выбранного видео.

    Важно:
    звук пойдёт в выбранное в Windows устройство вывода.
    Чтобы звук попал в AyuGram, в Windows надо выбрать:
    CABLE Input (VB-Audio Virtual Cable)
    """
    player = None

    try:
        from ffpyplayer.player import MediaPlayer

        logging.info("Запуск аудио из видео: %r", video_path)

        player = MediaPlayer(video_path)

        while not stop_event.is_set():
            frame, val = player.get_frame()

            if val == "eof":
                logging.info("Аудио закончилось.")
                break

            if isinstance(val, (int, float)):
                time.sleep(max(0.001, min(float(val), 0.05)))
            else:
                time.sleep(0.01)

    except Exception:
        logging.exception("Ошибка аудио-проигрывателя")

    finally:
        try:
            if player is not None:
                player.close_player()
        except Exception:
            pass

        logging.info("Аудио-проигрыватель остановлен.")


def start_audio_thread(video_path: str) -> Tuple[Optional[threading.Thread], Optional[threading.Event]]:
    if not AUDIO_ENABLED:
        return None, None

    stop_event = threading.Event()

    thread = threading.Thread(
        target=audio_worker,
        args=(video_path, stop_event),
        daemon=True,
    )

    thread.start()

    return thread, stop_event


def stop_audio_thread(thread: Optional[threading.Thread], stop_event: Optional[threading.Event]) -> None:
    if stop_event is not None:
        stop_event.set()

    if thread is not None:
        thread.join(timeout=2.0)


@dataclass
class WindowInfo:
    hwnd: int
    title: str
    class_name: str
    process_name: str
    rect: Tuple[int, int, int, int]

    @property
    def width(self) -> int:
        return max(0, self.rect[2] - self.rect[0])

    @property
    def height(self) -> int:
        return max(0, self.rect[3] - self.rect[1])

    @property
    def area(self) -> int:
        return self.width * self.height


def get_process_name_by_hwnd(hwnd: int) -> str:
    try:
        _, pid = win32process.GetWindowThreadProcessId(hwnd)
        return psutil.Process(pid).name()
    except Exception:
        return ""


def enum_visible_windows() -> List[WindowInfo]:
    windows: List[WindowInfo] = []

    def callback(hwnd, _):
        try:
            if not win32gui.IsWindowVisible(hwnd):
                return

            title = win32gui.GetWindowText(hwnd) or ""
            class_name = win32gui.GetClassName(hwnd) or ""
            process_name = get_process_name_by_hwnd(hwnd)

            rect = win32gui.GetWindowRect(hwnd)
            left, top, right, bottom = rect
            width = right - left
            height = bottom - top

            if width < 200 or height < 120:
                return

            windows.append(
                WindowInfo(
                    hwnd=hwnd,
                    title=title,
                    class_name=class_name,
                    process_name=process_name,
                    rect=rect,
                )
            )
        except Exception:
            return

    win32gui.EnumWindows(callback, None)
    return windows


def find_telegram_windows() -> List[WindowInfo]:
    result: List[WindowInfo] = []

    for window in enum_visible_windows():
        haystack = f"{window.title} {window.class_name} {window.process_name}".lower()
        process_name = window.process_name.lower()

        if (
            "telegram" in haystack
            or "ayugram" in haystack
            or process_name in {
                "telegram.exe",
                "telegramdesktop.exe",
                "ayugram.exe",
                "ayugramdesktop.exe",
            }
        ):
            result.append(window)

    result.sort(key=lambda w: w.area, reverse=True)
    return result


def activate_window(hwnd: int) -> None:
    try:
        if win32gui.IsIconic(hwnd):
            win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)

        win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
        win32gui.BringWindowToTop(hwnd)

        try:
            win32gui.SetForegroundWindow(hwnd)
        except Exception:
            pass

        time.sleep(0.35)

    except Exception as exc:
        logging.warning("Не удалось активировать окно %s: %s", hwnd, exc)


def clip_rect_to_screen(rect: Tuple[int, int, int, int]) -> Optional[Tuple[int, int, int, int]]:
    left, top, right, bottom = rect
    screen_w, screen_h = pyautogui.size()

    left = max(0, left)
    top = max(0, top)
    right = min(screen_w, right)
    bottom = min(screen_h, bottom)

    if right <= left or bottom <= top:
        return None

    return left, top, right, bottom


def locate_red_hangup_button(image_rgb, offset_x: int, offset_y: int) -> Optional[Tuple[int, int]]:
    arr = np.array(image_rgb.convert("RGB"))
    h, w = arr.shape[:2]

    if h < 120 or w < 200:
        return None

    y_start = int(h * 0.35)
    crop = arr[y_start:h, :, :]

    r = crop[:, :, 0].astype(np.int16)
    g = crop[:, :, 1].astype(np.int16)
    b = crop[:, :, 2].astype(np.int16)

    mask = (
        (r > 135)
        & (g < 135)
        & (b < 135)
        & (r > g * 1.25)
        & (r > b * 1.25)
    ).astype(np.uint8) * 255

    kernel = np.ones((5, 5), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, 8)

    best_score = 0.0
    best_point: Optional[Tuple[int, int]] = None

    for i in range(1, num_labels):
        x, y, bw, bh, area = stats[i]
        cx, cy = centroids[i]

        if area < 100:
            continue
        if area > 40000:
            continue
        if bw < 18 or bh < 18:
            continue
        if bw > 220 or bh > 220:
            continue

        ratio = bw / max(1, bh)
        if ratio < 0.35 or ratio > 2.85:
            continue

        absolute_cx = float(cx)
        absolute_cy = float(cy + y_start)

        center_penalty = abs(absolute_cx - (w / 2)) / max(1, w / 2)
        lower_bonus = absolute_cy / max(1, h)

        score = float(area) * (1.0 + lower_bonus) * max(0.25, 1.4 - center_penalty)

        if score > best_score:
            best_score = score
            best_point = (
                int(offset_x + absolute_cx),
                int(offset_y + absolute_cy),
            )

    return best_point


def screenshot_window(window: WindowInfo):
    clipped = clip_rect_to_screen(window.rect)
    if clipped is None:
        return None, None

    left, top, right, bottom = clipped
    width = right - left
    height = bottom - top

    image = pyautogui.screenshot(region=(left, top, width, height))
    return image, (left, top)


def click_red_hangup_button(window: WindowInfo) -> bool:
    clipped = clip_rect_to_screen(window.rect)
    if clipped is None:
        return False

    left, top, right, bottom = clipped
    width = right - left
    height = bottom - top

    try:
        pyautogui.moveTo(
            left + width // 2,
            top + int(height * 0.84),
            duration=0.15,
        )
    except Exception:
        pass

    time.sleep(0.45)

    image, offset = screenshot_window(window)
    if image is None or offset is None:
        return False

    pos = locate_red_hangup_button(image, offset[0], offset[1])

    if pos is None:
        return False

    logging.info("Найдена красная кнопка завершения звонка: %s", pos)

    pyautogui.moveTo(pos[0], pos[1], duration=0.1)
    pyautogui.click()
    time.sleep(0.6)

    return True


def fallback_click_bottom_center(window: WindowInfo) -> bool:
    clipped = clip_rect_to_screen(window.rect)
    if clipped is None:
        return False

    left, top, right, bottom = clipped
    width = right - left
    height = bottom - top

    x = left + width // 2
    y = bottom - max(65, min(140, int(height * 0.13)))

    try:
        pyautogui.moveTo(x, y, duration=0.15)
        time.sleep(0.15)
        pyautogui.click()
        time.sleep(0.5)
        logging.info("Fallback-клик по нижнему центру окна: %s, %s", x, y)
        return True
    except Exception as exc:
        logging.warning("Fallback-клик не удался: %s", exc)
        return False


def fallback_hotkeys(window: WindowInfo) -> bool:
    try:
        activate_window(window.hwnd)

        pyautogui.press("esc")
        time.sleep(0.25)

        pyautogui.hotkey("alt", "f4")
        time.sleep(0.5)

        logging.info("Отправлены fallback-горячие клавиши Escape и Alt+F4.")
        return True
    except Exception as exc:
        logging.warning("Fallback-горячие клавиши не сработали: %s", exc)
        return False


def end_telegram_call() -> Tuple[bool, str]:
    windows = find_telegram_windows()

    if not windows:
        return False, "Окно Telegram/AyuGram не найдено."

    logging.info("Найдено окон Telegram/AyuGram: %s", len(windows))

    for window in windows:
        logging.info(
            "Пробую окно: title=%r class=%r process=%r rect=%r",
            window.title,
            window.class_name,
            window.process_name,
            window.rect,
        )

        activate_window(window.hwnd)

        for _ in range(3):
            if click_red_hangup_button(window):
                return True, "Звонок завершён нажатием красной кнопки."

    for window in windows:
        activate_window(window.hwnd)
        if fallback_click_bottom_center(window):
            return True, "Сделана fallback-попытка завершить звонок кликом по нижнему центру окна."

    for window in windows:
        if fallback_hotkeys(window):
            return True, "Сделана fallback-попытка завершить звонок горячими клавишами."

    return False, "Не удалось автоматически завершить звонок Telegram/AyuGram."


def open_video_capture(video_path: str) -> cv2.VideoCapture:
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        raise RuntimeError(f"Не удалось открыть видеофайл:\n{video_path}")

    return cap


def wait_before_start_and_feed_first_frame(cam, first_frame_rgb) -> bool:
    state = {
        "done": False,
        "started": False,
    }

    root = tk.Tk()
    root.title("AyuGram VideoCam")
    root.geometry("560x320")
    root.resizable(False, False)
    root.attributes("-topmost", True)

    label = tk.Label(
        root,
        text=(
            "Виртуальная камера уже работает.\n\n"
            "В AyuGram выбери:\n"
            "Камера: OBS Virtual Camera\n"
            "Микрофон: CABLE Output, если используешь VB-CABLE\n\n"
            "Сейчас в камеру подаётся первый кадр видео,\n"
            "поэтому логотип OBS показываться не должен.\n\n"
            "Когда звонок подключился — нажми СТАРТ ВИДЕО."
        ),
        font=("Segoe UI", 10),
        justify="center",
    )
    label.pack(pady=16)

    def start_video():
        state["started"] = True
        state["done"] = True
        try:
            root.destroy()
        except Exception:
            pass

    def cancel():
        state["started"] = False
        state["done"] = True
        try:
            root.destroy()
        except Exception:
            pass

    start_button = tk.Button(
        root,
        text="СТАРТ ВИДЕО",
        font=("Segoe UI", 13, "bold"),
        width=18,
        height=2,
        command=start_video,
    )
    start_button.pack(pady=4)

    cancel_button = tk.Button(
        root,
        text="Отмена",
        font=("Segoe UI", 10),
        width=12,
        command=cancel,
    )
    cancel_button.pack(pady=4)

    root.protocol("WM_DELETE_WINDOW", cancel)

    try:
        root.lift()
        root.focus_force()
    except Exception:
        pass

    while not state["done"]:
        try:
            root.update_idletasks()
            root.update()
        except tk.TclError:
            break

        cam.send(first_frame_rgb)
        cam.sleep_until_next_frame()

    return state["started"]


def run_video_to_virtual_camera(video_path: str) -> Tuple[bool, str]:
    cap = open_video_capture(video_path)

    audio_thread = None
    audio_stop_event = None

    try:
        raw_fps = float(cap.get(cv2.CAP_PROP_FPS))
        fps = sanitize_fps(raw_fps)

        ok, first_frame_bgr = cap.read()
        if not ok or first_frame_bgr is None:
            raise RuntimeError("Не удалось прочитать первый кадр видео.")

        rotated_first_frame = rotate_frame_bgr(first_frame_bgr)
        first_height, first_width = rotated_first_frame.shape[:2]

        width, height = sanitize_size(first_width, first_height)

        first_frame_rgb = prepare_frame_for_camera(first_frame_bgr, width, height)

        logging.info(
            "Видео: path=%r width=%s height=%s fps=%s rotate=%s audio=%s",
            video_path,
            width,
            height,
            fps,
            ROTATE_MODE,
            AUDIO_ENABLED,
        )

        try:
            cam = pyvirtualcam.Camera(
                width=width,
                height=height,
                fps=fps,
                fmt=PixelFormat.RGB,
                backend=VIRTUAL_CAMERA_BACKEND,
            )
        except Exception as exc:
            raise RuntimeError(
                "Не удалось открыть виртуальную камеру OBS.\n\n"
                "Проверь, что OBS Studio установлен и что в системе есть "
                "устройство 'OBS Virtual Camera'."
            ) from exc

        with cam:
            logging.info("Виртуальная камера открыта: %s", cam.device)

            started = wait_before_start_and_feed_first_frame(cam, first_frame_rgb)

            if not started:
                logging.info("Пользователь отменил запуск видео.")
                return True, "Запуск видео отменён."

            cap.release()
            cap = open_video_capture(video_path)

            audio_thread, audio_stop_event = start_audio_thread(video_path)

            frame_count = 0

            while True:
                ok, frame_bgr = cap.read()

                if not ok:
                    break

                if frame_bgr is None:
                    break

                frame_rgb = prepare_frame_for_camera(frame_bgr, width, height)

                cam.send(frame_rgb)
                cam.sleep_until_next_frame()

                frame_count += 1

            logging.info("Видео закончилось. Отправлено кадров: %s", frame_count)

            stop_audio_thread(audio_thread, audio_stop_event)
            audio_thread = None
            audio_stop_event = None

            time.sleep(0.25)

            success, message = end_telegram_call()
            logging.info("Результат завершения звонка: success=%s message=%r", success, message)

            time.sleep(0.4)
            return success, message

    finally:
        stop_audio_thread(audio_thread, audio_stop_event)
        cap.release()


def main() -> int:
    setup_logging()
    set_dpi_awareness()

    if sys.platform != "win32":
        show_error("Ошибка", "Эта программа рассчитана на Windows.")
        return 1

    try:
        video_path = select_video_file()

        if not video_path:
            return 0

        if not os.path.isfile(video_path):
            show_error("Ошибка", "Выбранный файл не найден.")
            return 1

        success, message = run_video_to_virtual_camera(video_path)

        if not success:
            show_warning(
                "Видео закончилось",
                message
                + "\n\n"
                "Если звонок не завершился, заверши его вручную. "
                "Лог программы находится в AppData\\Local\\AyuGramVideoCam.",
            )

        return 0

    except pyautogui.FailSafeException:
        logging.exception("PyAutoGUI FailSafeException")
        show_error(
            "Остановлено",
            "Сработала защита PyAutoGUI: мышь была перемещена в угол экрана.\n\n"
            "Программа остановлена.",
        )
        return 1

    except Exception:
        error_text = traceback.format_exc()
        logging.error(error_text)

        show_error(
            "Ошибка",
            "Произошла ошибка:\n\n"
            + error_text[-3500:]
            + "\n\nЛог программы находится в AppData\\Local\\AyuGramVideoCam\\app.log",
        )
        return 1


if __name__ == "__main__":
    raise SystemExit(main())