Создаем пул воркеров в GO

Но подождите, что такое пул воркеров?

Это шаблон проектирования, используемый в параллельном программировании для управления группой рабочих потоков/горутин (объяснение разницы между потоками и горутинами выходит за рамки данного поста), которые могут быть использованы для выполнения задач или обработки рабочих нагрузок.
В пуле воркеров заранее создается и поддерживается фиксированное количество рабочих горутин, готовых к обработке поступающих задач. Вместо того чтобы создавать новую горутину для каждой задачи, задачи назначаются доступным рабочим горутинам из пула. После выполнения задания рабочая горутина возвращается в пул и может быть назначена на новое задание.

https://t.me/Golang_google – обучающий телеграм канал для Golang разработчиков

Теперь, когда у нас есть представление о том, что мы хотим решить с помощью этого, давайте создадим его:

1 – Предположим, что у вас уже установлен GO, и вы уже создали новый проект golang.
Давайте создадим тест:
cd workers_pool
touch workers_pool_test.go

2 – Добавьте следующий код
package main

import (
 "testing"
)

type TestJob struct {
 Id     int
 Result chan int
}

func (j *TestJob) Process() error {
 j.Result <- 1
 return nil
}

func (j *TestJob) ID() int {
 return j.Id
}

func TestNewWorkerPool(t *testing.T) {
 wp := NewWorkerPool(3, 5)
 expected := 10

 wp.Start()

 c := make(chan int, expected)

 for i := 1; i <= expected; i++ {
  task := &TestJob{Id: i, Result: c}
  wp.AddJob(task)
 }

 result := 0
 for v := range c {
  result += v
  if result == expected {
   close(c)
  }
 }

 if result != expected {
  t.Fatalf("expected 10, got %d", result)
 }
}

Давайте посмотрим, что мы сделали:

wp := NewWorkerPool(3, 5)

Мы создаем пул, в котором будет 3 рабочих, а также очередь из 5 заданий, затем запускаем его и добавляем задания в пул.
Все остальное в тесте просто для тестирования, мы определяем TestJob, который будет содержать буферизованный канал, чтобы убедиться, что все задания были выполнены, и мы можем проверить тест.

Если вы запустите этот код:

go test -v -race ./...

Выдаст ожидаемую ошибку:

./workers_pool_test.go:22:8: undefined: NewWorkerPool
FAIL github.com/<your-ghuser>/workers_pool [build failed]

3 – Затем создадим NewWorkerPool:
package main

import (
   "fmt"
   "sync"
)

type Job interface {
 ID() int
 Process() error
}

type WorkersPool struct {
   numWorkers int
   jobQueue   chan Job
   wg         *sync.WaitGroup
   quitCh     chan struct{}
}

func NewWorkerPool(numWorkers, queueSize int) *WorkersPool {
   return &WorkersPool{
      numWorkers: numWorkers,
      jobQueue:   make(chan Job, queueSize),
      quitCh:     make(chan struct{}),
      wg:         &sync.WaitGroup{},
   }
}

func (wp *WorkersPool) AddJob(task Job) {
   wp.jobQueue <- task
}

func (wp *WorkersPool) Start() {
   // Start the worker goroutines
   for i := 1; i <= wp.numWorkers; i++ {
      wp.wg.Add(1)
      go wp.worker(i)
   }

   go func() {
      wp.wg.Wait()
   }()

   fmt.Println("Worker pool started")
}

func (wp *WorkersPool) Stop() {
   fmt.Println("Worker pool stopped")
   close(wp.jobQueue)
   close(wp.quitCh)
}

func (wp *WorkersPool) worker(id int) {
   defer wp.wg.Done()
   for {
      select {
      case task, ok := <-wp.jobQueue:
         if !ok {
            fmt.Printf("Worker %d shutting down\n", id)
            return
         }
         err := task.Process()
         if err != nil {
            fmt.Printf("Worker %d, ERROR processing task: %d\n", id, task.ID())
         }
         fmt.Printf("Worker %d processing task: %d\n", id, task.ID())
      case <-wp.quitCh:
         fmt.Printf("Worker %d stopped\n", id)
         wp.Stop()
         return
      }
   }
}

В приведенном выше коде мы видим публичные вызовы Start и Stop, которые довольно очевидно, что они делают, но давайте объясним их, Start – это создание грорутин для каждого воркера и добавление ожидания для каждого из них, а после создания рабочего мы открываем еще одну горутину для ожидания выполнения, а Stop – это просто закрытие открытых каналов.

Важной частью логики является случай переключателя, в котором мы слушаем выполнение заданий, что будет происходить каждый раз, когда новое задание будет добавлено в AddJob.

4 – Проведите тест еще раз:
go test -v -race ./...

На этот раз нас ждет успех:

=== RUN   TestNewWorkerPool
Worker pool started
Worker 3 processing task: 1
Worker 2 processing task: 2
Worker 3 processing task: 5
Worker 3 processing task: 6
Worker 3 processing task: 7
Worker 1 processing task: 3
Worker 2 processing task: 4
Worker 1 processing task: 9
Worker 2 processing task: 10
Worker 3 processing task: 8
--- PASS: TestNewWorkerPool (0.00s)
PASS
ok      github.com/<your-ghuser>/workers_pool        0.685s

Вот и все :), у нас есть пул работников в GO.

Заключительные замечания

Обратите внимание, что мы запускаем тесты с -race, это потому, что мы хотим быть уверены, что наш код безопасен с точки зрения мульти-гороутина, и у нас нет условий гонки.

Это простой пул воркеров, использующий память во время выполнения без какого-либо поведения персистентности, в будущем я создам новый пост с использованием Redis, чтобы сделать что-то подобное тому, что делает sidekiq для ruby.

+1
0
+1
0
+1
0
+1
0
+1
1

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *