Загрузка данных
package org.example;
import org.json.JSONObject;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class AccountService {
private final ProducerProgress producerProgress;
private final ProducerEnd producerEnd;
private final KafkaMessagePublisher kafkaMessagePublisher;
private final ScheduledExecutorService scheduledExecutor; // ← НОВОЕ
public AccountService(ProducerProgress producerProgress,
ProducerEnd producerEnd,
KafkaMessagePublisher kafkaMessagePublisher,
ScheduledExecutorService scheduledExecutor) { // ← добавили
this.producerProgress = producerProgress;
this.producerEnd = producerEnd;
this.kafkaMessagePublisher = kafkaMessagePublisher;
this.scheduledExecutor = scheduledExecutor;
}
public void process(String message) {
String messageId = UUID.randomUUID().toString();
JSONObject jsonObject = new JSONObject(message);
JSONObject dataObj = jsonObject.getJSONObject("data");
String requestId = dataObj.getString("requestId");
String code = dataObj.optString("code", "");
if ("FormingKOD".equals(code)) {
// 1.6.1 — сразу отправляем READY_FOR_TASK (поток НЕ блокируется)
String readyBody = createReadyBody(requestId);
kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, readyBody, messageId);
// Планируем SUCCESS через 3,5 секунды (неблокирующе!)
scheduledExecutor.schedule(() -> {
String successBody = createSuccessBody(requestId);
kafkaMessagePublisher.send("sbof.business-function.fct.status.v1", requestId, successBody, messageId);
}, 3500, TimeUnit.MILLISECONDS);
} else {
// Обычные БФ — без изменений
producerProgress.publish(message, messageId);
producerEnd.publish(message, messageId);
}
}
private String createReadyBody(String requestId) {
return "{\n" +
" \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
" \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
" \"data\": {\n" +
" \"functionExecutionStatus\": \"READY_FOR_TASK\",\n" +
" \"functionBusinessStatus\": \"USER_DATA_WAITING\",\n" +
" \"requestId\": \"" + requestId + "\",\n" +
" \"slaSeconds\": 60,\n" +
" \"checkPoint\": \"PENDING\"\n" +
" }\n" +
"}";
}
private String createSuccessBody(String requestId) {
return "{\n" +
" \"createdAt\": \"" + Instant.now().toString() + "\",\n" +
" \"type\": \"BUSINESS_FUNCTION_STATUS\",\n" +
" \"data\": {\n" +
" \"functionExecutionStatus\": \"SUCCESS\",\n" +
" \"functionBusinessStatus\": \"SUCCESS\",\n" +
" \"requestId\": \"" + requestId + "\",\n" +
" \"slaSeconds\": 60,\n" +
" \"checkPoint\": \"PENDING\"\n" +
" }\n" +
"}";
}
}
@Bean(destroyMethod = "shutdown")
public ScheduledExecutorService scheduledExecutorService() {
int poolSize = Integer.parseInt(
System.getenv().getOrDefault("SCHEDULED_POOL_SIZE", "100")
);
return Executors.newScheduledThreadPool(poolSize);
}
@Configuration
public class KafkaProducerConfig {
@Bean
public Producer<String, String> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "1Servers");
// === ТЮНИНГ ДЛЯ ВЫСОКОЙ ПРОПУСКНОЙ СПОСОБНОСТИ ===
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "262144"); // 256 KB
props.put(ProducerConfig.LINGER_MS_CONFIG, "10"); // 10 мс — даём batch'у заполниться
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // или "lz4"
props.put(ProducerConfig.ACKS_CONFIG, "1"); // для нагрузки — 1 достаточно
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "10");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "67108864"); // 64 MB
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
}