生涯未熟

生涯未熟

プログラミングをちょこちょこと。

入社から一年経ちました

株式会社アイスタイルに去年の6/1に入社してから一年経ちました。
もう30手前になると一年があっという間だなぁ、としみじみ感じております。

何をやったか?

・最初の2,3ヶ月は久々にWindows使った
・雑にScalaやった
・ガッツリGoやってる
hadoop, hbase, kafka, cassandraとかもやった
・microservices的なこともやってる
・今までやってきたPHPは何だったのか?ってくらいちゃんとしたPHPに触れた
・ビジュアルリグレッションテストの環境作った

とか色々やりました。
今までやったことないようなことにチャレンジしてるので、刺激的な毎日です。

どんな感じか?

今のところある程度自由に伸び伸びと開発をしています。
最近は弊社で各種カンファレンスのスポンサーやったり、勉強会開いたりとコミュニティに寄与することも徐々にですが始まりました。

あとテックブログとかもやってたりします。

https://techblog.istyle.co.jp

開発に関することなら理解のある職場なので、ありがたい限りです。

おわりに

という感じで元気に生きてます。
弊社気になる方は是非ご連絡下さい。

TIME_WAITを引き起こさないhttp.Getを実現する

大量にhttp.Getするという書き捨てコードを書いていたのですが、途中までは順調に進むのですがあるところから急に通信を行わなくなる、という事態にハマりました。

一体何が?

なんじゃこりゃ、というわけでとりあえずコネクションの状況を確認してみたのですが、途中で止まるのも納得なTIME_WAITだらけの状況でした。
そう、TIME_WAITにポートが使い潰されて止まっていたのです。

その時のコードがこちら。

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var wg = sync.WaitGroup{}
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            url := "http://www.google.co.jp"
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これなんですが、実行するとTIME_WAITが大量に発生します。
Macで確認するときは netstat ではなく、 sysctl net.inet.tcp.tw_pcbcount で確認しましょう。
Macだと netstat ではTIME_WAITが表示されないのです・・・

原因

何が原因かというと、コネクションプールがほとんど利用できてないんですよね。これ。
MaxIdleConnsPerHost のデフォルトが2なので、2コネクションしか再利用できてない状態で新規のコネクションをポンポン開けちゃう状態でした。
なので、こうしてみました。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

ホスト毎に100件分のコネクションを再利用する感じのコードです。
間髪入れずにブン回すので、semaphore処理も入れてます。
これでTIME_WAITが発生しないと思うでしょ?ダメなんだなこれが。

$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 27
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 77
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 146
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 201
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 288
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 334

...

こんな感じで無限にTIME_WAITが増えていって、全然再利用されていません。
めちゃくちゃ悩んだんですが、どうやら resp を読み込まないとコネクションが再利用されないようです。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
    "io/ioutil"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            _, err = ioutil.ReadAll(resp.Body)
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これでやってみると・・・

$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0

...

やったーーーーー!!TIME_WAITが0にできたよーーーーーー!!!
という感じで、もし同じ現象で悩んでる方がいたら試してみてください。

High-performance concurrencyという話をしました

主催をしている勉強会でGoのConcurrencyにまつわる話をしました。

gounconference.connpass.com

最近、Concurrency in Goという本を読んだことに触発され、「並行処理はサラッとこんな感じでやるといい感じになるのでは?」というTipsを本書よりチョイスしてスライドに落とし込みました。

gitpitch.com

10分のLT枠なのにリハーサルをしたところ30分かかることが判明したりしましたが、なんとか無事30分かけてLTできました。

何を伝えたかったのか?

Goの肝であるConcurrencyを調べると面白いよってことを聞いていただいた方に伝えたかったのです。
例えば、goroutineのメモリ確保に関する挙動とか調べてて「おもしれー!」ってなりましたし、今回では話しませんでしたがatomicパッケージとかも試してみてCASとかそっちのことを調べてLock-freeの概念とかを知ることができましたし、色々と学ぶ起点になると思います。

補遺

それではLT内ではサッと解説するに留めていたので、当記事で改めて説明をします。

ConcurrencyとParallelismについて

GoでのConccurencyを語る起点として、この記事をまずは読んだほうがいいでしょう。

Concurrency is not parallelism - The Go Blog

これはHerokuのWazaというConference内でRob Pike氏が語られた内容の一部が記載されています。
スライドもあるのでこっちも読むと良い。

Go Concurrency Patterns

LT内でうまく皆さんに伝わったかどうか不安だけども、言いたかったことは

  • Concurrencyは並行処理可能な構成、のこと
  • Parallelismは並行処理という行動、のこと

Concurrencyにおける危険性

