主催をしている勉強会でGoのConcurrencyにまつわる話をしました。
gounconference.connpass.com
最近、Concurrency in Goという本を読んだことに触発され、「並行処理はサラッとこんな感じでやるといい感じになるのでは?」というTipsを本書よりチョイスしてスライドに落とし込みました。
O'Reilly Media (2017-07-19)
gitpitch.com
10分のLT枠なのにリハーサルをしたところ30分かかることが判明したりしましたが、なんとか無事30分かけてLTできました。
何を伝えたかったのか?
Goの肝であるConcurrencyを調べると面白いよってことを聞いていただいた方に伝えたかったのです。
例えば、goroutineのメモリ確保に関する挙動とか調べてて「おもしれー!」ってなりましたし、今回では話しませんでしたがatomicパッケージとかも試してみてCASとかそっちのことを調べてLock-freeの概念とかを知ることができましたし、色々と学ぶ起点になると思います。
補遺
それではLT内ではサッと解説するに留めていたので、当記事で改めて説明をします。
ConcurrencyとParallelismについて
GoでのConccurencyを語る起点として、この記事をまずは読んだほうがいいでしょう。
Concurrency is not parallelism - The Go Blog
これはHerokuのWazaというConference内でRob Pike氏が語られた内容の一部が記載されています。
スライドもあるのでこっちも読むと良い。
Go Concurrency Patterns
LT内でうまく皆さんに伝わったかどうか不安だけども、言いたかったことは
- Concurrencyは並行処理可能な構成、のこと
- Parallelismは並行処理という行動、のこと
Concurrencyにおける危険性
未然に防がなければいけない危険性として簡単な例としてDeadLockを挙げましたが、正直な所そこまで気にしなくてもテストがっつり書いてると気付けるレベルなので危険度としては低い。
スライド内に書いたLiveLockやらResourceStarvationなど、「特定の条件」が揃った時のみ発生する系の方がよっぽど危険ではある。
あとはそういったことにどれだけ気をつけていても、やはり問題が起こってしまうことがあるので、そのような場合の対処方法も最初に紹介した書籍内で説明はされている。
Basic Concurrency Pattern
書籍内で幾つかパターンを紹介しているのですが、今回は簡単な3つについて説明しました。
- Confinement
- Preventing Goroutine Leaks
- Heartbeats
Confinement
簡単な話で、レキシカルスコープを有効に使いましょうという話です。
package main
import "fmt"
func main() {
loopData := func(handleData chan<- int) {
defer close(handleData)
data := []int{1, 2, 3, 4}
for i := range data {
handleData <- data[i]
}
}
handleData := make(chan int)
go loopData(handleData)
for num := range handleData {
fmt.Println(num)
}
}
data
をmainに置くのではなく、loopDataに内包することでクリティカルセクションを無くします。
Preventing Goroutine Leaks
goroutineのメモリリークの話です。
うっかりnil channelを渡してしまい、ブロッキングによりリークするパターンが題材です。
今回のスライドで使った例は様々なものを省いたので、ある程度処理っぽいものを改めて書き直しました。
package main
import (
"fmt"
"time"
)
func main() {
strChan := make(chan string)
interval := time.Tick(1 * time.Second)
completed1 := doWork(nil, 3)
completed2 := doWork(strChan, 5)
for {
select {
case <-interval:
strChan<- "foo"
case _, ok := <-completed1:
if ok == false {
fmt.Println("Completed work: 1")
return
}
case _, ok := <-completed2:
if ok == false {
fmt.Println("Completed work: 2")
return
}
}
}
}
func doWork(strings <-chan string, limit int) <-chan interface{} {
completed := make(chan interface{})
go func() {
defer fmt.Println("doWork exited.")
defer close(completed)
var count int
for {
select {
case s := <-strings:
if count == limit { return }
fmt.Println(s)
count++
}
}
}()
return completed
}
limit
まで string channel
を受け取り、出力するプログラムです。
LeakするgoroutineとLeakしないgoroutineの2種類を用意して実行しています。
nil channel
を引数として渡してしまっているため、本来なら3回の出力でプログラムが終了するところが、5回も出力してしまっています。
このことから、 nil channel
が for-select
内でブロッキングしていることが分かりますね。
なのでこんな風に親から子に終了を告げるchannelを伝搬させれば良い。
package main
import (
"fmt"
"time"
)
func main() {
doneChan := make(chan interface{})
strChan := make(chan string)
interval := time.Tick(1 * time.Second)
timeout := time.After(5 * time.Second)
completed := doWork(doneChan, nil, 3)
for {
select {
case <-timeout:
doneChan <- struct{}{}
case <-interval:
strChan <- "foo"
case _, ok := <-completed:
if ok == false {
fmt.Println("Completed work: 1")
return
}
}
}
}
func doWork(done <-chan interface{}, strings <-chan string, limit int) <-chan interface{} {
completed := make(chan interface{})
go func() {
defer fmt.Println("doWork exited.")
defer close(completed)
var count int
for {
select {
case <-done: return
case s := <-strings:
if count == limit { return }
fmt.Println(s)
count++
}
}
}()
return completed
}
Leakしているgoroutineを5s後には終了させるという感じの処理ですね。非常に簡単。
こんな風に「作成したgoroutineの管理権限をきちんと親が持っているのかどうか」というところは常に気をつけたいところです。
Heartbeats
goroutineの生死確認に使うやつです。
書籍内ではheartbeatsを使って、goroutineが死んでいたら復活させたりする手法を書いていたりします。
LTではこの辺から駆け足になってしまったので、皆さんに伝わったかどうかが非常に不安でした・・・
実装としては非常にシンプルで、goroutineが発するpulseが伝わらなくなったら死んでる、という判定をしているだけです。
package main
import (
"fmt"
"time"
)
func main() {
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) })
const timeout = 2 * time.Second
heartbeat, results := doWork(done, timeout/2)
for {
select {
case _, ok := <-heartbeat:
if ok == false {
fmt.Println("心臓の鼓動が停止しました・・・")
return
}
fmt.Println("pulse")
case r, ok := <-results:
if ok == false {
return
}
fmt.Printf("results %v\n", r.Second())
case <-time.After(timeout):
fmt.Println("タイムアウトしました!")
return
}
}
}
func doWork(
done <-chan interface{},
pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{})
results := make(chan time.Time)
go work(heartbeat, results, pulseInterval, done)
return heartbeat, results
}
func work(
heartbeat chan interface{},
results chan time.Time,
pulseInterval time.Duration,
done <-chan interface{},
) {
defer close(heartbeat)
defer close(results)
pulse := time.Tick(pulseInterval)
workGen := time.Tick(2 * pulseInterval)
for i := 0; i < 2; i++ {
select {
case <-done:
return
case <-pulse:
sendPulse(heartbeat)
case r := <-workGen:
sendResult(r, done, pulse, heartbeat, results)
}
}
}
func sendPulse(heartbeat chan interface{}) {
select {
case heartbeat <- struct{}{}:
default:
}
}
func sendResult(
r time.Time,
done <-chan interface{},
pulse <-chan time.Time,
heartbeat chan interface{},
results chan time.Time,
) {
for {
select {
case <-done:
return
case <-pulse:
sendPulse(heartbeat)
case results <- r:
return
}
}
}
空structがおすすめなのはメモリ消費が無いからですね。
空structについてはdave cheneyさんのこのエントリが非常に分かりやすいです。
The empty struct | Dave Cheney
さいごに
いろいろ話したいことを詰め込んだら30分くらいになったので、どこかで30分くらいの枠のセッションがあったら流用していいな、って感じのスライドが爆誕しました。
参加者の皆さん、LT長くてすみませんでした・・・
とりあえずまだまだConcurrency絡みの話はあるので、気になった方はConcurrency in Goとか読んでみるといいと思います。
英語onlyなので読むの辛いですけど・・・
また次回も何かネタを探して発表しよう。