生涯未熟

プログラミングをちょこちょこと。

Sparkにおける不正なCSV読み込みへの立ち向かい方

Apache Sparkを使い、あるデータをHDFSCSVとして保存し、保存したCSVから読み込んだデータをDBに格納するということを想定して、もし不正なCSVファイルが紛れ込んでいたらどうする?ということを考えていく。

状況

この疑問が生じた発端となった不正なCSVを見ていきたい。

"商品A","この商品は
素晴らしい商品
です","使ってみたけど
確かに素晴らしかった
です!"
"商品B","この商品は
\"駄目\"
です","使ってみたけど
確かに\"駄目\"
でした!"

このような形の商品名・説明・レビューで構成されているMultiLine CSVである。
これはどのように不正かというと、2つ目のレコードの \" が災いして以下のように崩れてしまうのである。

f:id:syossan:20180811020435p:plain

このような不正なCSVをSparkのDataFrameに乗せて処理をしてしまうと、愚直に崩れた状態で読み込んでしまうので様々な問題を引き起こしてしまう。

どう立ち向かうか?

FASTFAIL

SparkでのCSV読み込み時にはoptionを設定することが出来るが、 FAILFAST Mode で動くようにoption設定するのが一つの方法だ。
Modeには3種類あり、

  • PERMISSIVE(default): 全ての行を走査する。欠落したセルにはnullを入れ、余分なセルは無視という挙動を行う
  • DROPMALFORMED: 想定より少ないセル数の行の削除、またスキーマと一致しないセル内容の削除を行う
  • FAILFAST: 不正行が見つかった場合にRuntimeExceptionを返す

という内容になっている。

FAILFAST Modeで動かしておき、もしRuntimeExceptionが発生した場合はCSVの確認を行う、という方法もひとつ。

escape + quote options

しかし、上記の方法では読み込み時に不正なCSVを検知するだけなのであまり意味がない。
そこで、 escape optionと quote optionを使って正しいCSVファイルに直すという方法がある。

この問題のCSVが何故不正なのか?といった点を掘り下げていくと、ダブルクオーテーションをエスケープするにはダブルクオーテーションを使わなくてはいけないという仕様がRFCに記載されており、 \" を使ったCSVは本来 "" というCSVになるのが正しい形である。

RFC 4180 - Common Format and MIME Type for Comma-Separated Values (CSV) Files

If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote

そこで、CSVの書き込み時と読み込み時に escapequote のoptionを仕様に合わせた形に設定していきたい。

一例として以下のようにやってみる。

  • CSV書き込み
df.write
    .options(
        Map(
            "escape" -> "\"",
            "quote" -> "\""
        )
    )
    .mode(SaveMode.Append)
    .csv("hdfs://hoge:8082")
  • CSV読み込み
spark.read
    .format("org.apache.spark.csv")
    .options(Map(
        "escape" -> "\"",
        "quote" -> "\"",
        "multiLine" -> "true"
    ))
    .schema(schema)
    .csv(
        "hdfs://hoge:8082/*.csv"
    )

このような感じで書き込み時と読み込み時に適切な形になるようオプションで明示的に指定してあげると上手く動きます。
SparkでCSVを扱う際にはくれぐれも気を付けましょう。

ランサーズ開発ランチにお邪魔してきた!

f:id:syossan:20180711115427j:plain

クラウドソーシングで超有名なランサーズさんが、開発ランチという面白いイベントをやっていたのでカツ丼食べたくてブログ枠として参加してまいりました!!

lancers-engineer.connpass.com

ランサーズ開発ランチ(Lunchers)について

このイベントはランサーズさんが開催している勉強会で、一ヶ月に1~2回というスパンで開かれております。
はてなのa_knowさんやオミカレのそーだいさんなど、著名な方々がゲストで招かれている素晴らしい勉強会です!

engineer.blog.lancers.jp

ブログ枠だと最高に美味しいトンカツが食べれるので皆さん参加しましょう!!!!

f:id:syossan:20180711120000j:plain

kakakakakkuさんによるお話

今回のランサーズ開発ランチのゲストは、

kakakakakku.hatenablog.com

の記事で1000ブクマを超える反響を呼んだkakakakakkuさんによる「プロジェクトの成功を支える ZenHub と モブプログラミング」でした!

僭越ながら、お話を聞いて個人的に「オッ」と思ったところを書き出していきます。

f:id:syossan:20180711120853j:plain

Backlogsにあるタスクにメンバーを割り当てない

手の空いた人が優先順位の高いタスクからサッと着手出来るように、Backlogsのタスクにメンバーは割り当てないようにする。
スキルがミスマッチの場合でも、時間がかかってもいいので着手するようにする。

