Загрузка данных
dmitriev-aal@VDI-Dmitriev-A:~/Desktop/rshbintech/integrations/cki/ip2-services/flow-adapters/dbo2-flow$ sed -n '1,80p' src/main/java/ru/rshbintech/integrations/cki/ip2_services/flow_adapters/dbo2_flow/kafka/consumers/OwnfinProviderToSystemsListener.java
package ru.rshbintech.integrations.cki.ip2_services.flow_adapters.dbo2_flow.kafka.consumers;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;
import org.springframework.http.HttpHeaders;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import ru.rshbintech.integrations.cki.ip2_services.flow_adapters.dbo2_flow.kafka.KafkaHeadersValidationException;
import ru.rshbintech.integrations.cki.ip2_services.flow_adapters.dbo2_flow.services.OwnfinService;
import ru.rshbintech.integrations.cki.ip2_services.integration_libraries.http_client_starter.data.HeadersUmqa;
import ru.rshbintech.integrations.cki.ip2_services.integration_libraries.http_client_starter.data.LogInfo;
import ru.rshbintech.integrations.cki.ip2_services.integration_libraries.http_client_starter.exceptions.types.ProcessingException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Component
@RequiredArgsConstructor
public class OwnfinProviderToSystemsListener {
private final OwnfinService ownfinService;
private final List<String> REQUIRED_RESPONSE_HEADERS = Arrays.asList(HeadersUmqa.X_B3_TRACEID.getName(), HeadersUmqa.X_B3_SPANID.getName(),
HeadersUmqa.X_SERVICE.getName(), HeadersUmqa.X_SOURCE_SERVICE.getName(), HeadersUmqa.X_SOURCE_SYSTEM.getName(),
HeadersUmqa.X_TO_SYSTEM.getName(), HeadersUmqa.X_EXTENSION.getName());
@KafkaListener(
topics = "${kafka.consumer.cluster-consumer.topic.plt_answer_send_transfer_dbo_resp}",
groupId = "plt-answer-send-tranfer-dbo-resp-group",
containerFactory = "kafkaListenerContainerFactoryOwnfin")
void listenPltAnswerSendTransferDboResp(ConsumerRecord<String, String> consumerRecord) {
HttpHeaders httpHeaders = new HttpHeaders();
try {
Set<String> missingHeaders = validateResponseHeaders(httpHeaders);
if (!missingHeaders.isEmpty()) {
throw new KafkaHeadersValidationException("Отсутствуют обязательные заголовки: " + missingHeaders, missingHeaders);
}
log.info(createLogInfo(httpHeaders, consumerRecord).toLogString());
MDC.put("trace_id", httpHeaders.getFirst(HeadersUmqa.X_B3_TRACEID.getName()));
ownfinService.ownfinServiceResponse(httpHeaders, consumerRecord.value());
} catch (Exception ex) {
log.error(createLogError(httpHeaders, consumerRecord, ex).toLogString());
throw new ProcessingException("Error process request from Kafka. Service: " + httpHeaders.get(HeadersUmqa.X_SERVICE.getName()), ex);
} finally {
MDC.clear();
}
}
private Set<String> validateResponseHeaders(HttpHeaders httpHeaders) {
return REQUIRED_RESPONSE_HEADERS.stream()
.filter(h -> !httpHeaders.containsKey(h))
.collect(Collectors.toSet());
}
private LogInfo createLogInfo(HttpHeaders httpHeaders, ConsumerRecord<String, String> consumerRecord) {
LogInfo logInfo = new LogInfo();
logInfo.setMessage("Received message from kafka");
logInfo.setEndpoint(getEndpoint(consumerRecord));
logInfo.setReqHeaders(httpHeaders);
logInfo.setReqData(consumerRecord.value());
return logInfo;
}
private LogInfo createLogError(HttpHeaders httpHeaders, ConsumerRecord<String, String> consumerRecord, Exception ex) {
LogInfo logInfo = new LogInfo();
logInfo.setMessage("Exception kafka listener");
logInfo.setException(ex);
logInfo.setEndpoint(getEndpoint(consumerRecord));
dmitriev-aal@VDI-Dmitriev-A:~/Desktop/rshbintech/integrations/cki/ip2-services/flow-adapters/dbo2-flow$ git diff origin/master..origin/feature/20260610_ECO_OFX-77 \
> -- src/main/resources/application.yaml | sed -n '560,620p'
endpoints:
@@ -1166,8 +971,6 @@ integration:
endpoint: "/api/sbp/internal/v1/callback/espp"
SendTransactionInfoResponse:
endpoint: "/api/sbp/internal/v1/callback/saf"
- validate:
- enabled: "false"
SendTransactionSettlementResponse:
endpoint: "/api/sbp/internal/v1/callback/saf"
SendTransactionStatusResponse:
@@ -1186,14 +989,6 @@ integration:
endpoints:
SimpleApproveAnswerResponse:
endpoint: "/api/sbp/internal/v1/callback/saf"
- owf-ms-prd-subscription-card:
- endpoints:
- GetHistoryPaidOptionResponse:
- endpoint: "/api/subscription-card/external/v1/pc/history-paid-options/reply"
- GetPaidOptionResponse:
- endpoint: "/api/subscription-card/external/v1/pc/paid-option/reply"
- SetPaidOptionResponse:
- endpoint: "/api/subscription-card/external/v1/pc/update-paid-option/reply"
owf-ms-prd-subscription-vsesvoe:
endpoints:
SendPaymentHubCommandResponse:
@@ -1212,11 +1007,6 @@ integration:
protocol: "KAFKA"
ip1-name: "OWNFIN"
services:
- owf-ms-common-answer-send-transfer-dbo-subscribers:
- endpoints:
- AnswerSendTransferDBORequest:
- kafka:
- producer-topic: "${APP_KAFKA_TOPIC_PLT_ANSWER_SEND_TRANSFER_DBO_REQ:plt_answer_send_transfer_dbo_req}"
owf-ms-plt-account-registry:
endpoints:
cft_GetAccountByAccountIDResponse:
@@ -1229,19 +1019,13 @@ integration:
producer-topic: "${APP_KAFKA_TOPIC_OWNFIN_CDI_GET_PERSON_RESP:ownfin.cdi_get_person_resp}"
owf-ms-prd-brokerage:
endpoints:
- CancelHoldOnCardResponse:
- kafka:
- producer-topic: "${APP_KAFKA_TOPIC_PRD_BROKERAGE_CONSUMER_EXTSYS_RESP:prd_brokerage_consumer_extsys_resp}"
- CancelTransfer_v4Response:
- kafka:
- producer-topic: "${APP_KAFKA_TOPIC_PRD_BROKERAGE_CONSUMER_EXTSYS_RESP:prd_brokerage_consumer_extsys_resp}"
ChangeBrokerContractResponse:
kafka:
producer-topic: "${APP_KAFKA_TOPIC_PRD_BROKERAGE_CONSUMER_EXTSYS_RESP:prd_brokerage_consumer_extsys_resp}"
CheckClientStockExchangeResponse:
kafka:
producer-topic: "${APP_KAFKA_TOPIC_PRD_BROKERAGE_CONSUMER_EXTSYS_RESP:prd_brokerage_consumer_extsys_resp}"
- CheckPassportResponse:
+ CheckPassportKafkaResponse:
validate:
comment: "Отключение валидации, так как backend-mvd-service отдает невалидные ответы."
enabled: "false"
@@ -1252,26 +1036,9 @@ integration:
ConfirmBrokerContractResponse:
kafka:
dmitriev-aal@VDI-Dmitriev-A:~/Desktop/rshbintech/integrations/cki/ip2-services/flow-adapters/dbo2-flow$ grep -R "class OwnfinProviderToSystemsListener" -n src/main/java
src/main/java/ru/rshbintech/integrations/cki/ip2_services/flow_adapters/dbo2_flow/kafka/consumers/OwnfinProviderToSystemsListener.java:23:public class OwnfinProviderToSystemsListener {
dmitriev-aal@VDI-Dmitriev-A:~/Desktop/rshbintech/integrations/cki/ip2-services/flow-adapters/dbo2-flow$ git diff origin/master..origin/feature/20260610_ECO_OFX-77 -- src/main/resources/application.yaml | head -200
diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml
index db7f95e8..f55a2fa0 100644
--- a/src/main/resources/application.yaml
+++ b/src/main/resources/application.yaml
@@ -14,8 +14,8 @@ ownfin-kafka-services:
- owf-ms-prd-account-management
- owf-ms-prd-credit-app
- owf-ms-prd-service-packages
- - owf-ms-prd-client-management
- owf-ms-prd-invoice-for-payment
+ - owf-ms-prd-client-management
integration:
this-system-id: "flow-adapters"
@@ -77,6 +77,7 @@ integration:
AuthClientWithCryptoCalcResponse:
CalcOrderResponse:
CancelCloseCurrentAccountResponse:
+ CancelClosePersonDeposit_V2Response:
CancelClosePersonDeposit_v2Response:
CancelHoldOnCardResponse:
CancelPersonDeposit_v3Response:
@@ -95,7 +96,6 @@ integration:
CheckAvailabilityShareResponse:
CheckClientStockExchangeResponse:
CheckNewAccountAndCardResponse:
- CheckNewCardForAccountResponse:
CheckOperationDayResponse:
CheckTransferBetweenCardsResponse:
CloseCurrentAccountResponse:
@@ -146,7 +146,6 @@ integration:
RemoveClientResponse:
ReserveBrokerContractResponse:
SBPMessageRequest:
- SendDealNTProResponse:
SendDevicePCResponse:
SendDevicePC_v2Response:
SendInformationCategoryResponse:
@@ -201,13 +200,6 @@ integration:
endpoints:
EprshbSendMessage:
endpoint: "/api/v1/send"
- validate:
- in:
- enabled: "true"
- schema: "EprshbSendMessage_req.json"
- answer-in:
- enabled: "true"
- schema: "EprshbSendMessage_resp.json"
esia-auth-service:
endpoints:
ESIARegisterRequest:
@@ -219,16 +211,17 @@ integration:
SendPFRStatementStatusResponse:
endpoint: "/api/v1/ownfin/sendPfrStatementStatus"
validate:
- enabled: "default"
answer-in:
schema: "SendPFRStatementStatus_resp.json"
+ espp:
+ protocol: "MQ"
+ ip1-name: "ESPP"
+ services:
+ espp:
+ endpoints:
+ SendPaymentHubCommandRequest:
flow-adapters:
protocol: "HTTP"
- copy-headers:
- request:
- strategy: "required-and-umqa"
- response:
- strategy: "required-and-umqa"
services:
affl-flow:
endpoints:
@@ -292,8 +285,6 @@ integration:
endpoint: "/api/CancelPersonDeposit_v3Request"
CancelTransfer_v4Request:
endpoint: "/api/CancelTransfer_v4Request"
- CancelTransfer_v5Request:
- endpoint: "/api/CancelTransfer_v5Request"
ChangeBrokerContractRequest:
endpoint: "/api/ChangeBrokerContractRequest"
ChangeClientDataSNILSRequest:
@@ -326,8 +317,6 @@ integration:
endpoint: "/api/CreatePaymentTransferRequest"
CreatePersonDepositAccountDBORequest:
endpoint: "/api/CreatePersonDepositAccountDBORequest"
- ExchangePointRubleRequest:
- endpoint: "/api/ExchangePointRubleRequest"
FindListAccountByClientID_v3Request:
endpoint: "/api/FindListAccountByClientID_v3Request"
FindListPersonDepositByClientID_v4Request:
@@ -370,10 +359,6 @@ integration:
endpoint: "/api/SendMessageStatusRequest"
SendNewAccountAndCardRequest:
endpoint: "/api/SendNewAccountAndCardRequest"
- SendStatusPhoneClientRequest:
- endpoint: "/api/SendStatusPhoneClientRequest"
- SendTransferRequest:
- endpoint: "/api/SendTransferRequest"
SendTransfer_v4Request:
endpoint: "/api/SendTransfer_v4Request"
SetStatusClientDBO_v2Request:
@@ -421,6 +406,8 @@ integration:
endpoints:
CheckAvailabilityShareRequest:
endpoint: "/api/CheckAvailabilityShareRequest"
+ ExchangePointRubleRequest:
+ endpoint: "/api/ExchangePointRubleRequest"
GetPersonCategoryRequest:
endpoint: "/api/GetPersonCategoryRequest"
GetPersonWages_v2Request:
@@ -447,26 +434,8 @@ integration:
schema: "AllocateCommonCustomerDocumentCsp_req.json"
answer-in:
schema: "AllocateCommonCustomerDocumentCsp_resp.json"
- AllocateCreditClaimDocumentCspJsonRequest:
- endpoint: "/api/AllocateCreditClaimDocumentCspJsonRequest"
- validate:
- in:
- schema: "AllocateCreditClaimDocumentCsp_req.json"
- answer-in:
- schema: "AllocateCreditClaimDocumentCsp_resp.json"
- request: true
- service-name: "AllocateCreditClaimDocumentCsp"
- AllocateCreditClaimDocumentCspRequest:
- endpoint: "/api/AllocateCreditClaimDocumentCspRequest"
AllocateCreditDealDocumentCspRequest:
endpoint: "/api/AllocateCreditDealDocumentCspRequest"
- AllocateCustomerNpDocumentCspRequest:
- endpoint: "/api/AllocateCustomerNpDocumentCspJsonRequest"
- validate:
- in:
- schema: "AllocateCustomerNpDocumentCsp_req.json"
- answer-in:
- schema: "AllocateCustomerNpDocumentCsp_resp.json"
GetObjectFileJsonRequest:
endpoint: "/api/GetObjectFileRequest"
validate:
@@ -477,21 +446,12 @@ integration:
schema: "GetObjectFile_resp.json"
GetObjectFileRequest:
endpoint: "/api/GetObjectFileRequest"
- SearchCustomerNpDocumentsPageCspRequest:
- endpoint: "/api/SearchCustomerNpDocumentsPageCspRequest"
- validate:
- in:
- schema: "SearchCustomerNpDocumentsPageCsp_req.json"
- answer-in:
- schema: "SearchCustomerNpDocumentsPageCsp_resp.json"
efr-flow:
endpoints:
AnswerApplicationStatusDBOResponse:
endpoint: "/api/AnswerApplicationStatusDBOResponse"
ConnectionRestrictionDBOResponse:
endpoint: "/api/ConnectionRestrictionDBOResponse"
- CreateApplicationShortEFRRequest:
- endpoint: "/api/CreateApplicationShortEFRRequest"
eprshb-flow:
copy-headers:
request:
@@ -530,10 +490,8 @@ integration:
validate:
enabled: "default"
in:
- enabled: "true"
schema: "EprshbSendMessage_req.json"
answer-in:
- enabled: "true"
schema: "EprshbSendMessage_resp.json"
GetLinkInsurancePolicyFOSRequest:
endpoint: "/api/GetLinkInsurancePolicyFOSRequest"
@@ -546,19 +504,13 @@ integration:
validate:
enabled: "default"
in:
- enabled: "true"
schema: "GetTemplateStatus_req.json"
answer-in:
- enabled: "true"
schema: "GetTemplateStatus_resp.json"
ManageSubscriptionRequest:
endpoint: "/api/ManageSubscriptionRequest"
RequestGetStatementPFRRequest:
endpoint: "/api/RequestGetStatementPFRRequest"
- validate:
- enabled: "false"
- SendDealNTProRequest:
- endpoint: "/api/SendDealNTProRequest"
SendNotificationTypeListResponse:
endpoint: "/api/SendNotificationTypeListResponse"
SendSubscriptionNotificationResponse:
@@ -573,24 +525,9 @@ integration:
validate:
enabled: "default"
in:
- enabled: "true"
schema: "SendTemplateNoticeEmailJson_req.json"