主催をしている勉強会でGoのConcurrencyにまつわる話をしました。
最近、Concurrency in Goという本を読んだことに触発され、「並行処理はサラッとこんな感じでやるといい感じになるのでは?」というTipsを本書よりチョイスしてスライドに落とし込みました。
10分のLT枠なのにリハーサルをしたところ30分かかることが判明したりしましたが、なんとか無事30分かけてLTできました。
何を伝えたかったのか?
Goの肝であるConcurrencyを調べると面白いよってことを聞いていただいた方に伝えたかったのです。
例えば、goroutineのメモリ確保に関する挙動とか調べてて「おもしれー!」ってなりましたし、今回では話しませんでしたがatomicパッケージとかも試してみてCASとかそっちのことを調べてLock-freeの概念とかを知ることができましたし、色々と学ぶ起点になると思います。
補遺
それではLT内ではサッと解説するに留めていたので、当記事で改めて説明をします。
ConcurrencyとParallelismについて
GoでのConccurencyを語る起点として、この記事をまずは読んだほうがいいでしょう。
Concurrency is not parallelism - The Go Blog
これはHerokuのWazaというConference内でRob Pike氏が語られた内容の一部が記載されています。
スライドもあるのでこっちも読むと良い。
LT内でうまく皆さんに伝わったかどうか不安だけども、言いたかったことは
- Concurrencyは並行処理可能な構成、のこと
- Parallelismは並行処理という行動、のこと
Concurrencyにおける危険性
未然に防がなければいけない危険性として簡単な例としてDeadLockを挙げましたが、正直な所そこまで気にしなくてもテストがっつり書いてると気付けるレベルなので危険度としては低い。
スライド内に書いたLiveLockやらResourceStarvationなど、「特定の条件」が揃った時のみ発生する系の方がよっぽど危険ではある。
あとはそういったことにどれだけ気をつけていても、やはり問題が起こってしまうことがあるので、そのような場合の対処方法も最初に紹介した書籍内で説明はされている。
Basic Concurrency Pattern
書籍内で幾つかパターンを紹介しているのですが、今回は簡単な3つについて説明しました。
- Confinement
- Preventing Goroutine Leaks
- Heartbeats
Confinement
簡単な話で、レキシカルスコープを有効に使いましょうという話です。
package main import "fmt" func main() { loopData := func(handleData chan<- int) { defer close(handleData) data := []int{1, 2, 3, 4} for i := range data { handleData <- data[i] } } handleData := make(chan int) go loopData(handleData) for num := range handleData { fmt.Println(num) } }
data
をmainに置くのではなく、loopDataに内包することでクリティカルセクションを無くします。
Preventing Goroutine Leaks
goroutineのメモリリークの話です。
うっかりnil channelを渡してしまい、ブロッキングによりリークするパターンが題材です。
今回のスライドで使った例は様々なものを省いたので、ある程度処理っぽいものを改めて書き直しました。
package main import ( "fmt" "time" ) func main() { strChan := make(chan string) interval := time.Tick(1 * time.Second) completed1 := doWork(nil, 3) // Leak goroutine completed2 := doWork(strChan, 5)// Not leak goroutine for { select { case <-interval: strChan<- "foo" case _, ok := <-completed1: if ok == false { fmt.Println("Completed work: 1") return } case _, ok := <-completed2: if ok == false { fmt.Println("Completed work: 2") return } } } } func doWork(strings <-chan string, limit int) <-chan interface{} { completed := make(chan interface{}) go func() { defer fmt.Println("doWork exited.") defer close(completed) var count int for { select { case s := <-strings: if count == limit { return } fmt.Println(s) count++ } } }() return completed }
limit
まで string channel
を受け取り、出力するプログラムです。
LeakするgoroutineとLeakしないgoroutineの2種類を用意して実行しています。
nil channel
を引数として渡してしまっているため、本来なら3回の出力でプログラムが終了するところが、5回も出力してしまっています。
このことから、 nil channel
が for-select
内でブロッキングしていることが分かりますね。
なのでこんな風に親から子に終了を告げるchannelを伝搬させれば良い。
package main import ( "fmt" "time" ) func main() { doneChan := make(chan interface{}) strChan := make(chan string) interval := time.Tick(1 * time.Second) timeout := time.After(5 * time.Second) completed := doWork(doneChan, nil, 3) // Leak goroutine for { select { case <-timeout: doneChan <- struct{}{} case <-interval: strChan <- "foo" case _, ok := <-completed: if ok == false { fmt.Println("Completed work: 1") return } } } } func doWork(done <-chan interface{}, strings <-chan string, limit int) <-chan interface{} { completed := make(chan interface{}) go func() { defer fmt.Println("doWork exited.") defer close(completed) var count int for { select { case <-done: return case s := <-strings: if count == limit { return } fmt.Println(s) count++ } } }() return completed }
Leakしているgoroutineを5s後には終了させるという感じの処理ですね。非常に簡単。
こんな風に「作成したgoroutineの管理権限をきちんと親が持っているのかどうか」というところは常に気をつけたいところです。
Heartbeats
goroutineの生死確認に使うやつです。
書籍内ではheartbeatsを使って、goroutineが死んでいたら復活させたりする手法を書いていたりします。
LTではこの辺から駆け足になってしまったので、皆さんに伝わったかどうかが非常に不安でした・・・
実装としては非常にシンプルで、goroutineが発するpulseが伝わらなくなったら死んでる、という判定をしているだけです。
package main import ( "fmt" "time" ) func main() { done := make(chan interface{}) // 処理の終了を知らせるchannel time.AfterFunc(10*time.Second, func() { close(done) }) // 10s後には終了 const timeout = 2 * time.Second // タイムアウトするまでの時間 heartbeat, results := doWork(done, timeout/2) for { select { case _, ok := <-heartbeat: if ok == false { fmt.Println("心臓の鼓動が停止しました・・・") return } fmt.Println("pulse") case r, ok := <-results: if ok == false { return } fmt.Printf("results %v\n", r.Second()) case <-time.After(timeout): fmt.Println("タイムアウトしました!") return } } } // workを並列で動かし、heartbeat channelとresult channelを返す func doWork( done <-chan interface{}, pulseInterval time.Duration, // heartbeatの確認パルスを送る時間間隔 ) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) results := make(chan time.Time) go work(heartbeat, results, pulseInterval, done) return heartbeat, results } func work( heartbeat chan interface{}, results chan time.Time, pulseInterval time.Duration, done <-chan interface{}, ) { defer close(heartbeat) defer close(results) pulse := time.Tick(pulseInterval) // 1sごとに発火するchannel workGen := time.Tick(2 * pulseInterval) // 2sごとに発火するchannel //for { for i := 0; i < 2; i++ { select { case <-done: return case <-pulse: // 1s毎にsendPulseを実行 sendPulse(heartbeat) case r := <-workGen: // 2s毎にtime.Timeを返す sendResult(r, done, pulse, heartbeat, results) } } } // heartbeat channelに何らかの値を入れる(空structがオススメ) func sendPulse(heartbeat chan interface{}) { select { case heartbeat <- struct{}{}: default: // bufferが満杯の場合、このdefaultがないとblockingされてしまう } } func sendResult( r time.Time, done <-chan interface{}, pulse <-chan time.Time, heartbeat chan interface{}, results chan time.Time, ) { for { select { case <-done: return case <-pulse: sendPulse(heartbeat) case results <- r: return } } }
空structがおすすめなのはメモリ消費が無いからですね。
空structについてはdave cheneyさんのこのエントリが非常に分かりやすいです。
The empty struct | Dave Cheney
さいごに
いろいろ話したいことを詰め込んだら30分くらいになったので、どこかで30分くらいの枠のセッションがあったら流用していいな、って感じのスライドが爆誕しました。
参加者の皆さん、LT長くてすみませんでした・・・
とりあえずまだまだConcurrency絡みの話はあるので、気になった方はConcurrency in Goとか読んでみるといいと思います。
英語onlyなので読むの辛いですけど・・・
また次回も何かネタを探して発表しよう。