Загрузка данных


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
HTTP Methods Security Tester (WSTG-CONF-06) + RBAC (WSTG-IDNT-01)
Адаптировано для Flask-приложения с CSRF-защитой.

Изменения:
- Исключены эндпоинты /register и /logout (чтобы не ломать сессию)
- В /ban_user используется ID=3 (вместо 1) – предполагается, что это обычный пользователь
- Добавлена задержка между запросами (опционально)
- Улучшена проверка успешного входа
"""

import argparse
import re
import time
from urllib.parse import urljoin, urlparse, parse_qs

import requests
from requests.exceptions import RequestException, Timeout, ConnectionError


DEFAULT_URL = "http://10.0.20.5:5000"
REQUEST_DELAY = 0.1  # секунд между запросами (можно выставить 0, если не нужно)

# --- ID для динамических эндпоинтов (чтобы не банить админа) ---
TEST_USER_ID = 3          # ID обычного пользователя (для ban_user, edit_user)
TEST_GAME_ID = 1          # ID существующей игры (для delete_game, editor_game)
TEST_NEWS_ID = 1          # ID существующей новости (для admin/news/edit, delete)

# Список эндпоинтов для тестирования (без /register и /logout)
ENDPOINTS = [
    "/",
    "/home",
    "/home_admin",
    "/add_card",
    "/recharge_balance",
    "/delete_card",
    "/cart",
    "/checkout",
    "/like",
    "/users",
    "/news",
    "/admin/news",
    "/admin/news/add",
    f"/admin/news/edit/{TEST_NEWS_ID}",
    f"/admin/news/delete/{TEST_NEWS_ID}",
    f"/edit_user/{TEST_USER_ID}",
    f"/delete_game/{TEST_GAME_ID}",
    "/search_suggestions",
    "/search_suggestions1",
    "/add_game",
    f"/editor_game/{TEST_GAME_ID}",
    f"/ban_user/{TEST_USER_ID}",
    f"/add_to_cart/{TEST_GAME_ID}",
    f"/remove_from_cart/{TEST_GAME_ID}",
    f"/add_to_likes/{TEST_GAME_ID}",
    f"/remove_from_likes/{TEST_GAME_ID}",
]

TEST_METHODS = [
    "GET",
    "POST",
    "PUT",
    "DELETE",
    "PATCH",
    "OPTIONS",
    "HEAD",
    "TRACE",
    "CONNECT",
    "FOO",
]

METHOD_ALLOWED_CODES = {200, 201, 202, 204}
REDIRECT_CODES = {301, 302, 303, 307, 308}
DANGEROUS_METHODS = {"PUT", "DELETE", "TRACE", "CONNECT", "PATCH"}


def make_session() -> requests.Session:
    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (HTTP-Methods-Tester)",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    })
    return session


def extract_csrf_token(html: str) -> str | None:
    """Извлекает CSRF-токен из HTML формы Flask-WTF."""
    patterns = [
        r'<input[^>]*name=["\']csrf_token["\'][^>]*value=["\']([^"\']+)["\']',
        r'<input[^>]*value=["\']([^"\']+)["\'][^>]*name=["\']csrf_token["\']',
    ]
    for pattern in patterns:
        m = re.search(pattern, html, re.IGNORECASE | re.DOTALL)
        if m:
            return m.group(1)
    return None


def login(session: requests.Session, base_url: str, email: str, password: str, timeout: int = 5) -> bool:
    """Выполняет вход в приложение (с CSRF) и возвращает True при успехе."""
    login_url = urljoin(base_url, "/")

    try:
        # Получаем страницу логина и выдираем CSRF-токен
        resp = session.get(login_url, timeout=timeout)
        resp.raise_for_status()

        csrf_token = extract_csrf_token(resp.text)
        if not csrf_token:
            print(f"[-] Не найден csrf_token на странице логина для {email}")
            return False

        # Отправляем POST с данными формы
        payload = {
            "email": email,
            "password": password,
            "remember_me": "y",
            "submit": "Login",
            "csrf_token": csrf_token,
        }
        resp = session.post(login_url, data=payload, timeout=timeout, allow_redirects=True)

        # Проверяем финальный URL после всех редиректов
        final_path = urlparse(resp.url).path
        if final_path in ("/home", "/home_admin"):
            print(f"[+] Успешный вход как {email} -> {resp.url}")
            return True

        print(f"[-] Не удалось войти как {email} (финальный URL: {resp.url}, статус: {resp.status_code})")
        return False

    except RequestException as e:
        print(f"[-] Ошибка логина для {email}: {e}")
        return False


def is_login_redirect(location: str | None) -> bool:
    """Определяет, является ли редирект перенаправлением на страницу логина."""
    if not location:
        return False
    parsed = urlparse(location)
    path = parsed.path or ""
    qs = parse_qs(parsed.query)
    return path == "/" or "next" in qs or "login" in path.lower()


def classify_response(status: int | None, location: str | None) -> str:
    """Классифицирует ответ сервера."""
    if status is None:
        return "error"
    if status == 403:
        return "forbidden"
    if status == 405:
        return "method_not_allowed"
    if status in METHOD_ALLOWED_CODES:
        return "allowed"
    if status in REDIRECT_CODES:
        if is_login_redirect(location):
            return "auth_required"
        return "redirect"
    return "other"


def test_method(session: requests.Session, base_url: str, endpoint: str, method: str, timeout: int = 5) -> dict:
    """Отправляет запрос с заданным методом и возвращает информацию о ответе."""
    url = urljoin(base_url, endpoint)

    try:
        if method == "HEAD":
            resp = session.head(url, timeout=timeout, allow_redirects=False)
        elif method == "OPTIONS":
            resp = session.options(url, timeout=timeout, allow_redirects=False)
        else:
            resp = session.request(method, url, timeout=timeout, allow_redirects=False)

        return {
            "status": resp.status_code,
            "location": resp.headers.get("Location"),
            "content_type": resp.headers.get("Content-Type", ""),
        }
    except (ConnectionError, Timeout) as e:
        return {"status": None, "location": None, "content_type": "", "error": str(e)}
    except RequestException as e:
        return {"status": None, "location": None, "content_type": "", "error": str(e)}


def print_result(method: str, info: dict):
    """Выводит результат тестирования одного метода."""
    if info.get("error"):
        print(f"  {method:<8} -> ERROR: {info['error']}")
        return

    status = info["status"]
    location = info.get("location")
    kind = classify_response(status, location)

    extra = ""
    if location:
        extra = f" -> {location}"

    if kind == "allowed":
        if method in DANGEROUS_METHODS:
            extra += " !!! ОПАСНЫЙ МЕТОД РАЗРЕШЁН !!!"
        print(f"  {method:<8} -> {status} [+]{extra}")
    elif kind == "forbidden":
        print(f"  {method:<8} -> {status} [! FORBIDDEN]{extra}")
    elif kind == "method_not_allowed":
        print(f"  {method:<8} -> {status} [-] (method not allowed){extra}")
    elif kind == "auth_required":
        print(f"  {method:<8} -> {status} [-] (auth required){extra}")
    elif kind == "redirect":
        print(f"  {method:<8} -> {status} [-] (redirect){extra}")
    else:
        print(f"  {method:<8} -> {status} [?]{extra}")


def run_tests(session: requests.Session, base_url: str, label: str, timeout: int = 5, delay: float = 0) -> dict:
    """Запускает тесты для всех эндпоинтов и возвращает результаты."""
    results_by_endpoint = {}
    for endpoint in ENDPOINTS:
        endpoint_results = {}
        for method in TEST_METHODS:
            endpoint_results[method] = test_method(session, base_url, endpoint, method, timeout=timeout)
            if delay > 0:
                time.sleep(delay)
        results_by_endpoint[endpoint] = endpoint_results
        # Вывод результатов для этого эндпоинта
        print(f"\n--- Эндпоинт: {endpoint} (Сессия: {label}) ---")
        for method, info in endpoint_results.items():
            print_result(method, info)
    return results_by_endpoint


def generate_report(all_results: dict):
    """Формирует итоговый отчёт о найденных уязвимостях."""
    print("\n" + "=" * 80)
    print("ИТОГОВЫЙ ОТЧЁТ")
    print("=" * 80)

    dangerous_found = False
    forbidden_found = False

    for session_name, endpoints in all_results.items():
        for endpoint, methods in endpoints.items():
            for method, info in methods.items():
                status = info.get("status")
                location = info.get("location")
                kind = classify_response(status, location)

                if kind == "forbidden":
                    forbidden_found = True
                if method in DANGEROUS_METHODS and kind == "allowed":
                    print(f"[!] {endpoint} [{session_name}]: метод {method} разрешён (код {status})")
                    dangerous_found = True

    if not dangerous_found:
        print("[+] Опасные методы (PUT, DELETE, TRACE, CONNECT, PATCH) нигде не разрешены.")
    else:
        print("[!] Найдены разрешённые опасные методы.")

    if forbidden_found:
        print("[+] На части эндпоинтов корректно возвращается 403 Forbidden для недостаточных прав.")
    print("=" * 80)


def main():
    parser = argparse.ArgumentParser(description="Тестирование HTTP-методов и RBAC (WSTG-CONF-06, WSTG-IDNT-01)")
    parser.add_argument("--url", default=DEFAULT_URL, help=f"Базовый URL (по умолчанию {DEFAULT_URL})")
    parser.add_argument("--timeout", type=int, default=5, help="Таймаут запроса в секундах")
    parser.add_argument("--delay", type=float, default=REQUEST_DELAY, help="Задержка между запросами (сек)")

    parser.add_argument("--user-login", default=None, help="Email обычного пользователя")
    parser.add_argument("--user-password", default=None, help="Пароль обычного пользователя")

    parser.add_argument("--admin-login", default=None, help="Email администратора")
    parser.add_argument("--admin-password", default=None, help="Пароль администратора")

    args = parser.parse_args()
    base_url = args.url.rstrip("/")

    print(f"[*] Начинаем тестирование HTTP-методов для {base_url}")
    print(f"[*] Эндпоинтов для проверки: {len(ENDPOINTS)}")

    all_results = {}

    # 1. Анонимная сессия
    print("\n[1] ТЕСТИРОВАНИЕ БЕЗ АУТЕНТИФИКАЦИИ")
    anon_session = make_session()
    all_results["anon"] = run_tests(anon_session, base_url, "anon", timeout=args.timeout, delay=args.delay)

    # 2. Сессия обычного пользователя
    if args.user_login and args.user_password:
        print("\n[2] ТЕСТИРОВАНИЕ ОТ ИМЕНИ ОБЫЧНОГО ПОЛЬЗОВАТЕЛЯ")
        user_session = make_session()
        if login(user_session, base_url, args.user_login, args.user_password, timeout=args.timeout):
            all_results["user"] = run_tests(user_session, base_url, "user", timeout=args.timeout, delay=args.delay)
        else:
            print("[-] Тесты от имени обычного пользователя пропущены из-за неудачного входа.")
    else:
        print("\n[2] Пропуск тестов от обычного пользователя (укажите --user-login и --user-password)")

    # 3. Сессия администратора
    if args.admin_login and args.admin_password:
        print("\n[3] ТЕСТИРОВАНИЕ ОТ ИМЕНИ АДМИНИСТРАТОРА")
        admin_session = make_session()
        if login(admin_session, base_url, args.admin_login, args.admin_password, timeout=args.timeout):
            all_results["admin"] = run_tests(admin_session, base_url, "admin", timeout=args.timeout, delay=args.delay)
        else:
            print("[-] Тесты от имени администратора пропущены из-за неудачного входа.")
    else:
        print("\n[3] Пропуск тестов от администратора (укажите --admin-login и --admin-password)")

    generate_report(all_results)


if __name__ == "__main__":
    main()