Загрузка данных
import os, io, base64, pyotp, qrcode import shutil import secrets import requests from pathlib import Path from werkzeug.middleware.proxy_fix import ProxyFix from datetime import datetime, timedelta from flask import Flask, render_template, request, redirect, url_for, session, send_from_directory, jsonify, flash from flask_sqlalchemy import SQLAlchemy from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from flask import send_file app = Flask(__name__) app.config['SECRET_KEY'] = 'cloud-vault-2026-final-secure' BASE_DIR = os.path.abspath(os.path.dirname(__file__)) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(BASE_DIR, 'cloud_vault.db') UPLOAD_BASE_PATH = os.path.join(BASE_DIR, 'uploads') app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 * 1024 app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) db = SQLAlchemy(app) class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(50), unique=True, nullable=False) password_hash = db.Column(db.String(256), nullable=False) totp_secret = db.Column(db.String(32), nullable=True) class SharedLink(db.Model): id = db.Column(db.Integer, primary_key=True) token = db.Column(db.String(64), unique=True, nullable=False) file_path = db.Column(db.String(512), nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) expires_at = db.Column(db.DateTime, nullable=False) class UserSession(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) session_token = db.Column(db.String(100), unique=True, nullable=False) ip_address = db.Column(db.String(45)) location = db.Column(db.String(100), default="Неизвестно") user_agent = db.Column(db.String(255)) last_activity = db.Column(db.DateTime, default=datetime.utcnow) def get_location_by_ip(ip): if ip == '127.0.0.1': return "Локальный хост" try: response = requests.get(f'http://ip-api.com/json/{ip}?lang=ru', timeout=5) data = response.json() if data.get('status') == 'success': return f"{data.get('city')}, {data.get('country')}" return f"Ошибка API: {data.get('message')}" except Exception as e: return f"Ошибка запроса: {str(e)}" def get_user_dir(): user_id = str(session.get('user_id')) path = os.path.join(UPLOAD_BASE_PATH, user_id) os.makedirs(path, exist_ok=True) return path def get_user_storage_usage(user_id): user_dir = os.path.join(UPLOAD_BASE_PATH, str(user_id)) total_size = 0 if not os.path.exists(user_dir): return 0 for dirpath, dirnames, filenames in os.walk(user_dir): for f in filenames: fp = os.path.join(dirpath, f) if os.path.exists(fp): total_size += os.path.getsize(fp) return total_size with app.app_context(): db.create_all() def move_to_trash(item_path): parent_dir = os.path.dirname(item_path) trash_dir = os.path.join(parent_dir, ".trash") if not os.path.exists(trash_dir): os.makedirs(trash_dir) item_name = os.path.basename(item_path) dest_path = os.path.join(trash_dir, item_name) if os.path.exists(dest_path): from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") dest_path = os.path.join(trash_dir, f"{timestamp}_{item_name}") shutil.move(item_path, dest_path) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if not password or len(password) < 12: flash('Пароль слишком короткий (минимум 12 символов)', 'danger') return redirect(url_for('register')) if User.query.filter_by(username=username).first(): flash('Логин уже занят', 'danger') return redirect(url_for('register')) db.session.add(User(username=username, password_hash=generate_password_hash(password))) db.session.commit() flash('Аккаунт создан! Войдите.', 'success') return redirect(url_for('login')) return render_template('register.html') @app.route('/login', methods=['GET', 'POST']) def login (): if request.method == 'POST': user = User.query.filter_by(username=request.form.get('username')).first() if user and check_password_hash(user.password_hash, request.form.get('password')): session['pre_auth_user_id'] = user.id if user.totp_secret: return redirect(url_for('login_2fa')) session.pop('pre_auth_user_id', None) token = secrets.token_hex(24) ip_addr = request.headers.get('X-Forwarded-For', request.remote_addr) location = get_location_by_ip(ip_addr) new_session = UserSession( user_id=user.id, session_token=token, ip_address=ip_addr, location=location, user_agent=request.headers.get('User-Agent') ) db.session.add(new_session) db.session.commit() session.update({ 'user_id': user.id, 'username': user.username, 'session_token': token }) return redirect(url_for('dashboard')) flash('Ошибка входа', 'danger') return render_template('login.html') @app.route('/login-2fa', methods=['GET', 'POST']) def login_2fa(): user_id = session.get('pre_auth_user_id') if not user_id: return redirect(url_for('login')) user = User.query.get(user_id) if request.method == 'POST': otp = request.form.get('otp') if otp and pyotp.totp.TOTP(user.totp_secret).verify(otp): session.pop('pre_auth_user_id', None) token = secrets.token_hex(24) ip_addr = request.headers.get('X-Forwarded-For', request.remote_addr) if ',' in ip_addr: ip_addr = ip_addr.split(',')[0].strip() location = get_location_by_ip(ip_addr) new_session = UserSession( user_id=user.id, session_token=token, ip_address=request.remote_addr, location=location, user_agent=request.headers.get('User-Agent') ) db.session.add(new_session) db.session.commit() session.update({ 'user_id': user.id, 'username': user.username, 'session_token': token }) return redirect(url_for('dashboard')) else: flash('Неверный код 2FA', 'danger') return render_template('login_2fa.html') @app.before_request def check_session_validty(): if request.endpoint in ['login', 'login_2fa', 'static', 'register']: return u_id = session.get('user_id') token = session.get('session_token') if u_id and token: exists = UserSession.query.filter_by(user_id=u_id, session_token=token).first() if not exists: session.clear() return redirect(url_for('login')) @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.route('/trash') def show_trash(): import os trash_items = [] base_path = app.config.get('UPLOAD_FOLDER', 'uploads') for root, dirs, files in os.walk(base_path): if '.trash' in dirs: trash_path = os.path.join(root, '.trash') for item in os.listdir(trash_path): full_path = os.path.join(trash_path, item) trash_items.append({ 'name': item, 'full_path': full_path, 'original_path': root }) return render_template('trash.html', items=trash_items) @app.route('/restore', methods=['POST']) def restore_file(): import shutil data = request.json trash_path = data.get('path') original_path = trash_path.replace('/.trash/', '/') if os.path.exists(trash_path): shutil.move(trash_path, original_path) return {"status": "ok"} return {"status": "error"}, 400 @app.route('/final-delete', methods=['POST']) def final_delete(): import os, shutil data = request.json path = data.get('path') if os.path.exists(path): if os.path.isdir(path): shutil.rmtree(path) else: os.remove(path) return {"status": "ok"} return {"status": "error"}, 400 @app.route('/settings') def settings(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] user = User.query.get(user_id) usage = get_user_storage_usage(user_id) active_sessions = UserSession.query.filter_by(user_id=user_id).all() limit = 10 * 1024 * 1024 * 1024 usage_gb = round(usage / (1024**3), 2) percent = min((usage / limit) * 100, 100) if limit > 0 else 0 return render_template('settings.html', user_id=user_id, usage_gb=usage_gb, percent=percent, has_2fa=bool(user.totp_secret), sessions=active_sessions) @app.route('/logout_session/<int:sid>') def logout_specific_session(sid): if 'user_id' not in session: return redirect(url_for('login')) sess_to_delete = UserSession.query.get(sid) if sess_to_delete and sess_to_delete.user_id == session['user_id']: db.session.delete(sess_to_delete) db.session.commit() flash('Устройство успешно отключено', 'success') return redirect(url_for('settings')) @app.route('/change-password', methods=['POST']) def change_password(): if 'user_id' not in session: return jsonify({"error": "Auth"}), 401 user = User.query.get(session['user_id']) data = request.json if not check_password_hash(user.password_hash, data.get('old_password')): return jsonify({"error": "Старый пароль неверен"}), 400 new_pw = data.get('new_password') if not new_pw or len(new_pw) < 12: return jsonify({"error": "Минимум 12 символов"}), 400 user.password_hash = generate_password_hash(new_pw) db.session.commit() return jsonify({"success": True}) @app.route('/upload', methods=['POST']) def upload_file(): if 'user_id' not in session: return redirect(url_for('login')) file = request.files.get('file') path = request.form.get('path', '') if file: user_id = session['user_id'] current_usage = get_user_storage_usage(user_id) file.seek(0, os.SEEK_END) new_file_size = file.tell() file.seek(0) if current_usage + new_file_size > 10 * 1024 * 1024 * 1024: return "Ошибка: Недостаточно места", 400 user_dir = os.path.join(UPLOAD_BASE_PATH, str(user_id), path.strip('/')) os.makedirs(user_dir, exist_ok=True) original_name = file.filename filename = os.path.basename(original_name) if filename: file.save(os.path.join(user_dir, filename)) return "OK", 200 @app.route('/raw_file/<int:file_id>') def raw_file(file_id): file_data = File.quary.get_or_404(file_id) file_path = os.path.join(app.config['UPLOAD_FOLDER'], file_data.storage_path) return send_file( file_path, mimetype='application/pdf', as_attachment=False ) @app.route('/preview/<path:file_id>') def preview_pdf(file_id): full_path = os.path.join(UPLOAD_BASE_PATH, file_id) if not os.path.exists(full_path): for root, dirs, files in os.walk(UPLOAD_BASE_PATH): if file_id in files: return send_from_directory(root, file_id) return f"Файл {file_id} не найден в хранилище", 404 return send_from_directory(UPLOAD_BASE_PATH, file_id) @app.route('/share/<path:filename>', methods=['POST']) def share_file(filename): if 'user_id' not in session: return jsonify({"error": "Auth required"}), 401 data = request.get_json() or {} days = int(data.get('days', 1)) token = secrets.token_urlsafe(32) expiry = datetime.now() + timedelta(days=days) new_share = SharedLink( token=token, file_path=filename, user_id=session['user_id'], expires_at=expiry ) db.session.add(new_share) db.session.commit() share_url = f"{request.host_url}s/{token}" return jsonify({"share_url": share_url}) @app.route('/s/<token>') def public_download(token): share = SharedLink.query.filter_by(token=token).first_or_404() if datetime.now() > share.expires_at: return "Срок действия ссылки истек", 410 user_dir = os.path.join(UPLOAD_BASE_PATH, str(share.user_id)) return send_from_directory(user_dir, share.file_path) user_dir = os.path.join(UPLOAD_BASE_PATH, str(share.user_id)) return send_from_directory(user_dir, share.file_path) @app.route('/download/<path:filename>') def download_file(filename): if 'user_id' not in session: return redirect(url_for('login')) return send_from_directory(get_user_dir(), filename) @app.route('/delete/<path:filename>', methods=['DELETE']) def delete_item(filename): base_path = app.config.get('UPLOAD_FOLDER', 'uploads') full_path = os.path.join(base_path, filename) if os.path.exists(full_path): parent_dir = os.path.dirname(full_path) trash_dir = os.path.join(parent_dir, ".trash") if not os.path.exists(trash_dir): os.makedirs(trash_dir) dest_path = os.path.join(trash_dir, os.path.basename(full_path)) shutil.move(full_path, dest_path) return {"status": "success", "message": "Moved to trash"}, 200 return {"status": "error", "message": "File not found"}, 404 @app.route('/create-folder', methods=['POST']) def create_folder(): if 'user_id' not in session: return jsonify({"error": "Auth"}), 401 data = request.get_json() folder_name = secure_filename(data.get('name')) current_subpath = data.get('path', '').strip('/') if folder_name: new_path = os.path.join(get_user_dir(), current_subpath, folder_name) try: os.makedirs(new_path, exist_ok=True) return jsonify({"success": True}) except Exception as e: return jsonify({"error": str(e)}), 500 return jsonify({"error": "Invalid name"}), 400 @app.route('/') @app.route('/dashboard') @app.route('/dashboard/<path:subpath>') def dashboard(subpath=""): if 'user_id' not in session: return redirect('/login') user_id = session['user_id'] root_user_dir = get_user_dir() # Корень пользователя (uploads/ID) current_view_dir = os.path.join(root_user_dir, subpath) # Текущая папка usage = get_user_storage_usage(user_id) usage_gb = round(usage / (1024**3), 2) limit = 10 * 1024 * 1024 * 1024 percent = min((usage / limit) * 100, 100) if limit > 0 else 0 folders = [] files = [] if os.path.exists(current_view_dir): for item in os.listdir(current_view_dir): item_path = os.path.join(current_view_dir, item) if os.path.isdir(item_path): folders.append(item) else: files.append(item) all_files_metadata = [] for root, dirs, filenames in os.walk(root_user_dir): for name in dirs + filenames: rel_path = str(Path(os.path.join(root, name)).relative_to(root_user_dir)) if rel_path == ".": continue all_files_metadata.append({ "name": name, "path": rel_path.replace('\\', '/'), "is_dir": os.path.isdir(os.path.join(root, name)) }) return render_template('dashboard.html', folders=folders, files=files, current_path=subpath, usage_gb=usage_gb, percent=percent, all_files_metadata=all_files_metadata) # Передаем список @app.route('/enable-2fa', methods=['POST']) def enable_2fa(): user = User.query.get(session['user_id']) secret = pyotp.random_base32() session['temp_totp'] = secret uri = pyotp.totp.TOTP(secret).provisioning_uri(name=user.username, issuer_name="CloudVault") img = qrcode.make(uri) buf = io.BytesIO() img.save(buf) return jsonify({"qr_code": base64.b64encode(buf.getvalue()).decode()}) @app.route('/confirm-2fa', methods=['POST']) def confirm_2fa(): otp = request.json.get('otp') secret = session.get('temp_totp') if secret and pyotp.totp.TOTP(secret).verify(otp): user = User.query.get(session['user_id']) user.totp_secret = secret db.session.commit() return jsonify({"success": True}) return jsonify({"error": "Код неверен"}), 400 @app.route('/disable-2fa', methods=['POST']) def disable_2fa(): user = User.query.get(session['user_id']) user.totp_secret = None db.session.commit() return jsonify({"success": True}) @app.route('/logout') def logout(): session.clear() return redirect(url_for('login')) if __name__ == '__main__': app.run(host='0.0.0.0', port=5050)