Загрузка данных


#!/bin/bash
set -euo pipefail

# ====== НАСТРОЙКИ ======
SRC_DB="postgresql://postgres:GwifiDk4rokd97HdpacqxtJH@localhost:5432/cp-advanced-sdr-processor"
DST_DB="postgresql://postgres:GwifiDk4rokd97HdpacqxtJH@192.168.15.211:5432/cp-advanced-sdr-processor"

SRC_TABLE="log_records_v2"
DST_TABLE="log_records_v2"
STAGING_TABLE="log_records_v2_staging1"

BATCH_SIZE=3000
TMP_FILE="/tmp/migrate_data_$$.csv"

log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"; }

# ====== СОЗДАНИЕ STAGING ======
log "Создание staging-таблицы (UNLOGGED, без индексов)..."
psql "$DST_DB" <<-EOF
    DROP TABLE IF EXISTS ${STAGING_TABLE};
    CREATE UNLOGGED TABLE ${STAGING_TABLE} (LIKE ${DST_TABLE} INCLUDING DEFAULTS);
    -- Удаляем столбец, которого нет в источнике
    ALTER TABLE ${STAGING_TABLE} DROP COLUMN IF EXISTS for_partner_cp_agreement_id;
    -- Убираем identity, чтобы можно было вставлять id
    ALTER TABLE ${STAGING_TABLE} ALTER COLUMN id DROP IDENTITY IF EXISTS;
EOF

packet_num=0



while true; do
    packet_num=$((packet_num + 1))
    log "Пакет #${packet_num} – выборка ${BATCH_SIZE} записей из temp_table..."

    # 1. Получаем пары (begin_time, id) в виде кортежей для IN
    pairs=$(psql "$SRC_DB" -t -A -c \
        "SELECT string_agg(format('(%L,%s)', begin_time, id), ',') FROM (SELECT begin_time, id FROM temp_table LIMIT $BATCH_SIZE) t")

    if [ -z "$pairs" ]; then
        log "Больше записей нет. Завершение."
        break
    fi

    log "Пакет #${packet_num} – загружено пар: $(echo "$pairs" | grep -o '(' | wc -l)"

    # 2. Выгружаем все столбцы из источника в CSV
    log "Пакет #${packet_num} – выгрузка данных..."
    psql "$SRC_DB" -c "\COPY (SELECT * FROM ${SRC_TABLE} WHERE (begin_time, id) IN (${pairs})) TO '$TMP_FILE' WITH CSV" || {
        log "ОШИБКА при выгрузке! Прерывание."
        exit 1
    }

    if [ ! -s "$TMP_FILE" ]; then
        log "Файл пуст! Прерывание."
        exit 1
    fi

    # 3. Загружаем в staging
    log "Пакет #${packet_num} – загрузка в staging..."
    psql "$DST_DB" -c "\COPY ${STAGING_TABLE} FROM '$TMP_FILE' WITH CSV" || {
        log "ОШИБКА при загрузке в staging! Прерывание."
        exit 1
    }

    # 4. Переносим в основную таблицу с OVERRIDING SYSTEM VALUE
    log "Пакет #${packet_num} – вставка в ${DST_TABLE}..."
    psql "$DST_DB" <<-EOF
        INSERT INTO ${DST_TABLE} OVERRIDING SYSTEM VALUE
        SELECT * FROM ${STAGING_TABLE}
        ON CONFLICT (begin_time, id) DO NOTHING;
        TRUNCATE ${STAGING_TABLE};
EOF

    # 5. Удаляем обработанные записи из temp_table
    deleted=$(psql "$SRC_DB" -t -A -c \
        "DELETE FROM temp_table WHERE (begin_time, id) IN (${pairs}) RETURNING id" | wc -l)
    log "Пакет #${packet_num} – удалено ${deleted} записей."

    rm -f "$TMP_FILE"
done

# ====== ОЧИСТКА ======
psql "$DST_DB" -c "DROP TABLE IF EXISTS ${STAGING_TABLE};"
rm -f "$TMP_FILE"
log "Миграция завершена. Обработано пакетов: ${packet_num}."