Преимущества многократно используемых потоков в асинхронном программировании на Golang
В этой статье я расскажу о проблемах масштабирования (когда нужно обработать миллиарды вызовов), которые мы могли бы решить, используя «пул потоков». Согласно Википедии, пул потоков — это шаблон проектирования программного обеспечения для достижения параллелизма выполнения в компьютерной программе.
Пул потоков позволяет повторно использовать рабочие потоки для оптимального программирования в асинхронном режиме. Начнем с этого примера:
type StructExample struct { /* Stores the object ID*/
objId *int /* channel that helps to communicate error messages*/
sErr chan error
/* A callback function pointer, which can store the user defined
executable code*/
callback *func(chan error, *int)
}
// ------------------------------------------------
func main() {
UpperBound := 1000000000 /* 1 billion functional cbs in flight*/
/* Create buffered channel */
cErr := make(chan error, UpperBound)
defer close(cErr)
for ii :=1 ; ii <= UpperBound; ii++ {
id := ii
sObj := StructExamplePtr(&id, nil, cErr)
callback := func(err chan error, id_ *int) {
/* HARDCODED "wait" FOR PERFORMANCE COMPARISON */
wait := 1
log.Println("Reporting from callback function with id: ", *id_, " Waiting for ", wait, " seconds." )
time.Sleep(time.Duration(wait) * time.Second)
err <- fmt.Errorf("Done with this struct id: %d", *id_)
}/*callback function ends*/
for ii :=1 ; ii <= UpperBound; ii++ {
c := <- cErr
log.Println("Waiting for struct to end: ", c)
}
return
}
Ожидалось, что это вызовет нагрузку на ЦП, когда одновременно обрабатывается 1 миллиард горутин.
На приведенном выше изображении показано, какую нагрузку это может вызвать в системе, если у нас нет никакого контроля над количеством потоков. Следовательно, приложение не может масштабироваться, потому что при 2х миллиардах запросов код вызовет конкуренцию за ЦП.
Следовательно, становится важным установить верхние границы для запросов, которые будут отправлены на ЦП для обработки, и поэтому давайте реализуем пул потоков в коде.
type ThreadPool struct {
/*ThreadPool needs a queue to store all the incoming requests*/
waitQueue []AsyncProgInterface
/*Multiple goroutines will be pushing and popping hence, mutexes are must to maintain consistency*/
mutex *sync.Mutex
/*Number of reusable worker threads*/
numWorkerThreads int
/*just a utilisation counter*/
threadUsed int
/* this map will store worker threads. Key: thread_id */
threadsAttr map[int]*ThreadInfo
/* Array of thread ids, consumed by scheduler function */
threadIdsArr []int
}
/* Just a Constructor*/
func NewThreadPool(numWorkerThreads int) *ThreadPool {
tp := new(ThreadPool)
tp.waitQueue = []AsyncProgInterface{}
tp.numWorkerThreads = numWorkerThreads
tp.mutex = &sync.Mutex{}
tp.threadsAttr = map[int]*ThreadInfo{}
tp.threadIdsArr = make([]int, numWorkerThreads)
tp.threadUsed = 0
for ii := 0 ; ii < numWorkerThreads; ii++ {
th := NewThread()
tp.threadsAttr[th.Id] = th
tp.threadIdsArr[ii] = th.Id
}
return tp
}
/* Typical Queue POP operation */
func (tp *ThreadPool) Pop() AsyncProgInterface {
tp.mutex.Lock()
defer tp.mutex.Unlock()
size := len(tp.waitQueue)
if size > 0 {
item := tp.waitQueue[size-1]
tp.waitQueue = tp.waitQueue[:size - 1]
return item
}
return nil
}
/* Typical Queue PUSH operation */
func (tp *ThreadPool) Push(item AsyncProgInterface) {
tp.mutex.Lock()
defer tp.mutex.Unlock()
tp.waitQueue = append(tp.waitQueue, item)
}
// Round Robin Scheduler
func (tp *ThreadPool) RunSchedulerInBackground() {
idx := 0
for {
// Polling the QUEUE for any request to be processed
if len(tp.waitQueue) > 0 {
// If there is a request, check for available thread
thid := tp.threadIdsArr[idx]
threadObject := tp.threadsAttr[thid]
if threadObject.IsBusy() == false {
item := tp.Pop()
tp.mutex.Lock()
threadObject.SetBusy()
tp.threadUsed += 1
tp.mutex.Unlock()
go item.Execute(threadObject.Id)
log.Println("Thread allocated. Current thread worker threads used: ", tp.threadUsed)
} /* end if threadObject.IsBusy() */
} /* end if len(tp.waitQueue) */
idx++
if idx >= tp.numWorkerThreads {
idx = 0
}
}
}
/* It is Critical to free the thread worker, so that it can work on the next item of the wait Queue*/
func (tp *ThreadPool) FreeThread(thread_id int) {
tp.mutex.Lock()
defer tp.mutex.Unlock()
log.Println("Releasing the thread id...", thread_id)
tp.threadsAttr[thread_id].SetFree()
tp.threadUsed -= 1
}
В коде у нас есть пул потоков с очередью ожидания, в которую мы можем вызывать колбэки наших функций, и планировщик будет вызывать запрос всякий раз, когда рабочий поток не занят.
type ThreadInfo struct {
Busy bool
Id int
}
func NewThread() *ThreadInfo {
th := new(ThreadInfo)
th.Id = randomVal(99999, 9999) /*assign a thread id between this range*/
th.Busy = false
return th
}
func (th *ThreadInfo) IsBusy() bool {
return th.Busy
}
func (th *ThreadInfo) SetBusy() {
th.Busy = true
return
}
func (th *ThreadInfo) SetFree() {
th.Busy = false
return
}
Теперь давайте вызовем пул потоков в колбэке :
func InvokeCallback(id int, cErr chan error, tp *ThreadPool) {
sObj := StructExamplePtr()
sObj.SetErrChan(cErr)
sObj.SetObjectIdInt(&id)
sObj.AllocateThreadPool(tp)
callback := func(err chan error, id_ *int, thread_id int) {
log.Println("Assigned thread id: ", thread_id, " to object id: ", *id_)
if err == nil {
log.Fatalf("Error channel is not declared.")
}
if id_ == nil {
log.Fatalf("Int object id is nil.")
}
if tp == nil {
log.Fatalf("Thread pool is nil.")
}
wait := 1
log.Println("Reporting from callback function with id: ", *id_, " Waiting for ", wait, " seconds.")
time.Sleep(time.Duration(wait) * time.Second)
tp.FreeThread(thread_id)
err <- fmt.Errorf("Done with this struct id: %d", *id_)
}
err := sObj.SetCallBack(&callback)
if err != nil {
fmt.Println("SetCallBack failed with err: ", err)
return
}
tp.Push(sObj) /* <---- After forming cb function, push it to the thread pool queue for processing */
}
Наконец посмотрим функциональный блок main().
func main() {
rand.Seed(time.Now().UnixNano())
UpperBound := 1000000000
cErr := make(chan error, UpperBound)
defer close(cErr)
/*
In this thread pool, 100 reusable threads with unique ids are created.
*/
threadpool := NewThreadPool(100) /* Thread pool initialisation with worker threads*/
go threadpool.RunSchedulerInBackground() /* Kick start the schduler in background*/
for ii := 1; ii <= UpperBound; ii++ {
InvokeCallback(ii, cErr, threadpool)
}
for ii := 1; ii <= UpperBound; ii++ {
c := <-cErr
log.Println(ii , " Waiting for struct to end: ", c)
}
return
}
После запуска программы с тем же 1 миллиардом вызовов, но со 100 рабочими потоками одновременно, процессор выглядит намного лучше.
а) так как в данный момент работает только 100 рабочих потоков, это увеличит общее время выполнения программы, но в конечном итоге защитит ЦП от конфликтов,
б) так как теперь я храню 1 миллиард запросов вызовов в памяти, то это будет оказывать давление на память.
В приведенном ниже выводе вы можете увидеть, как потоки «логически» повторно используются в программе.
$ grepping one of the worker thread ids in log output
2022/02/24 08:50:04 Assigned thread id: 48237 to object id: 39496703
2022/02/24 08:50:05 Releasing the thread id... 48237
2022/02/24 08:50:05 Assigned thread id: 48237 to object id: 43788562
2022/02/24 08:50:06 Releasing the thread id... 482372022/02/24 08:51:19 Assigned thread id: 48237 to object id: 161590479
2022/02/24 08:51:20 Releasing the thread id... 48237
2022/02/24 08:51:20 Assigned thread id: 48237 to object id: 161766419
2022/02/24 08:51:21 Releasing the thread id... 48237
2022/02/24 08:51:21 Assigned thread id: 48237 to object id: 161911140
2022/02/24 08:51:22 Releasing the thread id... 48237
2022/02/24 08:51:22 Assigned thread id: 48237 to object id: 162184318
2022/02/24 08:51:23 Releasing the thread id... 48237
Наконец, в заключение я успешно реализовал пул потоков и сделал программу более масштабируемой в реальном мире. ВАШЕ ЗДОРОВЬЕ!!!!