未然に防がなければいけない危険性として簡単な例としてDeadLockを挙げましたが、正直な所そこまで気にしなくてもテストがっつり書いてると気付けるレベルなので危険度としては低い。
スライド内に書いたLiveLockやらResourceStarvationなど、「特定の条件」が揃った時のみ発生する系の方がよっぽど危険ではある。

あとはそういったことにどれだけ気をつけていても、やはり問題が起こってしまうことがあるので、そのような場合の対処方法も最初に紹介した書籍内で説明はされている。

Basic Concurrency Pattern

書籍内で幾つかパターンを紹介しているのですが、今回は簡単な3つについて説明しました。

  • Confinement
  • Preventing Goroutine Leaks
  • Heartbeats

Confinement

簡単な話で、レキシカルスコープを有効に使いましょうという話です。

package main

import "fmt"

func main() {
    loopData := func(handleData chan<- int) {
        defer close(handleData)
        data := []int{1, 2, 3, 4}
        for i := range data {
            handleData <- data[i]
        }
    }

    handleData := make(chan int)
    go loopData(handleData)
    for num := range handleData {
        fmt.Println(num)
    }
}

data をmainに置くのではなく、loopDataに内包することでクリティカルセクションを無くします。

Preventing Goroutine Leaks

goroutineのメモリリークの話です。
うっかりnil channelを渡してしまい、ブロッキングによりリークするパターンが題材です。

今回のスライドで使った例は様々なものを省いたので、ある程度処理っぽいものを改めて書き直しました。

package main

import (
    "fmt"
    "time"
)

func main() {
    strChan := make(chan string)
    interval := time.Tick(1 * time.Second)
    completed1 := doWork(nil, 3) // Leak goroutine
    completed2 := doWork(strChan, 5)// Not leak goroutine

    for {
        select {
        case <-interval:
            strChan<- "foo"
        case _, ok := <-completed1:
            if ok == false {
                fmt.Println("Completed work: 1")
                return
            }
        case _, ok := <-completed2:
            if ok == false {
                fmt.Println("Completed work: 2")
                return
            }
        }
    }
}

func doWork(strings <-chan string, limit int) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)

        var count int
        for {
            select {
            case s := <-strings:
                if count == limit { return }
                fmt.Println(s)
                count++
            }
        }
    }()
    return completed
}

limit まで string channel を受け取り、出力するプログラムです。
LeakするgoroutineとLeakしないgoroutineの2種類を用意して実行しています。

nil channel を引数として渡してしまっているため、本来なら3回の出力でプログラムが終了するところが、5回も出力してしまっています。
このことから、 nil channelfor-select 内でブロッキングしていることが分かりますね。

なのでこんな風に親から子に終了を告げるchannelを伝搬させれば良い。

package main

import (
    "fmt"
    "time"
)

func main() {
    doneChan := make(chan interface{})
    strChan := make(chan string)
    interval := time.Tick(1 * time.Second)
    timeout := time.After(5 * time.Second)
    completed := doWork(doneChan, nil, 3)  // Leak goroutine

    for {
        select {
        case <-timeout:
            doneChan <- struct{}{}
        case <-interval:
            strChan <- "foo"
        case _, ok := <-completed:
            if ok == false {
                fmt.Println("Completed work: 1")
                return
            }
        }
    }
}

func doWork(done <-chan interface{}, strings <-chan string, limit int) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)

        var count int
        for {
            select {
            case <-done: return
            case s := <-strings:
                if count == limit { return }
                fmt.Println(s)
                count++
            }
        }
    }()
    return completed
}

Leakしているgoroutineを5s後には終了させるという感じの処理ですね。非常に簡単。
こんな風に「作成したgoroutineの管理権限をきちんと親が持っているのかどうか」というところは常に気をつけたいところです。

Heartbeats

goroutineの生死確認に使うやつです。
書籍内ではheartbeatsを使って、goroutineが死んでいたら復活させたりする手法を書いていたりします。
LTではこの辺から駆け足になってしまったので、皆さんに伝わったかどうかが非常に不安でした・・・

実装としては非常にシンプルで、goroutineが発するpulseが伝わらなくなったら死んでる、という判定をしているだけです。

package main

import (
    "fmt"
    "time"
)

func main() {
    done := make(chan interface{}) // 処理の終了を知らせるchannel
    time.AfterFunc(10*time.Second, func() { close(done) }) // 10s後には終了
    const timeout = 2 * time.Second // タイムアウトするまでの時間

    heartbeat, results := doWork(done, timeout/2)
    for {
        select {
        case _, ok := <-heartbeat:
            if ok == false {
                fmt.Println("心臓の鼓動が停止しました・・・")
                return
            }
            fmt.Println("pulse")
        case r, ok := <-results:
            if ok == false {
                return
            }
            fmt.Printf("results %v\n", r.Second())
        case <-time.After(timeout):
            fmt.Println("タイムアウトしました!")
            return
        }
    }
}

