生涯未熟

生涯未熟

プログラミングをちょこちょこと。

Cassandraが墓石まみれになり死

TombStoneが出てこなくて、ついTotemと言ってしまった私です。
今回はTombStoneで死んだ話をします。

状況

Apache SparkでCassandraに向けて、1000万件程のデータ を流し込む処理を実行していたのですが、途中で処理が完全に止まってしまい死にました。

何故に

何ぞ?と思い、 cqlsh でQuery投げてみましたが、何を投げても以下のようなエラーが。

cassandra.ReadFailure: code=1300 [Replica(s) failed to execute read] message="Operation failed - received 0 responses and 1 failures" info={'required_responses': 1, 'received_responses': 0, 'failures': 1, 'consistency': 'ONE'}

ほげ〜〜〜〜となって調べてみると、こんな記事が。

stackoverflow.com

You are exceeding the tombstone_failure_threshold. It defaults to 100'000.

いやいや、そんなTombStoneが10万もおっ立てるほど削除処理回してへんで、となっておもむろに nodetool cfstats hoge.fuga を実行したところ完全にTombStoneが10万超えてました・・・

なんでやねん

opencredo.com

この記事読んで知ったんですが、columnのnullでもTombStoneって立つんですね・・・
たしかにテーブル内のデータを見ると、nullのデータがちらほらあって色々と納得。

どうすんねん

とりあえずTombStoneを掃除するために、Compactionの設定を変えることにしました。
Tableの WITH COMPACTION をこんな感じに。

'tombstone_compaction_interval': '60' :TombStoneのCompaction間隔秒数(デフォルトは1日)
'tombstone_threshold': '0.01' :Compaction実行のしきい値となる、すべての列とTombStoneが含まれた列の比率

docs.datastax.com

上限値も変えようかなとは思ったんですが、それやっちゃうと上限値の最適値を探る作業が入っちゃうので面倒だなと思いCompaction設定をいじりました。

まとめ

TombStoneで死ぬこともあるので気を付けよう。

参考資料

Cassandra導入事例と現場視点での苦労したポイント cassandra summit2014jpn

Null bindings on prepared statements and undesired tombstone creation

Common Problems with Cassandra Tombstones - OpenCredo

How is data deleted?

cassandra.yaml構成ファイル

Compaction subproperties

saramaでAsyncProducerを使う時に気を付けなければいけないたったひとつのこと

AsyncCloseを使うな


どういうこと?

AsyncProducerには

  • AsyncClose
  • Close

の2種類がある。

なんの違いが?

それぞれのコメント読みましょ。

AsyncClose

AsyncClose triggers a shutdown of the producer. The shutdown has completed when both the Errors and Successes channels have been closed. When calling AsyncClose, you must continue to read from those channels in order to drain the results of any messages in flight.


AsyncCloseがプロデューサのシャットダウンをトリガします。 エラーと成功の両方のチャネルが閉じられたときにシャットダウンが完了しました。 AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。

Close

Close shuts down the producer and waits for any buffered messages to be flushed. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.


Closeはプロデューサをシャットダウンし、バッファされたメッセージがフラッシュされるのを待ちます。 プロデューサオブジェクトがスコープの外に出る前に、この関数を呼び出す必要があります。 基になるクライアントでCloseを呼び出す前にこれを呼び出す必要があります。

よくわからん🤔

コードを読もう

AsyncClose

func (p *asyncProducer) AsyncClose() {
    go withRecover(p.shutdown)
}

withRecover()shutdown というメソッドを引数にとって実行しています。

func withRecover(fn func()) {
    defer func() {
        handler := PanicHandler
        if handler != nil {
            if err := recover(); err != nil {
                handler(err)
            }
        }
    }()

    fn()
}

単純にpanicしたときにrecoverしてerrorを吐き出してる感じのdeferをつけて、引数のメソッドを実行しているだけですね。

func (p *asyncProducer) shutdown() {
    Logger.Println("Producer shutting down.")
    p.inFlight.Add(1)
    p.input <- &ProducerMessage{flags: shutdown}

    p.inFlight.Wait()

    if p.ownClient {
        err := p.client.Close()
        if err != nil {
            Logger.Println("producer/shutdown failed to close the embedded client:", err)
        }
    }

    close(p.input)
    close(p.retries)
    close(p.errors)
    close(p.successes)
}

