Загрузка данных


package kafka;

import com.fasterxml.jackson.databind.JsonNode;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendMessages {

    @Autowired
    Publisher publisher;

    // ←←← ИЗМЕНЁН ТОЛЬКО ЭТОТ ЭНДПОИНТ (как в примере)
    @PostMapping(value = "/sbbol-notify-info", consumes = "application/json")
    public String sendMessage(@RequestHeader(value = "Topic") String topic,
                              @RequestBody JsonNode listenedMessage)
            throws JSONException {
        publisher.sendMessageToConstructor(listenedMessage, topic);
        return "OK";
    }

    // Всё остальное оставлено без изменений
    @PostMapping({"/sbbol-notify-info-100"})
    public String sendMessage100() throws JSONException {
        for (int i = 0; i < 100000; i++) {
            Publisher.sendMessage();
        }
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-templateId"}) // Запуск процесса по templateId
    public String sendMessageByTemplateId() throws JSONException {
        Publisher.sendMessageByTemplateId();
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-100-templateId"}) // Запуск процесса по templateId
    public String sendMessageByTemplateId100() throws JSONException {
        for (int i = 0; i < 100000; i++) {
            Publisher.sendMessageByTemplateId();
        }
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-templateIdAFK"}) // Запуск процесса по templateId
    public String sendMessageByTemplateIdAFK() throws JSONException {
        Publisher.sendMessageByTemplateIdAFK();
        return "OK";
    }
}

package kafka;

import com.fasterxml.jackson.databind.JsonNode;
import org.json.JSONException;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.UUID;

@Service
public class Publisher {

    static KafkaMessagePublisher kafkaMessagePublisher;

    public Publisher(KafkaMessagePublisher kafkaMessagePublisher) {
        Publisher.kafkaMessagePublisher = kafkaMessagePublisher;
    }

    // ←←← ИЗМЕНЁННЫЙ/ДОБАВЛЕННЫЙ МЕТОД (только для нового эндпоинта sbbol-notify-info)
    // Полностью соответствует стилю примера: принимает JsonNode + topic,
    // берёт тело из "body", генерирует messageId.
    // Заголовки из "headers" пока не прокидываем (у вас KafkaMessagePublisher
    // их не поддерживает в текущем сигнатуре send), если нужно — скажи, добавим.
    public void sendMessageToConstructor(JsonNode message, String topic) throws JSONException {
        String bodyMessage = message.get("body").toString();
        String messageId = UUID.randomUUID().toString();

        kafkaMessagePublisher.send(topic, bodyMessage, messageId);
    }

    // Всё остальное оставлено без единого изменения
    public static void sendMessage() {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        String formattedDateTime = now.format(formatter);
        String uuid = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString();
        String bodyMessage = "{\n" +
                "    \"createdAt\": \"" + formattedDateTime + "\",\n" +
                "    \"type\": \"START_PROCESS\",\n" +
                "    \"data\": {\n" +
                "        \"startEvent\": \"SBBOL_CREATE_APP_FOR_SME_LOAN_CHECK_WHITE_ZONE\",\n" +
                "        \"businessObjectId\": \"" + uuid + "\",\n" +
                "        \"businessObjectType\": \"CREDIT\"\n" +
                "    }\n" +
                "}";
        kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
    }

    public static void sendMessageByTemplateId() {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        String formattedDateTime = now.format(formatter);
        String uuid = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString();
        String bodyMessage = "{\n" +
                "    \"createdAt\": \"2025-09-23T13:29:13.35242658\",\n" +
                "    \"type\": \"START_PROCESS\",\n" +
                "    \"data\": {\n" +
                "        \"businessObjectId\": \"b19d4ed-259e-4800-9b15-3237c964936e\",\n" +
                "        \"templateId\": \"019976a6-b6aa-7450-92c7-5146a85b546d\",\n" +
                "        \"businessObjectType\": \"CREDIT\"\n" +
                "    }\n" +
                "}";
        kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
    }

    public static void sendMessageByTemplateIdAFK() {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        String formattedDateTime = now.format(formatter);
        String uuid = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString();
        String bodyMessage = "{\n" +
                "    \"createdAt\": \"2025-10-08T13:29:13.35242658\",\n" +
                "    \"type\": \"START_PROCESS\",\n" +
                "    \"data\": {\n" +
                "        \"businessObjectId\": \"" + uuid + "\",\n" +
                "        \"templateId\": \"0199a9e0-1cf0-7a87-a18d-67f7111f22ec\",\n" +
                "        \"businessObjectType\": \"CREDIT\"\n" +
                "    }\n" +
                "}";
        kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
    }
}