// workを並列で動かし、heartbeat channelとresult channelを返す
func doWork(
    done <-chan interface{},
    pulseInterval time.Duration, // heartbeatの確認パルスを送る時間間隔
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go work(heartbeat, results, pulseInterval, done)
    return heartbeat, results
}

func work(
    heartbeat chan interface{},
    results chan time.Time,
    pulseInterval time.Duration,
    done <-chan interface{},
) {
    defer close(heartbeat)
    defer close(results)

    pulse := time.Tick(pulseInterval)         // 1sごとに発火するchannel
    workGen := time.Tick(2 * pulseInterval) // 2sごとに発火するchannel

    //for {
    for i := 0; i < 2; i++ {
        select {
        case <-done:
            return
        case <-pulse:
            // 1s毎にsendPulseを実行
            sendPulse(heartbeat)
        case r := <-workGen: // 2s毎にtime.Timeを返す
            sendResult(r, done, pulse, heartbeat, results)
        }
    }
}

// heartbeat channelに何らかの値を入れる(空structがオススメ)
func sendPulse(heartbeat chan interface{}) {
    select {
    case heartbeat <- struct{}{}:
    default: // bufferが満杯の場合、このdefaultがないとblockingされてしまう
    }
}

func sendResult(
    r time.Time,
    done <-chan interface{},
    pulse <-chan time.Time,
    heartbeat chan interface{},
    results chan time.Time,
) {
    for {
        select {
        case <-done:
            return
        case <-pulse:
            sendPulse(heartbeat)
        case results <- r:
            return
        }
    }
}

空structがおすすめなのはメモリ消費が無いからですね。
空structについてはdave cheneyさんのこのエントリが非常に分かりやすいです。

The empty struct | Dave Cheney

さいごに

いろいろ話したいことを詰め込んだら30分くらいになったので、どこかで30分くらいの枠のセッションがあったら流用していいな、って感じのスライドが爆誕しました。
参加者の皆さん、LT長くてすみませんでした・・・

とりあえずまだまだConcurrency絡みの話はあるので、気になった方はConcurrency in Goとか読んでみるといいと思います。
英語onlyなので読むの辛いですけど・・・

また次回も何かネタを探して発表しよう。

swaggoが思いの外、素晴らしかった

swaggoがペロッとswagger-uiをアプリケーション内に組み込みたかった願望をサクッと叶えてくれた。

swaggoの概要については@pei0804さんが以下の記事で詳しく書かれているので、細かいところは割愛。

qiita.com

何が良かったか

煩わしいこと抜きで、アノテーション書いてバチッとコマンド打ったらswagger-uiでAPIドキュメントが見れる、そんなところですね。
これ抜きでやろうとするとgoaに寄せた方がいいんじゃないか?ってくらい面倒だったので、非常に助かった。

あと、gin・echo・net/http用にUIを呼び出す関数が用意されてるので、echoで作ってた自分にはうってつけでした。
地味に revel-swaggerリポジトリとしてあるみたいなので、将来的には使えるようになる・・・のか?🤔

微妙なところ

ドキュメントがちょっと分かりづらいかなと思ったりしました。

アノテーション自体の書き方はこの辺に書いてあるんですが、

DeclarativeCommentsFormat · GitBook

これだけだと完全に分からんので、exampleを探す羽目になるかと思います。

あと、Empty Responseを表すのにどうやって書けばいいのかさっぱり分からず、ISSUEを探してようやく見つかったレベルなのでこの辺なんとかならんかなと。

Support empty responses · Issue #11 · swaggo/swag · GitHub

おわり

ただそんなことは些細なレベルでドチャクソ便利マンなので、作った方ありがとうございますというお気持ちに溢れました🙏

オライリーのsafariに関するちょっとアレなところ

とある本が読みたくてオライリーsafariをFree Trialで使っている。

Web上で読むタイプなので、Chromeの翻訳機能使えば洋書でもすいすい読めて使いやすいな、と思ってたのだが一つ重大な欠点を発見した。
それは、Submit Errataが出来ないこと。

Submit Errataは読んで字のごとく、書籍内の誤りを報告する機能なのだが報告の際に該当のページ数の入力が必要になる。
しかし、safariだと肝心のページ数が分からないのだ。

safari独自の報告ページでもあるのかと思いきや特にあるわけでもないので、完全にお手上げである。
暇があったらオライリーのサポートにでもメールを投げるか。

以上、小さな愚痴を書き残しておく。