雑に作って雑に使えるが乗っかってるCPUが1コアなので、雑にハッシュ型をポイポイ〜と投げるとすぐにCPUが100%に張り付く。 実際に7万qps相当のHSETを投げたが、残念ながらCPUが天井を越えて使い物にならなくなってしまった。 こういうCPU使いまっせっていう時は大人しくGCEかGKEで建てた方がいいですなぁという雑感。
Spannerで書き込みの速が出なかったお話
Spanner、皆さんどう使ってるでしょう?めちゃくちゃ雑に使っても大概のシステムの要件を満たせるくらいに捌いてくれる良い子なので、利用シーンは色々あるかとおもいます。値段はお高いですが・・・👀
さて、そんなSpannerで唯一"速" が出なかったパターンを発見したので記述しておく。
どういう状況か?
Spannerはベストプラクティスに従ったスキーマ設計をした場合(hot-spotを作らないetc...)、1ノードあたり読み取りで10000QPS・書き込みで2000QPSのパフォーマンスが出ます。ノードを増やしていくと20000QPS、30000QPSてな感じで性能は上がっていくのですが(あくまでもカタログスペック通りなら)、あるテーブルに対して負荷試験をしている時にどれだけノードを増やしても書き込みで5000~7000QPS以上の速が出ないという状況に陥りました。
調査
今回、負荷試験としてはGKE + Locust + Go(Echo)といった感じでGCPの例やPixivさんの例を参考に環境構築しました。で、上記のような状況になった際に「もしかしてLocustのWorker力が足りないか?」とか「いやいやGoで作ったアプリがおかしいのか?」とか「きっとSpannerのClientライブラリにあるSessionPoolの設定値がおかしいんだ!」とか色々頭を悩ませましたが、Spannerのメトリクスをモニタリングしていたら異常にLatencyが増えていることに気付き、「Spanner側の問題では・・・?」と疑いを持つこととなった。
この時に頭にあったのは「カタログスペックは1レコード1kbを想定しているので、もしや1レコードが重すぎた?」というのと「Unique Indexを使っているテーブルなのでこれが原因か?」という2つでした。問題の切り分けとして、以下のようなテーブルを作成し、試してみることに。
- UUIDを主キーとして持つ1カラムのテーブル
- ↑のテーブルに1カラムとUnique Indexを追加したテーブル
- 今回の状況の際に対象としたテーブルと同じカラム構造だがUnique Indexを持たないテーブル
これらのテーブルに対して、Spannerのノード数を5にしLocustから10000RPSで負荷をかけたところ
- 10000QPS
- 6000QPS
- 10000QPS
という結果になりました。
で、Unique Indexがなんでこんな悪さをしとるのじゃろ?と調べていましたが、sinmetalさんが発表されていたスライドにこんな一文が。
引用:Google Cloud Spanner Deep Dive - Google スライド
うわ〜〜〜〜これか〜〜〜〜。実はUnique Indexは特に分散させなくとも良いだろーと負荷をかける際にInsertしていたデータがhotspotを気にしていないものだったのが原因でした・・・
感想
うーん、Unique Indexを使う際はhotspotに気を付けねばならんですね。(Spannerの公式ドキュメントに書いといてよというお気持ちが少しある) ちなみに気になったので、②のパターンでノード数を上げてみるとどうなるのかやってみたのですが20ノードに上げてみたら逆に5000QPSに下がってしまいました。まぁhotspotなところにどれだけノード数増やしても無駄ですもんねぇ。
結論:Unique Indexのhotspotに気を付ける
現場からは以上です。
grpc-gatewayにおけるHTMLの出力
Cloud Runにおける段階的ロールアウトのメモ
Cloud Runの段階的ロールアウトに関して知らなかったことがあったのでメモ。
段階的ロールアウトって?
ここを見よう。
要するにデプロイ後、最新リビジョンに追従するのではなくトラフィックの管理を行い、段階的にロールアウトしていこうというやつ。
知らなかったこと
「新しいリビジョンの編集とデプロイ」→「このリビジョンをすぐに利用する」のチェックを外すことで最新リビジョンへの追従を止めることが出来るが、その後に gcloud run deploy
でデプロイを行うと段階的ロールアウト状態のままとなり、デプロイしたリビジョンは最新リビジョンとはならない。
ドキュメント見たところ gcloud run deploy
のoptionsに「このリビジョンをすぐに利用する」に該当するoptionが無かったので、再度最新リビジョンへ追従したい場合には手動でチェックボックスをonにしてデプロイするしかない。(と今のところは認識している
Cloud Composerを用いた機械学習における推論パイプラインの構築
ここ最近仕事で作成していた推論パイプラインが完成したので、Cloud Composerを使った場合にどのように構築していくのかサンプルプログラムを書いていこうかと思います。
設計
今回は以下のようなフローを想定して構築していきます。
イメージとしては各種アプリケーションでの購買データを用いた販売予測などが当てはまるでしょうか。
日毎に溜まる差分データがCSVにアップロードされたことによりCloud Composerが発火し、最終的に販売予測を行ったデータをBigQueryに格納するという感じになります。
Cloud Composerについて
前提知識として軽くCloud Composerの説明をします。
Cloud ComposerとはGCPで提供されているフルマネージドのワークフローシステムで、Apache Airflowが構築されたリソースをGKE等々で展開され手間暇をかけずに使用することが出来ます。
他にもCloud Data Fusionが似てような利用方法としてはありますが、料金がお高いので(Basic:月11万、Enterprise:月30万)小規模なワークフローや叩き台としてはCloud Composerが向いていると考えています。
以下がGoogleの提供しているリソースイメージです。
Apache Airflowは2020/04/25 現在、ver1.10.6までサポートされています。
AutoML Tablesについて
こちらも前提知識として軽い説明を。
AutoML Tablesは機械学習モデルのビルド及び、デプロイを簡単に行うことの出来るサービスです。
CSVやBigQueryのデータセットを読み取り、どのような予測結果が欲しいか?を選ぶだけで最適なモデルを作成してくれます。
今回、お仕事で使ってみましたがデータセットの数さえ揃えれば、かなり高い予測モデルを構築してくれるのでめちゃくちゃ助かりました。ただし、トレーニング時間によってかかる料金が結構するので(1h: 約2000円)トレーニング時間を設定する際はお気を付けください。また、別の注意点として予測を行う際にはモデルのデプロイが行われますが、必要な予測が終わり次第必ずデプロイされたリソースを削除しましょう。モデルをデプロイしている間は常に料金が加算されていきますので、削除せず放置した場合とんでもない請求が来ることがあります。今回の推論パイプラインではその辺りも加味した作りにしていきます。
構築
では、実際にどのように構築するのかを解説していきます。
サンプルコードはGitHubにアップロードしていますので、参照しながらお読み下さい。
CSV Sensor - アップロードされたCSVをトリガーに実行する
初めに、Cloud StorageにCSVがアップロードされたらCloud Composerが動き出すように実装していきます。
サンプルコードを見てみましょう。
基本骨子としては
- DAGの設定
- GoogleCloudStoragePrefixSensorの利用
- タスク間依存関係の設定
の3つになります。
DAGの設定
DAGとはDirected Acyclic Graphの略で有向非巡回グラフという意になります。
Airflowでは一つのワークフローの単位と考えて大丈夫です。
大体の設定はドキュメントを読むだけで理解は出来ると思うのですが、ひとつだけ大きな罠があります。それが dagrun_timeout
です。
これは設定名だけ見ると「DAGのタイムアウト設定」と思うのですが大きな間違いで、「DAGの同時実行数が設定値以上になった場合、実行済みのDAGの実行時間がdagrun_timeoutより大きい時に失敗とする」といった設定になります。ややこしいのですが、以下の記事を読むとどういったことを指すか分かると思います。
今回はこの条件に引っかかることはないので設定する必要性はありませんが、一応説明のため書いておきました。
また、個人的には catchup
の設定をFalseにしておくことをオススメしておきます。
これは start_date
から現在までにスケジューリングされるはずだったジョブを実行するかどうかのフラグで、例えばstart_dateが days_ago(1)
で schedule_interval
を @hourly
の時にcatchupをTrueでDAGを登録すると24個分のジョブが走ることになります。変に過去のジョブを走らせたくないという人はcatchupをFalseで設定するか、schedule_intervalをNoneとしそもそもスケジューリング自体を無効化すると良いでしょう。
GoogleCloudStoragePrefixSensorの利用
AirflowにはOperatorというタスクで何を実行するかを決める要素があります。こちらを見て頂くと様々なOperatorが存在することが分かります。特に複雑な事をしない場合は、これらプリセットのOperatorを組み合わせてワークフローを構成していくことになるでしょう。
このDAGではGoogleCloudStoragePrefixSensorというOperatorを利用します。Sensorはざっくり言うと「何かが起きた時に実行されるOperator」で、他にもHTTPのレスポンスを待って実行されるSensorやPythonプログラムの実行完了を待ってから実行されるSensorなどがあります。今回はGCSを監視し、指定したPrefixに合致するファイルが生成された場合に実行されるSensorを使って、CSVをアップロードした時に実行されることを確認します。
基本的な使い方はソースコード内にコメントで書いた通りなのですが、軽く説明を入れると差分データが入るようなアプリケーションを想定しているため20200426-120000.csvのようなYYMMDD-HHMMSS.csvの形式を監視するようにPrefixを指定しています。また日次でアップロードされるのでSensorのタイムアウト時間も1日待つようにしています。
タスク間依存関係の設定
最後にタスクについての依存関係を記載していきます。例えば以下のような依存関係だと
- runme_0, runme_1, runme_2, also_run_thisが同時に実行される
- runme_0, runme_1, runme_2が実行完了になるとrun_after_loopが実行される
- run_after_loop, also_run_thisが実行完了になるとrun_this_lastが実行される
といったタスクの関係性になります。 今回は特に複雑な依存関係はありませんが、もし長大なワークフローになる場合などはSubDagOperatorといった別のDAGを実行させる仕組みがありますので、ワークフローの中から別途切り出してみるのも一つでしょう。そして基本的にはDAGでありますので、上流のタスクには戻るような依存関係にはさせないよう注意してください。
また、trigger_ruleという依存関係における親タスクが成功/失敗したかでタスクを実行させるかどうかを設定できますので、そういった要素を頭に入れてワークフローを組んでいくとよいでしょう。Predictの項では実際にtrigger_ruleを使って、ある状況に対する対処を実装していきます。
今はまだCSV Sensorしかありませんので、一旦一つだけのタスクを書いております。
Preprocessing - 前処理
データセットの前処理はCloud Functionsを通して行います。これには理由があり、データセットの前処理を行う場合にはpandasがほぼ必須かと思いますがpandasが依存しているパッケージにgcsfsがあり、それをComposerではインストールすることが出来なかったのです。この辺りの原因はイマイチわからなかったのですが、この記事が同じ問題にぶつかっていたので参考に貼っておきます。
Cloud Functionsへリクエスト送信する方法として、OAuth ID Tokenを取得してAuthorization Headerにくっつけて送信しています。この辺りの実装方法はコメントにもあるGoogleのドキュメントに書いてありますので一度読んでみて下さい。またOperatorはSimpleHttpOperatorを一部改変したものを使っています。先述のOAuth ID Tokenの件があったのとレスポンスのチェックがしたかったという2点で改変しています。このように既存のOperatorで解決できないことがあった場合はCustom Operatorとして実装することが出来ます。
あとはSimpleHttpOperatorにはhttp_conn_idというパラメータが設定できますが、これはリクエストのドメイン部分を設定しているものになります。AirflowのGUI画面から設定内容は確認でき、上部タブのAdmin→Connectionsを開くとコードでも指定しているhttp_defaultの項目があるはずです。このコードではhttp_defaultを上書きしてCloud FunctionsのURLである'https://asia-northeast1-inference-pipeline.cloudfunctions.net'を設定していますが、cloud_functions_httpなど別途connectionsを作りそれを指定するのが良いと思います。
また今回はCloud Functionsで実行するコードの説明や記載はしませんが「GCSからCSVを読み取り、データクレンジング等した結果をGCSに再度アップロード」といった感じのことをやっていると考えてください。
Import BQ - BigQueryへのインポート
Cloud Functionsを通してデータ加工したものをBigQueryへインポートします。Cloud Functions上で加工してBigQueryへインポートするのも良いのですが、せっかくなのでComposerで別タスクとしてやっていきます。丁度良くCloud StorageからBigQueryへインポートするOperatorが存在しているのでそちらを利用しています。
やっていることとしては、source_objectsにて指定した条件に合致するCSVを全てまとめてdataテーブルにTRUNCATEした上でインポートしています。schema_fieldsを設定していますが、他にも自動でスキーマを読み取ることの出来るautodetectという設定もありますので特別な理由がない限りはそちらを使うのが良いでしょう。
Predict - AutoML Tablesを用いた予測
さて、ここからはAutoML Tablesの要素が入ってきます。ここでは以下のフローを実現しています。
- AutoML Tablesのモデルをデプロイ
- デプロイされたモデルを使って予測を行う
- モデルのアンデプロイ
どれも共通する項目としてはAutoML Clientのモデルパスの取得処理ですかね。ここで指定しているパラメータですが、プロジェクトID・AutoML Tablesのリージョン・モデルIDの3つ構成されます。モデルIDが一番わかりにくいですが、これはAutoML Tablesのモデル画面に表示されているIDになります。
このモデルパスを使って、デプロイしたり予測したりアンデプロイしています。
注意点として、データセット画面に記載されているTBLxxxxxのIDでは無いのでお気を付けください。
加えてAutoML Tables由来の注意点が幾つかあり、一つはリージョンに関するものです。AutoML Tablesは現在指定できるリージョンが「グローバル」と「欧州連合」の2つで、予測する際に使用する入力元と出力先のサービスもそのリージョンに合わせる必要があります。また、予測結果は「prediction-(モデル名)_(日付)」の名称で出力されます。今回だと出力先をBigQueryにすると先述の名称のテーブルが毎日作成されてしまうので、Cloud Storageを出力先として指定しています。
最後に新しくXComという要素が出てきました。これはタスク間での値の受け渡しに関する仕組みで、例えばタスクAでの出力結果をタスクBで使いたい場合にはAでXComに対してプッシュし、Bでプルするという流れになります。ここでは先程述べた予測結果のディレクトリ名を後処理を行うタスクに受け渡すために使用しています。使用する前にAirflowのGUIにあるAdmin→XComsから作成しておきましょう。PythonOperatorにはprovide_context=Trueを設定し、Xcomをプッシュする時に必要となる**kwargsを引数として渡せるようにしておきましょう。
また、delete_modelではtrigger_ruleをall_doneにしています。これは「モデルをデプロイしたが予測が失敗した場合」に対処するもので、trigger_ruleのデフォルトはall_success(親タスクが全て成功した時に実行)になっており、デフォルトのままだと予測が失敗した場合にはデプロイしている状態のままDAGが終了し、最悪の場合放置していて料金が大変なことになるということを防ぐためにも設定しています。
Postprocessing - 後処理
ワークフローの作成もラストになりました。予測した結果を加工する後処理を作成しましょう。
後処理では前処理の時と違う点が一つあります。それは予測の時にXComでプッシュしたディレクトリ名をCloud Functionsに引き渡すという点です。その点を考慮し、作成したRunCloudFunctionsOperatorを少し改造してディレクトリ名を引き渡せるようにしてみました。
XComからプルした値をCloud FunctionsにURLパラメータとして引き渡しています。これで、Cloud Functions側で予測結果が格納されたディレクトリを参照して、ディレクトリ内のファイルを加工できるようになりました。
もう少し便利に
最後の最後にこのワークフローをもう少し便利にするため、タスクの失敗/成功時にSlackへ通知するように実装しましょう。失敗時の通知は全タスクで共通のため、DAGのon_failure_callbackで設定。成功の通知は最後のタスクが成功した時に送るようにしたいのでpostprocessingのタスクに対してon_success_callbackを設定しています。詳しくは以下の電通さんのテックブログに詳しく書いてありますので参考にしてください。