Загрузка данных
import os
import sys
import time
import ctypes
import logging
import traceback
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
import cv2
import numpy as np
import pyautogui
import pyvirtualcam
from pyvirtualcam import PixelFormat
import tkinter as tk
from tkinter import filedialog, messagebox
import psutil
import win32con
import win32gui
import win32process
APP_NAME = "AyuGramVideoCam"
VIRTUAL_CAMERA_BACKEND = "obs"
# Ты проверил, что "none" помогло, поэтому оставляем так.
# Если потом снова понадобится повернуть:
# "none", "clockwise", "counterclockwise", "180"
ROTATE_MODE = "none"
# Включить звук из видео.
# Для попадания звука в звонок нужен VB-CABLE.
AUDIO_ENABLED = True
VIDEO_EXTENSIONS = (
"*.mp4",
"*.avi",
"*.mkv",
"*.mov",
"*.wmv",
"*.webm",
"*.m4v",
)
pyautogui.PAUSE = 0.05
pyautogui.FAILSAFE = True
def setup_logging() -> None:
log_dir = Path(os.getenv("LOCALAPPDATA", str(Path.home()))) / APP_NAME
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
filename=str(log_dir / "app.log"),
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8",
)
def set_dpi_awareness() -> None:
try:
ctypes.windll.user32.SetProcessDPIAware()
except Exception:
pass
def show_error(title: str, text: str) -> None:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
messagebox.showerror(title, text)
root.destroy()
def show_warning(title: str, text: str) -> None:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
messagebox.showwarning(title, text)
root.destroy()
def select_video_file() -> Optional[str]:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
filetypes = [
("Видео файлы", " ".join(VIDEO_EXTENSIONS)),
("Все файлы", "*.*"),
]
video_path = filedialog.askopenfilename(
title="Выберите видеофайл",
filetypes=filetypes,
)
root.destroy()
if not video_path:
return None
return video_path
def sanitize_fps(raw_fps: float) -> float:
if raw_fps is None or raw_fps <= 0 or raw_fps > 120:
return 30.0
return float(raw_fps)
def sanitize_size(width: int, height: int) -> Tuple[int, int]:
if width <= 0 or height <= 0:
return 1280, 720
width = int(width)
height = int(height)
if width % 2 != 0:
width -= 1
if height % 2 != 0:
height -= 1
width = max(width, 320)
height = max(height, 240)
return width, height
def rotate_frame_bgr(frame_bgr):
if ROTATE_MODE == "clockwise":
return cv2.rotate(frame_bgr, cv2.ROTATE_90_CLOCKWISE)
if ROTATE_MODE == "counterclockwise":
return cv2.rotate(frame_bgr, cv2.ROTATE_90_COUNTERCLOCKWISE)
if ROTATE_MODE == "180":
return cv2.rotate(frame_bgr, cv2.ROTATE_180)
return frame_bgr
def prepare_frame_for_camera(frame_bgr, width: int, height: int):
frame_bgr = rotate_frame_bgr(frame_bgr)
if frame_bgr.shape[1] != width or frame_bgr.shape[0] != height:
frame_bgr = cv2.resize(frame_bgr, (width, height), interpolation=cv2.INTER_AREA)
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
frame_rgb = np.ascontiguousarray(frame_rgb)
return frame_rgb
def audio_worker(video_path: str, stop_event: threading.Event) -> None:
"""
Проигрывает звук из выбранного видео.
Важно:
звук пойдёт в выбранное в Windows устройство вывода.
Чтобы звук попал в AyuGram, в Windows надо выбрать:
CABLE Input (VB-Audio Virtual Cable)
"""
player = None
try:
from ffpyplayer.player import MediaPlayer
logging.info("Запуск аудио из видео: %r", video_path)
player = MediaPlayer(video_path)
while not stop_event.is_set():
frame, val = player.get_frame()
if val == "eof":
logging.info("Аудио закончилось.")
break
if isinstance(val, (int, float)):
time.sleep(max(0.001, min(float(val), 0.05)))
else:
time.sleep(0.01)
except Exception:
logging.exception("Ошибка аудио-проигрывателя")
finally:
try:
if player is not None:
player.close_player()
except Exception:
pass
logging.info("Аудио-проигрыватель остановлен.")
def start_audio_thread(video_path: str) -> Tuple[Optional[threading.Thread], Optional[threading.Event]]:
if not AUDIO_ENABLED:
return None, None
stop_event = threading.Event()
thread = threading.Thread(
target=audio_worker,
args=(video_path, stop_event),
daemon=True,
)
thread.start()
return thread, stop_event
def stop_audio_thread(thread: Optional[threading.Thread], stop_event: Optional[threading.Event]) -> None:
if stop_event is not None:
stop_event.set()
if thread is not None:
thread.join(timeout=2.0)
@dataclass
class WindowInfo:
hwnd: int
title: str
class_name: str
process_name: str
rect: Tuple[int, int, int, int]
@property
def width(self) -> int:
return max(0, self.rect[2] - self.rect[0])
@property
def height(self) -> int:
return max(0, self.rect[3] - self.rect[1])
@property
def area(self) -> int:
return self.width * self.height
def get_process_name_by_hwnd(hwnd: int) -> str:
try:
_, pid = win32process.GetWindowThreadProcessId(hwnd)
return psutil.Process(pid).name()
except Exception:
return ""
def enum_visible_windows() -> List[WindowInfo]:
windows: List[WindowInfo] = []
def callback(hwnd, _):
try:
if not win32gui.IsWindowVisible(hwnd):
return
title = win32gui.GetWindowText(hwnd) or ""
class_name = win32gui.GetClassName(hwnd) or ""
process_name = get_process_name_by_hwnd(hwnd)
rect = win32gui.GetWindowRect(hwnd)
left, top, right, bottom = rect
width = right - left
height = bottom - top
if width < 200 or height < 120:
return
windows.append(
WindowInfo(
hwnd=hwnd,
title=title,
class_name=class_name,
process_name=process_name,
rect=rect,
)
)
except Exception:
return
win32gui.EnumWindows(callback, None)
return windows
def find_telegram_windows() -> List[WindowInfo]:
result: List[WindowInfo] = []
for window in enum_visible_windows():
haystack = f"{window.title} {window.class_name} {window.process_name}".lower()
process_name = window.process_name.lower()
if (
"telegram" in haystack
or "ayugram" in haystack
or process_name in {
"telegram.exe",
"telegramdesktop.exe",
"ayugram.exe",
"ayugramdesktop.exe",
}
):
result.append(window)
result.sort(key=lambda w: w.area, reverse=True)
return result
def activate_window(hwnd: int) -> None:
try:
if win32gui.IsIconic(hwnd):
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
win32gui.BringWindowToTop(hwnd)
try:
win32gui.SetForegroundWindow(hwnd)
except Exception:
pass
time.sleep(0.35)
except Exception as exc:
logging.warning("Не удалось активировать окно %s: %s", hwnd, exc)
def clip_rect_to_screen(rect: Tuple[int, int, int, int]) -> Optional[Tuple[int, int, int, int]]:
left, top, right, bottom = rect
screen_w, screen_h = pyautogui.size()
left = max(0, left)
top = max(0, top)
right = min(screen_w, right)
bottom = min(screen_h, bottom)
if right <= left or bottom <= top:
return None
return left, top, right, bottom
def locate_red_hangup_button(image_rgb, offset_x: int, offset_y: int) -> Optional[Tuple[int, int]]:
arr = np.array(image_rgb.convert("RGB"))
h, w = arr.shape[:2]
if h < 120 or w < 200:
return None
y_start = int(h * 0.35)
crop = arr[y_start:h, :, :]
r = crop[:, :, 0].astype(np.int16)
g = crop[:, :, 1].astype(np.int16)
b = crop[:, :, 2].astype(np.int16)
mask = (
(r > 135)
& (g < 135)
& (b < 135)
& (r > g * 1.25)
& (r > b * 1.25)
).astype(np.uint8) * 255
kernel = np.ones((5, 5), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, 8)
best_score = 0.0
best_point: Optional[Tuple[int, int]] = None
for i in range(1, num_labels):
x, y, bw, bh, area = stats[i]
cx, cy = centroids[i]
if area < 100:
continue
if area > 40000:
continue
if bw < 18 or bh < 18:
continue
if bw > 220 or bh > 220:
continue
ratio = bw / max(1, bh)
if ratio < 0.35 or ratio > 2.85:
continue
absolute_cx = float(cx)
absolute_cy = float(cy + y_start)
center_penalty = abs(absolute_cx - (w / 2)) / max(1, w / 2)
lower_bonus = absolute_cy / max(1, h)
score = float(area) * (1.0 + lower_bonus) * max(0.25, 1.4 - center_penalty)
if score > best_score:
best_score = score
best_point = (
int(offset_x + absolute_cx),
int(offset_y + absolute_cy),
)
return best_point
def screenshot_window(window: WindowInfo):
clipped = clip_rect_to_screen(window.rect)
if clipped is None:
return None, None
left, top, right, bottom = clipped
width = right - left
height = bottom - top
image = pyautogui.screenshot(region=(left, top, width, height))
return image, (left, top)
def click_red_hangup_button(window: WindowInfo) -> bool:
clipped = clip_rect_to_screen(window.rect)
if clipped is None:
return False
left, top, right, bottom = clipped
width = right - left
height = bottom - top
try:
pyautogui.moveTo(
left + width // 2,
top + int(height * 0.84),
duration=0.15,
)
except Exception:
pass
time.sleep(0.45)
image, offset = screenshot_window(window)
if image is None or offset is None:
return False
pos = locate_red_hangup_button(image, offset[0], offset[1])
if pos is None:
return False
logging.info("Найдена красная кнопка завершения звонка: %s", pos)
pyautogui.moveTo(pos[0], pos[1], duration=0.1)
pyautogui.click()
time.sleep(0.6)
return True
def fallback_click_bottom_center(window: WindowInfo) -> bool:
clipped = clip_rect_to_screen(window.rect)
if clipped is None:
return False
left, top, right, bottom = clipped
width = right - left
height = bottom - top
x = left + width // 2
y = bottom - max(65, min(140, int(height * 0.13)))
try:
pyautogui.moveTo(x, y, duration=0.15)
time.sleep(0.15)
pyautogui.click()
time.sleep(0.5)
logging.info("Fallback-клик по нижнему центру окна: %s, %s", x, y)
return True
except Exception as exc:
logging.warning("Fallback-клик не удался: %s", exc)
return False
def fallback_hotkeys(window: WindowInfo) -> bool:
try:
activate_window(window.hwnd)
pyautogui.press("esc")
time.sleep(0.25)
pyautogui.hotkey("alt", "f4")
time.sleep(0.5)
logging.info("Отправлены fallback-горячие клавиши Escape и Alt+F4.")
return True
except Exception as exc:
logging.warning("Fallback-горячие клавиши не сработали: %s", exc)
return False
def end_telegram_call() -> Tuple[bool, str]:
windows = find_telegram_windows()
if not windows:
return False, "Окно Telegram/AyuGram не найдено."
logging.info("Найдено окон Telegram/AyuGram: %s", len(windows))
for window in windows:
logging.info(
"Пробую окно: title=%r class=%r process=%r rect=%r",
window.title,
window.class_name,
window.process_name,
window.rect,
)
activate_window(window.hwnd)
for _ in range(3):
if click_red_hangup_button(window):
return True, "Звонок завершён нажатием красной кнопки."
for window in windows:
activate_window(window.hwnd)
if fallback_click_bottom_center(window):
return True, "Сделана fallback-попытка завершить звонок кликом по нижнему центру окна."
for window in windows:
if fallback_hotkeys(window):
return True, "Сделана fallback-попытка завершить звонок горячими клавишами."
return False, "Не удалось автоматически завершить звонок Telegram/AyuGram."
def open_video_capture(video_path: str) -> cv2.VideoCapture:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Не удалось открыть видеофайл:\n{video_path}")
return cap
def wait_before_start_and_feed_first_frame(cam, first_frame_rgb) -> bool:
state = {
"done": False,
"started": False,
}
root = tk.Tk()
root.title("AyuGram VideoCam")
root.geometry("560x320")
root.resizable(False, False)
root.attributes("-topmost", True)
label = tk.Label(
root,
text=(
"Виртуальная камера уже работает.\n\n"
"В AyuGram выбери:\n"
"Камера: OBS Virtual Camera\n"
"Микрофон: CABLE Output, если используешь VB-CABLE\n\n"
"Сейчас в камеру подаётся первый кадр видео,\n"
"поэтому логотип OBS показываться не должен.\n\n"
"Когда звонок подключился — нажми СТАРТ ВИДЕО."
),
font=("Segoe UI", 10),
justify="center",
)
label.pack(pady=16)
def start_video():
state["started"] = True
state["done"] = True
try:
root.destroy()
except Exception:
pass
def cancel():
state["started"] = False
state["done"] = True
try:
root.destroy()
except Exception:
pass
start_button = tk.Button(
root,
text="СТАРТ ВИДЕО",
font=("Segoe UI", 13, "bold"),
width=18,
height=2,
command=start_video,
)
start_button.pack(pady=4)
cancel_button = tk.Button(
root,
text="Отмена",
font=("Segoe UI", 10),
width=12,
command=cancel,
)
cancel_button.pack(pady=4)
root.protocol("WM_DELETE_WINDOW", cancel)
try:
root.lift()
root.focus_force()
except Exception:
pass
while not state["done"]:
try:
root.update_idletasks()
root.update()
except tk.TclError:
break
cam.send(first_frame_rgb)
cam.sleep_until_next_frame()
return state["started"]
def run_video_to_virtual_camera(video_path: str) -> Tuple[bool, str]:
cap = open_video_capture(video_path)
audio_thread = None
audio_stop_event = None
try:
raw_fps = float(cap.get(cv2.CAP_PROP_FPS))
fps = sanitize_fps(raw_fps)
ok, first_frame_bgr = cap.read()
if not ok or first_frame_bgr is None:
raise RuntimeError("Не удалось прочитать первый кадр видео.")
rotated_first_frame = rotate_frame_bgr(first_frame_bgr)
first_height, first_width = rotated_first_frame.shape[:2]
width, height = sanitize_size(first_width, first_height)
first_frame_rgb = prepare_frame_for_camera(first_frame_bgr, width, height)
logging.info(
"Видео: path=%r width=%s height=%s fps=%s rotate=%s audio=%s",
video_path,
width,
height,
fps,
ROTATE_MODE,
AUDIO_ENABLED,
)
try:
cam = pyvirtualcam.Camera(
width=width,
height=height,
fps=fps,
fmt=PixelFormat.RGB,
backend=VIRTUAL_CAMERA_BACKEND,
)
except Exception as exc:
raise RuntimeError(
"Не удалось открыть виртуальную камеру OBS.\n\n"
"Проверь, что OBS Studio установлен и что в системе есть "
"устройство 'OBS Virtual Camera'."
) from exc
with cam:
logging.info("Виртуальная камера открыта: %s", cam.device)
started = wait_before_start_and_feed_first_frame(cam, first_frame_rgb)
if not started:
logging.info("Пользователь отменил запуск видео.")
return True, "Запуск видео отменён."
cap.release()
cap = open_video_capture(video_path)
audio_thread, audio_stop_event = start_audio_thread(video_path)
frame_count = 0
while True:
ok, frame_bgr = cap.read()
if not ok:
break
if frame_bgr is None:
break
frame_rgb = prepare_frame_for_camera(frame_bgr, width, height)
cam.send(frame_rgb)
cam.sleep_until_next_frame()
frame_count += 1
logging.info("Видео закончилось. Отправлено кадров: %s", frame_count)
stop_audio_thread(audio_thread, audio_stop_event)
audio_thread = None
audio_stop_event = None
time.sleep(0.25)
success, message = end_telegram_call()
logging.info("Результат завершения звонка: success=%s message=%r", success, message)
time.sleep(0.4)
return success, message
finally:
stop_audio_thread(audio_thread, audio_stop_event)
cap.release()
def main() -> int:
setup_logging()
set_dpi_awareness()
if sys.platform != "win32":
show_error("Ошибка", "Эта программа рассчитана на Windows.")
return 1
try:
video_path = select_video_file()
if not video_path:
return 0
if not os.path.isfile(video_path):
show_error("Ошибка", "Выбранный файл не найден.")
return 1
success, message = run_video_to_virtual_camera(video_path)
if not success:
show_warning(
"Видео закончилось",
message
+ "\n\n"
"Если звонок не завершился, заверши его вручную. "
"Лог программы находится в AppData\\Local\\AyuGramVideoCam.",
)
return 0
except pyautogui.FailSafeException:
logging.exception("PyAutoGUI FailSafeException")
show_error(
"Остановлено",
"Сработала защита PyAutoGUI: мышь была перемещена в угол экрана.\n\n"
"Программа остановлена.",
)
return 1
except Exception:
error_text = traceback.format_exc()
logging.error(error_text)
show_error(
"Ошибка",
"Произошла ошибка:\n\n"
+ error_text[-3500:]
+ "\n\nЛог программы находится в AppData\\Local\\AyuGramVideoCam\\app.log",
)
return 1
if __name__ == "__main__":
raise SystemExit(main())