Загрузка данных
use std::{sync::Arc, time::Duration};
use crate::rest::state::BrmState;
use axum::{
Json,
extract::{Path, State},
http::StatusCode,
};
use ractor::rpc::CallResult;
use serde::{Deserialize, Serialize};
use serde_json::{Value, from_value, json};
use uuid::Uuid;
use zen_engine::{
debug_runner::{
actor::DEBUG_RUNNER_ACTOR_NAME,
api::{DebugRpcMessage, DebugRunReceipt, DebugRunRequest, DebugSessionId},
},
model::{DecisionContent, DecisionGraphInfo},
};
use zen_types::{
graph::{graph_types::DecisionGraphResponse, tracer::DecisionGraphTrace},
messages::network::GraphTemplateId,
rpc::flow::DecisionTemplateInfo,
};
const DEBUG_RPC_TIMEOUT: Duration = Duration::from_secs(15);
type HttpResult<T> = Result<T, StatusCode>;
type BrmEngine = State<Arc<BrmState»;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecRequest {
graph: Value,
context: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecReceipt {
graph_id: Uuid,
execute_id: Uuid,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphReceipt {
graph_id: Uuid,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TemplateReceipt {
template_id: GraphTemplateId,
}
pub async fn deploy_and_evaluate_graph(State(state): BrmEngine, Json(payload): Json<ExecRequest>) -> HttpResult<Json<ExecReceipt» {
tracing::trace!("deploy_and_evaluate_graph api call {:?}", payload);
let decision_content: DecisionContent = from_value(payload.graph).unwrap();
let value = payload.context.clone();
let result = state.get_engine().deploy(Arc::new(decision_content)).await;
if let Ok(graph_id) = result {
if let Ok(execute_id) = state.get_engine().execute(graph_id, value).await {
return Ok(ExecReceipt { graph_id, execute_id }.into());
}
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn deploy(State(state): BrmEngine, Json(payload): Json<Value>) -> HttpResult<Json<GraphReceipt» {
tracing::trace!("deploy api call {:?}", payload);
let decision_content: DecisionContent = from_value(payload).unwrap();
if decision_content.graph_template_uuid.is_none() {
tracing::warn!("deploy отклонён: отсутствует graphTemplateUuid");
return Err(StatusCode::BAD_REQUEST);
}
let result = state.get_engine().deploy(Arc::new(decision_content)).await;
if let Ok(graph_id) = result {
return Ok(Json(GraphReceipt { graph_id }));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn deploy_list(State(state): BrmEngine) -> HttpResult<Json<Vec<DecisionGraphInfo»> {
let result = state.get_engine().deployed_list().await;
if let Ok(list) = result {
return Ok(Json(list));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn get_graph(State(state): BrmEngine, Path(graph_id): Path<Uuid>) -> HttpResult<Json<Option<Arc<DecisionContent»» {
tracing::trace!("api call {:?}", graph_id);
let result = state.get_engine().get_graph(graph_id).await;
if let Ok(graph) = result {
return Ok(Json(graph));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn terminate(State(state): BrmEngine, Path(graph_id): Path<Uuid>) -> HttpResult<Json<GraphReceipt» {
tracing::trace!("api call {:?}", graph_id);
let result = state.get_engine().terminate(graph_id).await;
if let Ok(graph_id) = result {
return Ok(Json(GraphReceipt { graph_id }));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn execute(State(state): BrmEngine, Path(graph_id): Path<Uuid>, Json(payload): Json<Value>) -> HttpResult<Json<Value» {
tracing::trace!("api call {:?}", payload);
if let Ok(execute_id) = state.get_engine().execute(graph_id, payload).await {
return Ok(Json(json!({
"graph_id": graph_id,
"execute_id": execute_id,
})));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn execute_status(State(state): BrmEngine, Json(payload): Json<ExecReceipt>) -> HttpResult<
Json<DecisionGraphResponse» {
let status = state.get_engine().execute_status(payload.graph_id, payload.execute_id).await;
match status {
Ok(Some(response)) => Ok(response.into()),
Ok(None) => Err(StatusCode::NOT_FOUND),
Err(_e) => Err(StatusCode::BAD_REQUEST),
}
}
// =========================
// Template endpoints
// =========================
pub async fn save_template(State(state): BrmEngine, Json(payload): Json<Value>) -> HttpResult<Json<TemplateReceipt» {
tracing::trace!("save_template api call {:?}", payload);
let decision_content: DecisionContent = from_value(payload).unwrap();
// Новый контракт:
// - create template: templateUuid отсутствует
// - update template: templateUuid присутствует
// uuid версии шаблона генерирует сервис, а не UI
let result = state.get_engine().save_template(Arc::new(decision_content)).await;
if let Ok(template_id) = result {
return Ok(Json(TemplateReceipt { template_id }));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn template_list(State(state): BrmEngine) -> HttpResult<Json<Vec<DecisionTemplateInfo»> {
let result = state.get_engine().template_list().await;
if let Ok(list) = result {
return Ok(Json(list));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn get_template(State(state): BrmEngine, Path(template_id): Path<GraphTemplateId>) -> HttpResult<Json<Option<Arc<DecisionContent»» {
tracing::trace!("get_template api call {:?}", template_id);
let result = state.get_engine().get_template(template_id).await;
if let Ok(template) = result {
return Ok(Json(template));
}
Err(StatusCode::BAD_REQUEST)
}
pub async fn remove_template(State(state): BrmEngine, Path(template_id): Path<GraphTemplateId>) -> HttpResult<Json<TemplateReceipt» {
tracing::trace!("remove_template api call {:?}", template_id);
let result = state.get_engine().remove_template(template_id).await;
if let Ok(template_id) = result {
return Ok(Json(TemplateReceipt { template_id }));
}
Err(StatusCode::BAD_REQUEST)
}
fn debug_runner_ref() -> Result<ractor::ActorRef<DebugRpcMessage>, StatusCode> {
let cell = ractor::registry::where_is(DEBUG_RUNNER_ACTOR_NAME.to_string()).ok_or_else(|| {
tracing::error!(
actor = DEBUG_RUNNER_ACTOR_NAME,
"debug runner не зарегистрирован — endpoint недоступен"
);
StatusCode::SERVICE_UNAVAILABLE
})?;
Ok(ractor::ActorRef::<DebugRpcMessage>::from(cell))
}
async fn debug_call<T, F>(runner: &ractor::ActorRef<DebugRpcMessage>, build: F) -> Result<T, StatusCode>
where
T: Send + 'static,
F: FnOnce(ractor::RpcReplyPort<T>) -> DebugRpcMessage,
{
match runner.call(build, Some(DEBUG_RPC_TIMEOUT)).await {
Ok(CallResult::Success(value)) => Ok(value),
Ok(CallResult::Timeout) => {
tracing::error!("таймаут вызова DebugRunnerActor");
Err(StatusCode::REQUEST_TIMEOUT)
}
Ok(CallResult::SenderError) => {
tracing::error!("DebugRunnerActor закрыл reply_port");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
Err(e) => {
tracing::error!(error = %e, "ошибка вызова DebugRunnerActor");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn debug_run(_state: BrmEngine, Json(request): Json<DebugRunRequest>) -> HttpResult<(StatusCode, Json<DebugRunReceipt>)> {
let runner = debug_runner_ref()?;
let outcome = debug_call(&runner, |reply| DebugRpcMessage::Submit { request, reply }).await?;
match outcome {
Ok(receipt) => Ok((StatusCode::ACCEPTED, Json(receipt))),
Err(message) => {
tracing::warn!(error = %message, "Submit отклонён debug runner'ом");
Err(StatusCode::BAD_REQUEST)
}
}
}
pub async fn debug_session(_state: BrmEngine, Path(session_uuid): Path<DebugSessionId>) -> HttpResult<Json<DecisionGraphTrace» {
let runner = debug_runner_ref()?;
let snapshot = debug_call(&runner, |reply| DebugRpcMessage::Fetch {
execute_id: session_uuid,
reply,
})
.await?;
snapshot.map(Json).ok_or(StatusCode::NOT_FOUND)
}
pub async fn debug_cancel(_state: BrmEngine, Path(session_uuid): Path<DebugSessionId>) -> HttpResult<StatusCode> {
let runner = debug_runner_ref()?;
let cancelled = debug_call(&runner, |reply| DebugRpcMessage::Cancel {
execute_id: session_uuid,
reply,
})
.await?;
if cancelled {
Ok(StatusCode::NO_CONTENT)
} else {
Err(StatusCode::NOT_FOUND)
}
}