スキルがミスマッチの場合でも〜、というのが個人的には刺さりました。
マネージャーが機能している場合、プロジェクトを円滑に終わらせるためにも事前にメンバーのスキルセットに合ったタスクの割り振りをやってしまいがちです。
しかし、それではメンバーのスキルセットの質や幅がいつまで経っても向上しないままなので、タスクにかかる時間を多少目を瞑ってもやらせる、ということなのでしょう。

途中、kakakakakkuさんが「サービス自体には興味がない。プロジェクトの達成とメンバーの成長に興味がある。」と仰っていたことを含んで考えると、感慨深いものがあります。

優先度警察

kakakakakkuさんが受け持つプロジェクトでは「優先度」という言葉は禁句らしいです。
というのも、「優先度」ではなく「優先順位」という言葉を使って欲しい、という気持ちから。

優先度だとありきたりな「高・中・低」で分けられてしまいます。
しかし、優先順位だと明確に「1, 2 3」とやるべきことの順番を決めることができます。
例えば、Aさんがタスクを取ろうとBacklogsを見た時に、優先度・高と優先度・高のタスクが同時に存在した場合にどちらを先にやるか悩んでしまいますよね。
そういったことが無いように、優先順位の一番高いものから着手できるよう「優先順位」という考え方を大事にしているとのことです。

質問タイム:前半

  • ZenHubはどこがオススメポイント? → 画面の見やすさ、Githubに紐付いた機能、Epic機能
  • Epicに紐付いた仕様はどう管理しているの? → 仕様書をGoogleドライブで管理してEpicにリンクを貼る
  • 優先順位がビジネスに左右される場合は? → 基本左右されないようにする。Doingに入ったものは変えない。Backlogs内だとバンバン変える。
  • Reviewingからなかなか進まない場合は? → 口頭・対面でレビュー進めるように言う(Doingより優先してやって!など)。WIP制限がReviewingを進めるという効果もある。1~3日の粒度のタスクなのでPRレビューもそんなにかからないはず。
  • ラベルの付け方は? → 全体、仕様、フロント、バック(Ruby etc)、デザイン etc...。スキルマップと照らし合わせて、ラベルを分ける。
  • 今後必要なんだけど今できないみたいなタスクは(技術検証とか)? → Backlogsに積んでおくといい。やるんだったら3日で終わるように調整する。
  • Reviewingの数は制限あるの? → ない。レビューが詰まるようならやる価値はある。

スウォーミングの考え方

タスクとメンバーの関係性を1:1から1:nと考える。
あるタスクに異様に時間がかかっている状況が発生した場合、手が空いたメンバーがそのタスクに関わるようにする。
その考え方の延長線上にあるものが「モブプログラミング」である。
スキルセット関係なく関わるようにすることで、個々のメンバーの強みが他のメンバーの強みとなっていく→強みの伝搬。

モブリリース

今回のお話の中で、この考え方が凄い良いなぁ〜と聞いておりました。
どういうものかというと、属人化しやすいリリース作業をモブプログラミングのように皆の前で行うことで、リリース作業の理解を広めるというもの。
手順書を作り、それを見ながらリリースするという方法もあるのですが、それよりもリリースのプロセスを可視化された状態で見て学んだ方がいいとのこと。
実際に人がやっている手順を見ることで、自分がやる時に安心して出来ますし、その場で「何故それをするのか?」といった質問も出来ますし、良い取り組みですよね。

質問タイム:後半

  • あるタスクが困窮していて、皆で助けにいった際にタスク全体のスケジュールが遅れる場合はどうする? → スケジュールを伸ばす。ビジネス側に文句を言われても無視する。
  • モブプログラミング開催の周期は? → 毎日やってるところもあるが、kakakakakkuさんのところは週1回。教育・息抜きのイベントのような扱い。モブリリースは頻繁にやっている。スキルマップと照らし合わせて、スキル的に難がある人が多い場合は、慣れてきたプロジェクト後半にモブプロを開始する場合もある。

まとめ

プロジェクトマネージャーという職を経験したことのない未熟な私にとっては、非常にタメになるお話の連続でめちゃくちゃ勉強になりました!
今回のお話の中で得た知識を、少しでも現場で活かせることが出来るよう頑張ります・・・!

ランサーズ開発ランチでは今後もドンドン魅力的なゲストを呼んで開催されるとのことなので、界隈の最先端にいる方のお話を聞きたい方は是非チェックしましょう!!

runtime.Goexitを改めて理解する

序文

runtime packageには Goexit() というfunctionがあります。

