生涯未熟

プログラミングをちょこちょこと。

saramaでAsyncProducerを使う時に気を付けなければいけないたったひとつのこと

AsyncCloseを使うな


どういうこと?

AsyncProducerには

  • AsyncClose
  • Close

の2種類がある。

なんの違いが?

それぞれのコメント読みましょ。

AsyncClose

AsyncClose triggers a shutdown of the producer. The shutdown has completed when both the Errors and Successes channels have been closed. When calling AsyncClose, you must continue to read from those channels in order to drain the results of any messages in flight.


AsyncCloseがプロデューサのシャットダウンをトリガします。 エラーと成功の両方のチャネルが閉じられたときにシャットダウンが完了しました。 AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。

Close

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.


Closeはプロデューサをシャットダウンし、バッファされたメッセージがフラッシュされるのを待ちます。 プロデューサオブジェクトがスコープの外に出る前に、この関数を呼び出す必要があります。 基になるクライアントでCloseを呼び出す前にこれを呼び出す必要があります。

よくわからん🤔

コードを読もう

AsyncClose

func (p *asyncProducer) AsyncClose() {
    go withRecover(p.shutdown)
}

withRecover()shutdown というメソッドを引数にとって実行しています。

func withRecover(fn func()) {
    defer func() {
        handler := PanicHandler
        if handler != nil {
            if err := recover(); err != nil {
                handler(err)
            }
        }
    }()

    fn()
}

単純にpanicしたときにrecoverしてerrorを吐き出してる感じのdeferをつけて、引数のメソッドを実行しているだけですね。

func (p *asyncProducer) shutdown() {
    Logger.Println("Producer shutting down.")
    p.inFlight.Add(1)
    p.input <- &ProducerMessage{flags: shutdown}

    p.inFlight.Wait()

    if p.ownClient {
        err := p.client.Close()
        if err != nil {
            Logger.Println("producer/shutdown failed to close the embedded client:", err)
        }
    }

    close(p.input)
    close(p.retries)
    close(p.errors)
    close(p.successes)
}

で、肝心のshutdown処理なのですが、一度シャットダウンのための値を p.input に渡して、Producer上で実行中のメッセージの受け渡しが終わり次第、Producerをcloseしようとしています。

Close

func (p *asyncProducer) Close() error {
    p.AsyncClose()

    if p.conf.Producer.Return.Successes {
        go withRecover(func() {
            for range p.successes {
            }
        })
    }

    var errors ProducerErrors
    if p.conf.Producer.Return.Errors {
        for event := range p.errors {
            errors = append(errors, event)
        }
    } else {
        <-p.errors
    }

    if len(errors) > 0 {
        return errors
    }
    return nil
}

1行目から AsyncClose 呼んでますね・・・
その後は、brokerへのメッセージ送信の成否を示す SuccessesErrors をClose側でhandlingしています。
p.successesp.errors で値を受け取り続けているのは、最初の説明で書いてあった "AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。" に従っているわけですね。
この飛行中ってのはイコール、メッセージの送信中って意味で、それを表しているのが inFlight というわけです。

inFlight 自体は sync.WaitGroup で、メッセージを送信する際に Add(1) , 送信し終わったら Done() します。
今回で言うと、 shutdown 処理の際に、安全にシャットダウンするために inFlight を使ってますね。

まとめ

というわけで、 AsyncClose のみを使用するときには結局 Close でやっているような処理を実装しなくてはならないので、それなら Close 使った方がいいよ。という結論に至りました。
なら何故 AsyncClose は外部からも読めるようになってるんだ?という感じもするかもしれませんが、 p.successes とかは処理内容がNoopなので、自分で色々やりたい人はどうぞ、っていうキャパシティを残してるんですかね?🤔

OS Xのnetstatに立ち向かう

ある時、手持ちのMacからTIME_WAITが出てるか調べようと netstat を叩いた所、想定していた挙動と全く違ったので、メモ代わりに書いておく。

何が違ったのか?

  • -p オプションの挙動
  • TIME_WAITの扱い

-p オプションの挙動

これは完全にLinuxBSD系であるOSXの違いなんですが、 -p の扱いが両者で全く違います。
完全にPIDと実行プログラム名が出ると思って -p 付けて叩いたら

