package org.example;
import org.json.JSONObject;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.UUID;
@Service
public class AccountService {
private final ProducerProgress producerProgress;
private final ProducerEnd producerEnd;
private final KafkaMessagePublisher kafkaMessagePublisher; // ← добавили
public AccountService(ProducerProgress producerProgress,
ProducerEnd producerEnd,
KafkaMessagePublisher kafkaMessagePublisher) { // ← добавили параметр
this.producerProgress = producerProgress;
this.producerEnd = producerEnd;
this.kafkaMessagePublisher = kafkaMessagePublisher;
}
public void process(String message) {
String messageId = UUID.randomUUID().toString();
// Парсим входящее сообщение START_BUSINESS_FUNCTION
JSONObject jsonObject = new JSONObject(message);
JSONObject dataObj = jsonObject.getJSONObject("data");
String requestId = dataObj.getString("requestId");
String code = dataObj.optString("code", ""); // если кода нет — пустая строка
if ("FormingKOD".equals(code)) {
// ==================== СПЕЦИАЛЬНАЯ ОБРАБОТКА FormingKOD ====================
// 1.6.1 — отправляем READY_FOR_TASK
String readyBody = "{\n" +
" \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
" \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
" \"data\": {\n" +
" \"functionExecutionStatus\": \"READY_FOR_TASK\",\n" +
" \"functionBusinessStatus\": \"USER_DATA_WAITING\",\n" +
" \"requestId\": \"" + requestId + "\",\n" +
" \"slaSeconds\": 60,\n" +
" \"checkPoint\": \"PENDING\"\n" +
" }\n" +
"}";
kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, readyBody, messageId);
// Пауза — даём process-engine время сделать запрос в assignment-сервис (2,5 сек + запас)
try {
Thread.sleep(3500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 1.6.3 — отправляем SUCCESS
String successBody = "{\n" +
" \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
" \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
" \"data\": {\n" +
" \"functionExecutionStatus\": \"SUCCESS\",\n" +
" \"functionBusinessStatus\": \"SUCCESS\",\n" +
" \"requestId\": \"" + requestId + "\",\n" +
" \"slaSeconds\": 60,\n" +
" \"checkPoint\": \"PENDING\"\n" +
" }\n" +
"}";
kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, successBody, messageId);
} else {
// ==================== СТАРОЕ ПОВЕДЕНИЕ для всех остальных БФ ====================
producerProgress.publish(message, messageId);
producerEnd.publish(message, messageId);
}
}
}