Загрузка данных
import os
import sys
import time
import ctypes
import logging
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
import cv2
import numpy as np
import pyautogui
import pyvirtualcam
from pyvirtualcam import PixelFormat
import tkinter as tk
from tkinter import filedialog, messagebox
import psutil
import win32con
import win32gui
import win32process
APP_NAME = "AyuGramVideoCam"
VIRTUAL_CAMERA_BACKEND = "obs"
VIDEO_EXTENSIONS = (
"*.mp4",
"*.avi",
"*.mkv",
"*.mov",
"*.wmv",
"*.webm",
"*.m4v",
)
pyautogui.PAUSE = 0.05
pyautogui.FAILSAFE = True
def setup_logging() -> None:
log_dir = Path(os.getenv("LOCALAPPDATA", str(Path.home()))) / APP_NAME
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
filename=str(log_dir / "app.log"),
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
encoding="utf-8",
)
def set_dpi_awareness() -> None:
try:
ctypes.windll.user32.SetProcessDPIAware()
except Exception:
pass
def show_error(title: str, text: str) -> None:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
messagebox.showerror(title, text)
root.destroy()
def show_info(title: str, text: str) -> None:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
messagebox.showinfo(title, text)
root.destroy()
def show_warning(title: str, text: str) -> None:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
messagebox.showwarning(title, text)
root.destroy()
def select_video_file() -> Optional[str]:
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True)
filetypes = [
("Видео файлы", " ".join(VIDEO_EXTENSIONS)),
("Все файлы", "*.*"),
]
video_path = filedialog.askopenfilename(
title="Выберите видеофайл",
filetypes=filetypes,
)
root.destroy()
if not video_path:
return None
return video_path
def sanitize_fps(raw_fps: float) -> float:
if raw_fps is None or raw_fps <= 0 or raw_fps > 120:
return 30.0
return float(raw_fps)
def sanitize_size(width: int, height: int) -> Tuple[int, int]:
if width <= 0 or height <= 0:
return 1280, 720
width = int(width)
height = int(height)
if width % 2 != 0:
width -= 1
if height % 2 != 0:
height -= 1
width = max(width, 320)
height = max(height, 240)
return width, height
@dataclass
class WindowInfo:
hwnd: int
title: str
class_name: str
process_name: str
rect: Tuple[int, int, int, int]
@property
def width(self) -> int:
return max(0, self.rect[2] - self.rect[0])
@property
def height(self) -> int:
return max(0, self.rect[3] - self.rect[1])
@property
def area(self) -> int:
return self.width * self.height
def get_process_name_by_hwnd(hwnd: int) -> str:
try:
_, pid = win32process.GetWindowThreadProcessId(hwnd)
return psutil.Process(pid).name()
except Exception:
return ""
def enum_visible_windows() -> List[WindowInfo]:
windows: List[WindowInfo] = []
def callback(hwnd, _):
try:
if not win32gui.IsWindowVisible(hwnd):
return
title = win32gui.GetWindowText(hwnd) or ""
class_name = win32gui.GetClassName(hwnd) or ""
process_name = get_process_name_by_hwnd(hwnd)
rect = win32gui.GetWindowRect(hwnd)
left, top, right, bottom = rect
width = right - left
height = bottom - top
if width < 200 or height < 120:
return
windows.append(
WindowInfo(
hwnd=hwnd,
title=title,
class_name=class_name,
process_name=process_name,
rect=rect,
)
)
except Exception:
return
win32gui.EnumWindows(callback, None)
return windows
def find_telegram_windows() -> List[WindowInfo]:
result: List[WindowInfo] = []
for window in enum_visible_windows():
haystack = f"{window.title} {window.class_name} {window.process_name}".lower()
process_name = window.process_name.lower()
if (
"telegram" in haystack
or "ayugram" in haystack
or process_name in {
"telegram.exe",
"telegramdesktop.exe",
"ayugram.exe",
"ayugramdesktop.exe",
}
):
result.append(window)
result.sort(key=lambda w: w.area, reverse=True)
return result
def activate_window(hwnd: int) -> None:
try:
if win32gui.IsIconic(hwnd):
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
win32gui.BringWindowToTop(hwnd)
try:
win32gui.SetForegroundWindow(hwnd)
except Exception:
pass
time.sleep(0.35)
except Exception as exc:
logging.warning("Не удалось активировать окно %s: %s", hwnd, exc)
def clip_rect_to_screen(rect: Tuple[int, int, int, int]) -> Optional[Tuple[int, int, int, int]]:
left, top, right, bottom = rect
screen_w, screen_h = pyautogui.size()
left = max(0, left)
top = max(0, top)
right = min(screen_w, right)
bottom = min(screen_h, bottom)
if right <= left or bottom <= top:
return None
return left, top, right, bottom
def locate_red_hangup_button(image_rgb, offset_x: int, offset_y: int) -> Optional[Tuple[int, int]]:
arr = np.array(image_rgb.convert("RGB"))
h, w = arr.shape[:2]
if h < 120 or w < 200:
return None
y_start = int(h * 0.35)
crop = arr[y_start:h, :, :]
r = crop[:, :, 0].astype(np.int16)
g = crop[:, :, 1].astype(np.int16)
b = crop[:, :, 2].astype(np.int16)
mask = (
(r > 135)
& (g < 135)
& (b < 135)
& (r > g * 1.25)
& (r > b * 1.25)
).astype(np.uint8) * 255
kernel = np.ones((5, 5), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, 8)
best_score = 0.0
best_point: Optional[Tuple[int, int]] = None
for i in range(1, num_labels):
x, y, bw, bh, area = stats[i]
cx, cy = centroids[i]
if area < 100:
continue
if area > 40000:
continue
if bw < 18 or bh < 18:
continue
if bw > 220 or bh > 220:
continue
ratio = bw / max(1, bh)
if ratio < 0.35 or ratio > 2.85:
continue
absolute_cx = float(cx)
absolute_cy = float(cy + y_start)
center_penalty = abs(absolute_cx - (w / 2)) / max(1, w / 2)
lower_bonus = absolute_cy / max(1, h)
score = float(area) * (1.0 + lower_bonus) * max(0.25, 1.4 - center_penalty)
if score > best_score:
best_score = score
best_point = (
int(offset_x + absolute_cx),
int(offset_y + absolute_cy),
)
return best_point
def screenshot_window(window: WindowInfo):
clipped = clip_rect_to_screen(window.rect)
if clipped is None:
return None, None
left, top, right, bottom = clipped
width = right - left
height = bottom - top
image = pyautogui.screenshot(region=(left, top, width, height))
return image, (left, top)
def click_red_hangup_button(window: WindowInfo) -> bool:
clipped = clip_rect_to_screen(window.rect)
if clipped is None:
return False
left, top, right, bottom = clipped
width = right - left
height = bottom - top
try:
pyautogui.moveTo(
left + width // 2,
top + int(height * 0.84),
duration=0.15,
)
except Exception:
pass
time.sleep(0.45)
image, offset = screenshot_window(window)
if image is None or offset is None:
return False
pos = locate_red_hangup_button(image, offset[0], offset[1])
if pos is None:
return False
logging.info("Найдена красная кнопка завершения звонка: %s", pos)
pyautogui.moveTo(pos[0], pos[1], duration=0.1)
pyautogui.click()
time.sleep(0.6)
return True
def fallback_click_bottom_center(window: WindowInfo) -> bool:
clipped = clip_rect_to_screen(window.rect)
if clipped is None:
return False
left, top, right, bottom = clipped
width = right - left
height = bottom - top
x = left + width // 2
y = bottom - max(65, min(140, int(height * 0.13)))
try:
pyautogui.moveTo(x, y, duration=0.15)
time.sleep(0.15)
pyautogui.click()
time.sleep(0.5)
logging.info("Fallback-клик по нижнему центру окна: %s, %s", x, y)
return True
except Exception as exc:
logging.warning("Fallback-клик не удался: %s", exc)
return False
def fallback_hotkeys(window: WindowInfo) -> bool:
try:
activate_window(window.hwnd)
pyautogui.press("esc")
time.sleep(0.25)
pyautogui.hotkey("alt", "f4")
time.sleep(0.5)
logging.info("Отправлены fallback-горячие клавиши Escape и Alt+F4.")
return True
except Exception as exc:
logging.warning("Fallback-горячие клавиши не сработали: %s", exc)
return False
def end_telegram_call() -> Tuple[bool, str]:
windows = find_telegram_windows()
if not windows:
return False, "Окно Telegram/AyuGram не найдено."
logging.info("Найдено окон Telegram/AyuGram: %s", len(windows))
for window in windows:
logging.info(
"Пробую окно: title=%r class=%r process=%r rect=%r",
window.title,
window.class_name,
window.process_name,
window.rect,
)
activate_window(window.hwnd)
for _ in range(3):
if click_red_hangup_button(window):
return True, "Звонок завершён нажатием красной кнопки."
for window in windows:
activate_window(window.hwnd)
if fallback_click_bottom_center(window):
return True, "Сделана fallback-попытка завершить звонок кликом по нижнему центру окна."
for window in windows:
if fallback_hotkeys(window):
return True, "Сделана fallback-попытка завершить звонок горячими клавишами."
return False, "Не удалось автоматически завершить звонок Telegram/AyuGram."
def open_video_capture(video_path: str) -> cv2.VideoCapture:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Не удалось открыть видеофайл:\n{video_path}")
return cap
def run_video_to_virtual_camera(video_path: str) -> Tuple[bool, str]:
cap = open_video_capture(video_path)
try:
raw_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
raw_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
raw_fps = float(cap.get(cv2.CAP_PROP_FPS))
width, height = sanitize_size(raw_width, raw_height)
fps = sanitize_fps(raw_fps)
logging.info(
"Видео: path=%r width=%s height=%s fps=%s",
video_path,
width,
height,
fps,
)
try:
cam = pyvirtualcam.Camera(
width=width,
height=height,
fps=fps,
fmt=PixelFormat.RGB,
backend=VIRTUAL_CAMERA_BACKEND,
)
except Exception as exc:
raise RuntimeError(
"Не удалось открыть виртуальную камеру OBS.\n\n"
"Проверь, что OBS Studio установлен и что в системе есть "
"устройство 'OBS Virtual Camera'."
) from exc
with cam:
logging.info("Виртуальная камера открыта: %s", cam.device)
show_info(
"Виртуальная камера готова",
"Виртуальная камера запущена.\n\n"
f"Устройство: {cam.device}\n\n"
"Теперь в AyuGram выбери эту камеру как источник видео.\n\n"
"После нажатия OK видео начнёт проигрываться один раз от начала до конца.",
)
frame_count = 0
while True:
ok, frame_bgr = cap.read()
if not ok:
break
if frame_bgr is None:
break
if frame_bgr.shape[1] != width or frame_bgr.shape[0] != height:
frame_bgr = cv2.resize(frame_bgr, (width, height), interpolation=cv2.INTER_AREA)
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
cam.send(frame_rgb)
cam.sleep_until_next_frame()
frame_count += 1
logging.info("Видео закончилось. Отправлено кадров: %s", frame_count)
time.sleep(0.25)
success, message = end_telegram_call()
logging.info("Результат завершения звонка: success=%s message=%r", success, message)
time.sleep(0.4)
return success, message
finally:
cap.release()
def main() -> int:
setup_logging()
set_dpi_awareness()
if sys.platform != "win32":
show_error("Ошибка", "Эта программа рассчитана на Windows.")
return 1
try:
video_path = select_video_file()
if not video_path:
return 0
if not os.path.isfile(video_path):
show_error("Ошибка", "Выбранный файл не найден.")
return 1
success, message = run_video_to_virtual_camera(video_path)
if not success:
show_warning(
"Видео закончилось",
message
+ "\n\n"
"Если звонок не завершился, заверши его вручную. "
"Лог программы находится в AppData\\Local\\AyuGramVideoCam.",
)
return 0
except pyautogui.FailSafeException:
logging.exception("PyAutoGUI FailSafeException")
show_error(
"Остановлено",
"Сработала защита PyAutoGUI: мышь была перемещена в угол экрана.\n\n"
"Программа остановлена.",
)
return 1
except Exception:
error_text = traceback.format_exc()
logging.error(error_text)
show_error(
"Ошибка",
"Произошла ошибка:\n\n"
+ error_text[-3500:]
+ "\n\nЛог программы находится в AppData\\Local\\AyuGramVideoCam\\app.log",
)
return 1
if __name__ == "__main__":
raise SystemExit(main())