Apache Sparkを使い、あるデータをHDFSにCSVとして保存し、保存したCSVから読み込んだデータをDBに格納するということを想定して、もし不正なCSVファイルが紛れ込んでいたらどうする?ということを考えていく。
状況
この疑問が生じた発端となった不正なCSVを見ていきたい。
"商品A","この商品は 素晴らしい商品 です","使ってみたけど 確かに素晴らしかった です!" "商品B","この商品は \"駄目\" です","使ってみたけど 確かに\"駄目\" でした!"
このような形の商品名・説明・レビューで構成されているMultiLine CSVである。
これはどのように不正かというと、2つ目のレコードの \"
が災いして以下のように崩れてしまうのである。
このような不正なCSVをSparkのDataFrameに乗せて処理をしてしまうと、愚直に崩れた状態で読み込んでしまうので様々な問題を引き起こしてしまう。
どう立ち向かうか?
FASTFAIL
SparkでのCSV読み込み時にはoptionを設定することが出来るが、 FAILFAST Mode
で動くようにoption設定するのが一つの方法だ。
Modeには3種類あり、
- PERMISSIVE(default): 全ての行を走査する。欠落したセルにはnullを入れ、余分なセルは無視という挙動を行う
- DROPMALFORMED: 想定より少ないセル数の行の削除、またスキーマと一致しないセル内容の削除を行う
- FAILFAST: 不正行が見つかった場合にRuntimeExceptionを返す
という内容になっている。
FAILFAST Modeで動かしておき、もしRuntimeExceptionが発生した場合はCSVの確認を行う、という方法もひとつ。
escape + quote options
しかし、上記の方法では読み込み時に不正なCSVを検知するだけなのであまり意味がない。
そこで、 escape
optionと quote
optionを使って正しいCSVファイルに直すという方法がある。
この問題のCSVが何故不正なのか?といった点を掘り下げていくと、ダブルクオーテーションをエスケープするにはダブルクオーテーションを使わなくてはいけないという仕様がRFCに記載されており、 \"
を使ったCSVは本来 ""
というCSVになるのが正しい形である。
RFC 4180 - Common Format and MIME Type for Comma-Separated Values (CSV) Files
If double-quotes are used to enclose fields, then a double-quote appearing inside a field must be escaped by preceding it with another double quote
そこで、CSVの書き込み時と読み込み時に escape
と quote
のoptionを仕様に合わせた形に設定していきたい。
一例として以下のようにやってみる。
- CSV書き込み
df.write .options( Map( "escape" -> "\"", "quote" -> "\"" ) ) .mode(SaveMode.Append) .csv("hdfs://hoge:8082")
- CSV読み込み
spark.read .format("org.apache.spark.csv") .options(Map( "escape" -> "\"", "quote" -> "\"", "multiLine" -> "true" )) .schema(schema) .csv( "hdfs://hoge:8082/*.csv" )
このような感じで書き込み時と読み込み時に適切な形になるようオプションで明示的に指定してあげると上手く動きます。
SparkでCSVを扱う際にはくれぐれも気を付けましょう。