ʕ ◔ϖ◔ʔ % netstat -anp                                                                                                                                                 (net-tools)-[master]
netstat: option requires an argument -- p
Usage:  netstat [-AaLlnW] [-f address_family | -p protocol]
    netstat [-gilns] [-f address_family]
    netstat -i | -I interface [-w wait] [-abdgRtS]
    netstat -s [-s] [-f address_family | -p protocol] [-w wait]
    netstat -i | -I interface -s [-f address_family | -p protocol]
    netstat -m [-m]
    netstat -r [-Aaln] [-f address_family]
    netstat -rs [-s]

なんて出たもので。BSD系では -pプロトコル指定のオプションなんですね。
なので、実行プログラム名とか見たいって場合は netstat よりも lsof 使った方がいいのかもしれません。

TIME_WAITの扱い

OS Xの場合、netstat ではTIME_WAITが表示されません。
なので、確認したい場合は

ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0

を実行しましょう。

おわりに

OS XがTIME_WAITを頑なに隠してるのが謎・・・
どっかのファイルにTCPコネクションを吐いていてくれたら、なんとかツール組んで出来るんじゃないかなーと思ったり。

あと、調査する中で面白かったのはnetstatアマチュア無線パケット通信プロトコルであるAX25とかをサポートしてたりとか、謎な知識が知れたことですね。
今回、netstatのコードも読んだんですが一度読んでみると様々な発見があるのでお薦めです。

github.com

参考資料

blog.kamipo.net

OSX: Where are my TIME_WAIT ?!

入社から一年経ちました

株式会社アイスタイルに去年の6/1に入社してから一年経ちました。
もう30手前になると一年があっという間だなぁ、としみじみ感じております。

何をやったか?

・最初の2,3ヶ月は久々にWindows使った
・雑にScalaやった
・ガッツリGoやってる
hadoop, hbase, kafka, cassandraとかもやった
・microservices的なこともやってる
・今までやってきたPHPは何だったのか?ってくらいちゃんとしたPHPに触れた
・ビジュアルリグレッションテストの環境作った

とか色々やりました。
今までやったことないようなことにチャレンジしてるので、刺激的な毎日です。

どんな感じか?

今のところある程度自由に伸び伸びと開発をしています。
最近は弊社で各種カンファレンスのスポンサーやったり、勉強会開いたりとコミュニティに寄与することも徐々にですが始まりました。

あとテックブログとかもやってたりします。

https://techblog.istyle.co.jp

開発に関することなら理解のある職場なので、ありがたい限りです。

おわりに

という感じで元気に生きてます。
弊社気になる方は是非ご連絡下さい。

TIME_WAITを引き起こさないhttp.Getを実現する

大量にhttp.Getするという書き捨てコードを書いていたのですが、途中までは順調に進むのですがあるところから急に通信を行わなくなる、という事態にハマりました。

一体何が?

なんじゃこりゃ、というわけでとりあえずコネクションの状況を確認してみたのですが、途中で止まるのも納得なTIME_WAITだらけの状況でした。
そう、TIME_WAITにポートが使い潰されて止まっていたのです。

その時のコードがこちら。

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var wg = sync.WaitGroup{}
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            url := "http://www.google.co.jp"
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これなんですが、実行するとTIME_WAITが大量に発生します。
Macで確認するときは netstat ではなく、 sysctl net.inet.tcp.tw_pcbcount で確認しましょう。
Macだと netstat ではTIME_WAITが表示されないのです・・・

原因

何が原因かというと、コネクションプールがほとんど利用できてないんですよね。これ。
MaxIdleConnsPerHost のデフォルトが2なので、2コネクションしか再利用できてない状態で新規のコネクションをポンポン開けちゃう状態でした。
なので、こうしてみました。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

ホスト毎に100件分のコネクションを再利用する感じのコードです。
間髪入れずにブン回すので、semaphore処理も入れてます。
これでTIME_WAITが発生しないと思うでしょ?ダメなんだなこれが。

$ ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 27
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 77
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 146
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 201
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 288
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 334

...

こんな感じで無限にTIME_WAITが増えていって、全然再利用されていません。
めちゃくちゃ悩んだんですが、どうやら resp を読み込まないとコネクションが再利用されないようです。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
    "io/ioutil"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            _, err = ioutil.ReadAll(resp.Body)
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これでやってみると・・・

$ ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0

...

やったーーーーー!!TIME_WAITが0にできたよーーーーーー!!!
という感じで、もし同じ現象で悩んでる方がいたら試してみてください。

High-performance concurrencyという話をしました

主催をしている勉強会でGoのConcurrencyにまつわる話をしました。

gounconference.connpass.com

最近、Concurrency in Goという本を読んだことに触発され、「並行処理はサラッとこんな感じでやるといい感じになるのでは?」というTipsを本書よりチョイスしてスライドに落とし込みました。

gitpitch.com

10分のLT枠なのにリハーサルをしたところ30分かかることが判明したりしましたが、なんとか無事30分かけてLTできました。

何を伝えたかったのか?

Goの肝であるConcurrencyを調べると面白いよってことを聞いていただいた方に伝えたかったのです。
例えば、goroutineのメモリ確保に関する挙動とか調べてて「おもしれー!」ってなりましたし、今回では話しませんでしたがatomicパッケージとかも試してみてCASとかそっちのことを調べてLock-freeの概念とかを知ることができましたし、色々と学ぶ起点になると思います。

補遺

それではLT内ではサッと解説するに留めていたので、当記事で改めて説明をします。

ConcurrencyとParallelismについて

GoでのConccurencyを語る起点として、この記事をまずは読んだほうがいいでしょう。

Concurrency is not parallelism - The Go Blog

これはHerokuのWazaというConference内でRob Pike氏が語られた内容の一部が記載されています。
スライドもあるのでこっちも読むと良い。

Go Concurrency Patterns

LT内でうまく皆さんに伝わったかどうか不安だけども、言いたかったことは

  • Concurrencyは並行処理可能な構成、のこと
  • Parallelismは並行処理という行動、のこと

Concurrencyにおける危険性

未然に防がなければいけない危険性として簡単な例としてDeadLockを挙げましたが、正直な所そこまで気にしなくてもテストがっつり書いてると気付けるレベルなので危険度としては低い。
スライド内に書いたLiveLockやらResourceStarvationなど、「特定の条件」が揃った時のみ発生する系の方がよっぽど危険ではある。

あとはそういったことにどれだけ気をつけていても、やはり問題が起こってしまうことがあるので、そのような場合の対処方法も最初に紹介した書籍内で説明はされている。

Basic Concurrency Pattern

書籍内で幾つかパターンを紹介しているのですが、今回は簡単な3つについて説明しました。

  • Confinement
  • Preventing Goroutine Leaks
  • Heartbeats

Confinement

簡単な話で、レキシカルスコープを有効に使いましょうという話です。

package main

import "fmt"

func main() {
    loopData := func(handleData chan<- int) {
        defer close(handleData)
        data := []int{1, 2, 3, 4}
        for i := range data {
            handleData <- data[i]
        }
    }

    handleData := make(chan int)
    go loopData(handleData)
    for num := range handleData {
        fmt.Println(num)
    }
}

data をmainに置くのではなく、loopDataに内包することでクリティカルセクションを無くします。

Preventing Goroutine Leaks

goroutineのメモリリークの話です。
うっかりnil channelを渡してしまい、ブロッキングによりリークするパターンが題材です。

今回のスライドで使った例は様々なものを省いたので、ある程度処理っぽいものを改めて書き直しました。

package main

import (
    "fmt"
    "time"
)

func main() {
    strChan := make(chan string)
    interval := time.Tick(1 * time.Second)
    completed1 := doWork(nil, 3) // Leak goroutine
    completed2 := doWork(strChan, 5)// Not leak goroutine

    for {
        select {
        case <-interval:
            strChan<- "foo"
        case _, ok := <-completed1:
            if ok == false {
                fmt.Println("Completed work: 1")
                return
            }
        case _, ok := <-completed2:
            if ok == false {
                fmt.Println("Completed work: 2")
                return
            }
        }
    }
}

func doWork(strings <-chan string, limit int) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)

        var count int
        for {
            select {
            case s := <-strings:
                if count == limit { return }
                fmt.Println(s)
                count++
            }
        }
    }()
    return completed
}

limit まで string channel を受け取り、出力するプログラムです。
LeakするgoroutineとLeakしないgoroutineの2種類を用意して実行しています。

