Concurrency ใน Go ตอนที่ 2: Channel

ตอนที่ 2 ของบทความ “Concurrency ใน Go” เราจะมาพูดถึงวิธีการที่แนะนำให้ใช้สื่อสารกันระหว่างหลายๆ goroutine ซึ่งก็คือ Channel

Channel

แนวทางการสื่อสารสำหรับ concurrency ตาม best practice ก็คือ

“Do not communicate by sharing memory; instead, share memory by communicating.”

ดังนั้น ในการสื่อสารระหว่างหลาย routine ปกติเราจะไม่แก้ค่าในตัวแปรกลาง แต่จะใช้การรับส่งค่าผ่าน channel แทน

โดย channel จะเป็นตัวแปรแบบ typed ต้องกำหนดตั้งแต่เริ่มว่าตัวแปรที่จะส่งค่าเป็นประเภทไหน

  • การส่งค่าเข้าไปยัง channel เขียนแบบนี้ c <- v
  • การรับค่าจาก channel เขียนแบบนี้ <- c

โดยที่ c คือ channel, v คือค่าตัวแปร

ตัวอย่าง

func multiply(v1 int, v2 int, c chan int) {
  c <- v1 * v2
}
func main() {
  c := make(chan int)
  go multiply(1, 2, c)
  go multiply(3, 4, c)
  a := <- c
  b := <- c
  fmt.Println(a * b)
}

จะเห็นว่าไม่มีการใช้ WaitGroup แต่ก็ main ก็ไม่ได้จบไปก่อน นั่นก็เพราะว่า channel มีกลไก blocking อยู่แล้ว มันจะหยุดรออัตโนมัติเมื่อพยายามรับหรือส่งข้อมูล

  • บล็อกการส่งค่าเข้าใน channel จนกว่าค่าที่ค้างอยู่จะถูกรับออกไป
  • บล็อกการรับค่าจาก channel รอจนกว่าจะมีค่าส่งเข้ามา

หมายความว่า เราสามารถใช้ channel เพื่อการรอในลักษณะคล้ายๆ กับ WaitGroup ก็ได้ เช่น ใช้ channel bool เพื่อรอสัญญาณว่างานจบแล้ว

ready := make(chan bool)
go foo(ready)
<-ready

มันจะรอจนกว่า foo() จะส่งค่า true เข้ามาใน ready (ซึ่งจริงๆ จะส่ง false ก็ทำงานได้ แต่ตามหลักควรส่ง true)

สังเกตว่าเราไม่ได้รับค่าออกไปใช้เลย เราแค่รอให้มีการส่งค่าเข้ามาใน channel เฉยๆ

ในการใช้งาน channel ในการรับค่าเป็น stream ปกติแล้วเรามักจะวนรับค่าโดยใช้ for range

func producer(c chan int) {
  for i := 1; i <= 5; i++ {
    fmt.Println("Sending", i)
    c <- i
    time.Sleep(500 * time.Millisecond)
  }
  close(c)
}
func main() {
  c := make(chan int)
  go producer(c)

  for i := range c {
    fmt.Print("Receiving", i)
  }

  fmt.Println("Done")
}

close(c) คือการสั่งปิด channel เพื่อไม่ให้ลูปของเราค้างรอค่าไปเรื่อยๆ เนื่องจากในฝั่งผู้รับเราใช้ for range ซึ่งจะไม่จบลูปจนกว่า channel จะปิด (กรณีที่ลูปเป็น 1:n ก็ไม่จำเป็นต้องสั่ง close() เพราะมีจำนวนลูปที่แน่นอนอยู่แล้ว)

หมายเหตุ: ให้สั่งปิด channel ที่ฝั่งผู้ส่งเมื่อฝั่งผู้รับจำเป็นต้องรู้ว่าข้อมูลหมดแล้ว การสั่งที่ฝั่งผู้รับจะเสี่ยงเกิด Panic ถ้าฝั่งผู้ส่งยังพยายามส่งข้อมูล

Buffered Channel

โดย default channel จะไม่มีความจุ (capacity = 0) หมายความว่ารับส่งค่าได้ทีละค่า และมันจะบล็อกการทำงานอย่างที่บอกไป แต่เราสามารถกำหนดค่า capacity หรือ buffer ให้กับ channel ได้ เพื่อให้มันทำงานต่อโดยไม่บล็อกจนกว่า buffer จะเต็ม เราจะเรียก channel แบบนี้ว่า buffered channel

