ここ最近仕事で作成していた推論パイプラインが完成したので、Cloud Composerを使った場合にどのように構築していくのかサンプルプログラムを書いていこうかと思います。
設計
今回は以下のようなフローを想定して構築していきます。
イメージとしては各種アプリケーションでの購買データを用いた販売予測などが当てはまるでしょうか。
日毎に溜まる差分データがCSVにアップロードされたことによりCloud Composerが発火し、最終的に販売予測を行ったデータをBigQueryに格納するという感じになります。
Cloud Composerについて
前提知識として軽くCloud Composerの説明をします。
Cloud ComposerとはGCPで提供されているフルマネージドのワークフローシステムで、Apache Airflowが構築されたリソースをGKE等々で展開され手間暇をかけずに使用することが出来ます。
他にもCloud Data Fusionが似てような利用方法としてはありますが、料金がお高いので(Basic:月11万、Enterprise:月30万)小規模なワークフローや叩き台としてはCloud Composerが向いていると考えています。
以下がGoogleの提供しているリソースイメージです。
Apache Airflowは2020/04/25 現在、ver1.10.6までサポートされています。
AutoML Tablesについて
こちらも前提知識として軽い説明を。
AutoML Tablesは機械学習モデルのビルド及び、デプロイを簡単に行うことの出来るサービスです。
CSVやBigQueryのデータセットを読み取り、どのような予測結果が欲しいか?を選ぶだけで最適なモデルを作成してくれます。
今回、お仕事で使ってみましたがデータセットの数さえ揃えれば、かなり高い予測モデルを構築してくれるのでめちゃくちゃ助かりました。ただし、トレーニング時間によってかかる料金が結構するので(1h: 約2000円)トレーニング時間を設定する際はお気を付けください。また、別の注意点として予測を行う際にはモデルのデプロイが行われますが、必要な予測が終わり次第必ずデプロイされたリソースを削除しましょう。モデルをデプロイしている間は常に料金が加算されていきますので、削除せず放置した場合とんでもない請求が来ることがあります。今回の推論パイプラインではその辺りも加味した作りにしていきます。
構築
では、実際にどのように構築するのかを解説していきます。
サンプルコードはGitHubにアップロードしていますので、参照しながらお読み下さい。
CSV Sensor - アップロードされたCSVをトリガーに実行する
初めに、Cloud StorageにCSVがアップロードされたらCloud Composerが動き出すように実装していきます。
サンプルコードを見てみましょう。
基本骨子としては
- DAGの設定
- GoogleCloudStoragePrefixSensorの利用
- タスク間依存関係の設定
の3つになります。
DAGの設定
DAGとはDirected Acyclic Graphの略で有向非巡回グラフという意になります。
Airflowでは一つのワークフローの単位と考えて大丈夫です。
大体の設定はドキュメントを読むだけで理解は出来ると思うのですが、ひとつだけ大きな罠があります。それが dagrun_timeout
です。
これは設定名だけ見ると「DAGのタイムアウト設定」と思うのですが大きな間違いで、「DAGの同時実行数が設定値以上になった場合、実行済みのDAGの実行時間がdagrun_timeoutより大きい時に失敗とする」といった設定になります。ややこしいのですが、以下の記事を読むとどういったことを指すか分かると思います。
今回はこの条件に引っかかることはないので設定する必要性はありませんが、一応説明のため書いておきました。
また、個人的には catchup
の設定をFalseにしておくことをオススメしておきます。
これは start_date
から現在までにスケジューリングされるはずだったジョブを実行するかどうかのフラグで、例えばstart_dateが days_ago(1)
で schedule_interval
を @hourly
の時にcatchupをTrueでDAGを登録すると24個分のジョブが走ることになります。変に過去のジョブを走らせたくないという人はcatchupをFalseで設定するか、schedule_intervalをNoneとしそもそもスケジューリング自体を無効化すると良いでしょう。
GoogleCloudStoragePrefixSensorの利用
AirflowにはOperatorというタスクで何を実行するかを決める要素があります。こちらを見て頂くと様々なOperatorが存在することが分かります。特に複雑な事をしない場合は、これらプリセットのOperatorを組み合わせてワークフローを構成していくことになるでしょう。
このDAGではGoogleCloudStoragePrefixSensorというOperatorを利用します。Sensorはざっくり言うと「何かが起きた時に実行されるOperator」で、他にもHTTPのレスポンスを待って実行されるSensorやPythonプログラムの実行完了を待ってから実行されるSensorなどがあります。今回はGCSを監視し、指定したPrefixに合致するファイルが生成された場合に実行されるSensorを使って、CSVをアップロードした時に実行されることを確認します。
基本的な使い方はソースコード内にコメントで書いた通りなのですが、軽く説明を入れると差分データが入るようなアプリケーションを想定しているため20200426-120000.csvのようなYYMMDD-HHMMSS.csvの形式を監視するようにPrefixを指定しています。また日次でアップロードされるのでSensorのタイムアウト時間も1日待つようにしています。
タスク間依存関係の設定
最後にタスクについての依存関係を記載していきます。例えば以下のような依存関係だと
- runme_0, runme_1, runme_2, also_run_thisが同時に実行される
- runme_0, runme_1, runme_2が実行完了になるとrun_after_loopが実行される
- run_after_loop, also_run_thisが実行完了になるとrun_this_lastが実行される
といったタスクの関係性になります。 今回は特に複雑な依存関係はありませんが、もし長大なワークフローになる場合などはSubDagOperatorといった別のDAGを実行させる仕組みがありますので、ワークフローの中から別途切り出してみるのも一つでしょう。そして基本的にはDAGでありますので、上流のタスクには戻るような依存関係にはさせないよう注意してください。
また、trigger_ruleという依存関係における親タスクが成功/失敗したかでタスクを実行させるかどうかを設定できますので、そういった要素を頭に入れてワークフローを組んでいくとよいでしょう。Predictの項では実際にtrigger_ruleを使って、ある状況に対する対処を実装していきます。
今はまだCSV Sensorしかありませんので、一旦一つだけのタスクを書いております。
Preprocessing - 前処理
データセットの前処理はCloud Functionsを通して行います。これには理由があり、データセットの前処理を行う場合にはpandasがほぼ必須かと思いますがpandasが依存しているパッケージにgcsfsがあり、それをComposerではインストールすることが出来なかったのです。この辺りの原因はイマイチわからなかったのですが、この記事が同じ問題にぶつかっていたので参考に貼っておきます。
Cloud Functionsへリクエスト送信する方法として、OAuth ID Tokenを取得してAuthorization Headerにくっつけて送信しています。この辺りの実装方法はコメントにもあるGoogleのドキュメントに書いてありますので一度読んでみて下さい。またOperatorはSimpleHttpOperatorを一部改変したものを使っています。先述のOAuth ID Tokenの件があったのとレスポンスのチェックがしたかったという2点で改変しています。このように既存のOperatorで解決できないことがあった場合はCustom Operatorとして実装することが出来ます。
あとはSimpleHttpOperatorにはhttp_conn_idというパラメータが設定できますが、これはリクエストのドメイン部分を設定しているものになります。AirflowのGUI画面から設定内容は確認でき、上部タブのAdmin→Connectionsを開くとコードでも指定しているhttp_defaultの項目があるはずです。このコードではhttp_defaultを上書きしてCloud FunctionsのURLである'https://asia-northeast1-inference-pipeline.cloudfunctions.net'を設定していますが、cloud_functions_httpなど別途connectionsを作りそれを指定するのが良いと思います。
また今回はCloud Functionsで実行するコードの説明や記載はしませんが「GCSからCSVを読み取り、データクレンジング等した結果をGCSに再度アップロード」といった感じのことをやっていると考えてください。
Import BQ - BigQueryへのインポート
Cloud Functionsを通してデータ加工したものをBigQueryへインポートします。Cloud Functions上で加工してBigQueryへインポートするのも良いのですが、せっかくなのでComposerで別タスクとしてやっていきます。丁度良くCloud StorageからBigQueryへインポートするOperatorが存在しているのでそちらを利用しています。
やっていることとしては、source_objectsにて指定した条件に合致するCSVを全てまとめてdataテーブルにTRUNCATEした上でインポートしています。schema_fieldsを設定していますが、他にも自動でスキーマを読み取ることの出来るautodetectという設定もありますので特別な理由がない限りはそちらを使うのが良いでしょう。
Predict - AutoML Tablesを用いた予測
さて、ここからはAutoML Tablesの要素が入ってきます。ここでは以下のフローを実現しています。
- AutoML Tablesのモデルをデプロイ
- デプロイされたモデルを使って予測を行う
- モデルのアンデプロイ
どれも共通する項目としてはAutoML Clientのモデルパスの取得処理ですかね。ここで指定しているパラメータですが、プロジェクトID・AutoML Tablesのリージョン・モデルIDの3つ構成されます。モデルIDが一番わかりにくいですが、これはAutoML Tablesのモデル画面に表示されているIDになります。
このモデルパスを使って、デプロイしたり予測したりアンデプロイしています。
注意点として、データセット画面に記載されているTBLxxxxxのIDでは無いのでお気を付けください。
加えてAutoML Tables由来の注意点が幾つかあり、一つはリージョンに関するものです。AutoML Tablesは現在指定できるリージョンが「グローバル」と「欧州連合」の2つで、予測する際に使用する入力元と出力先のサービスもそのリージョンに合わせる必要があります。また、予測結果は「prediction-(モデル名)_(日付)」の名称で出力されます。今回だと出力先をBigQueryにすると先述の名称のテーブルが毎日作成されてしまうので、Cloud Storageを出力先として指定しています。
最後に新しくXComという要素が出てきました。これはタスク間での値の受け渡しに関する仕組みで、例えばタスクAでの出力結果をタスクBで使いたい場合にはAでXComに対してプッシュし、Bでプルするという流れになります。ここでは先程述べた予測結果のディレクトリ名を後処理を行うタスクに受け渡すために使用しています。使用する前にAirflowのGUIにあるAdmin→XComsから作成しておきましょう。PythonOperatorにはprovide_context=Trueを設定し、Xcomをプッシュする時に必要となる**kwargsを引数として渡せるようにしておきましょう。
また、delete_modelではtrigger_ruleをall_doneにしています。これは「モデルをデプロイしたが予測が失敗した場合」に対処するもので、trigger_ruleのデフォルトはall_success(親タスクが全て成功した時に実行)になっており、デフォルトのままだと予測が失敗した場合にはデプロイしている状態のままDAGが終了し、最悪の場合放置していて料金が大変なことになるということを防ぐためにも設定しています。
Postprocessing - 後処理
ワークフローの作成もラストになりました。予測した結果を加工する後処理を作成しましょう。
後処理では前処理の時と違う点が一つあります。それは予測の時にXComでプッシュしたディレクトリ名をCloud Functionsに引き渡すという点です。その点を考慮し、作成したRunCloudFunctionsOperatorを少し改造してディレクトリ名を引き渡せるようにしてみました。
XComからプルした値をCloud FunctionsにURLパラメータとして引き渡しています。これで、Cloud Functions側で予測結果が格納されたディレクトリを参照して、ディレクトリ内のファイルを加工できるようになりました。
もう少し便利に
最後の最後にこのワークフローをもう少し便利にするため、タスクの失敗/成功時にSlackへ通知するように実装しましょう。失敗時の通知は全タスクで共通のため、DAGのon_failure_callbackで設定。成功の通知は最後のタスクが成功した時に送るようにしたいのでpostprocessingのタスクに対してon_success_callbackを設定しています。詳しくは以下の電通さんのテックブログに詳しく書いてありますので参考にしてください。