Загрузка данных
package kafka;
import com.fasterxml.jackson.databind.JsonNode;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendMessages {
@Autowired
Publisher publisher;
@PostMapping(value = "/sendToKafkaConstructor", consumes = "application/json")
public String sendToConstructor(@RequestHeader(value = "Topic") String topic, @RequestBody JsonNode listenedMessage)
throws JSONException {
publisher.sendMessageToConstructor(listenedMessage, topic);
return "OK";
}
@PostMapping(value = "/sendToKafkaGateway", consumes = "application/json")
public String sendToGateway(@RequestHeader(value = "Topic") String topic, @RequestBody JsonNode listenedMessage)
throws JSONException {
publisher.sendMessageToGateway(listenedMessage, topic);
return "OK";
}
}
package kafka;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONException;
import java.util.Properties;
import java.util.UUID;
public class Publisher {
public Publisher() {
}
private static Properties constructorProperties = new Properties() {
{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"tvldq-pega00052.delta.sbrf.ru:9092,tvldq-pega00053.delta.sbrf.ru:9092,tvldq-pega00054.delta.sbrf.ru:9092,tvldq-pega00055.delta.sbrf.ru:9092");
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
}
};
private static final Producer<String, String> constructorProducer = new KafkaProducer<String, String>(
constructorProperties);
private static Properties gaterwayProperties = new Properties() {
{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"tvldq-pega00056.delta.sbrf.ru:9092,tvldq-pega00057.delta.sbrf.ru:9092,tvldq-pega00059.delta.sbrf.ru:9092,tvldq-pega00060.delta.sbrf.ru:9092");
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
}
};
private static final Producer<String, String> gatewayProducer = new KafkaProducer<String, String>(
gaterwayProperties);
public void sendMessageToConstructor(JsonNode message, String topic) throws JSONException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.get("body").toString());
String messageId = UUID.randomUUID().toString();
JsonNode headers = message.get("headers");
if (headers != null) {
headers.fieldNames().forEachRemaining(header -> {
try {
record.headers().add(header, headers.get(header).asText().getBytes());
} catch (IllegalStateException e) {
e.printStackTrace();
}
});
}
record.headers().add("messageId", messageId.getBytes());
constructorProducer.send(record);
constructorProducer.flush();
}
public void sendMessageToGateway(JsonNode message, String topic) throws JSONException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message.get("body").toString());
String messageId = UUID.randomUUID().toString();
JsonNode headers = message.get("headers");
if (headers != null) {
headers.fieldNames().forEachRemaining(header -> {
try {
record.headers().add(header, headers.get(header).asText().getBytes());
} catch (IllegalStateException e) {
e.printStackTrace();
}
});
}
record.headers().add("messageId", messageId.getBytes());
gatewayProducer.send(record);
gatewayProducer.flush();
}
}
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package kafka;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendMessages {
@PostMapping(value="/sbbol-notify-info",consumes = "application/json")
public String sendMessage() throws JSONException {
Publisher.sendMessage();
return "OK";
}
@PostMapping({"/sbbol-notify-info-100"})
public String sendMessage100() throws JSONException {
for (int i = 0; i < 100000; i++) {
Publisher.sendMessage();
}
return "OK";
}
@PostMapping({"/sbbol-notify-info-templateId"}) // Запуск процесса по templateId
public String sendMessageByTemplateId() throws JSONException {
Publisher.sendMessageByTemplateId();
return "OK";
}
@PostMapping({"/sbbol-notify-info-100-templateId"}) // Запуск процесса по templateId
public String sendMessageByTemplateId100() throws JSONException {
for (int i = 0; i < 100000; i++) {
Publisher.sendMessageByTemplateId();
}
return "OK";
}
@PostMapping({"/sbbol-notify-info-templateIdAFK"}) // Запуск процесса по templateId
public String sendMessageByTemplateIdAFK() throws JSONException {
Publisher.sendMessageByTemplateIdAFK();
return "OK";
}
}
package kafka;
import org.json.JSONObject;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.UUID;
@Service
public class Publisher {
static KafkaMessagePublisher kafkaMessagePublisher;
public Publisher(KafkaMessagePublisher kafkaMessagePublisher) {
Publisher.kafkaMessagePublisher = kafkaMessagePublisher;
}
public static void sendMessage() {
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
String formattedDateTime = now.format(formatter);
String uuid = UUID.randomUUID().toString();
String messageId = UUID.randomUUID().toString();
String bodyMessage = "{\n" +
" \"createdAt\": \""+formattedDateTime+"\",\n" +
" \"type\": \"START_PROCESS\",\n" +
" \"data\": {\n" +
" \"startEvent\": \"SBBOL_CREATE_APP_FOR_SME_LOAN_CHECK_WHITE_ZONE\",\n" +
" \"businessObjectId\": \""+ uuid +"\",\n" +
" \"businessObjectType\": \"CREDIT\"\n" +
" }\n" +
"}";
kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
}
public static void sendMessageByTemplateId() {
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
String formattedDateTime = now.format(formatter);
String uuid = UUID.randomUUID().toString();
String messageId = UUID.randomUUID().toString();
String bodyMessage = "{\n" +
" \"createdAt\": \"2025-09-23T13:29:13.35242658\",\n" +
" \"type\": \"START_PROCESS\",\n" +
" \"data\": {\n" +
" \"businessObjectId\": \"b19d4ed-259e-4800-9b15-3237c964936e\",\n" +
" \"templateId\": \"019976a6-b6aa-7450-92c7-5146a85b546d\",\n" +
" \"businessObjectType\": \"CREDIT\"\n" +
" }\n" +
"}";
kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
}
public static void sendMessageByTemplateIdAFK() {
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
String formattedDateTime = now.format(formatter);
String uuid = UUID.randomUUID().toString();
String messageId = UUID.randomUUID().toString();
String bodyMessage = "{\n" +
" \"createdAt\": \"2025-10-08T13:29:13.35242658\",\n" +
" \"type\": \"START_PROCESS\",\n" +
" \"data\": {\n" +
" \"businessObjectId\": \""+ uuid +"\",\n" +
" \"templateId\": \"0199a9e0-1cf0-7a87-a18d-67f7111f22ec\",\n" +
" \"businessObjectType\": \"CREDIT\"\n" +
" }\n" +
"}";
kafkaMessagePublisher.send("sbof.constructor.cmd.process.v1", bodyMessage, messageId);
}
}