で、肝心のshutdown処理なのですが、一度シャットダウンのための値を p.input に渡して、Producer上で実行中のメッセージの受け渡しが終わり次第、Producerをcloseしようとしています。

Close

func (p *asyncProducer) Close() error {
    p.AsyncClose()

    if p.conf.Producer.Return.Successes {
        go withRecover(func() {
            for range p.successes {
            }
        })
    }

    var errors ProducerErrors
    if p.conf.Producer.Return.Errors {
        for event := range p.errors {
            errors = append(errors, event)
        }
    } else {
        <-p.errors
    }

    if len(errors) > 0 {
        return errors
    }
    return nil
}

1行目から AsyncClose 呼んでますね・・・
その後は、brokerへのメッセージ送信の成否を示す SuccessesErrors をClose側でhandlingしています。
p.successesp.errors で値を受け取り続けているのは、最初の説明で書いてあった "AsyncCloseを呼び出すときは、飛行中のメッセージの結果を排除するために、それらのチャネルからの読み込みを続ける必要があります。" に従っているわけですね。
この飛行中ってのはイコール、メッセージの送信中って意味で、それを表しているのが inFlight というわけです。

inFlight 自体は sync.WaitGroup で、メッセージを送信する際に Add(1) , 送信し終わったら Done() します。
今回で言うと、 shutdown 処理の際に、安全にシャットダウンするために inFlight を使ってますね。

まとめ

というわけで、 AsyncClose のみを使用するときには結局 Close でやっているような処理を実装しなくてはならないので、それなら Close 使った方がいいよ。という結論に至りました。
なら何故 AsyncClose は外部からも読めるようになってるんだ?という感じもするかもしれませんが、 p.successes とかは処理内容がNoopなので、自分で色々やりたい人はどうぞ、っていうキャパシティを残してるんですかね?🤔

OS Xのnetstatに立ち向かう

ある時、手持ちのMacからTIME_WAITが出てるか調べようと netstat を叩いた所、想定していた挙動と全く違ったので、メモ代わりに書いておく。

何が違ったのか?

  • -p オプションの挙動
  • TIME_WAITの扱い

-p オプションの挙動

これは完全にLinuxBSD系であるOSXの違いなんですが、 -p の扱いが両者で全く違います。
完全にPIDと実行プログラム名が出ると思って -p 付けて叩いたら

ʕ ◔ϖ◔ʔ % netstat -anp                                                                                                                                                 (net-tools)-[master]
netstat: option requires an argument -- p
Usage:  netstat [-AaLlnW] [-f address_family | -p protocol]
    netstat [-gilns] [-f address_family]
    netstat -i | -I interface [-w wait] [-abdgRtS]
    netstat -s [-s] [-f address_family | -p protocol] [-w wait]
    netstat -i | -I interface -s [-f address_family | -p protocol]
    netstat -m [-m]
    netstat -r [-Aaln] [-f address_family]
    netstat -rs [-s]

なんて出たもので。BSD系では -pプロトコル指定のオプションなんですね。
なので、実行プログラム名とか見たいって場合は netstat よりも lsof 使った方がいいのかもしれません。

TIME_WAITの扱い

OS Xの場合、netstat ではTIME_WAITが表示されません。
なので、確認したい場合は

ʕ ◔ϖ◔ʔ % sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0

を実行しましょう。

おわりに

OS XがTIME_WAITを頑なに隠してるのが謎・・・
どっかのファイルにTCPコネクションを吐いていてくれたら、なんとかツール組んで出来るんじゃないかなーと思ったり。

あと、調査する中で面白かったのはnetstatアマチュア無線パケット通信プロトコルであるAX25とかをサポートしてたりとか、謎な知識が知れたことですね。
今回、netstatのコードも読んだんですが一度読んでみると様々な発見があるのでお薦めです。

github.com

参考資料

blog.kamipo.net

OSX: Where are my TIME_WAIT ?!

入社から一年経ちました

