Загрузка данных


import imaplib
import socket
import re
from imap_tools import MailBox, A   # ← ИСПРАВЛЕННЫЙ ИМПОРТ

socket.setdefaulttimeout(20)

# Таймауты
TIMEOUT = 20

COMMON = {
    "gmail.com": "imap.gmail.com",
    "googlemail.com": "imap.gmail.com",
    "outlook.com": "outlook.office365.com",
    "hotmail.com": "outlook.office365.com",
    "live.com": "outlook.office365.com",
    "msn.com": "outlook.office365.com",
    "mail.ru": "imap.mail.ru",
    "bk.ru": "imap.mail.ru",
    "inbox.ru": "imap.mail.ru",
    "list.ru": "imap.mail.ru",
    "yandex.ru": "imap.yandex.ru",
    "ya.ru": "imap.yandex.ru",
    "yandex.com": "imap.yandex.ru",
    "yandex.kz": "imap.yandex.ru",
    "yandex.by": "imap.yandex.ru",
    "rambler.ru": "imap.rambler.ru",
    "yahoo.com": "imap.mail.yahoo.com",
    "yahoo.co.uk": "imap.mail.yahoo.com",
    "yahoo.fr": "imap.mail.yahoo.com",
    "yahoo.de": "imap.mail.yahoo.com",
    "yahoo.es": "imap.mail.yahoo.com",
    "yahoo.it": "imap.mail.yahoo.com",
    "yahoo.co.jp": "imap.mail.yahoo.com",
    "icloud.com": "imap.mail.me.com",
    "me.com": "imap.mail.me.com",
    "mac.com": "imap.mail.me.com",
    "protonmail.com": "imap.protonmail.com",
    "proton.me": "imap.protonmail.com",
    "zoho.com": "imap.zoho.com",
    "gmx.com": "imap.gmx.com",
    "gmx.net": "imap.gmx.net",
    "gmx.de": "imap.gmx.net",
    "web.de": "imap.web.de",
    "t-online.de": "secureimap.t-online.de",
    "aol.com": "imap.aol.com",
    "firstmail.ltd": "mail.firstmail.ltd",
    "notletters.com": "mail.notletters.com",
}

IMAP_FALLBACKS = [
    "imap.{d}",
    "mail.{d}",
    "imap.mail.{d}",
    "{d}",
]


def get_imap_server(email: str) -> str | None:
    """Определяет IMAP сервер по email"""
    try:
        domain = email.split("@")[1].lower().strip()
    except (IndexError, AttributeError):
        return None

    if domain in COMMON:
        return COMMON[domain]

    for pattern in IMAP_FALLBACKS:
        host = pattern.format(d=domain)
        try:
            print(f"[TRY] {host}")
            imap = imaplib.IMAP4_SSL(host, timeout=10)
            imap.logout()
            print(f"[FOUND IMAP] {host}")
            return host
        except Exception:
            continue

    print(f"[NO IMAP] {domain}")
    return None


def check(email: str, password: str) -> tuple[bool, str | int, str]:
    """Проверка аккаунта"""
    server = get_imap_server(email)
    if not server:
        domain = email.split("@")[-1].lower()
        return False, f"IMAP сервер не найден для домена: {domain}", ""

    try:
        with MailBox(server, timeout=TIMEOUT).login(email, password) as m:
            status = m.folder.status("INBOX")
            count = int(status.get("MESSAGES", 0))
        return True, count, server
    except Exception as e:
        return False, str(e), server


def get_last_messages(email: str, password: str, server: str, limit: int = 20) -> list[dict]:
    """Получить последние письма"""
    data = []
    try:
        with MailBox(server, timeout=TIMEOUT).login(email, password) as m:
            for msg in m.fetch(limit=limit, reverse=True):
                data.append({
                    "from": msg.from_ or "",
                    "subject": msg.subject or "(без темы)",
                    "date": str(msg.date)[:19] if msg.date else "",
                    "uid": msg.uid,
                })
    except Exception as e:
        raise RuntimeError(f"Ошибка получения писем: {e}") from e
    return data


def extract_code(text: str) -> str:
    """Извлекает код подтверждения"""
    if not text:
        return ""
    patterns = [r"\b\d{4,8}\b", r"\b[A-Z0-9]{4,10}\b"]
    for pattern in patterns:
        match = re.search(pattern, text)
        if match:
            return match.group(0)
    return ""


def search_messages(
    email: str, password: str, server: str,
    search_by: str = "last", query: str = "", limit: int = 20
) -> list[dict]:
    """Поиск писем"""
    data = []
    try:
        with MailBox(server, timeout=TIMEOUT).login(email, password, initial_folder='INBOX') as m:
            fetched = list(m.fetch(limit=500, reverse=True, mark_seen=False))

            for msg in fetched:
                sender = (msg.from_ or "").lower()
                subject = msg.subject or "(без темы)"
                body = (msg.text or '') + '\n' + (msg.html or '')

                matched = False
                q = query.lower().strip()

                if search_by == "from":
                    matched = q.replace('@', '') in sender
                elif search_by == "subject":
                    matched = q in subject.lower()
                elif search_by == "body":
                    matched = q in body.lower()
                else:
                    matched = True

                if matched:
                    data.append({
                        "from": msg.from_ or "",
                        "subject": subject,
                        "date": str(msg.date)[:19] if msg.date else "",
                        "uid": msg.uid,
                        "body": body[:500],
                        "code": extract_code(body),
                    })

                if len(data) >= limit:
                    break
    except Exception as e:
        raise RuntimeError(f"Ошибка поиска писем: {e}") from e

    return data


def delete_messages(
    email: str, password: str, server: str,
    delete_by: str = "all", query: str = ""
) -> int:
    """Удаление писем"""
    deleted = 0
    try:
        with MailBox(server, timeout=TIMEOUT).login(email, password) as m:
            if delete_by == "all":
                criteria = A("ALL")
            elif delete_by == "from":
                criteria = A(from_=query)
            elif delete_by == "subject":
                criteria = A(subject=query)
            else:
                criteria = A("ALL")

            uids = [msg.uid for msg in m.fetch(criteria)]
            if uids:
                m.delete(uids)
                deleted = len(uids)
    except Exception as e:
        raise RuntimeError(f"Ошибка удаления писем: {e}") from e

    return deleted