Загрузка данных
# -*- coding: cp1251 -*-
"""
BOX-BOT VISION (v5 - PERFORMANCE UPDATE)
==================================================================================
Многопоточный, оптический ROI-трекинг, предсказание по времени (t_hit), SendInput
"""
import sys
import time
import threading
import ctypes
import numpy as np
import cv2
import mss
try:
import win32gui
import win32con
HAS_WIN32 = True
except ImportError:
HAS_WIN32 = False
# ========================== НАСТРОЙКИ ==========================
WINDOW_NAME = "BOX-BOT VISION v5"
BOX_TEMPLATE_PATH = r"D:\codecode\box.png"
# Отключаемая визуализация для максимального FPS
SHOW_DEBUG = True
MATCH_THRESHOLD = 0.50
CLICK_COOLDOWN_SEC = 2.0
ALIGN_TOLERANCE_PX = 3
# Задержка реакции. Так как клик теперь моментальный через SendInput,
# возможно, это значение придется УМЕНЬШИТЬ по сравнению с v4.
REACTION_LAG_SEC = 0.05
# Мертвая зона скорости (пикселей в секунду) для стоящей коробки
VELOCITY_DEADBAND_PX_S = 25.0
# Коэффициент сглаживания скорости (Exponential Moving Average).
# 0.0 - верить только новому кадру, 1.0 - скорость вообще не меняется.
EMA_ALPHA = 0.6
# ========================== WINAPI SENDINPUT ==========================
# Нативные структуры C для эмуляции аппаратного клика
PUL = ctypes.POINTER(ctypes.c_ulong)
class KeyBdInput(ctypes.Structure):
_fields_ = [("wVk", ctypes.c_ushort), ("wScan", ctypes.c_ushort), ("dwFlags", ctypes.c_ulong),
("time", ctypes.c_ulong), ("dwExtraInfo", PUL)]
class HardwareInput(ctypes.Structure):
_fields_ = [("uMsg", ctypes.c_ulong), ("wParamL", ctypes.c_short), ("wParamH", ctypes.c_ushort)]
class MouseInput(ctypes.Structure):
_fields_ = [("dx", ctypes.c_long), ("dy", ctypes.c_long), ("mouseData", ctypes.c_ulong),
("dwFlags", ctypes.c_ulong), ("time", ctypes.c_ulong), ("dwExtraInfo", PUL)]
class Input_I(ctypes.Union):
_fields_ = [("ki", KeyBdInput), ("mi", MouseInput), ("hi", HardwareInput)]
class Input(ctypes.Structure):
_fields_ = [("type", ctypes.c_ulong), ("ii", Input_I)]
def fast_click(x, y):
"""Сверхбыстрый клик мышью в обход GIL и медленного pyautogui"""
ctypes.windll.user32.SetCursorPos(int(x), int(y))
extra = ctypes.c_ulong(0)
# Left Mouse Down (0x0002)
ii_down = Input_I()
ii_down.mi = MouseInput(0, 0, 0, 0x0002, 0, ctypes.pointer(extra))
x_down = Input(ctypes.c_ulong(0), ii_down)
# Left Mouse Up (0x0004)
ii_up = Input_I()
ii_up.mi = MouseInput(0, 0, 0, 0x0004, 0, ctypes.pointer(extra))
x_up = Input(ctypes.c_ulong(0), ii_up)
# Send both inputs sequentially
ctypes.windll.user32.SendInput(1, ctypes.pointer(x_down), ctypes.sizeof(x_down))
ctypes.windll.user32.SendInput(1, ctypes.pointer(x_up), ctypes.sizeof(x_up))
# ========================== СОСТОЯНИЕ (ДЛЯ ПОТОКОВ) ==========================
class SharedState:
def __init__(self):
self.lock = threading.Lock()
self.is_running = True
self.paused = False
# Данные от зрения
self.box_lost = True
self.box_x = None
self.box_y = None
self.tower_x = None
self.tower_y = None
self.velocity = 0.0
# Телеметрия
self.frame_time = time.time()
self.vision_fps = 0.0
self.last_frame = None
# ========================== УТИЛИТЫ ==========================
def set_always_on_top(window_name):
if not HAS_WIN32:
return
hwnd = win32gui.FindWindow(None, window_name)
if hwnd:
win32gui.SetWindowPos(
hwnd, win32con.HWND_TOPMOST, 0, 0, 0, 0,
win32con.SWP_NOMOVE | win32con.SWP_NOSIZE
)
def select_rect(prompt, monitor, sct):
print(prompt)
raw = sct.grab(monitor)
img = np.array(raw)[:, :, :3]
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
r = cv2.selectROI(WINDOW_NAME, img, showCrosshair=True, fromCenter=False)
cv2.destroyWindow(WINDOW_NAME)
return r
def match_all_boxes(gray_frame, template_gray, threshold, min_distance):
"""Оптимизированный поиск ВСЕХ коробок (используется только при потере!)"""
res = cv2.matchTemplate(gray_frame, template_gray, cv2.TM_CCOEFF_NORMED)
kernel_size = max(3, int(min_distance) | 1)
kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8)
dilated = cv2.dilate(res, kernel)
peaks_mask = (res >= threshold) & (res == dilated)
ys, xs = np.where(peaks_mask)
h, w = template_gray.shape
raw_points = [(x + w / 2.0, y + h / 2.0, float(res[y, x])) for x, y in zip(xs, ys)]
raw_points.sort(key=lambda c: c[2], reverse=True)
selected = []
for cx, cy, score in raw_points:
if not any(abs(cx - sx) < min_distance and abs(cy - sy) < min_distance for sx, sy, _ in selected):
selected.append((cx, cy, score))
selected.sort(key=lambda c: c[1]) # Сверху вниз
return selected
# ========================== ПОТОК ЗРЕНИЯ ==========================
def vision_thread(state, cfg, sct, template_gray):
region = cfg["region"]
th, tw = template_gray.shape
min_dist = max(8, int(0.55 * min(th, tw)))
# Предвыделение параметров для ROI трекинга
pad_x, pad_y = tw, th
H, W = region["height"], region["width"]
loop_times = []
while state.is_running:
t_start = time.time()
# Zero-copy взятие зеленого канала
raw = sct.grab(region)
# np.frombuffer работает без создания лишней копии памяти
frame_bgra = np.frombuffer(raw.bgra, dtype=np.uint8).reshape((H, W, 4))
gray = frame_bgra[:, :, 1] # Только зеленый канал
t_now = time.time()
with state.lock:
# 1. Если потеряли коробку — ищем заново всё (падающую и башню)
if state.box_lost or state.tower_x is None:
boxes = match_all_boxes(gray, template_gray, MATCH_THRESHOLD, min_dist)
if len(boxes) >= 1:
# Нашли хотя бы одну (падающую)
state.box_x = boxes[0][0]
state.box_y = boxes[0][1]
state.velocity = 0.0
state.frame_time = t_now
# Есть ли башня под ней?
if len(boxes) >= 2:
gap = boxes[1][1] - boxes[0][1]
# Проверка зазора (защита от ложного срабатывания, если игра кончилась)
if gap < th * 0.8:
state.box_lost = True # Зазора нет, коробка стоит в башне
else:
state.tower_x = boxes[1][0]
state.tower_y = boxes[1][1]
state.box_lost = False
else:
# Башни нет (самая первая коробка) - целимся в центр экрана
state.tower_x = W / 2.0
state.box_lost = False
# 2. Если коробка зафиксирована — делаем сверхбыстрый ROI-трекинг
else:
last_x, last_y = state.box_x, state.box_y
# Ограничиваем область поиска только вокруг последней точки (ROI)
x1 = max(0, int(last_x - pad_x))
y1 = max(0, int(last_y - pad_y))
x2 = min(W, int(last_x + pad_x))
y2 = min(H, int(last_y + pad_y))
roi = gray[y1:y2, x1:x2]
# Поиск в маленьком окне - dilate не нужен, используем minMaxLoc
res = cv2.matchTemplate(roi, template_gray, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
if max_val >= MATCH_THRESHOLD:
new_x = x1 + max_loc[0] + tw / 2.0
new_y = y1 + max_loc[1] + th / 2.0
dt = t_now - state.frame_time
if dt > 0:
v_raw = (new_x - state.box_x) / dt
# Экспоненциальное сглаживание скорости
state.velocity = state.velocity * EMA_ALPHA + v_raw * (1.0 - EMA_ALPHA)
state.box_x = new_x
state.box_y = new_y
state.frame_time = t_now
else:
state.box_lost = True # Потеряли в ROI, на следующем кадре будет полный поиск
if SHOW_DEBUG:
state.last_frame = frame_bgra.copy()
# Считаем FPS потока зрения
loop_times.append(time.time() - t_start)
if len(loop_times) > 30:
loop_times.pop(0)
state.vision_fps = 1.0 / (sum(loop_times) / len(loop_times))
time.sleep(0.001) # Уступаем процессор главному потоку
# ========================== ГЛАВНЫЙ ПОТОК (ЛОГИКА + GUI) ==========================
def main():
tpl_bgr = cv2.imread(BOX_TEMPLATE_PATH)
if tpl_bgr is None:
print("ОШИБКА: Шаблон не найден!")
sys.exit(1)
template_gray = cv2.cvtColor(tpl_bgr, cv2.COLOR_BGR2GRAY)
with mss.mss() as sct:
monitor = sct.monitors[0]
rx, ry, rw, rh = select_rect("Выделите рамку ИГРЫ и нажмите ENTER.", monitor, sct)
if rw == 0: return
cfg = {
"region": {"left": monitor["left"] + rx, "top": monitor["top"] + ry, "width": rw, "height": rh},
"click_abs": (monitor["left"] + rx + rw // 2, monitor["top"] + ry + rh // 2)
}
state = SharedState()
v_thread = threading.Thread(target=vision_thread, args=(state, cfg, sct, template_gray), daemon=True)
v_thread.start()
if SHOW_DEBUG:
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
set_always_on_top(WINDOW_NAME)
last_click_time = 0.0
try:
while state.is_running:
t_now = time.time()
with state.lock:
box_x = state.box_x
box_y = state.box_y
tower_x = state.tower_x
vel = state.velocity
t_frame = state.frame_time
lost = state.box_lost
paused = state.paused
disp_frame = state.last_frame
# --- ЛОГИКА ПРЕДСКАЗАНИЯ ---
clicked = False
if not paused and not lost and tower_x is not None and box_x is not None:
if (t_now - last_click_time) > CLICK_COOLDOWN_SEC:
dt = t_now - t_frame
# Вычисляем субкадровую позицию (где коробка ПРЯМО СЕЙЧАС между кадрами)
virtual_x = box_x + vel * dt
if abs(vel) > VELOCITY_DEADBAND_PX_S:
# Физика: прогнозирование по времени.
# t_hit = расстояние / скорость
t_hit = (tower_x - virtual_x) / vel
# Если время до пересечения меньше или равно пингу системы
if 0 <= t_hit <= REACTION_LAG_SEC:
fast_click(*cfg["click_abs"])
clicked = True
else:
# Коробка почти не двигается, используем допуск по пикселям
if abs(virtual_x - tower_x) <= ALIGN_TOLERANCE_PX:
fast_click(*cfg["click_abs"])
clicked = True
if clicked:
last_click_time = time.time()
with state.lock:
state.box_lost = True # Принудительный сброс, ждём новую коробку
# --- ОТРИСОВКА (Если включена) ---
if SHOW_DEBUG and disp_frame is not None:
disp = disp_frame.copy()
h, w = disp.shape[:2]
status_color = (0, 0, 255) if lost else (0, 255, 0)
if not lost and box_x is not None:
# Рисуем текущую экстраполированную позицию
virt_x = int(box_x + vel * (t_now - t_frame))
cv2.line(disp, (virt_x, 0), (virt_x, h), (0, 255, 255), 2)
if tower_x is not None:
cv2.line(disp, (int(tower_x), 0), (int(tower_x), h), status_color, 2)
cv2.putText(disp, f"Vision FPS: {state.vision_fps:.0f}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
cv2.putText(disp, f"Speed: {vel:.1f} px/s", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
if paused:
cv2.putText(disp, "PAUSED", (10, h-20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,150,255), 2)
cv2.imshow(WINDOW_NAME, disp)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('p'):
with state.lock:
state.paused = not state.paused
else:
# Если дебаг отключен, главный поток просто крутится на частоте ~1000 Гц
time.sleep(0.001)
except KeyboardInterrupt:
pass
finally:
state.is_running = False
if SHOW_DEBUG:
cv2.destroyAllWindows()
if __name__ == "__main__":
main()