Загрузка данных
import imaplib
import socket
import re
from imap_tools import MailBox, A # ← ИСПРАВЛЕННЫЙ ИМПОРТ
socket.setdefaulttimeout(20)
# Таймауты
TIMEOUT = 20
COMMON = {
"gmail.com": "imap.gmail.com",
"googlemail.com": "imap.gmail.com",
"outlook.com": "outlook.office365.com",
"hotmail.com": "outlook.office365.com",
"live.com": "outlook.office365.com",
"msn.com": "outlook.office365.com",
"mail.ru": "imap.mail.ru",
"bk.ru": "imap.mail.ru",
"inbox.ru": "imap.mail.ru",
"list.ru": "imap.mail.ru",
"yandex.ru": "imap.yandex.ru",
"ya.ru": "imap.yandex.ru",
"yandex.com": "imap.yandex.ru",
"yandex.kz": "imap.yandex.ru",
"yandex.by": "imap.yandex.ru",
"rambler.ru": "imap.rambler.ru",
"yahoo.com": "imap.mail.yahoo.com",
"yahoo.co.uk": "imap.mail.yahoo.com",
"yahoo.fr": "imap.mail.yahoo.com",
"yahoo.de": "imap.mail.yahoo.com",
"yahoo.es": "imap.mail.yahoo.com",
"yahoo.it": "imap.mail.yahoo.com",
"yahoo.co.jp": "imap.mail.yahoo.com",
"icloud.com": "imap.mail.me.com",
"me.com": "imap.mail.me.com",
"mac.com": "imap.mail.me.com",
"protonmail.com": "imap.protonmail.com",
"proton.me": "imap.protonmail.com",
"zoho.com": "imap.zoho.com",
"gmx.com": "imap.gmx.com",
"gmx.net": "imap.gmx.net",
"gmx.de": "imap.gmx.net",
"web.de": "imap.web.de",
"t-online.de": "secureimap.t-online.de",
"aol.com": "imap.aol.com",
"firstmail.ltd": "mail.firstmail.ltd",
"notletters.com": "mail.notletters.com",
}
IMAP_FALLBACKS = [
"imap.{d}",
"mail.{d}",
"imap.mail.{d}",
"{d}",
]
def get_imap_server(email: str) -> str | None:
"""Определяет IMAP сервер по email"""
try:
domain = email.split("@")[1].lower().strip()
except (IndexError, AttributeError):
return None
if domain in COMMON:
return COMMON[domain]
for pattern in IMAP_FALLBACKS:
host = pattern.format(d=domain)
try:
print(f"[TRY] {host}")
imap = imaplib.IMAP4_SSL(host, timeout=10)
imap.logout()
print(f"[FOUND IMAP] {host}")
return host
except Exception:
continue
print(f"[NO IMAP] {domain}")
return None
def check(email: str, password: str) -> tuple[bool, str | int, str]:
"""Проверка аккаунта"""
server = get_imap_server(email)
if not server:
domain = email.split("@")[-1].lower()
return False, f"IMAP сервер не найден для домена: {domain}", ""
try:
with MailBox(server, timeout=TIMEOUT).login(email, password) as m:
status = m.folder.status("INBOX")
count = int(status.get("MESSAGES", 0))
return True, count, server
except Exception as e:
return False, str(e), server
def get_last_messages(email: str, password: str, server: str, limit: int = 20) -> list[dict]:
"""Получить последние письма"""
data = []
try:
with MailBox(server, timeout=TIMEOUT).login(email, password) as m:
for msg in m.fetch(limit=limit, reverse=True):
data.append({
"from": msg.from_ or "",
"subject": msg.subject or "(без темы)",
"date": str(msg.date)[:19] if msg.date else "",
"uid": msg.uid,
})
except Exception as e:
raise RuntimeError(f"Ошибка получения писем: {e}") from e
return data
def extract_code(text: str) -> str:
"""Извлекает код подтверждения"""
if not text:
return ""
patterns = [r"\b\d{4,8}\b", r"\b[A-Z0-9]{4,10}\b"]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(0)
return ""
def search_messages(
email: str, password: str, server: str,
search_by: str = "last", query: str = "", limit: int = 20
) -> list[dict]:
"""Поиск писем"""
data = []
try:
with MailBox(server, timeout=TIMEOUT).login(email, password, initial_folder='INBOX') as m:
fetched = list(m.fetch(limit=500, reverse=True, mark_seen=False))
for msg in fetched:
sender = (msg.from_ or "").lower()
subject = msg.subject or "(без темы)"
body = (msg.text or '') + '\n' + (msg.html or '')
matched = False
q = query.lower().strip()
if search_by == "from":
matched = q.replace('@', '') in sender
elif search_by == "subject":
matched = q in subject.lower()
elif search_by == "body":
matched = q in body.lower()
else:
matched = True
if matched:
data.append({
"from": msg.from_ or "",
"subject": subject,
"date": str(msg.date)[:19] if msg.date else "",
"uid": msg.uid,
"body": body[:500],
"code": extract_code(body),
})
if len(data) >= limit:
break
except Exception as e:
raise RuntimeError(f"Ошибка поиска писем: {e}") from e
return data
def delete_messages(
email: str, password: str, server: str,
delete_by: str = "all", query: str = ""
) -> int:
"""Удаление писем"""
deleted = 0
try:
with MailBox(server, timeout=TIMEOUT).login(email, password) as m:
if delete_by == "all":
criteria = A("ALL")
elif delete_by == "from":
criteria = A(from_=query)
elif delete_by == "subject":
criteria = A(subject=query)
else:
criteria = A("ALL")
uids = [msg.uid for msg in m.fetch(criteria)]
if uids:
m.delete(uids)
deleted = len(uids)
except Exception as e:
raise RuntimeError(f"Ошибка удаления писем: {e}") from e
return deleted