// Goexit terminates the goroutine that calls it. No other goroutine is affected.
// Goexit runs all deferred calls before terminating the goroutine. Because Goexit
// is not a panic, any recover calls in those deferred functions will return nil.
//
// Calling Goexit from the main goroutine terminates that goroutine
// without func main returning. Since func main has not returned,
// the program continues execution of other goroutines.
// If all other goroutines exit, the program crashes.

// Goexitはそれを呼び出すgoroutineを終了します。 その他のゴルーチンは影響を受けません。
// Goexitは、goroutineを終了する前にすべてのdefer呼び出しを実行します。 Goexitはpanicではないので、これらのdefer関数の中のすべてのrecover呼び出しはnilを返します。
// メインのgoroutineからGoexitを呼び出すと、メインgoroutineに戻ることなくそのgoroutineが終了します。 func mainは返されていないので、プログラムは他のgoroutineの実行を続けます。
// その他のゴルーチンがすべて終了すると、プログラムがクラッシュします。
func Goexit() {
    // Run all deferred functions for the current goroutine.
    // This code is similar to gopanic, see that implementation
    // for detailed comments.

        // 現在のゴルーチンのすべてのdefer関数を実行します。
        // このコードはgopanicに似ています。詳細なコメントの実装を参照してください。
    gp := getg()
    for {
        d := gp._defer
        if d == nil {
            break
        }
        if d.started {
            if d._panic != nil {
                d._panic.aborted = true
                d._panic = nil
            }
            d.fn = nil
            gp._defer = d.link
            freedefer(d)
            continue
        }
        d.started = true
        reflectcall(nil, unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
        if gp._defer != d {
            throw("bad defer entry in Goexit")
        }
        d._panic = nil
        d.fn = nil
        gp._defer = d.link
        freedefer(d)
        // Note: we ignore recovers here because Goexit isn't a panic
    }
    goexit1()
}

コメントにあるように、以下の特性を持ちます。

  • 呼び出し元のgoroutineを終了する
  • すべてのgoroutineが終了すると、 deadlock を起こしクラッシュする
  • panic処理と違い、defer function内のrecoverはnilを返す

なかなか使い所の難しい要素ではありますが、Goでは FailNow() などで使われたりしています。
さて、このGoexitですがそこまで使われていないっぽくて、自分自身使ってみたことがないので試してみました。

試行

それでは軽く試してみましょう。

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    go func() {
        defer fmt.Println("Done")
        time.Sleep(1 * time.Second)
        runtime.Goexit()
    }()

    fmt.Printf("goroutines: %v\n", runtime.NumGoroutine())
    time.Sleep(2 * time.Second)
    fmt.Printf("goroutines: %v\n", runtime.NumGoroutine())
}

// goroutines: 2
// Done
// goroutines: 1

正常に終了し、goroutineの数が減っていることがわかりますね。
ただ、これだけでは単純に return するのと変わりません。次は少し捻ってみましょう。

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    go func() {
        defer fmt.Println("Done")
        HelloAndExit()
    }()

    time.Sleep(1 * time.Second)
}

func HelloAndExit() {
    fmt.Println("Hello")
    runtime.Goexit()
}

// Hello
// Done

Hello と出力してからgoroutineを終了させるパターンです。
ただし、これだけだと

func HelloAndExit() {
    fmt.Println("Hello")
}

としても同じ結果になります。
それでは、 return との違いを試してみましょう。

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    go func() {
        defer fmt.Println("Done HelloAndExit goroutine")
        fmt.Println("Start HelloAndExit goroutine")
        HelloAndExit()
        HelloAndExit()
    }()

    go func() {
        defer fmt.Println("Done HelloAndReturn goroutine")
        fmt.Println("Start HelloAndReturn goroutine")
        HelloAndReturn()
        HelloAndReturn()
    }()

    time.Sleep(1 * time.Second)
}

func HelloAndExit() {
    fmt.Println("HelloAndExit")
    runtime.Goexit()
}

func HelloAndReturn() {
    fmt.Println("HelloAndReturn")
    return
}

// Start HelloAndReturn goroutine
// HelloAndReturn
// HelloAndReturn
// Done HelloAndReturn goroutine

// Start HelloAndExit goroutine
// HelloAndExit
// Done HelloAndExit goroutine

このように、 return の場合では後続処理が走り、 runtime.Goexit では実行された時点で呼び出し元goroutineが終了していることが分かりますね。
こんな風にネストを深くしても、

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    go func() {
        defer fmt.Println("Done")
        Wrapper()
    }()

    time.Sleep(2 * time.Second)
}

func Wrapper() {
    fmt.Println("Wrapper")
    HelloAndExit()
    time.Sleep(1 * time.Second)
    fmt.Println("Wake up")
}

