Worker Pool Pattern,就。很。Pool。
Worker Pool Pattern,就。很。Pool。

Worker Pool Pattern,就。很。Pool。

Tags
Golang
Hey! Go Design Patterns
ithome 2021 ironman
Date
Sep 9, 2021

什麼是 Worker Pool Pattern?

設定好 pool 的 goroutine 數量,預先啟動多個 goroutine,把 job 傳給這些 goroutine 執行
與 Thread-Per-Message Pattern 類似,都是將 message 或 job 傳給 goroutine 執行的 pattern,不同的是:
  • Thread-Per-Message Patternt 當接收到 message 的時候啟動 goroutine
  • Worker Pool Pattern 是預先啟動好 goroutine,稱之為 pool
預先啟動好的 goroutine 可以先做一些前置動作,例如 DB 連線、與其他 service 的 socket 連線等,可避免收到 job 時才開始執行這些動作導致速度緩慢,或者無法控制 goroutine 數量導致系統崩潰。

問題情境

設計一個推播新聞系統,會將新的新聞直接推播出去,我們希望推播系統效率要高,並且每次推播都會跟某 service 建立 socket 拿取資料,如圖:
notion image
圖中socket 連線屬於耗時的 IO 行為,每個 goroutine 都連線 socket 會導致初始化過慢,並且 goroutine 一多起來,socket 連線過多會導致連線損壞。
設計有問題的程式碼如下:
(相關的 code 在Github - go-design-patterns。)
package main import ( "fmt" "time" ) func PushNews(news string, startTime time.Time) <-chan time.Time { newsCh := make(chan time.Time) go func() { time.Sleep(time.Duration(10 * time.Second)) //模擬與某service建立socket的時間 time.Sleep(time.Duration(3 * time.Second)) //模擬推播運行的時間 fmt.Printf("%s cost %s\n", news, time.Since(startTime)) newsCh <- time.Now() }() return newsCh } func main() { start := time.Now() allNews := []string{ "中秋節來了", "記得", "不要戶外烤肉~", "也不要吃太撐", } newsChs := []<-chan time.Time{} for _, news := range allNews { newsChs = append(newsChs, PushNews(news, start)) } // do something for index, newsCh := range newsChs { fmt.Printf("news %d is sent at %s\n", index, <-newsCh) } }
time.Sleep(time.Duration(10 * time.Second))會導致每個 goroutine 都花了 10 秒來練線 socket,導致運行緩慢。

解決方式

如果可以:
  • 控制 goroutine 數量
  • 預先連線 socket
即可避免 goroutine 接到 job 初始化過慢的問題,程式碼如下:
package main import ( "fmt" "time" ) type SendInfo struct { NewsName string FinishTime time.Time } func Worker(id int, jobs <-chan string, results chan<- SendInfo, startTime time.Time) { time.Sleep(time.Duration(10 * time.Second)) //模擬與某service建立socket的時間 for job := range jobs { time.Sleep(time.Duration(3 * time.Second)) //模擬推播運行的時間 fmt.Printf("%s cost %s by worker %d\n", job, time.Since(startTime), id) results <- SendInfo{job, time.Now()} } } func main() { start := time.Now() allNews := []string{ "中秋節來了", "記得", "不要戶外烤肉~", "也不要吃太撐", } jobs := make(chan string, len(allNews)) results := make(chan SendInfo, len(allNews)) for w := 1; w <= 2; w++ { go Worker(w, jobs, results, start) } for _, news := range allNews { jobs <- news } // do something for r := 1; r <= len(allNews); r++ { result := <-results fmt.Printf("news %s is sent at %s\n", result.NewsName, result.FinishTime) } }
  • for w := 1; w <= 2; w++在 pool 創建了三個 goroutine,稱之為 worker,他們會先初始化 socket 連線,讓後續有 job 傳入時不需要再執行一次 socket 連線
  • for _, news := range allNews即開始送 job 給 worker,由於 worker 已經初始化,運行不會被 socket 初始化拖慢
  • 控制 worker 的數量為三個,使 socket 連線數是可控的
概念如圖:
notion image
運行結果如圖:
notion image
如此一來每次運行 goroutine 就不用做 socket 連線,節省了許多時間。
不過,由於 pool 只有運行三個 goroutine,而 news 有四個,所以第四個也不要吃太撐news 會在 pool 都處理完前三個 news 後再執行。Worker Pool Pattern 可以限制 pool 的 goroutine 數量,以避免系統負載過大,但也需要考慮 pool 是否過小,導致 news jobs 常常需等待 pool 的情形。