import asyncio
import aiohttp
import time
import sys
# Настройки теста
URL = "https://bandit.cloudpub.ru/PROECT_ZDOH_GET_TORNADO"
TOTAL_REQUESTSTS = 100000 # Общее количество запросов (вернул как было)
CONCURRENT_REQUESTSTS = 50 # Количество одновременных соединений
async def send_request(session, request_id):
"""Отправляет один запрос и логирует результат."""
try:
start_time = time.time()
async with session.get(URL) as response:
status = response.status
await response.text() # Читаем тело ответа
end_time = time.time()
print(f"Запрос {request_id}: Статус {status}, Время: {end_time - start_time:.4f}")
except Exception as e:
print(f"Запрос {request_id} завершился с ошибкой: {e}")
async def bounded_request(semaphore, session, request_id):
"""Обёртка для контроля количества одновременных запросов."""
async with semaphore:
await send_request(session, request_id)
async def main():
"""Главная функция управления нагрузочным тестированием."""
print(f"СТАРТ ТЕСТА: Отправляю {TOTAL_REQUESTSTS} запросов...")
semaphore = asyncio.Semaphore(CONCURRENT_REQUESTSTS)
async with aiohttp.ClientSession() as session:
tasks = []
print("Создание задач...")
for i in range(TOTAL_REQUESTSTS):
task = asyncio.create_task(bounded_request(semaphore, session, i))
tasks.append(task)
print(f"Создано задач: {len(tasks)}. Ожидание выполнения запросов...")
# return_exceptions=True гарантирует, что сборка не упадет при первой же сетевой ошибке
await asyncio.gather(*tasks, return_exceptions=True)
print("ВСЕ ЗАДАЧИ ЗАВЕРШЕНЫ.")
if __name__ == "__main__":
print("Скрипт инициализирован.")
start_total = time.time()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("nТест прерван пользователем.")
except Exception as e:
print(f"КРИТИЧЕСКАЯ ОШИБКА В MAIN: {e}")
import traceback
traceback.print_exc()
print(f"Общее время работы скрипта: {time.time() - start_total:.2f} сек.")
sys.exit(0)