func HelloAndExit() {
    fmt.Println("HelloAndExit")
    runtime.Goexit()
}

// Wrapper
// HelloAndExit
// Done

runtime.Goexit が呼び出された時点で、Wrapperの処理を打ち切ってgoroutineが終了されています。

まとめ

面白い機能なんですが、利用シーンを考えてみるとなかなか難しくて、「これだ!」と思うような実践的な実装が思いつきませんでした・・・
実装例を探そうとしても、そもそもGoexitを使っている人が居なかっt(ry

こんな感じで使うと良いよっていうのを、どなたかデキルGopherな方が提示してくれるのを待つしか無い・・・

「Go言語でつくるインタプリタ」を読んだ

読んだので思ったことを書く。

Go言語でつくるインタプリタ
Thorsten Ball
オライリージャパン
売り上げランキング: 9,186

どんな感じの本なのさ?

詳しくはこちら

Writing An Interpreter In Goを読んだ | SOTA

本書は、
字句解析器→構文解析器(トップダウン)→評価器→さらなるインタプリタの改良→【付録】マクロの実装
という感じの構成になってました。

昔、Rubyで小さなインタプリタを書いた自分にとってはPratt構文解析器の説明など学びがある感じの本でした。

めちゃくちゃ丁寧な説明、なのに300Pもない

この本、めちゃくちゃ解説が丁寧です。特にGo以外の事前知識が無くても大丈夫なくらいです。
そこまで丁寧に書かれてるのに総ページ数が300もいかないという薄さ。サックリ読めてしまう薄さです。
1000P超えるdragon bookと比べると、お手軽さが段違いですね。

丁寧は丁寧なんですが、バカ真面目に書かれた本でもなく、合間合間に小気味なジョークを挟んでくるので読んでいてあまり飽きもきません。

省くところは省く簡潔さ

本当にインタプリタを作ることだけにフォーカスしているため、余計なところは出来る限り省いています。
そして、それを何故省くのか?といったところも勿論説明してくれているため、余計な疑問を持たずに読み進められる。

TDD形式のコーディング

きちんと事前にテストを書いてから進める進行なので、どういった挙動をするプログラムを組めばいいのか?というのが事前に分かりやすい。

おわり

色々な実装を削っても、この規模のインタプリタを作るの大変なんだなぁ・・・という感想。
いつかインタプリタを作りたくなった時に再び本書を紐解こうと思う。

Cassandraが墓石まみれになり死

TombStoneが出てこなくて、ついTotemと言ってしまった私です。
今回はTombStoneで死んだ話をします。

状況

Apache SparkでCassandraに向けて、1000万件程のデータ を流し込む処理を実行していたのですが、途中で処理が完全に止まってしまい死にました。

何故に

何ぞ?と思い、 cqlsh でQuery投げてみましたが、何を投げても以下のようなエラーが。

cassandra.ReadFailure: code=1300 [Replica(s) failed to execute read] message="Operation failed - received 0 responses and 1 failures" info={'required_responses': 1, 'received_responses': 0, 'failures': 1, 'consistency': 'ONE'}

ほげ〜〜〜〜となって調べてみると、こんな記事が。

stackoverflow.com

You are exceeding the tombstone_failure_threshold. It defaults to 100'000.

いやいや、そんなTombStoneが10万もおっ立てるほど削除処理回してへんで、となっておもむろに nodetool cfstats hoge.fuga を実行したところ完全にTombStoneが10万超えてました・・・

なんでやねん

opencredo.com

この記事読んで知ったんですが、columnのnullでもTombStoneって立つんですね・・・
たしかにテーブル内のデータを見ると、nullのデータがちらほらあって色々と納得。

どうすんねん

とりあえずTombStoneを掃除するために、Compactionの設定を変えることにしました。
Tableの WITH COMPACTION をこんな感じに。

'tombstone_compaction_interval': '60' :TombStoneのCompaction間隔秒数(デフォルトは1日)
'tombstone_threshold': '0.01' :Compaction実行のしきい値となる、すべての列とTombStoneが含まれた列の比率

docs.datastax.com

上限値も変えようかなとは思ったんですが、それやっちゃうと上限値の最適値を探る作業が入っちゃうので面倒だなと思いCompaction設定をいじりました。

まとめ

TombStoneで死ぬこともあるので気を付けよう。

参考資料

Cassandra導入事例と現場視点での苦労したポイント cassandra summit2014jpn

Null bindings on prepared statements and undesired tombstone creation

Common Problems with Cassandra Tombstones - OpenCredo

How is data deleted?

cassandra.yaml構成ファイル

Compaction subproperties