生涯未熟

生涯未熟

プログラミングをちょこちょこと。

TIME_WAITを引き起こさないhttp.Getを実現する

大量にhttp.Getするという書き捨てコードを書いていたのですが、途中までは順調に進むのですがあるところから急に通信を行わなくなる、という事態にハマりました。

一体何が?

なんじゃこりゃ、というわけでとりあえずコネクションの状況を確認してみたのですが、途中で止まるのも納得なTIME_WAITだらけの状況でした。
そう、TIME_WAITにポートが使い潰されて止まっていたのです。

その時のコードがこちら。

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var wg = sync.WaitGroup{}
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            url := "http://www.google.co.jp"
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これなんですが、実行するとTIME_WAITが大量に発生します。
Macで確認するときは netstat ではなく、 sysctl net.inet.tcp.tw_pcbcount で確認しましょう。
Macだと netstat ではTIME_WAITが表示されないのです・・・

原因

何が原因かというと、コネクションプールがほとんど利用できてないんですよね。これ。
MaxIdleConnsPerHost のデフォルトが2なので、2コネクションしか再利用できてない状態で新規のコネクションをポンポン開けちゃう状態でした。
なので、こうしてみました。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

ホスト毎に100件分のコネクションを再利用する感じのコードです。
間髪入れずにブン回すので、semaphore処理も入れてます。
これでTIME_WAITが発生しないと思うでしょ?ダメなんだなこれが。

$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 27
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 77
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 146
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 201
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 288
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 334

...

こんな感じで無限にTIME_WAITが増えていって、全然再利用されていません。
めちゃくちゃ悩んだんですが、どうやら resp を読み込まないとコネクションが再利用されないようです。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
    "io/ioutil"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            _, err = ioutil.ReadAll(resp.Body)
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これでやってみると・・・

$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0

...

やったーーーーー!!TIME_WAITが0にできたよーーーーーー!!!
という感じで、もし同じ現象で悩んでる方がいたら試してみてください。

High-performance concurrencyという話をしました

主催をしている勉強会でGoのConcurrencyにまつわる話をしました。

gounconference.connpass.com

最近、Concurrency in Goという本を読んだことに触発され、「並行処理はサラッとこんな感じでやるといい感じになるのでは?」というTipsを本書よりチョイスしてスライドに落とし込みました。

gitpitch.com

10分のLT枠なのにリハーサルをしたところ30分かかることが判明したりしましたが、なんとか無事30分かけてLTできました。

何を伝えたかったのか?

Goの肝であるConcurrencyを調べると面白いよってことを聞いていただいた方に伝えたかったのです。
例えば、goroutineのメモリ確保に関する挙動とか調べてて「おもしれー!」ってなりましたし、今回では話しませんでしたがatomicパッケージとかも試してみてCASとかそっちのことを調べてLock-freeの概念とかを知ることができましたし、色々と学ぶ起点になると思います。

補遺

それではLT内ではサッと解説するに留めていたので、当記事で改めて説明をします。

ConcurrencyとParallelismについて

GoでのConccurencyを語る起点として、この記事をまずは読んだほうがいいでしょう。

Concurrency is not parallelism - The Go Blog

これはHerokuのWazaというConference内でRob Pike氏が語られた内容の一部が記載されています。
スライドもあるのでこっちも読むと良い。

Go Concurrency Patterns

LT内でうまく皆さんに伝わったかどうか不安だけども、言いたかったことは

  • Concurrencyは並行処理可能な構成、のこと
  • Parallelismは並行処理という行動、のこと

Concurrencyにおける危険性

未然に防がなければいけない危険性として簡単な例としてDeadLockを挙げましたが、正直な所そこまで気にしなくてもテストがっつり書いてると気付けるレベルなので危険度としては低い。
スライド内に書いたLiveLockやらResourceStarvationなど、「特定の条件」が揃った時のみ発生する系の方がよっぽど危険ではある。

