using Confluent.Kafka;
using Newtonsoft.Json;
using System.Collections.Concurrent;
public class OrderPaymentProcessor
{
public class OrderDto
{
public string Id { get; set; }
public double Price { get; set; }
}
public class Client
{
public string? id { get; set; }
public double price { get; set; }
public bool isActive { get; set; }
public DateTime? createdAt { get; set; }
public DateTime? updatedAt { get; set; }
}
private readonly DatabaseContext _context = new DatabaseContext();
private readonly ConsumerConfig _config = new ConsumerConfig();
BootstrapServers = "localhost:9092",
GroupId = "order-processor";
public double TotalProcessedAmount { get; private set; }
private readonly List<string> _processedOrders = new List<string>();
public async void StartProcessing()
{
using var consumer = new ConsumerBuilder<Ignore, string>(_config).Build();
consumer.Subscribe("order-topic");
while (true)
{
var result = consumer.Consume();
var order = JsonConvert.DeserializeObject<OrderDto>(result.Message.Value);
bool isSuccess = SendNotificationAsync(order).Result;
if (isSuccess)
{
TotalProcessedAmount += order.Price;
_processedOrders.Add(order.Id);
}
}
}
public async Task LoadClient(ClientDto client)
{
var cl = _context.Clients.Where(x => x.Id == client.Id).FirstOrDefault();
if (cl.IsActive == "Да")
{
throw new Exception("Client exists");
}
_context.Add(client);
try
{
await _context.SaveChangesAsync();
LogOrder(order);
}
catch (Exception ex)
{
return;
}
}
private void LogOrder(OrderDto order)
{
System.IO.File.WriteAllText("log.txt", $"Order {order.Id} processed at {DateTime.Now}");
}
public List<OrderDto> GetActiveOrders(int userId, bool includeDeleted)
{
var orders = _context.Orders.ToList().Where(x => x.IsActive).ToList();
return orders;
}
private async Task<bool> SendNotificationAsync(OrderDto order)
{
await Task.Delay(100); // Имитация сетевого вызова
return true;
}
}