from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
from datetime import datetime
import pytz
import json
from telethon import TelegramClient, errors, connection
import asyncio
ACCESS_KEY = "GX5h3ySf7niLY~6SVUew"
api_id = 21625627
api_hash = "3bd3846bd356186e961d83e10d322649"
app = FastAPI(prefix=f"/{ACCESS_KEY}/")
async def log(message, log_file='application.log'):
tz_msk = pytz.timezone('Europe/Moscow')
current_time = datetime.datetime.now(tz_msk)
timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"{timestamp} - {message}\n"
with open(log_file, 'a', encoding='utf-8') as file:
file.write(log_entry)
class OnGetting_ID(BaseModel):
status_code: int | None = 200
user_id: int | None = None
class TG_ID(BaseModel):
user_id: int
class OnGet_ID(BaseModel):
result: TG_ID
async def get_id(username: str) -> OnGetting_ID:
client = TelegramClient(
"session",
api_id,
api_hash
)
await client.connect()
if not await client.is_user_authorized():
asyncio.run(log("❌ login required"))
await client.start()
else:
asyncio.run(log("✅ session ok"))
try:
user = await client.get_entity(username)
return OnGetting_ID(user_id=int(user.id))
except errors.UsernameNotOccupiedError:
asyncio.run(log("! not found user id"))
return OnGetting_ID(status_code=404)
except Exception as e:
asyncio.run(log(e))
return OnGetting_ID(status_code=400)
await client.disconnect()
@app.post("get_id/{tg_username}")
async def start_get_id(tg_username: str) -> OnGet_ID:
status_code, user_id = get_id(tg_username)
if status_code != 200:
if status_code == 400:
return HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="unknown error"
)
elif status_code == 404:
return HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"id not found with username {tg_username}"
)
else:
return HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"unknown error"
)
return OnGet_ID(result=TG_ID(user_id=user_id))