Загрузка данных
from fastapi import APIRouter, Request#, Depends
#from auth import verify_api_key_api
from fastapi.responses import JSONResponse
import requests
import json
import re
import datetime
import pytz
router = APIRouter()
ACCESS_KEY = "^5m2wW4^NE4YRaSEusK@JaPoQ"
BITX_URL = "https://stolitsa-spisaniya.bitrix24.ru/rest/11/u1rs8ycuqdz1e4fg"
STATUS_URL = f"{BITX_URL}/crm.status.get"
CONTACT_URL = f"{BITX_URL}/crm.contact.list"
LEAD_URL = f"{BITX_URL}/crm.lead.list"
LEAD_GET_URL = f"{BITX_URL}/crm.lead.get"
DEAL_URL = f"{BITX_URL}/crm.deal.list"
CLIENT_URL = f"https://vk-ads.maksimnazarov.dev/api/v1/bitrix/15Z973Upt1Sd7s5uhzDTudPDHU72Dd17/status"
STATUS_LIST_URL = f"{BITX_URL}/crm.status.list"
DEAL_GET_URL = f"{BITX_URL}/crm.deal.get"
def lg(message, log_file='application.log'):
tz_msk = pytz.timezone('Europe/Moscow')
current_time = datetime.datetime.now(tz_msk)
timestamp = current_time.strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"{timestamp} - {message}\n"
with open(log_file, 'a', encoding='utf-8') as file:
file.write(log_entry)
def modify_phone(phone):
if phone and phone[0] == '8':
return '7' + phone[1:]
return phone
def check_btx(phone, url):
if url == CONTACT_URL:
lg("Parsing contacts")
elif url == LEAD_URL:
lg("Parsing leads")
payload = {
"filter[PHONE]": phone,
"select[]": ["ID"]
}
response = requests.get(url, params=payload)
data = response.json()
if "ID" in str(json.loads(json.dumps(data))):
return True
return False
def checker_btx(stock_phone, stock_clear_phone):
try:
urls = [CONTACT_URL, LEAD_URL]
phone_types = ["7", "+", "8", "+"]
lg(f"STOCK CLEAR: {stock_clear_phone}")
n = 0
for url in urls:
lg(f"Parsing {stock_phone}")
result = check_btx(stock_phone, url)
if result == True:
return True
for url in urls:
lg(f"Parsing {stock_clear_phone}")
result = check_btx(stock_clear_phone, url)
if result == True:
return True
for url in urls:
for ptype in phone_types:
if n == 1:
phone = ptype + phone
n = 0
else:
phone = ptype + stock_clear_phone[1:]
n = 1
lg(f"Parsing {phone}")
result = check_btx(phone, url)
if result == True:
return True
lg("poshlo")
for url in urls:
positions = [1, 4, 7, 9]
s1 = stock_clear_phone
lg(f"first {s1}")
for pos in sorted(positions, reverse=True):
s1 = s1[:pos] + " " + s1[pos:]
lg(f"Parsing {s1}")
result = check_btx(s1, url)
if result == True:
return True
for url in urls:
positions = [1, 4] # где вставить пробел
positions2 = [9, 11]
s2 = stock_clear_phone
lg(f"first {s2}")
for pos in sorted(positions, reverse=True):
s2 = s2[:pos] + " " + s2[pos:]
for pos in sorted(positions2, reverse=True):
s2 = s2[:pos] + "-" + s2[pos:]
lg(f"Parsing {s2}")
result = check_btx(s2, url)
if result == True:
return True
for url in urls:
if stock_clear_phone.startswith("7"):
positions = [1, 4, 7, 9]
s3 = stock_clear_phone
for pos in sorted(positions, reverse=True):
s3 = s3[:pos] + " " + s3[pos:]
s3 = "+" + s3
lg(f"Parsing {s3}")
result = check_btx(s3, url)
if result == True:
return True
else:
s4 = stock_clear_phone
s4 = s4[1:]
s4 = "7" + s4
positions = [1, 4, 7, 9]
lg(f"first {s4}")
for pos in sorted(positions, reverse=True):
s4 = s4[:pos] + " " + s4[pos:]
s4 = "+" + s4
lg(f"Parsing {s4}")
result = check_btx(s4, url)
if result == True:
return True
for url in urls:
if stock_clear_phone.startswith("7"):
positions = [1, 4]
positions2 = [9, 11]
s5 = stock_clear_phone
for pos in sorted(positions, reverse=True):
s5 = s5[:pos] + " " + s5[pos:]
for pos in sorted(positions2, reverse=True):
s5 = s5[:pos] + "-" + s5[pos:]
s5 = "+" + s5
lg(f"Parsing {s5}")
result = check_btx(s5, url)
if result == True:
return True
else:
s6 = stock_clear_phone
s6 = s6[1:]
s6 = "7" + s6
positions = [1, 4]
positions2 = [9, 11]
for pos in sorted(positions, reverse=True):
s6 = s6[:pos] + " " + s6[pos:]
for pos in sorted(positions2, reverse=True):
s6 = s6[:pos] + "-" + s6[pos:]
s6 = "+" + s6
lg(f"Parsing {s6}")
result = check_btx(s6, url)
if result == True:
return True
for url in urls:
s7 = stock_clear_phone
s7 = s7[1:]
s7 = "8"+s7
positions = [1, 4]
positions2 = [9, 11]
for pos in sorted(positions, reverse=True):
s7 = s7[:pos] + " " + s7[pos:]
for pos in sorted(positions2, reverse=True):
s7 = s7[:pos] + "-" + s7[pos:]
lg(f"Parsing {s7}")
result = check_btx(s7, url)
if result == True:
return True
for url in urls:
s8 = stock_clear_phone
s8 = s8[1:]
s8 = "8"+s8
positions = [1, 4, 7, 9]
for pos in sorted(positions, reverse=True):
s8 = s8[:pos] + " " + s8[pos:]
lg(f"Parsing {s8}")
result = check_btx(s8, url)
if result == True:
return True
return False
except Exception as e:
return {
"error": "request_failed",
"error_description": str(e)
}
@router.api_route("/{new_ACCESS_KEY}/doubleCheck", methods=["GET", "POST"])
async def double_check(new_ACCESS_KEY: str, request: Request):
if new_ACCESS_KEY != ACCESS_KEY:
return JSONResponse({
"error": "bad api key",
"error_description": "invalid api key, get new with your admin."
}, status_code=401)
try:
phone = None
# Аналог request.form.get("phone")
if request.method == "POST":
form = await request.form()
phone = form.get("phone")
# Аналог request.is_json + request.json.get(...)
if request.method == "POST":
try:
json_data = await request.json()
if isinstance(json_data, dict):
phone = json_data.get("phone", phone)
except:
pass
# Валидация
if not phone:
return JSONResponse({
"error": "bad_request",
"error_description": "Parameter 'phone' is required"
}, status_code=400)
if not phone.startswith(("7", "8", "+7", "+8")):
return JSONResponse({
"error": "bad_phone_format",
"error_description": "Phone must match format 71234567890 / +71234567890"
}, status_code=400)
# Очистка номера (без изменений)
phone_clear = phone.replace("+", "")
phone_clear = phone_clear.replace("-", "")
phone_clear = phone_clear.replace("(", "")
phone_clear = phone_clear.replace(")", "")
phone_clear = phone_clear.replace(" ", "")
lg(f"new request: \nPhone: {phone}")
duplicates_found = checker_btx(phone, phone_clear)
lg(f'"result": {duplicates_found}')
return JSONResponse({"result": duplicates_found})
except Exception as e:
return JSONResponse({
"error": "server_error",
"error_description": str(e)
}, status_code=500)
@router.api_route("/{new_ACCESS_KEY}/changed_status", methods=["POST"])
async def changed_status(new_ACCESS_KEY: str, request: Request):
if new_ACCESS_KEY != ACCESS_KEY:
return JSONResponse({
"error": "bad api key",
"error_description": "invalid api key, get new with your admin."
}, status_code=401)
lg(f"request args: {request.query_params}")
try:
data = request.query_params
lead_id = int(data["lead_id"])
deal_id = int(data["deal_id"])
lg(f"leadid: {lead_id}")
lg(f"dealid: {deal_id}")
if deal_id == 0:
lg(f"lead_id: {lead_id}")
params = {
"ID": lead_id,
"SELECT": ["UF_CRM_1768824257", "ID", "STATUS_ID"]
}
lead = requests.post(LEAD_GET_URL, json=params).json()["result"]
system313_id = lead.get("UF_CRM_1768824257")
lg(f"system313 id: {system313_id}")
params = {
"FILTER": {
"LEAD_ID": int(lead_id)
},
"SELECT": ["ID", "STAGE_ID", "UF_CRM_1765974363"]
}
lg("trying get a deals")
try:
deals = requests.post(DEAL_URL, json=params).json()
if "id" in str(json.loads(json.dumps(deals))).lower():
lg("request from lead BUT deal already create. Skip.")
return JSONResponse({
"status": "request from lead BUT deal already create. Skip."
})
else:
lg(f"dont find a deal.")
params = {
"filter": {"STATUS_ID": lead.get("STATUS_ID")},
"select": ["NAME"]
}
lg(f"status id: {lead.get('STATUS_ID')}")
status_response = requests.post(
STATUS_LIST_URL, json=params
).json()["result"][0]
lg(f"status: {status_response.get('NAME')}")
params = {
"id": int(system313_id),
"status": status_response.get("NAME")
}
response = requests.post(CLIENT_URL, json=params)
lg(f"request to client status code: {response.status_code}")
except Exception as e:
lg(e)
print(e)
else:
lg(f"find a deal: {deal_id}")
params = {
"ID": deal_id,
"SELECT": ["UF_CRM_696E1FAA1CDB2", "ID", "UF_CRM_1765974363", "STAGE_ID"]
}
deals = requests.post(DEAL_GET_URL, json=params)
deals = deals.json()["result"]
system313_id = deals.get("UF_CRM_696E1FAA1CDB2")
lg("system313: " + system313_id)
params = {
"filter": {"STATUS_ID": deals.get("STAGE_ID")},
"select": ["NAME"]
}
status_response = requests.post(
STATUS_LIST_URL, json=params
).json()["result"][0]
status = status_response.get("NAME")
lg(f"status: {status}")
if deals.get("UF_CRM_1765974363"):
lg(f"date of deal: {deals.get('UF_CRM_1765974363')[:10]}")
params = {
"id": int(system313_id),
"status": status_response.get("NAME"),
"contractDate": deals.get("UF_CRM_1765974363")[:10]
}
response = requests.post(CLIENT_URL, json=params)
lg(response.status_code)
else:
lg(f"date of deal is none.")
params = {
"id": int(system313_id),
"status": status_response.get("NAME")
}
response = requests.post(CLIENT_URL, json=params)
lg(response.status_code)
return JSONResponse({"status": "Test_True"})
except Exception as e:
print(e)
return JSONResponse({
"error": "server_error",
"error_description": str(e)
}, status_code=500)