Загрузка данных
from __future__ import annotations
import csv
import os
import re
import statistics
from pathlib import Path
import psycopg
from dotenv import load_dotenv
from psycopg.conninfo import make_conninfo
BASE_DIR = Path(__file__).resolve().parents[1]
load_dotenv(BASE_DIR / ".env")
SCHEMA = os.getenv("DB_SCHEMA", "okato")
# Для CREATE INDEX нужен владелец таблиц или superuser.
# Лучше запускать от postgres.
DB_USER = os.getenv("RESEARCH_DB_USER", os.getenv("DB_ADMIN_USER", "postgres"))
DB_PASSWORD = os.getenv("RESEARCH_DB_PASSWORD", os.getenv("DB_ADMIN_PASSWORD", ""))
CONNINFO = make_conninfo(
host=os.getenv("DB_HOST", "127.0.0.1"),
port=int(os.getenv("DB_PORT", "5432")),
dbname=os.getenv("DB_NAME", "okato_db"),
user=DB_USER,
password=DB_PASSWORD if DB_PASSWORD else None,
sslmode=os.getenv("DB_SSLMODE", "prefer"),
)
REPEAT_COUNT = 5
INDEXES = [
{
"name": "idx_research_territory_object_parent",
"create": f"""
CREATE INDEX idx_research_territory_object_parent
ON {SCHEMA}.territory_object(parent_object_id);
""",
},
{
"name": "idx_research_territory_object_filter",
"create": f"""
CREATE INDEX idx_research_territory_object_filter
ON {SCHEMA}.territory_object(kind_id, area_category_id, is_active, okato_code);
""",
},
{
"name": "idx_research_name_history_object_date",
"create": f"""
CREATE INDEX idx_research_name_history_object_date
ON {SCHEMA}.object_name_history(object_id, effective_date DESC, name_history_id DESC);
""",
},
{
"name": "idx_research_deactivation_history_object_date",
"create": f"""
CREATE INDEX idx_research_deactivation_history_object_date
ON {SCHEMA}.object_deactivation_history(object_id, effective_date DESC, deactivation_history_id DESC);
""",
},
{
"name": "idx_research_official_person_object_start",
"create": f"""
CREATE INDEX idx_research_official_person_object_start
ON {SCHEMA}.object_official_person(object_id, start_date DESC);
""",
},
]
QUERIES = [
{
"name": "Получение дочерних объектов",
"sql": f"""
SELECT
object_id,
okato_code,
name,
is_active,
area_category_id,
kind_id,
parent_object_id
FROM {SCHEMA}.territory_object
WHERE parent_object_id = 1
ORDER BY object_id;
""",
},
{
"name": "Фильтрация объектов классификатора",
"sql": f"""
SELECT
t.object_id,
t.okato_code,
t.name,
t.is_active,
t.parent_object_id,
k.name AS kind_name,
ac.name AS area_category_name
FROM {SCHEMA}.territory_object t
JOIN {SCHEMA}.kind_catalog k
ON k.kind_id = t.kind_id
JOIN {SCHEMA}.area_category_catalog ac
ON ac.area_category_id = t.area_category_id
WHERE t.kind_id = 1
AND t.area_category_id = 1
AND t.is_active = TRUE
ORDER BY t.okato_code, t.object_id
LIMIT 25;
""",
},
{
"name": "Получение истории переименований",
"sql": f"""
SELECT
h.name_history_id,
h.old_name,
h.new_name,
h.effective_date,
d.document_id,
d.name AS document_name,
d.authority
FROM {SCHEMA}.object_name_history h
JOIN {SCHEMA}.source_document d
ON d.document_id = h.document_id
WHERE h.object_id = 1
ORDER BY h.effective_date DESC, h.name_history_id DESC;
""",
},
{
"name": "Получение истории исключений",
"sql": f"""
SELECT
h.deactivation_history_id,
h.effective_date,
d.document_id,
d.name AS document_name,
d.authority
FROM {SCHEMA}.object_deactivation_history h
JOIN {SCHEMA}.source_document d
ON d.document_id = h.document_id
WHERE h.object_id = 45372
ORDER BY h.effective_date DESC, h.deactivation_history_id DESC;
""",
},
{
"name": "Получение должностных лиц объекта",
"sql": f"""
SELECT
oop.object_id,
oop.person_id,
oop.start_date,
oop.end_date,
p.last_name,
p.first_name,
p.middle_name,
p.birth_date
FROM {SCHEMA}.object_official_person oop
JOIN {SCHEMA}.official_person p
ON p.person_id = oop.person_id
WHERE oop.object_id = 1
ORDER BY oop.start_date DESC, p.last_name, p.first_name;
""",
},
]
EXECUTION_TIME_RE = re.compile(r"Execution Time: ([0-9.]+) ms")
def connect() -> psycopg.Connection:
return psycopg.connect(CONNINFO)
def execute_ddl(conn: psycopg.Connection, sql: str) -> None:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
def drop_research_indexes(conn: psycopg.Connection) -> None:
for index in INDEXES:
execute_ddl(conn, f"DROP INDEX IF EXISTS {SCHEMA}.{index['name']};")
def create_research_indexes(conn: psycopg.Connection) -> None:
for index in INDEXES:
execute_ddl(conn, index["create"])
def analyze_tables(conn: psycopg.Connection) -> None:
tables = [
"territory_object",
"object_name_history",
"object_deactivation_history",
"object_official_person",
"official_person",
"source_document",
"kind_catalog",
"area_category_catalog",
]
for table in tables:
execute_ddl(conn, f"ANALYZE {SCHEMA}.{table};")
def explain_analyze(conn: psycopg.Connection, sql: str) -> tuple[float, str]:
explain_sql = "EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)\n" + sql
with conn.cursor() as cur:
cur.execute(explain_sql)
rows = cur.fetchall()
plan = "\n".join(row[0] for row in rows)
match = EXECUTION_TIME_RE.search(plan)
if match is None:
raise RuntimeError("Не удалось найти Execution Time в плане запроса.")
return float(match.group(1)), plan
def measure_query(conn: psycopg.Connection, sql: str) -> tuple[float, list[float], str]:
times = []
last_plan = ""
for _ in range(REPEAT_COUNT):
execution_time, plan = explain_analyze(conn, sql)
times.append(execution_time)
last_plan = plan
return statistics.mean(times), times, last_plan
def percent_change(before: float, after: float) -> float:
if before == 0:
return 0.0
return (before - after) / before * 100
def save_csv(rows: list[dict]) -> None:
output_path = BASE_DIR / "research_results.csv"
with output_path.open("w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(
file,
fieldnames=[
"Запрос",
"Без индексов, мс",
"С индексами, мс",
"Изменение, %",
],
delimiter=";",
)
writer.writeheader()
writer.writerows(rows)
def save_latex_table(rows: list[dict]) -> None:
output_path = BASE_DIR / "research_results.tex"
lines = [
r"\begin{table}[H]",
r" \centering",
r" \caption{Результаты исследования влияния индексов на время выполнения запросов}",
r" \label{tab:research_results}",
r" \begin{tabular}{|p{0.39\textwidth}|p{0.17\textwidth}|p{0.17\textwidth}|p{0.17\textwidth}|}",
r" \hline",
r" \textbf{Запрос} & \textbf{Без индексов, мс} & \textbf{С индексами, мс} & \textbf{Изменение, \%} \\",
r" \hline",
]
for row in rows:
lines.append(
" "
+ row["Запрос"]
+ " & "
+ row["Без индексов, мс"]
+ " & "
+ row["С индексами, мс"]
+ " & "
+ row["Изменение, %"]
+ r" \\"
)
lines.append(r" \hline")
lines.extend(
[
r" \end{tabular}",
r"\end{table}",
]
)
output_path.write_text("\n".join(lines), encoding="utf-8")
def save_plans(plans: dict[str, dict[str, str]]) -> None:
output_path = BASE_DIR / "research_plans.txt"
parts = []
for query_name, query_plans in plans.items():
parts.append("=" * 100)
parts.append(query_name)
parts.append("=" * 100)
parts.append("\n--- Без исследуемых индексов ---\n")
parts.append(query_plans["without_indexes"])
parts.append("\n--- С исследуемыми индексами ---\n")
parts.append(query_plans["with_indexes"])
output_path.write_text("\n".join(parts), encoding="utf-8")
def main() -> None:
rows = []
plans = {}
with connect() as conn:
print("1. Удаление исследуемых индексов...")
drop_research_indexes(conn)
analyze_tables(conn)
print("2. Замеры без исследуемых индексов...")
without_indexes: dict[str, tuple[float, list[float], str]] = {}
for query in QUERIES:
avg_time, times, plan = measure_query(conn, query["sql"])
without_indexes[query["name"]] = (avg_time, times, plan)
print(f" {query['name']}: {avg_time:.3f} мс")
print("3. Создание исследуемых индексов...")
create_research_indexes(conn)
analyze_tables(conn)
print("4. Замеры с исследуемыми индексами...")
with_indexes: dict[str, tuple[float, list[float], str]] = {}
for query in QUERIES:
avg_time, times, plan = measure_query(conn, query["sql"])
with_indexes[query["name"]] = (avg_time, times, plan)
print(f" {query['name']}: {avg_time:.3f} мс")
for query in QUERIES:
name = query["name"]
before = without_indexes[name][0]
after = with_indexes[name][0]
change = percent_change(before, after)
rows.append(
{
"Запрос": name,
"Без индексов, мс": f"{before:.3f}",
"С индексами, мс": f"{after:.3f}",
"Изменение, %": f"{change:.2f}",
}
)
plans[name] = {
"without_indexes": without_indexes[name][2],
"with_indexes": with_indexes[name][2],
}
save_csv(rows)
save_latex_table(rows)
save_plans(plans)
print("\nГотово.")
print(f"CSV: {BASE_DIR / 'research_results.csv'}")
print(f"LaTeX: {BASE_DIR / 'research_results.tex'}")
print(f"Планы запросов: {BASE_DIR / 'research_plans.txt'}")
if __name__ == "__main__":
main()
Окей, делаем исследование по индексам. Это подходит под твою работу, потому что у тебя уже описаны PostgreSQL, запросы к таблицам, интерфейс поиска/просмотра и Redis как кэш для части данных . Ниже даю **готовый скрипт**, который сам:
1. удаляет исследуемые индексы;
2. замеряет запросы без этих индексов;
3. создает индексы;
4. снова замеряет запросы;
5. сохраняет результаты в `research_results.csv`, `research_results.tex`, `research_plans.txt`.
Важно: запускать надо на **локальной копии базы**, не на единственной рабочей БД.
---
## 1. Создай файл
В корне проекта `code/new_version` создай папку:
```bash
mkdir research
```
Создай файл:
```bash
research/research_indexes.py
```
И вставь туда код:
```python
from __future__ import annotations
import csv
import os
import re
import statistics
from pathlib import Path
import psycopg
from dotenv import load_dotenv
from psycopg.conninfo import make_conninfo
BASE_DIR = Path(__file__).resolve().parents[1]
load_dotenv(BASE_DIR / ".env")
SCHEMA = os.getenv("DB_SCHEMA", "okato")
# Для CREATE INDEX нужен владелец таблиц или superuser.
# Лучше запускать от postgres.
DB_USER = os.getenv("RESEARCH_DB_USER", os.getenv("DB_ADMIN_USER", "postgres"))
DB_PASSWORD = os.getenv("RESEARCH_DB_PASSWORD", os.getenv("DB_ADMIN_PASSWORD", ""))
CONNINFO = make_conninfo(
host=os.getenv("DB_HOST", "127.0.0.1"),
port=int(os.getenv("DB_PORT", "5432")),
dbname=os.getenv("DB_NAME", "okato_db"),
user=DB_USER,
password=DB_PASSWORD if DB_PASSWORD else None,
sslmode=os.getenv("DB_SSLMODE", "prefer"),
)
REPEAT_COUNT = 5
INDEXES = [
{
"name": "idx_research_territory_object_parent",
"create": f"""
CREATE INDEX idx_research_territory_object_parent
ON {SCHEMA}.territory_object(parent_object_id);
""",
},
{
"name": "idx_research_territory_object_filter",
"create": f"""
CREATE INDEX idx_research_territory_object_filter
ON {SCHEMA}.territory_object(kind_id, area_category_id, is_active, okato_code);
""",
},
{
"name": "idx_research_name_history_object_date",
"create": f"""
CREATE INDEX idx_research_name_history_object_date
ON {SCHEMA}.object_name_history(object_id, effective_date DESC, name_history_id DESC);
""",
},
{
"name": "idx_research_deactivation_history_object_date",
"create": f"""
CREATE INDEX idx_research_deactivation_history_object_date
ON {SCHEMA}.object_deactivation_history(object_id, effective_date DESC, deactivation_history_id DESC);
""",
},
{
"name": "idx_research_official_person_object_start",
"create": f"""
CREATE INDEX idx_research_official_person_object_start
ON {SCHEMA}.object_official_person(object_id, start_date DESC);
""",
},
]
QUERIES = [
{
"name": "Получение дочерних объектов",
"sql": f"""
SELECT
object_id,
okato_code,
name,
is_active,
area_category_id,
kind_id,
parent_object_id
FROM {SCHEMA}.territory_object
WHERE parent_object_id = 1
ORDER BY object_id;
""",
},
{
"name": "Фильтрация объектов классификатора",
"sql": f"""
SELECT
t.object_id,
t.okato_code,
t.name,
t.is_active,
t.parent_object_id,
k.name AS kind_name,
ac.name AS area_category_name
FROM {SCHEMA}.territory_object t
JOIN {SCHEMA}.kind_catalog k
ON k.kind_id = t.kind_id
JOIN {SCHEMA}.area_category_catalog ac
ON ac.area_category_id = t.area_category_id
WHERE t.kind_id = 1
AND t.area_category_id = 1
AND t.is_active = TRUE
ORDER BY t.okato_code, t.object_id
LIMIT 25;
""",
},
{
"name": "Получение истории переименований",
"sql": f"""
SELECT
h.name_history_id,
h.old_name,
h.new_name,
h.effective_date,
d.document_id,
d.name AS document_name,
d.authority
FROM {SCHEMA}.object_name_history h
JOIN {SCHEMA}.source_document d
ON d.document_id = h.document_id
WHERE h.object_id = 1
ORDER BY h.effective_date DESC, h.name_history_id DESC;
""",
},
{
"name": "Получение истории исключений",
"sql": f"""
SELECT
h.deactivation_history_id,
h.effective_date,
d.document_id,
d.name AS document_name,
d.authority
FROM {SCHEMA}.object_deactivation_history h
JOIN {SCHEMA}.source_document d
ON d.document_id = h.document_id
WHERE h.object_id = 45372
ORDER BY h.effective_date DESC, h.deactivation_history_id DESC;
""",
},
{
"name": "Получение должностных лиц объекта",
"sql": f"""
SELECT
oop.object_id,
oop.person_id,
oop.start_date,
oop.end_date,
p.last_name,
p.first_name,
p.middle_name,
p.birth_date
FROM {SCHEMA}.object_official_person oop
JOIN {SCHEMA}.official_person p
ON p.person_id = oop.person_id
WHERE oop.object_id = 1
ORDER BY oop.start_date DESC, p.last_name, p.first_name;
""",
},
]
EXECUTION_TIME_RE = re.compile(r"Execution Time: ([0-9.]+) ms")
def connect() -> psycopg.Connection:
return psycopg.connect(CONNINFO)
def execute_ddl(conn: psycopg.Connection, sql: str) -> None:
with conn.cursor() as cur:
cur.execute(sql)
conn.commit()
def drop_research_indexes(conn: psycopg.Connection) -> None:
for index in INDEXES:
execute_ddl(conn, f"DROP INDEX IF EXISTS {SCHEMA}.{index['name']};")
def create_research_indexes(conn: psycopg.Connection) -> None:
for index in INDEXES:
execute_ddl(conn, index["create"])
def analyze_tables(conn: psycopg.Connection) -> None:
tables = [
"territory_object",
"object_name_history",
"object_deactivation_history",
"object_official_person",
"official_person",
"source_document",
"kind_catalog",
"area_category_catalog",
]
for table in tables:
execute_ddl(conn, f"ANALYZE {SCHEMA}.{table};")
def explain_analyze(conn: psycopg.Connection, sql: str) -> tuple[float, str]:
explain_sql = "EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)\n" + sql
with conn.cursor() as cur:
cur.execute(explain_sql)
rows = cur.fetchall()
plan = "\n".join(row[0] for row in rows)
match = EXECUTION_TIME_RE.search(plan)
if match is None:
raise RuntimeError("Не удалось найти Execution Time в плане запроса.")
return float(match.group(1)), plan
def measure_query(conn: psycopg.Connection, sql: str) -> tuple[float, list[float], str]:
times = []
last_plan = ""
for _ in range(REPEAT_COUNT):
execution_time, plan = explain_analyze(conn, sql)
times.append(execution_time)
last_plan = plan
return statistics.mean(times), times, last_plan
def percent_change(before: float, after: float) -> float:
if before == 0:
return 0.0
return (before - after) / before * 100
def save_csv(rows: list[dict]) -> None:
output_path = BASE_DIR / "research_results.csv"
with output_path.open("w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(
file,
fieldnames=[
"Запрос",
"Без индексов, мс",
"С индексами, мс",
"Изменение, %",
],
delimiter=";",
)
writer.writeheader()
writer.writerows(rows)
def save_latex_table(rows: list[dict]) -> None:
output_path = BASE_DIR / "research_results.tex"
lines = [
r"\begin{table}[H]",
r" \centering",
r" \caption{Результаты исследования влияния индексов на время выполнения запросов}",
r" \label{tab:research_results}",
r" \begin{tabular}{|p{0.39\textwidth}|p{0.17\textwidth}|p{0.17\textwidth}|p{0.17\textwidth}|}",
r" \hline",
r" \textbf{Запрос} & \textbf{Без индексов, мс} & \textbf{С индексами, мс} & \textbf{Изменение, \%} \\",
r" \hline",
]
for row in rows:
lines.append(
" "
+ row["Запрос"]
+ " & "
+ row["Без индексов, мс"]
+ " & "
+ row["С индексами, мс"]
+ " & "
+ row["Изменение, %"]
+ r" \\"
)
lines.append(r" \hline")
lines.extend(
[
r" \end{tabular}",
r"\end{table}",
]
)
output_path.write_text("\n".join(lines), encoding="utf-8")
def save_plans(plans: dict[str, dict[str, str]]) -> None:
output_path = BASE_DIR / "research_plans.txt"
parts = []
for query_name, query_plans in plans.items():
parts.append("=" * 100)
parts.append(query_name)
parts.append("=" * 100)
parts.append("\n--- Без исследуемых индексов ---\n")
parts.append(query_plans["without_indexes"])
parts.append("\n--- С исследуемыми индексами ---\n")
parts.append(query_plans["with_indexes"])
output_path.write_text("\n".join(parts), encoding="utf-8")
def main() -> None:
rows = []
plans = {}
with connect() as conn:
print("1. Удаление исследуемых индексов...")
drop_research_indexes(conn)
analyze_tables(conn)
print("2. Замеры без исследуемых индексов...")
without_indexes: dict[str, tuple[float, list[float], str]] = {}
for query in QUERIES:
avg_time, times, plan = measure_query(conn, query["sql"])
without_indexes[query["name"]] = (avg_time, times, plan)
print(f" {query['name']}: {avg_time:.3f} мс")
print("3. Создание исследуемых индексов...")
create_research_indexes(conn)
analyze_tables(conn)
print("4. Замеры с исследуемыми индексами...")
with_indexes: dict[str, tuple[float, list[float], str]] = {}
for query in QUERIES:
avg_time, times, plan = measure_query(conn, query["sql"])
with_indexes[query["name"]] = (avg_time, times, plan)
print(f" {query['name']}: {avg_time:.3f} мс")
for query in QUERIES:
name = query["name"]
before = without_indexes[name][0]
after = with_indexes[name][0]
change = percent_change(before, after)
rows.append(
{
"Запрос": name,
"Без индексов, мс": f"{before:.3f}",
"С индексами, мс": f"{after:.3f}",
"Изменение, %": f"{change:.2f}",
}
)
plans[name] = {
"without_indexes": without_indexes[name][2],
"with_indexes": with_indexes[name][2],
}
save_csv(rows)
save_latex_table(rows)
save_plans(plans)
print("\nГотово.")
print(f"CSV: {BASE_DIR / 'research_results.csv'}")
print(f"LaTeX: {BASE_DIR / 'research_results.tex'}")
print(f"Планы запросов: {BASE_DIR / 'research_plans.txt'}")
if __name__ == "__main__":
main()
```
---
## 2. Что добавить в `.env`
В `.env` добавь пользователя, от которого можно создавать индексы. Обычно это `postgres`.
```env
RESEARCH_DB_USER=postgres
RESEARCH_DB_PASSWORD=твой_пароль_от_postgres
```
Если у тебя локально PostgreSQL пускает `postgres` без пароля, можно оставить:
```env
RESEARCH_DB_USER=postgres
RESEARCH_DB_PASSWORD=
```
Если будет ошибка прав на `CREATE INDEX`, значит запускаешь не от владельца таблиц. Тогда нужно запускать от `postgres` или владельца схемы `okato`.
---
## 3. Как запускать
Перейди в папку проекта:
```bash
cd code/new_version
```
Установи зависимости, если еще не установлены:
```bash
pip install -r requirements.txt
```
Запусти скрипт:
```bash
python research/research_indexes.py
```
После выполнения появятся файлы:
```text
research_results.csv
research_results.tex
research_plans.txt
```
---
## 4. Что вставлять в курсовую
В исследовательскую часть вставляешь таблицу из файла:
```latex
\input{research_results.tex}
```
Или просто открываешь `research_results.tex`, копируешь оттуда таблицу и вставляешь вручную.
`research_plans.txt` нужен как доказательство, что замеры реально получены через `EXPLAIN ANALYZE`. Оттуда можно взять 1–2 фрагмента плана, если преподаватель попросит.
---
## 5. Какие индексы исследуются
В скрипте исследуются вот эти индексы:
```sql
CREATE INDEX idx_research_territory_object_parent
ON okato.territory_object(parent_object_id);
CREATE INDEX idx_research_territory_object_filter
ON okato.territory_object(kind_id, area_category_id, is_active, okato_code);
CREATE INDEX idx_research_name_history_object_date
ON okato.object_name_history(object_id, effective_date DESC, name_history_id DESC);
CREATE INDEX idx_research_deactivation_history_object_date
ON okato.object_deactivation_history(object_id, effective_date DESC, deactivation_history_id DESC);
CREATE INDEX idx_research_official_person_object_start
ON okato.object_official_person(object_id, start_date DESC);
```
Не советую исследовать индекс по `okato_code`, потому что у тебя на `territory_object.okato_code` уже есть `UNIQUE`, а PostgreSQL для него и так создает индекс автоматически. Поэтому честнее исследовать именно дополнительные индексы для внешних ключей, истории и фильтрации.
---
## 6. Если надо быстро проверить вручную
Например, один запрос без скрипта:
```sql
DROP INDEX IF EXISTS okato.idx_research_territory_object_parent;
ANALYZE okato.territory_object;
EXPLAIN ANALYZE
SELECT
object_id,
okato_code,
name,
is_active,
area_category_id,
kind_id,
parent_object_id
FROM okato.territory_object
WHERE parent_object_id = 1
ORDER BY object_id;
CREATE INDEX idx_research_territory_object_parent
ON okato.territory_object(parent_object_id);
ANALYZE okato.territory_object;
EXPLAIN ANALYZE
SELECT
object_id,
okato_code,
name,
is_active,
area_category_id,
kind_id,
parent_object_id
FROM okato.territory_object
WHERE parent_object_id = 1
ORDER BY object_id;
```
В `EXPLAIN ANALYZE` смотри строку:
```text
Execution Time: ... ms
```
Эти числа и вставляешь в таблицу исследования.