Загрузка данных


package kafka;

import com.fasterxml.jackson.databind.JsonNode;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendMessages {

    @Autowired
    Publisher publisher;

    @PostMapping(value = "/sendToKafkaConstructor", consumes = "application/json")
    public String sendToConstructor(@RequestHeader(value = "Topic") String topic, @RequestBody JsonNode listenedMessage)
            throws JSONException {
        publisher.sendMessageToConstructor(listenedMessage, topic);

        return "OK";
    }

    @PostMapping(value = "/sendToKafkaGateway", consumes = "application/json")
    public String sendToGateway(@RequestHeader(value = "Topic") String topic, @RequestBody JsonNode listenedMessage)
            throws JSONException {
        publisher.sendMessageToGateway(listenedMessage, topic);

        return "OK";
    }
}


package kafka;

import com.fasterxml.jackson.databind.JsonNode;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONException;

import java.util.Properties;
import java.util.UUID;

public class Publisher {
        public Publisher() {
        }

        private static Properties constructorProperties = new Properties() {
                {
                        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                        "tvldq-pega00052.delta.sbrf.ru:9092,tvldq-pega00053.delta.sbrf.ru:9092,tvldq-pega00054.delta.sbrf.ru:9092,tvldq-pega00055.delta.sbrf.ru:9092");
                        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                        "org.apache.kafka.common.serialization.StringSerializer");
                        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                        "org.apache.kafka.common.serialization.StringSerializer");
                }
        };

        private static final Producer<String, String> constructorProducer = new KafkaProducer<String, String>(
                        constructorProperties);

        private static Properties gaterwayProperties = new Properties() {
                {
                        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                        "tvldq-pega00056.delta.sbrf.ru:9092,tvldq-pega00057.delta.sbrf.ru:9092,tvldq-pega00059.delta.sbrf.ru:9092,tvldq-pega00060.delta.sbrf.ru:9092");
                        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                        "org.apache.kafka.common.serialization.StringSerializer");
                        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                        "org.apache.kafka.common.serialization.StringSerializer");

                }
        };

        private static final Producer<String, String> gatewayProducer = new KafkaProducer<String, String>(
                        gaterwayProperties);

        public void sendMessageToConstructor(JsonNode message, String topic) throws JSONException {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.get("body").toString());
                String messageId = UUID.randomUUID().toString();
                JsonNode headers = message.get("headers");
                if (headers != null) {
                        headers.fieldNames().forEachRemaining(header -> {
                                try {
                                        record.headers().add(header, headers.get(header).asText().getBytes());
                                } catch (IllegalStateException e) {
                                        e.printStackTrace();
                                }
                        });
                }
                record.headers().add("messageId", messageId.getBytes());
                constructorProducer.send(record);
                constructorProducer.flush();
        }

        public void sendMessageToGateway(JsonNode message, String topic) throws JSONException {
                ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.get("body").toString());
                String messageId = UUID.randomUUID().toString();
                JsonNode headers = message.get("headers");
                if (headers != null) {
                        headers.fieldNames().forEachRemaining(header -> {
                                try {
                                        record.headers().add(header, headers.get(header).asText().getBytes());
                                } catch (IllegalStateException e) {
                                        e.printStackTrace();
                                }
                        });
                }
                record.headers().add("messageId", messageId.getBytes());
                gatewayProducer.send(record);
                gatewayProducer.flush();
        }

}


//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package kafka;

import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendMessages {

    @PostMapping(value="/sbbol-notify-info",consumes = "application/json")
    public String sendMessage() throws JSONException {

        Publisher.sendMessage();
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-100"})
    public String sendMessage100() throws JSONException {
        for (int i = 0; i < 100000; i++) {
            Publisher.sendMessage();
        }
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-templateId"}) // Запуск процесса по templateId
    public String sendMessageByTemplateId() throws JSONException {
            Publisher.sendMessageByTemplateId();
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-100-templateId"}) // Запуск процесса по templateId
    public String sendMessageByTemplateId100() throws JSONException {
        for (int i = 0; i < 100000; i++) {
            Publisher.sendMessageByTemplateId();
        }
        return "OK";
    }

    @PostMapping({"/sbbol-notify-info-templateIdAFK"}) // Запуск процесса по templateId
    public String sendMessageByTemplateIdAFK() throws JSONException {
        Publisher.sendMessageByTemplateIdAFK();
        return "OK";
    }

}


package kafka;

import org.json.JSONObject;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.UUID;

@Service
public class Publisher {

    static KafkaMessagePublisher kafkaMessagePublisher;

    public Publisher(KafkaMessagePublisher kafkaMessagePublisher) {
        Publisher.kafkaMessagePublisher = kafkaMessagePublisher;
    }


    public static void sendMessage() {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        String formattedDateTime = now.format(formatter);
        String uuid = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString();
        String bodyMessage = "{\n" +
                "    \"createdAt\": \""+formattedDateTime+"\",\n" +
                "    \"type\": \"START_PROCESS\",\n" +
                "    \"data\": {\n" +
                "        \"startEvent\": \"SBBOL_CREATE_APP_FOR_SME_LOAN_CHECK_WHITE_ZONE\",\n" +
                "        \"businessObjectId\": \""+ uuid +"\",\n" +
                "        \"businessObjectType\": \"CREDIT\"\n" +
                "    }\n" +
                "}";
        kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
    }


    public static void sendMessageByTemplateId() {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        String formattedDateTime = now.format(formatter);
        String uuid = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString();
        String bodyMessage = "{\n" +
                "    \"createdAt\": \"2025-09-23T13:29:13.35242658\",\n" +
                "    \"type\": \"START_PROCESS\",\n" +
                "    \"data\": {\n" +
                "        \"businessObjectId\": \"b19d4ed-259e-4800-9b15-3237c964936e\",\n" +
                "        \"templateId\": \"019976a6-b6aa-7450-92c7-5146a85b546d\",\n" +
                "        \"businessObjectType\": \"CREDIT\"\n" +
                "    }\n" +
                "}";
        kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
    }

    public static void sendMessageByTemplateIdAFK() {
        LocalDateTime now = LocalDateTime.now();
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        String formattedDateTime = now.format(formatter);
        String uuid = UUID.randomUUID().toString();
        String messageId = UUID.randomUUID().toString();
        String bodyMessage = "{\n" +
                "    \"createdAt\": \"2025-10-08T13:29:13.35242658\",\n" +
                "    \"type\": \"START_PROCESS\",\n" +
                "    \"data\": {\n" +
                "        \"businessObjectId\": \""+ uuid +"\",\n" +
                "        \"templateId\": \"0199a9e0-1cf0-7a87-a18d-67f7111f22ec\",\n" +
                "        \"businessObjectType\": \"CREDIT\"\n" +
                "    }\n" +
                "}";
        kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
    }

}