Загрузка данных


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
HTTP Methods Security Tester (Flask / Flask-WTF)

Что делает:
- логинится как user и admin через CSRF
- тестирует методы на списке эндпоинтов
- не путает 302 редирект с успешным доступом
- отдельно отмечает:
  * OK (2xx)
  * REDIRECT_LOGIN (302 на страницу логина)
  * FORBIDDEN (403)
  * METHOD_NOT_ALLOWED (405)

Пример:
python test_http_methods.py \
  --url http://10.0.20.5:5000 \
  --user-login test@test.ru --user-password 123456 \
  --admin-login admin@mail.ru --admin-password 123456
"""

import argparse
import re
from urllib.parse import urljoin, urlparse, parse_qs

import requests
from requests.exceptions import RequestException, Timeout, ConnectionError


DEFAULT_URL = "http://10.0.20.5:5000"

ENDPOINTS = [
    "/",
    "/register",
    "/home",
    "/home_admin",
    "/add_card",
    "/recharge_balance",
    "/delete_card",
    "/cart",
    "/checkout",
    "/like",
    "/users",
    "/logout",
    "/news",
    "/admin/news",
    "/admin/news/add",
    "/admin/news/edit/1",
    "/admin/news/delete/1",
    "/edit_user/1",
    "/delete_game/1",
    "/search_suggestions",
    "/search_suggestions1",
    "/add_game",
    "/editor_game/1",
    "/ban_user/1",
    "/add_to_cart/1",
    "/remove_from_cart/1",
    "/add_to_likes/1",
    "/remove_from_likes/1",
]

TEST_METHODS = [
    "GET",
    "POST",
    "PUT",
    "DELETE",
    "PATCH",
    "OPTIONS",
    "HEAD",
    "TRACE",
    "CONNECT",
    "FOO",
]

DANGEROUS_METHODS = {"PUT", "DELETE", "TRACE", "CONNECT", "PATCH"}
METHOD_ALLOWED_CODES = {200, 201, 202, 204}
REDIRECT_CODES = {301, 302, 303, 307, 308}


def make_session() -> requests.Session:
    s = requests.Session()
    s.headers.update({
        "User-Agent": "Mozilla/5.0 (HTTP-Methods-Tester)",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    })
    return s


def extract_csrf_token(html: str) -> str | None:
    patterns = [
        r'name="csrf_token"\s+type="hidden"\s+value="([^"]+)"',
        r'name="csrf_token"[^>]*value="([^"]+)"',
        r'value="([^"]+)"[^>]*name="csrf_token"',
    ]
    for p in patterns:
        m = re.search(p, html, re.IGNORECASE | re.DOTALL)
        if m:
            return m.group(1)
    return None


def login(session: requests.Session, base_url: str, email: str, password: str, timeout: int = 5) -> bool:
    """
    Логин через форму Flask-WTF.
    """
    login_url = urljoin(base_url, "/")

    try:
        r = session.get(login_url, timeout=timeout)
        r.raise_for_status()

        csrf = extract_csrf_token(r.text)
        if not csrf:
            print(f"[-] CSRF token не найден для {email}")
            return False

        payload = {
            "email": email,
            "password": password,
            "remember_me": "y",
            "submit": "Login",
            "csrf_token": csrf,
        }

        resp = session.post(login_url, data=payload, timeout=timeout, allow_redirects=True)

        final_path = urlparse(resp.url).path
        if final_path in ("/home", "/home_admin"):
            print(f"[+] Успешный вход: {email} -> {resp.url}")
            return True

        print(f"[-] Вход не удался: {email} (final_url={resp.url}, status={resp.status_code})")
        return False

    except RequestException as e:
        print(f"[-] Ошибка логина для {email}: {e}")
        return False


def classify_response(status: int, location: str | None) -> str:
    if status in METHOD_ALLOWED_CODES:
        return "OK"
    if status == 403:
        return "FORBIDDEN"
    if status == 405:
        return "METHOD_NOT_ALLOWED"
    if status in REDIRECT_CODES:
        if location:
            loc = location.lower()
            if "next=" in loc or loc.endswith("/login") or loc == "/" or loc.startswith("/?next="):
                return "REDIRECT_LOGIN"
            return "REDIRECT"
        return "REDIRECT"
    return "OTHER"


def test_one(session: requests.Session, base_url: str, endpoint: str, method: str, timeout: int = 5) -> dict:
    url = urljoin(base_url, endpoint)

    try:
        if method == "HEAD":
            resp = session.head(url, timeout=timeout, allow_redirects=False)
        elif method == "OPTIONS":
            resp = session.options(url, timeout=timeout, allow_redirects=False)
        else:
            resp = session.request(method, url, timeout=timeout, allow_redirects=False)

        return {
            "status": resp.status_code,
            "location": resp.headers.get("Location"),
            "class": classify_response(resp.status_code, resp.headers.get("Location")),
        }

    except (ConnectionError, Timeout) as e:
        return {
            "status": None,
            "location": None,
            "class": "ERROR",
            "error": str(e),
        }
    except RequestException as e:
        return {
            "status": None,
            "location": None,
            "class": "ERROR",
            "error": str(e),
        }


def print_results(endpoint: str, results: dict, label: str):
    print(f"\n--- Эндпоинт: {endpoint} ({label}) ---")
    for method in TEST_METHODS:
        info = results[method]
        if info["class"] == "ERROR":
            print(f"  {method:<8} -> ERROR: {info['error']}")
            continue

        status = info["status"]
        loc = info["location"]
        cls = info["class"]

        extra = ""
        if loc:
            extra = f" -> {loc}"

        if cls == "OK":
            mark = "[+]"
        elif cls == "FORBIDDEN":
            mark = "[-]"
            extra += " (403 forbidden)"
        elif cls == "METHOD_NOT_ALLOWED":
            mark = "[-]"
            extra += " (405 not allowed)"
        elif cls == "REDIRECT_LOGIN":
            mark = "[!]"
            extra += " (redirect to login)"
        elif cls == "REDIRECT":
            mark = "[!]"
            extra += " (redirect)"
        else:
            mark = "[?]"

        if method in DANGEROUS_METHODS and cls == "OK":
            extra += " !!! DANGEROUS METHOD ACCEPTED !!!"

        print(f"  {method:<8} -> {status} {mark}{extra}")


def run_suite(session: requests.Session, base_url: str, label: str, timeout: int = 5) -> dict:
    all_results = {}
    for endpoint in ENDPOINTS:
        endpoint_results = {}
        for method in TEST_METHODS:
            endpoint_results[method] = test_one(session, base_url, endpoint, method, timeout=timeout)
        all_results[endpoint] = endpoint_results
        print_results(endpoint, endpoint_results, label)
    return all_results


def summarize(all_results: dict):
    print("\n" + "=" * 80)
    print("ИТОГОВЫЙ ОТЧЁТ")
    print("=" * 80)

    dangerous = []
    auth_issues = []

    for endpoint, by_label in all_results.items():
        for label, results in by_label.items():
            for method, info in results.items():
                cls = info.get("class")
                status = info.get("status")

                if method in DANGEROUS_METHODS and cls == "OK":
                    dangerous.append((endpoint, label, method, status))

                if cls == "REDIRECT_LOGIN":
                    auth_issues.append((endpoint, label, method, info.get("location")))

    if dangerous:
        print("[!] Опасные методы разрешены:")
        for endpoint, label, method, status in dangerous:
            print(f"    - {endpoint} [{label}] {method} -> {status}")
    else:
        print("[+] Опасные методы нигде не разрешены.")

    if auth_issues:
        print("\n[!] Редиректы на логин обнаружены на защищённых страницах:")
        for endpoint, label, method, loc in auth_issues[:20]:
            print(f"    - {endpoint} [{label}] {method} -> {loc}")

    print("=" * 80)


def main():
    parser = argparse.ArgumentParser(description="HTTP Methods Security Tester")
    parser.add_argument("--url", default=DEFAULT_URL)
    parser.add_argument("--timeout", type=int, default=5)

    parser.add_argument("--user-login", default=None)
    parser.add_argument("--user-password", default=None)

    parser.add_argument("--admin-login", default=None)
    parser.add_argument("--admin-password", default=None)

    args = parser.parse_args()
    base_url = args.url.rstrip("/")

    print(f"[*] Начинаем тестирование HTTP-методов для {base_url}")
    print(f"[*] Эндпоинтов для проверки: {len(ENDPOINTS)}")

    all_results = {}

    print("\n[1] ТЕСТИРОВАНИЕ БЕЗ АУТЕНТИФИКАЦИИ")
    anon = make_session()
    all_results["anon"] = run_suite(anon, base_url, "anon", timeout=args.timeout)

    if args.user_login and args.user_password:
        print("\n[2] ТЕСТИРОВАНИЕ ОТ ИМЕНИ ОБЫЧНОГО ПОЛЬЗОВАТЕЛЯ")
        user_s = make_session()
        if login(user_s, base_url, args.user_login, args.user_password, timeout=args.timeout):
            all_results["user"] = run_suite(user_s, base_url, "user", timeout=args.timeout)
        else:
            print("[-] Пропуск тестов user: вход не удался")
    else:
        print("\n[2] Пропуск тестов user (нет --user-login / --user-password)")

    if args.admin_login and args.admin_password:
        print("\n[3] ТЕСТИРОВАНИЕ ОТ ИМЕНИ АДМИНИСТРАТОРА")
        admin_s = make_session()
        if login(admin_s, base_url, args.admin_login, args.admin_password, timeout=args.timeout):
            all_results["admin"] = run_suite(admin_s, base_url, "admin", timeout=args.timeout)
        else:
            print("[-] Пропуск тестов admin: вход не удался")
    else:
        print("\n[3] Пропуск тестов admin (нет --admin-login / --admin-password)")

    summarize(all_results)


if __name__ == "__main__":
    main()