/*
Есть некий сервис, который принимает задачи на обработку. Клиент передаёт на вход объект с описанием задачи.
Получив задачу, мы кладём её в очередь. Задача запускается в обработку (имитируем полезную работу через time.Sleep(5*time.Second)), если есть свободный обработчик. Как только обработчик освободился — берёт следующую задачу из очереди. Если в очереди пусто, ждём новых задач от клиентов.
Сервис одновременно может обрабатывать не более N задач. Остальные помещаются в очередь.
— клиенту нужен результат выполнения. Submit должен возвращать канал, в который придёт результат задачи, после чего канал закроется.
*/
type Task struct {
ID int
}
type TaskResult struct {
ID int
Value int
Err erro
}
type PoolConfig struct {
WorkerCount int
}
func (p PoolConfig) Validate() error {
if p.WorkerCount < 1 {
return error.New("Ivalid pool config")
}
return nil
}
func process(ctx context.Context, task Task) TaskResult {
select {
}
}
type WorkerPool struct {
Config PoolConfig
TasksCh chan Tasks
ResultsCh chan Result
Wg sync.WaitGroup
Ctx context.Context
Cancel func
}
func NewWorkerPool(pCtx context.Context, config PoolConfig) (*WorkerPool, error) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("can't run worerk pool: %w", err)
}
wp := WorkerPool{
Confing: config,
TasksCh: make(chan Task),
ResultsCh: make(chan Result)
Wg: sync.WaitGroup{}
}
ctx, cancel := context.WithCancel(pCtx)
wp.Ctx = ctx
wp.Cancel = cancel
worker := func(ctx context.Context) {
defer wp.Wg.Done()
for {
select {
case <- ctx.Done():
return
case task, ok <- tasksCh:
if !ok {
return
}
result := process(ctx, task)
select {
case resultCh <- task:
case <-ctx.Done()
}
}
}
}
for i := 0; i < config.CountWorkers; i++ {
wp.Wg.Add(1)
go worker()
}
return &wp, nil
}
func (wp *WorkerPool) AppendTask(task Task) {
taskResCh := make(chan TaskResult)
defer close(taskResCh)
sel
select
}
func main () {
wp := NewWorkerPool(context.Background(), PoolConfig{WorkerCount: 5})
wp.ResultsCh
}