Создаем пул воркеров в GO
Но подождите, что такое пул воркеров?
Это шаблон проектирования, используемый в параллельном программировании для управления группой рабочих потоков/горутин (объяснение разницы между потоками и горутинами выходит за рамки данного поста), которые могут быть использованы для выполнения задач или обработки рабочих нагрузок.
В пуле воркеров заранее создается и поддерживается фиксированное количество рабочих горутин, готовых к обработке поступающих задач. Вместо того чтобы создавать новую горутину для каждой задачи, задачи назначаются доступным рабочим горутинам из пула. После выполнения задания рабочая горутина возвращается в пул и может быть назначена на новое задание.
https://t.me/Golang_google – обучающий телеграм канал для Golang разработчиков
Теперь, когда у нас есть представление о том, что мы хотим решить с помощью этого, давайте создадим его:
1 – Предположим, что у вас уже установлен GO, и вы уже создали новый проект golang.
Давайте создадим тест:
cd workers_pool
touch workers_pool_test.go
2 – Добавьте следующий код
package main
import (
"testing"
)
type TestJob struct {
Id int
Result chan int
}
func (j *TestJob) Process() error {
j.Result <- 1
return nil
}
func (j *TestJob) ID() int {
return j.Id
}
func TestNewWorkerPool(t *testing.T) {
wp := NewWorkerPool(3, 5)
expected := 10
wp.Start()
c := make(chan int, expected)
for i := 1; i <= expected; i++ {
task := &TestJob{Id: i, Result: c}
wp.AddJob(task)
}
result := 0
for v := range c {
result += v
if result == expected {
close(c)
}
}
if result != expected {
t.Fatalf("expected 10, got %d", result)
}
}
Давайте посмотрим, что мы сделали:
wp := NewWorkerPool(3, 5)
Мы создаем пул, в котором будет 3 рабочих, а также очередь из 5 заданий, затем запускаем его и добавляем задания в пул.
Все остальное в тесте просто для тестирования, мы определяем TestJob, который будет содержать буферизованный канал, чтобы убедиться, что все задания были выполнены, и мы можем проверить тест.
Если вы запустите этот код:
go test -v -race ./...
Выдаст ожидаемую ошибку:
./workers_pool_test.go:22:8: undefined: NewWorkerPool
FAIL github.com/<your-ghuser>/workers_pool [build failed]
3 – Затем создадим NewWorkerPool:
package main
import (
"fmt"
"sync"
)
type Job interface {
ID() int
Process() error
}
type WorkersPool struct {
numWorkers int
jobQueue chan Job
wg *sync.WaitGroup
quitCh chan struct{}
}
func NewWorkerPool(numWorkers, queueSize int) *WorkersPool {
return &WorkersPool{
numWorkers: numWorkers,
jobQueue: make(chan Job, queueSize),
quitCh: make(chan struct{}),
wg: &sync.WaitGroup{},
}
}
func (wp *WorkersPool) AddJob(task Job) {
wp.jobQueue <- task
}
func (wp *WorkersPool) Start() {
// Start the worker goroutines
for i := 1; i <= wp.numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
go func() {
wp.wg.Wait()
}()
fmt.Println("Worker pool started")
}
func (wp *WorkersPool) Stop() {
fmt.Println("Worker pool stopped")
close(wp.jobQueue)
close(wp.quitCh)
}
func (wp *WorkersPool) worker(id int) {
defer wp.wg.Done()
for {
select {
case task, ok := <-wp.jobQueue:
if !ok {
fmt.Printf("Worker %d shutting down\n", id)
return
}
err := task.Process()
if err != nil {
fmt.Printf("Worker %d, ERROR processing task: %d\n", id, task.ID())
}
fmt.Printf("Worker %d processing task: %d\n", id, task.ID())
case <-wp.quitCh:
fmt.Printf("Worker %d stopped\n", id)
wp.Stop()
return
}
}
}
В приведенном выше коде мы видим публичные вызовы Start и Stop, которые довольно очевидно, что они делают, но давайте объясним их, Start – это создание грорутин для каждого воркера и добавление ожидания для каждого из них, а после создания рабочего мы открываем еще одну горутину для ожидания выполнения, а Stop – это просто закрытие открытых каналов.
Важной частью логики является случай переключателя, в котором мы слушаем выполнение заданий, что будет происходить каждый раз, когда новое задание будет добавлено в AddJob.
4 – Проведите тест еще раз:
go test -v -race ./...
На этот раз нас ждет успех:
=== RUN TestNewWorkerPool
Worker pool started
Worker 3 processing task: 1
Worker 2 processing task: 2
Worker 3 processing task: 5
Worker 3 processing task: 6
Worker 3 processing task: 7
Worker 1 processing task: 3
Worker 2 processing task: 4
Worker 1 processing task: 9
Worker 2 processing task: 10
Worker 3 processing task: 8
--- PASS: TestNewWorkerPool (0.00s)
PASS
ok github.com/<your-ghuser>/workers_pool 0.685s
Вот и все :), у нас есть пул работников в GO.
Заключительные замечания
Обратите внимание, что мы запускаем тесты с -race, это потому, что мы хотим быть уверены, что наш код безопасен с точки зрения мульти-гороутина, и у нас нет условий гонки.
Это простой пул воркеров, использующий память во время выполнения без какого-либо поведения персистентности, в будущем я создам новый пост с использованием Redis, чтобы сделать что-то подобное тому, что делает sidekiq для ruby.