あとはそういったことにどれだけ気をつけていても、やはり問題が起こってしまうことがあるので、そのような場合の対処方法も最初に紹介した書籍内で説明はされている。

Basic Concurrency Pattern

書籍内で幾つかパターンを紹介しているのですが、今回は簡単な3つについて説明しました。

  • Confinement
  • Preventing Goroutine Leaks
  • Heartbeats

Confinement

簡単な話で、レキシカルスコープを有効に使いましょうという話です。

package main

import "fmt"

func main() {
    loopData := func(handleData chan<- int) {
        defer close(handleData)
        data := []int{1, 2, 3, 4}
        for i := range data {
            handleData <- data[i]
        }
    }

    handleData := make(chan int)
    go loopData(handleData)
    for num := range handleData {
        fmt.Println(num)
    }
}

data をmainに置くのではなく、loopDataに内包することでクリティカルセクションを無くします。

Preventing Goroutine Leaks

goroutineのメモリリークの話です。
うっかりnil channelを渡してしまい、ブロッキングによりリークするパターンが題材です。

今回のスライドで使った例は様々なものを省いたので、ある程度処理っぽいものを改めて書き直しました。

package main

import (
    "fmt"
    "time"
)

func main() {
    strChan := make(chan string)
    interval := time.Tick(1 * time.Second)
    completed1 := doWork(nil, 3) // Leak goroutine
    completed2 := doWork(strChan, 5)// Not leak goroutine

    for {
        select {
        case <-interval:
            strChan<- "foo"
        case _, ok := <-completed1:
            if ok == false {
                fmt.Println("Completed work: 1")
                return
            }
        case _, ok := <-completed2:
            if ok == false {
                fmt.Println("Completed work: 2")
                return
            }
        }
    }
}

func doWork(strings <-chan string, limit int) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)

        var count int
        for {
            select {
            case s := <-strings:
                if count == limit { return }
                fmt.Println(s)
                count++
            }
        }
    }()
    return completed
}

limit まで string channel を受け取り、出力するプログラムです。
LeakするgoroutineとLeakしないgoroutineの2種類を用意して実行しています。

nil channel を引数として渡してしまっているため、本来なら3回の出力でプログラムが終了するところが、5回も出力してしまっています。
このことから、 nil channelfor-select 内でブロッキングしていることが分かりますね。

なのでこんな風に親から子に終了を告げるchannelを伝搬させれば良い。

package main

import (
    "fmt"
    "time"
)

func main() {
    doneChan := make(chan interface{})
    strChan := make(chan string)
    interval := time.Tick(1 * time.Second)
    timeout := time.After(5 * time.Second)
    completed := doWork(doneChan, nil, 3)  // Leak goroutine

    for {
        select {
        case <-timeout:
            doneChan <- struct{}{}
        case <-interval:
            strChan <- "foo"
        case _, ok := <-completed:
            if ok == false {
                fmt.Println("Completed work: 1")
                return
            }
        }
    }
}

func doWork(done <-chan interface{}, strings <-chan string, limit int) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)

        var count int
        for {
            select {
            case <-done: return
            case s := <-strings:
                if count == limit { return }
                fmt.Println(s)
                count++
            }
        }
    }()
    return completed
}

Leakしているgoroutineを5s後には終了させるという感じの処理ですね。非常に簡単。
こんな風に「作成したgoroutineの管理権限をきちんと親が持っているのかどうか」というところは常に気をつけたいところです。

Heartbeats

goroutineの生死確認に使うやつです。
書籍内ではheartbeatsを使って、goroutineが死んでいたら復活させたりする手法を書いていたりします。
LTではこの辺から駆け足になってしまったので、皆さんに伝わったかどうかが非常に不安でした・・・

実装としては非常にシンプルで、goroutineが発するpulseが伝わらなくなったら死んでる、という判定をしているだけです。

package main

import (
    "fmt"
    "time"
)

