Загрузка данных


import os
import json
from datetime import datetime
from kafka import KafkaConsumer
import psycopg2
from psycopg2.extras import execute_values

SQL_REQ = """
INSERT INTO sb_amelia.request_new (
  code, name, declarer_personal_number, declarer_fio_name,
  estimator_code, estimator_fio_name,
  issue_id, issue_sap_id, issue_state, issue_total_sum, issue_description,
  manager_code, manager_fio_name, rate_value,
  service_name, service_id,
  message_date, message_id,
  usnz_code, usnz_fio_name,
  work_category_name, work_category_id,
  building_address, building_name, priority_id
) VALUES %s
"""
#
SQL_STATE = """
INSERT INTO sb_amelia.request_state_new (
  issue_sap_id, issue_description, issue_id,
  message_date, message_id,
  status_datetime, status_code, status_name
) VALUES %s
"""
# безопасно достаем данные из JSON, не валимся с ошибкой если узла нет, возвращает default  в этом случае
def _safe_get(dct, *path, default=""):
    cur = dct
    for p in path:
        if not isinstance(cur, dict) or p not in cur:
            return default
        cur = cur[p]
    return cur if cur is not None else default

# преобразование строки в datetime
def _parse_dt(s, fmt):
    if not s:
        return None
    return datetime.strptime(s, fmt)

#нормализуем текстовое описание, убираем <p> если оно там есть
def _clean_desc(s):
    if not s:
        return ""
    return s.replace("<p>", "").replace("</p>", "")[:2000]

# парсим динамические параметры
def _extract_dynamic(parsed):
    # ожидаем структуру : dynamic_params = [{external_id, value:{external_id/full_name/...}}, ...]
    dyn = {"Estimator": None, "Manager": None, "Price": None, "USNZ": None}
    for item in parsed.get("dynamic_params", []) or []:
        k = item.get("external_id")
        if k in dyn:
            dyn[k] = item.get("value")
    return dyn

# сливает накопленный батч в БД и делает коммит
def _flush(conn, cur, req_rows, state_rows):
    if not req_rows and not state_rows:
        return
    # одна транзакция на батч
    execute_values(cur, SQL_REQ, req_rows, page_size=min(len(req_rows), 1000)) if req_rows else None
    execute_values(cur, SQL_STATE, state_rows, page_size=min(len(state_rows), 1000)) if state_rows else None
    conn.commit()

