Загрузка данных


package org.example;

import org.json.JSONObject;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Service
public class AccountService {

    private final ProducerProgress producerProgress;
    private final ProducerEnd producerEnd;
    private final KafkaMessagePublisher kafkaMessagePublisher;
    private final ScheduledExecutorService scheduledExecutor;   // ← НОВОЕ

    public AccountService(ProducerProgress producerProgress,
                          ProducerEnd producerEnd,
                          KafkaMessagePublisher kafkaMessagePublisher,
                          ScheduledExecutorService scheduledExecutor) {   // ← добавили
        this.producerProgress = producerProgress;
        this.producerEnd = producerEnd;
        this.kafkaMessagePublisher = kafkaMessagePublisher;
        this.scheduledExecutor = scheduledExecutor;
    }

    public void process(String message) {
        String messageId = UUID.randomUUID().toString();

        JSONObject jsonObject = new JSONObject(message);
        JSONObject dataObj = jsonObject.getJSONObject("data");
        String requestId = dataObj.getString("requestId");
        String code = dataObj.optString("code", "");

        if ("FormingKOD".equals(code)) {
            // 1.6.1 — сразу отправляем READY_FOR_TASK (поток НЕ блокируется)
            String readyBody = createReadyBody(requestId);
            kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, readyBody, messageId);

            // Планируем SUCCESS через 3,5 секунды (неблокирующе!)
            scheduledExecutor.schedule(() -> {
                String successBody = createSuccessBody(requestId);
                kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, successBody, messageId);
            }, 3500, TimeUnit.MILLISECONDS);

        } else {
            // Обычные БФ — без изменений
            producerProgress.publish(message, messageId);
            producerEnd.publish(message, messageId);
        }
    }

    private String createReadyBody(String requestId) {
        return "{\n" +
                " \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
                " \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
                " \"data\": {\n" +
                " \"functionExecutionStatus\": \"READY_FOR_TASK\",\n" +
                " \"functionBusinessStatus\": \"USER_DATA_WAITING\",\n" +
                " \"requestId\": \"" + requestId + "\",\n" +
                " \"slaSeconds\": 60,\n" +
                " \"checkPoint\": \"PENDING\"\n" +
                " }\n" +
                "}";
    }

    private String createSuccessBody(String requestId) {
        return "{\n" +
                " \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
                " \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
                " \"data\": {\n" +
                " \"functionExecutionStatus\": \"SUCCESS\",\n" +
                " \"functionBusinessStatus\": \"SUCCESS\",\n" +
                " \"requestId\": \"" + requestId + "\",\n" +
                " \"slaSeconds\": 60,\n" +
                " \"checkPoint\": \"PENDING\"\n" +
                " }\n" +
                "}";
    }
}


@Bean(destroyMethod = "shutdown")
public ScheduledExecutorService scheduledExecutorService() {
    int poolSize = Integer.parseInt(
            System.getenv().getOrDefault("SCHEDULED_POOL_SIZE", "100")
    );
    return Executors.newScheduledThreadPool(poolSize);
}

@Configuration
public class KafkaProducerConfig {

    @Bean
    public Producer<String, String> kafkaProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1Servers");

        // === ТЮНИНГ ДЛЯ ВЫСОКОЙ ПРОПУСКНОЙ СПОСОБНОСТИ ===
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "262144");     // 256 KB
        props.put(ProducerConfig.LINGER_MS_CONFIG, "10");          // 10 мс — даём batch'у заполниться
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // или "lz4"
        props.put(ProducerConfig.ACKS_CONFIG, "1");                // для нагрузки — 1 достаточно
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "67108864"); // 64 MB

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        return new KafkaProducer<>(props);
    }
}