func main() {
    done := make(chan interface{}) // 処理の終了を知らせるchannel
    time.AfterFunc(10*time.Second, func() { close(done) }) // 10s後には終了
    const timeout = 2 * time.Second // タイムアウトするまでの時間

    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        case _, ok := <-heartbeat:
            if ok == false {
                fmt.Println("心臓の鼓動が停止しました・・・")
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout):
            fmt.Println("タイムアウトしました!")
            return
        }
    }
}

// workを並列で動かし、heartbeat channelとresult channelを返す
func doWork(
    done <-chan interface{},
    pulseInterval time.Duration, // heartbeatの確認パルスを送る時間間隔
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go work(heartbeat, results, pulseInterval, done)
    return heartbeat, results
}

func work(
    heartbeat chan interface{},
    results chan time.Time,
    pulseInterval time.Duration,
    done <-chan interface{},
) {
    defer close(heartbeat)
    defer close(results)

    pulse := time.Tick(pulseInterval)         // 1sごとに発火するchannel
    workGen := time.Tick(2 * pulseInterval) // 2sごとに発火するchannel

    //for {
    for i := 0; i < 2; i++ {
        select {
        case <-done:
            return
        case <-pulse:
            // 1s毎にsendPulseを実行
            sendPulse(heartbeat)
        case r := <-workGen: // 2s毎にtime.Timeを返す
            sendResult(r, done, pulse, heartbeat, results)
        }
    }
}

// heartbeat channelに何らかの値を入れる(空structがオススメ)
func sendPulse(heartbeat chan interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // bufferが満杯の場合、このdefaultがないとblockingされてしまう
    }
}