def handler(context, event):
    topic = os.getenv("KAFKA_TOPIC")
    brokers = os.getenv("KAFKA_BROKERS", "").split(",")
    group_id = os.getenv("KAFKA_GROUP_ID")
    auto_offset_reset = os.getenv("KAFKA_AUTO_OFFSET_RESET", "earliest")
    consumer_timeout_ms = int(os.getenv("KAFKA_CONSUMER_TIMEOUT_MS", "5000"))
    batch_size = int(os.getenv("BATCH_SIZE", "500"))

    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=brokers,
        security_protocol=os.getenv("KAFKA_SECURITY_PROTOCOL", "SASL_SSL"),
        sasl_mechanism=os.getenv("KAFKA_SASL_MECHANISM", "SCRAM-SHA-512"),
        sasl_plain_username=os.getenv("KAFKA_USER"),
        sasl_plain_password=os.getenv("KAFKA_PASSWORD"),
        # ssl_cafile=os.getenv("KAFKA_SSL_CAFILE") or None,
        enable_auto_commit=False,
        group_id=group_id,
        auto_offset_reset=auto_offset_reset,
        consumer_timeout_ms=consumer_timeout_ms,
        value_deserializer=lambda m: m,
    )

    gp_dsn = os.getenv("GP_DSN")
    conn = psycopg2.connect(gp_dsn)
    cur = conn.cursor()

    req_rows = []
    state_rows = []
    processed = 0
    errors = 0
    log_rows = False                      # признак логирования каждого обработанного сообщения (только IDшники)
    last_offsets = {}                    # (topic, partition) -> последний просмотренный оффсет

    try:
        for msg in consumer:
            # Kafka координаты ("где отвалилось") для логирования
            topic = getattr(msg, "topic", None)
            partition = getattr(msg, "partition", None)
            offset = getattr(msg, "offset", None)
            last_offsets[(topic, partition)] = offset

            raw = msg.value
            if not raw:
                if log_rows:
                    context.logger.warning(
                        "SKIP empty: kafka=%s/%s/%s",
                        topic, partition, offset
                    )
                    errors += 1
                continue

            # парсим JSON
            try:
                parsed = json.loads(raw.decode("utf-8"))
            except Exception as ex:
                context.logger.exception(
                    "SKIP bad_json: kafka=%s/%s/%s err=%s",
                    topic, partition, offset, ex
                )
                errors += 1
                continue

            if "external_id" not in parsed:
                if log_rows:
                    context.logger.warning(
                        "SKIP no_external_id: kafka=%s/%s/%s",
                        topic, partition, offset
                    )
                    errors += 1
                continue

            # dynamic_params
            try:
                dyn = _extract_dynamic(parsed)
            except Exception as ex:
                context.logger.exception(
                    "SKIP dyn_parse_error: kafka=%s/%s/%s err=%s",
                    topic, partition, offset, ex
                )
                errors += 1
                continue

            estimator = dyn.get("Estimator") or {}
            manager   = dyn.get("Manager") or {}
            usnz      = dyn.get("USNZ") or {}

            # даты
            # message_date = status_history[-1].status.created_at
            current_status_external_id = _safe_get(parsed, "status", "external_id", default="")
            status_history = parsed.get("status_history") or []

            try:
                matched_status_item = next(
                    (
                        item
                        for item in reversed(status_history)
                        if _safe_get(item, "status", "external_id", default="") == current_status_external_id
                    ),
                    None
                )

                status_transition_raw = _safe_get(
                    matched_status_item or {},
                    "status_transition_date",
                    default=""
                )

                message_date = (
                    _parse_dt(status_transition_raw, "%Y-%m-%dT%H:%M:%S.%fZ")
                    if status_transition_raw else None
                )

            except Exception as ex:
                context.logger.exception(
                    "SKIP bad_message_date: kafka=%s/%s/%s status_external_id=%s value=%s err=%s",
                    topic, partition, offset, current_status_external_id,
                    status_transition_raw if 'status_transition_raw' in locals() else None,
                    ex
                )
                errors += 1
                continue

            status_created = _safe_get(parsed, "status", "created_at", default="")
            try:
                status_dt = _parse_dt(status_created, "%Y-%m-%dT%H:%M:%S.%fZ") if status_created else None
            except Exception as ex:
                context.logger.exception(
                    "SKIP bad_status_dt: kafka=%s/%s/%s value=%s err=%s",
                    topic, partition, offset, status_created, ex
                )
                errors += 1
                continue

            desc = _clean_desc(parsed.get("description", ""))

            # --- Логируем каждое сообшение (только ID) ---
            issue_id       = str(parsed.get("id", ""))
            issue_sap_id   = str(parsed.get("external_id", ""))
            message_id     = _safe_get(parsed, "identifier", "id")
            status_code    = _safe_get(parsed, "status", "external_id")
            executor_code  = _safe_get(parsed, "executor", "external_id")
            declarer_code  = _safe_get(parsed, "declarer", "external_id")
            estimator_code = estimator.get("external_id", "") if isinstance(estimator, dict) else ""
            manager_code   = manager.get("external_id", "") if isinstance(manager, dict) else ""
            usnz_code      = usnz.get("external_id", "") if isinstance(usnz, dict) else ""

            if log_rows:
                context.logger.info(
                    "ROW ids: kafka=%s/%s/%s issue_id=%s issue_sap_id=%s message_id=%s status=%s "
                    "exec=%s decl=%s est=%s mgr=%s usnz=%s",
                    topic, partition, offset,
                    issue_id, issue_sap_id, message_id, status_code,
                    executor_code, declarer_code, estimator_code, manager_code, usnz_code
                )

            # --- подготовка строк к вставке в 2 таблицы ---
            req_rows.append((
                _safe_get(parsed, "executor", "external_id"),
                _safe_get(parsed, "executor", "full_name"),
                _safe_get(parsed, "declarer", "external_id"),
                _safe_get(parsed, "declarer", "full_name"),
                estimator.get("external_id", "") if isinstance(estimator, dict) else "",
                estimator.get("full_name", "") if isinstance(estimator, dict) else "",
                issue_id,
                issue_sap_id,
                status_code,
                dyn.get("Price", "") if not isinstance(dyn.get("Price"), dict) else json.dumps(dyn.get("Price")),
                desc,
                manager_code,
                manager.get("full_name", "") if isinstance(manager, dict) else "",
                _safe_get(parsed, "rate", "value"),
                _safe_get(parsed, "service", "title"),
                _safe_get(parsed, "service", "id"),
                message_date,
                message_id,
                usnz_code,
                usnz.get("full_name", "") if isinstance(usnz, dict) else "",
                _safe_get(parsed, "work_category", "title"),
                _safe_get(parsed, "work_category", "id"),
                _safe_get(parsed, "building", "address"),
                _safe_get(parsed, "building", "title"),
                _safe_get(parsed, "priority", "id"),
            ))

            state_rows.append((
                issue_sap_id,
                desc,
                issue_id,
                message_date,
                message_id,
                status_dt,
                status_code,
                _safe_get(parsed, "status", "title"),
            ))

            processed += 1

            # --- если батч наполнился, сливаем егос в БД и коммитим оффсеты Kafka ---
            if processed % batch_size == 0:
                context.logger.info(
                    "FLUSH batch: size=%s processed=%s last_offsets=%s",
                    batch_size, processed, last_offsets
                )
                _flush(conn, cur, req_rows, state_rows)  # commits DB txn
                consumer.commit()                        # commits Kafka offsets after DB commit
                context.logger.info(
                    "COMMIT ok: processed=%s committed_offsets=%s",
                    processed, last_offsets
                )
                req_rows.clear()
                state_rows.clear()

        # --- Досливаем остатки ---
        if req_rows or state_rows:
            context.logger.info(
                "FINAL flush: req=%s state=%s processed=%s last_offsets=%s",
                len(req_rows), len(state_rows), processed, last_offsets
            )
            _flush(conn, cur, req_rows, state_rows)
            consumer.commit()
            context.logger.info(
                "FINAL commit ok: processed=%s committed_offsets=%s errors=%s",
                processed, last_offsets, errors
            )

        return f"OK. processed={processed} ERRORS={errors}"

    except Exception as e:
        context.logger.exception(
            "FATAL: processed=%s last_offsets=%s err=%s",
            processed, last_offsets, e
        )
        conn.rollback()
        raise

    finally:
        try:
            cur.close()
        except Exception:
            pass
        try:
            conn.close()
        except Exception:
            pass
        try:
            consumer.close()
        except Exception:
            pass