nil channel を引数として渡してしまっているため、本来なら3回の出力でプログラムが終了するところが、5回も出力してしまっています。
このことから、 nil channelfor-select 内でブロッキングしていることが分かりますね。

なのでこんな風に親から子に終了を告げるchannelを伝搬させれば良い。

package main

import (
    "fmt"
    "time"
)

func main() {
    doneChan := make(chan interface{})
    strChan := make(chan string)
    interval := time.Tick(1 * time.Second)
    timeout := time.After(5 * time.Second)
    completed := doWork(doneChan, nil, 3)  // Leak goroutine

    for {
        select {
        case <-timeout:
            doneChan <- struct{}{}
        case <-interval:
            strChan <- "foo"
        case _, ok := <-completed:
            if ok == false {
                fmt.Println("Completed work: 1")
                return
            }
        }
    }
}

func doWork(done <-chan interface{}, strings <-chan string, limit int) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)

        var count int
        for {
            select {
            case <-done: return
            case s := <-strings:
                if count == limit { return }
                fmt.Println(s)
                count++
            }
        }
    }()
    return completed
}

Leakしているgoroutineを5s後には終了させるという感じの処理ですね。非常に簡単。
こんな風に「作成したgoroutineの管理権限をきちんと親が持っているのかどうか」というところは常に気をつけたいところです。

Heartbeats

goroutineの生死確認に使うやつです。
書籍内ではheartbeatsを使って、goroutineが死んでいたら復活させたりする手法を書いていたりします。
LTではこの辺から駆け足になってしまったので、皆さんに伝わったかどうかが非常に不安でした・・・

実装としては非常にシンプルで、goroutineが発するpulseが伝わらなくなったら死んでる、という判定をしているだけです。

package main

import (
    "fmt"
    "time"
)

func main() {
    done := make(chan interface{}) // 処理の終了を知らせるchannel
    time.AfterFunc(10*time.Second, func() { close(done) }) // 10s後には終了
    const timeout = 2 * time.Second // タイムアウトするまでの時間

    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        case _, ok := <-heartbeat:
            if ok == false {
                fmt.Println("心臓の鼓動が停止しました・・・")
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout):
            fmt.Println("タイムアウトしました!")
            return
        }
    }
}

// workを並列で動かし、heartbeat channelとresult channelを返す
func doWork(
    done <-chan interface{},
    pulseInterval time.Duration, // heartbeatの確認パルスを送る時間間隔
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go work(heartbeat, results, pulseInterval, done)
    return heartbeat, results
}

func work(
    heartbeat chan interface{},
    results chan time.Time,
    pulseInterval time.Duration,
    done <-chan interface{},
) {
    defer close(heartbeat)
    defer close(results)

    pulse := time.Tick(pulseInterval)         // 1sごとに発火するchannel
    workGen := time.Tick(2 * pulseInterval) // 2sごとに発火するchannel

    //for {
    for i := 0; i < 2; i++ {
        select {
        case <-done:
            return
        case <-pulse:
            // 1s毎にsendPulseを実行
            sendPulse(heartbeat)
        case r := <-workGen: // 2s毎にtime.Timeを返す
            sendResult(r, done, pulse, heartbeat, results)
        }
    }
}

// heartbeat channelに何らかの値を入れる(空structがオススメ)
func sendPulse(heartbeat chan interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // bufferが満杯の場合、このdefaultがないとblockingされてしまう
    }
}

func sendResult(
    r time.Time,
    done <-chan interface{},
    pulse <-chan time.Time,
    heartbeat chan interface{},
    results chan time.Time,
) {
    for {
        select {
        case <-done:
            return
        case <-pulse:
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

空structがおすすめなのはメモリ消費が無いからですね。
空structについてはdave cheneyさんのこのエントリが非常に分かりやすいです。

The empty struct | Dave Cheney

さいごに

いろいろ話したいことを詰め込んだら30分くらいになったので、どこかで30分くらいの枠のセッションがあったら流用していいな、って感じのスライドが爆誕しました。
参加者の皆さん、LT長くてすみませんでした・・・

とりあえずまだまだConcurrency絡みの話はあるので、気になった方はConcurrency in Goとか読んでみるといいと思います。
英語onlyなので読むの辛いですけど・・・

また次回も何かネタを探して発表しよう。