func sendResult(
    r time.Time,
    done <-chan interface{},
    pulse <-chan time.Time,
    heartbeat chan interface{},
    results chan time.Time,
) {
    for {
        select {
        case <-done:
            return
        case <-pulse:
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

空structがおすすめなのはメモリ消費が無いからですね。
空structについてはdave cheneyさんのこのエントリが非常に分かりやすいです。

The empty struct | Dave Cheney

さいごに

いろいろ話したいことを詰め込んだら30分くらいになったので、どこかで30分くらいの枠のセッションがあったら流用していいな、って感じのスライドが爆誕しました。
参加者の皆さん、LT長くてすみませんでした・・・

とりあえずまだまだConcurrency絡みの話はあるので、気になった方はConcurrency in Goとか読んでみるといいと思います。
英語onlyなので読むの辛いですけど・・・

また次回も何かネタを探して発表しよう。

swaggoが思いの外、素晴らしかった

swaggoがペロッとswagger-uiをアプリケーション内に組み込みたかった願望をサクッと叶えてくれた。

swaggoの概要については@pei0804さんが以下の記事で詳しく書かれているので、細かいところは割愛。

qiita.com

何が良かったか

煩わしいこと抜きで、アノテーション書いてバチッとコマンド打ったらswagger-uiでAPIドキュメントが見れる、そんなところですね。
これ抜きでやろうとするとgoaに寄せた方がいいんじゃないか?ってくらい面倒だったので、非常に助かった。

あと、gin・echo・net/http用にUIを呼び出す関数が用意されてるので、echoで作ってた自分にはうってつけでした。
地味に revel-swaggerリポジトリとしてあるみたいなので、将来的には使えるようになる・・・のか?🤔

微妙なところ

ドキュメントがちょっと分かりづらいかなと思ったりしました。

アノテーション自体の書き方はこの辺に書いてあるんですが、

DeclarativeCommentsFormat · GitBook

これだけだと完全に分からんので、exampleを探す羽目になるかと思います。

あと、Empty Responseを表すのにどうやって書けばいいのかさっぱり分からず、ISSUEを探してようやく見つかったレベルなのでこの辺なんとかならんかなと。

Support empty responses · Issue #11 · swaggo/swag · GitHub

おわり

ただそんなことは些細なレベルでドチャクソ便利マンなので、作った方ありがとうございますというお気持ちに溢れました🙏

オライリーのsafariに関するちょっとアレなところ

とある本が読みたくてオライリーsafariをFree Trialで使っている。

Web上で読むタイプなので、Chromeの翻訳機能使えば洋書でもすいすい読めて使いやすいな、と思ってたのだが一つ重大な欠点を発見した。
それは、Submit Errataが出来ないこと。

Submit Errataは読んで字のごとく、書籍内の誤りを報告する機能なのだが報告の際に該当のページ数の入力が必要になる。
しかし、safariだと肝心のページ数が分からないのだ。

safari独自の報告ページでもあるのかと思いきや特にあるわけでもないので、完全にお手上げである。
暇があったらオライリーのサポートにでもメールを投げるか。

以上、小さな愚痴を書き残しておく。

限界を知りたくてひたすら歩いてきた

GW真っ只中。急に自分の限界を知りたくてひたすら歩くことにした。

ただ歩くだけなのも味気ないので、良さげなスポットをある程度調べてから行くことに。

とりあえず一日歩くと仮定して、20キロちょいを目標とした。
正直この時はハーフマラソンやと思えば余裕やろwww」と高を括っていたが…

出発

モバイルバッテリー、財布という軽装で11時に出発。

最初は東京の大仏がある乗蓮寺を目指すことに。

乗蓮寺

数キロの行程をサクッと終えて乗蓮寺に。

予想の1.5倍はデカくてビビった。
お参りをササっと済ませて、次の目的地松月院へ。

松月院

数百メートル先にある松月院
閑静な佇まいで心が引き締まる場所でした。

お参りを済まし、クルッと振り返ると、

ネオアームストロングサイクロンジェットアームストロング砲じゃねーか、完成度高けーなオイ 。

道中

凄いパンチラインを叩き出す邸宅を見つけた。

赤塚公園

自然が豊かで森林浴が楽しめる公園でした。
都会で汚れた心を癒した。

船渡氷川神社

小さな神社だったのですが、この神社には「十度の宮」という面白いものが祀られていました。

何でも、洪水の度に流された宮がこの地に戻ってきたそうで、それがなんと過去10回もあったとのこと。
僕もこのくらい粘り強い人間になりたいものです。

荒川

暮れ〜なずむ〜街の〜

ひか〜りと〜影の〜なか〜

去り〜ゆく〜あなたへ〜

贈る〜言葉〜

旧岩淵水門

というわけで旧岩淵水門にやってきました。

この水門の横を渡ることが出来るんですが、渡った先が最高でした。

この辺りから「あれ…?足がやべぇぞ…」となってきて、一抹の不安感が。

鯵家

鯵専門店があるという情報を元に行ってきました。

16:30にして、やっとまともな食事にありつけたのと、美味さで泣きかけました。

retty.me

亀ヶ池弁財天

亀〜

道中

途中、人懐っこい猫様に遊んで頂けました。
ありがたし🙏

近藤勇墓所

十条を抜けて

俳句に想いを馳せつつ

到着

どうやらここには近藤勇の胴体が眠ってるらしいです。
板橋駅すぐそこにこんなのがあるとは。

さて、この辺りで時間は既に18:30になり、正直足はガクガク・足裏はキツキツで帰りたくなりましたが心を鬼にして池袋へ向かうことに。

道中

謎のエナジードリンクに助けられました。
死ぬほどしんどい時に甘いものってマジで効果あるんすね。

謎にマットレスが落ちてました。
マットレスと孤独感を共有しました。

旅の終わり

そんなこんなで19:30、ついに…

やったあああああああああああああああ!!!! 池袋だああああああああああああ!!!!

足の痛みと疲れもあって、到着した時マジで泣きかけました。
生きてる証拠だよ!!

結局、11:00〜19:30の約8時間半で28キロ走破することが出来ました。
これが今の僕の限界です、本当にありがとうございました。

祝い

走破記念に一人で焼肉をしました。

A5ランクの肉を食いまくって、疲れからか吐きそうになりました。足るを知れ。

retty.me

こんな馬鹿みたいなことも30代になると出来なくなるかもしれないので、備えよう。