Fan-Out Fan-In Pattern,看吧世界!這就是多人解決的力量!
Fan-Out Fan-In Pattern,看吧世界!這就是多人解決的力量!

Fan-Out Fan-In Pattern,看吧世界!這就是多人解決的力量!

Tags
Golang
Hey! Go Design Patterns
ithome 2021 ironman
Date
Sep 7, 2021

什麼是 Fan-Out Fan-In Pattern?

將 input 由一個 producer 分發多個 goroutine 運行,再將多個 task goroutine 運行的結果由一個 consumer 收集資料合併為 output
如果程式的有著複雜的計算或者多個 IO 運行,可以將這些運行分發給 task goroutine 執行,使 task 執行更快速,在統一收集繼續下個流程。其中分發與收集的行為又被稱為 Fan Out、Fan In:
  • Fan Out: input 傳入 producer 後開啟多個 goroutine 運行,直到 producer 不再接收 input,就像是分發任務一般,所以稱為 Fan Out
  • Fan In: 由多個 input 傳入 consumer 後合併為 output 傳出,直到 consumer 不再接收 input,就像是收集資料一般,所以稱為 Fan In
notion image

問題情境

設計一個新聞資訊網頁系統,需要從 A、B、C server 拿取資料,這些資料都沒有順序性,純粹是要都顯示在網頁上而已,所以如果 A 資料拿完再拿 B,這樣就太浪費時間了。可以同時拿取 A、B、C server 的資料加快取資料的速度。
實作有問題的系統如下, A、B、C server 透過GetServerData()拿取資料,再透過ShowNews顯示新聞資料:
package main import ( "fmt" "math/rand" "time" ) func GetServerData(serverName string) string { time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模擬取得server data消耗的時間 return fmt.Sprintf("%s server data", serverName) } func ShowNews(news ...interface{}) { fmt.Println(news...) } func main() { start := time.Now() responseByServerA := GetServerData("A") responseByServerB := GetServerData("B") responseByServerC := GetServerData("C") ShowNews(responseByServerA, responseByServerB, responseByServerC) fmt.Printf("cost %s", time.Since(start)) }
會發現因為拿取資料無法並行,所以耗時較久
notion image

解決方式

實作Producer()Task()Consumer()來分別分發任務執行任務收集資料
package main import ( "fmt" "math/rand" "sync" "time" ) func Producer(serverNames ...string) <-chan string { producerCh := make(chan string, len(serverNames)) go func() { defer close(producerCh) for _, serverName := range serverNames { producerCh <- serverName } }() return producerCh } func Task(producerCh <-chan string) <-chan string { taskCh := make(chan string) go func() { defer close(taskCh) for serverName := range producerCh { taskCh <- GetServerData(serverName) } }() return taskCh } func Consumer(taskChs ...<-chan string) <-chan string { consumerCh := make(chan string) var wg sync.WaitGroup wg.Add(len(taskChs)) go func() { wg.Wait() close(consumerCh) }() for _, task := range taskChs { go func(task <-chan string) { defer wg.Done() for new := range task { consumerCh <- new } }(task) } return consumerCh } func GetServerData(serverName string) string { time.Sleep(time.Duration(rand.Intn(3)) * time.Second) //模擬取得server data消耗的時間 return fmt.Sprintf("%s server data", serverName) } func ShowNews(news ...interface{}) { fmt.Println(news...) } func main() { start := time.Now() producerCh := Producer("A", "B", "C") task1 := Task(producerCh) task2 := Task(producerCh) task3 := Task(producerCh) consumerCh := Consumer(task1, task2, task3) for new := range consumerCh { ShowNews(new) } fmt.Printf("cost %s", time.Since(start)) }
程式碼較長,重點如下:
  • 將需要拿取資料的 server 名稱傳遞給Producer()後,Producer()會創建一個 Channel 來分發任務,所以需再將此 Channel 傳給Task()使其 goroutine 獲得任務
  • Task()獲得任務開始執行後,也會產生各自的 Channel 用來傳遞 server 的資料,所以需再將此 Channel 送至Consumer()
  • Consumer()獲得所有Task()的 Channel 後,會在啟動相對數量的 gorotine 合併資料至consumerCh{},為了要確保資料取得完畢後關閉consumerCh{},需透過sync.WaitGroup{}來取得close(consumerCh)的時機,時機的邏輯如下:
    • wg.Add()會加入需等待的數目,這邊輸入 goroutine 的數量
    • wg.Done()會減去需等待的數目
    • wg.Wait()會使程式等待,等待至sync.WaitGroup{}等待數目被減去至 0 時才會繼續執行
    • 所以將wg.Done()都安排在讀取完Task()Channel 後,就可以確保讀完資料再close(consumerCh)
  • for new := range consumerCh會讀取 Channel 資料,直到close(consumerCh)後跳脫 for 迴圈
執行後由於取的資料可以同時執行因此加快了執行速度:
notion image