Загрузка данных
import socket
import select
import threading
import requests
import time
# Публичный список SOCKS5 прокси (проверка каждые 10 минут)
PROXY_LIST_URL = "https://raw.githubusercontent.com/proxygenerator1/ProxyGenerator/main/telegramProxys.txt"
class Socks5Proxy:
def __init__(self, listen_host='127.0.0.1', listen_port=1080):
self.listen_host = listen_host
self.listen_port = listen_port
self.upstream_proxy = None
def get_proxy(self):
try:
response = requests.get(PROXY_LIST_URL, timeout=10)
lines = response.text.strip().split('\n')
socks5_proxies = [l for l in lines if 'socks5://' in l]
if socks5_proxies:
proxy = socks5_proxies[0].replace('socks5://', '')
host, port = proxy.split(':')
print(f"Используется прокси: {host}:{port}")
return host, int(port)
except Exception as e:
print(f"Ошибка получения прокси: {e}")
return None, None
def handle_client(self, client_socket):
try:
# SOCKS5 handshake
data = client_socket.recv(262)
if data[0] != 5:
return
client_socket.sendall(b'\x05\x00')
data = client_socket.recv(4)
cmd = data[1]
if cmd != 1:
return
# Получаем адрес назначения
addr_type = data[3]
if addr_type == 1: # IPv4
addr = socket.inet_ntoa(client_socket.recv(4))
port = int.from_bytes(client_socket.recv(2), 'big')
elif addr_type == 3: # Domain name
domain_len = client_socket.recv(1)[0]
addr = client_socket.recv(domain_len).decode()
port = int.from_bytes(client_socket.recv(2), 'big')
else:
return
# Подключаемся через upstream прокси
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((self.upstream_host, self.upstream_port))
remote.sendall(b'\x05\x01\x00\x01' + socket.inet_aton(addr) + port.to_bytes(2, 'big'))
remote.recv(1024)
client_socket.sendall(b'\x05\x00\x00\x01' + socket.inet_aton('0.0.0.0') + b'\x00\x00')
# Пересылка трафика
sockets = [client_socket, remote]
while True:
rlist, _, _ = select.select(sockets, [], [])
for sock in rlist:
data = sock.recv(4096)
if not data:
return
if sock is client_socket:
remote.sendall(data)
else:
client_socket.sendall(data)
except Exception as e:
print(f"Ошибка: {e}")
finally:
client_socket.close()
def run(self):
while True:
self.upstream_host, self.upstream_port = self.get_proxy()
if not self.upstream_host:
print("Не удалось получить прокси, ждём 30 секунд...")
time.sleep(30)
continue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.listen_host, self.listen_port))
server.listen(5)
print(f"Локальный SOCKS5 прокси запущен на {self.listen_host}:{self.listen_port}")
print(f"Используется удалённый прокси: {self.upstream_host}:{self.upstream_port}")
while True:
client, addr = server.accept()
print(f"Подключение от {addr}")
threading.Thread(target=self.handle_client, args=(client,)).start()
if __name__ == "__main__":
proxy = Socks5Proxy()
proxy.run()