c := make(chan int, 3)

เลข 3 ที่เรากำหนดคือ capacity (buffer) ของ channel

ในกรณีนี้

  • เราจะส่งค่าให้ channel ได้ 3 ค่าโดยไม่มีการบล็อกแม้จะไม่มีการดึงค่าออกจาก channel เลย แต่จะบล็อกหลังจากการส่งค่าครั้งที่ 4
  • ไม่บล็อกการรับค่าจาก channel จนกว่าไม่มีค่าค้่างอยู่ใน buffer

สำหรับ unbuffered channel ก็เหมือนการับส่งของกันด้วยมือ ส่วน buffered channel จะเหมือนการมีช่องรับของเพิ่มมาให้ฝากของไว้ได้ มีประโยชน์ในเคสที่ผู้รับกับผู้ส่งทำงานด้วยความเร็วต่างกันชั่วคราว เช่น มี producer กับ consumer ที่มีโอกาสที่บางช่วงจะทำงานช้ากว่าหรือเร็วกว่ากัน ถ้าไม่มี buffer เลยก็จะต้องรอกันไปมา การใช้ buffer จะช่วยให้รันได้ต่อเนื่องลื่นไหลขึ้น

Channel Direction

ตอนที่ส่ง channel เข้าไปเป็น parameter ในฟังก์ชั่น เราสามารถกำกับทิศทางของ channel ได้ด้วย

  • ส่งได้อย่างเดียว: func producer(c chan<- int)
  • รับได้อย่างเดียว: func consumer(c <-chan int)

Select Statement

การใช้ select จะคล้ายๆ กับ switch คือควบคุมการทำงานโดยเช็คค่าเงื่อนไข แต่สำหรับ select จะใช้จัดการกับ channel

โดยที่ select จะบล็อกการทำงานไว้จนกว่าจะมี case ใดเคสหนึ่งเป็นจริง

สามารถใช้ในกรณีอย่างเช่น มี 2 channel ที่เราไม่รู้ว่าค่าจะมาลงที่ channel ไหน แต่เราสนใจแค่ channel ที่ได้รับค่าก่อน

func main() {
  c1 := make(chan string)
  c2 := make(chan string)

  go func() {
    time.Sleep(1 * time.Second)
    c1 <- "A"
  }()

  go func() {
    time.Sleep(2 * time.Second)
    c2 <- "B"
  }()

  select {
  case res := <-c1:
    fmt.Println("Received from c1", res)
  case res := <-c2:
    fmt.Println("Received from c2", res)
  }
}

select จะรอจนกว่าจะมีข้อมูลจาก c1 หรือ c2 (ซึ่งจากโค้ดก็คือ c1) และจะจบการทำงาน เพราะฉะนั้นเราจะเห็นข้อความแค่ “Received from c1”

หรือเราอาจจะใช้เลือกเคสที่มีการรับหรือส่งค่าก่อน เช่น

select {
case a := <- inChan:
  fmt.Println("Received a", a)
case outChan <- b:
  fmt.Println("Sent b", b)
}

นอกจากนั้นก็ยังมีเทคนิคที่ใช้กันบ่อย เช่น

1) การทำ timeout
สำหรับกรณีที่ไม่มีข้อมูลส่งมาในเวลาที่กำหนด เราจะใช้รูปแบบนี้

select {
case res := <-c1:
  fmt.Println("Received from c1", res)
case <- time.After(5 * time.Second)
  fmt.Println("Timeout")
}

2) การใช้ default
สำหรับกรณีที่จะไม่หยุดรอเลย เช่นในกรณีที่ channel ถูกบล็อกอยู่ เพื่อป้องกันไม่ให้โปรแกรมค้าง

select {
case res := <-c1:
  fmt.Println("Received from c1", res)
default:
  fmt.Println("No message")
}

3) ใช้เพื่อหยุดการทำงาน

select {
case res := <-c1:
  fmt.Println("Received from c1", res)
case <-abortChan
  fmt.Println("Abort")
  return
}

Leave a Reply