Загрузка данных


package org.example;

import org.json.JSONObject;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.UUID;

@Service
public class AccountService {

    private final ProducerProgress producerProgress;
    private final ProducerEnd producerEnd;
    private final KafkaMessagePublisher kafkaMessagePublisher;   // ← добавили

    public AccountService(ProducerProgress producerProgress,
                          ProducerEnd producerEnd,
                          KafkaMessagePublisher kafkaMessagePublisher) {   // ← добавили параметр
        this.producerProgress = producerProgress;
        this.producerEnd = producerEnd;
        this.kafkaMessagePublisher = kafkaMessagePublisher;
    }

    public void process(String message) {
        String messageId = UUID.randomUUID().toString();

        // Парсим входящее сообщение START_BUSINESS_FUNCTION
        JSONObject jsonObject = new JSONObject(message);
        JSONObject dataObj = jsonObject.getJSONObject("data");
        String requestId = dataObj.getString("requestId");
        String code = dataObj.optString("code", "");   // если кода нет — пустая строка

        if ("FormingKOD".equals(code)) {
            // ==================== СПЕЦИАЛЬНАЯ ОБРАБОТКА FormingKOD ====================

            // 1.6.1 — отправляем READY_FOR_TASK
            String readyBody = "{\n" +
                    " \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
                    " \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
                    " \"data\": {\n" +
                    " \"functionExecutionStatus\": \"READY_FOR_TASK\",\n" +
                    " \"functionBusinessStatus\": \"USER_DATA_WAITING\",\n" +
                    " \"requestId\": \"" + requestId + "\",\n" +
                    " \"slaSeconds\": 60,\n" +
                    " \"checkPoint\": \"PENDING\"\n" +
                    " }\n" +
                    "}";
            kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, readyBody, messageId);

            // Пауза — даём process-engine время сделать запрос в assignment-сервис (2,5 сек + запас)
            try {
                Thread.sleep(3500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            // 1.6.3 — отправляем SUCCESS
            String successBody = "{\n" +
                    " \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
                    " \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
                    " \"data\": {\n" +
                    " \"functionExecutionStatus\": \"SUCCESS\",\n" +
                    " \"functionBusinessStatus\": \"SUCCESS\",\n" +
                    " \"requestId\": \"" + requestId + "\",\n" +
                    " \"slaSeconds\": 60,\n" +
                    " \"checkPoint\": \"PENDING\"\n" +
                    " }\n" +
                    "}";
            kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, successBody, messageId);

        } else {
            // ==================== СТАРОЕ ПОВЕДЕНИЕ для всех остальных БФ ====================
            producerProgress.publish(message, messageId);
            producerEnd.publish(message, messageId);
        }
    }
}