株式会社アイスタイルに去年の6/1に入社してから一年経ちました。
もう30手前になると一年があっという間だなぁ、としみじみ感じております。

何をやったか?

・最初の2,3ヶ月は久々にWindows使った
・雑にScalaやった
・ガッツリGoやってる
hadoop, hbase, kafka, cassandraとかもやった
・microservices的なこともやってる
・今までやってきたPHPは何だったのか?ってくらいちゃんとしたPHPに触れた
・ビジュアルリグレッションテストの環境作った

とか色々やりました。
今までやったことないようなことにチャレンジしてるので、刺激的な毎日です。

どんな感じか?

今のところある程度自由に伸び伸びと開発をしています。
最近は弊社で各種カンファレンスのスポンサーやったり、勉強会開いたりとコミュニティに寄与することも徐々にですが始まりました。

あとテックブログとかもやってたりします。

https://techblog.istyle.co.jp

開発に関することなら理解のある職場なので、ありがたい限りです。

おわりに

という感じで元気に生きてます。
弊社気になる方は是非ご連絡下さい。

TIME_WAITを引き起こさないhttp.Getを実現する

大量にhttp.Getするという書き捨てコードを書いていたのですが、途中までは順調に進むのですがあるところから急に通信を行わなくなる、という事態にハマりました。

一体何が?

なんじゃこりゃ、というわけでとりあえずコネクションの状況を確認してみたのですが、途中で止まるのも納得なTIME_WAITだらけの状況でした。
そう、TIME_WAITにポートが使い潰されて止まっていたのです。

その時のコードがこちら。

package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var wg = sync.WaitGroup{}
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            url := "http://www.google.co.jp"
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これなんですが、実行するとTIME_WAITが大量に発生します。
Macで確認するときは netstat ではなく、 sysctl net.inet.tcp.tw_pcbcount で確認しましょう。
Macだと netstat ではTIME_WAITが表示されないのです・・・

原因

何が原因かというと、コネクションプールがほとんど利用できてないんですよね。これ。
MaxIdleConnsPerHost のデフォルトが2なので、2コネクションしか再利用できてない状態で新規のコネクションをポンポン開けちゃう状態でした。
なので、こうしてみました。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

ホスト毎に100件分のコネクションを再利用する感じのコードです。
間髪入れずにブン回すので、semaphore処理も入れてます。
これでTIME_WAITが発生しないと思うでしょ?ダメなんだなこれが。

$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 27
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 77
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 146
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 201
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 288
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 334

...

こんな感じで無限にTIME_WAITが増えていって、全然再利用されていません。
めちゃくちゃ悩んだんですが、どうやら resp を読み込まないとコネクションが再利用されないようです。

package main

import (
    "fmt"
    "sync"
    "net/http"
    "golang.org/x/sync/semaphore"
    "context"
    "io/ioutil"
)

var client *http.Client

const (
    Limit = 100
    Weight = 1
)

func main() {
    var wg = sync.WaitGroup{}

    s := semaphore.NewWeighted(Limit)

    defaultRoundTripper := http.DefaultTransport
    defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
    if !ok {
        panic(fmt.Sprintf("defaultRoundTripper not an *http.Transport"))
    }
    defaultTransport := *defaultTransportPointer
    defaultTransport.MaxIdleConns = 0
    defaultTransport.MaxIdleConnsPerHost = 100
    client = &http.Client{Transport: &defaultTransport}

    for i := 0; i < 5000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.Acquire(context.Background(), Weight)
            defer s.Release(Weight)
            url := "http://www.google.co.jp"
            resp, err := client.Get(url)
            if err != nil {
                fmt.Println("Error: " + err.Error())
                return
            }
            defer resp.Body.Close()
            _, err = ioutil.ReadAll(resp.Body)
            fmt.Println("Success")
        }()
    }
    wg.Wait()
}

これでやってみると・・・

$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0
$ sysctl net.inet.tcp.tw_pcbcount
net.inet.tcp.tw_pcbcount: 0

...

やったーーーーー!!TIME_WAITが0にできたよーーーーーー!!!
という感じで、もし同じ現象で悩んでる方がいたら試してみてください。