生涯未熟

生涯未熟

プログラミングをちょこちょこと。

Cloud Runにおける段階的ロールアウトのメモ

Cloud Runの段階的ロールアウトに関して知らなかったことがあったのでメモ。

段階的ロールアウトって?

ここを見よう。

cloud.google.com

要するにデプロイ後、最新リビジョンに追従するのではなくトラフィックの管理を行い、段階的にロールアウトしていこうというやつ。

知らなかったこと

「新しいリビジョンの編集とデプロイ」→「このリビジョンをすぐに利用する」のチェックを外すことで最新リビジョンへの追従を止めることが出来るが、その後に gcloud run deploy でデプロイを行うと段階的ロールアウト状態のままとなり、デプロイしたリビジョンは最新リビジョンとはならない。

ドキュメント見たところ gcloud run deploy のoptionsに「このリビジョンをすぐに利用する」に該当するoptionが無かったので、再度最新リビジョンへ追従したい場合には手動でチェックボックスをonにしてデプロイするしかない。(と今のところは認識している

Cloud Composerを用いた機械学習における推論パイプラインの構築

ここ最近仕事で作成していた推論パイプラインが完成したので、Cloud Composerを使った場合にどのように構築していくのかサンプルプログラムを書いていこうかと思います。

設計

今回は以下のようなフローを想定して構築していきます。

f:id:syossan:20200426165734p:plain

イメージとしては各種アプリケーションでの購買データを用いた販売予測などが当てはまるでしょうか。
日毎に溜まる差分データがCSVにアップロードされたことによりCloud Composerが発火し、最終的に販売予測を行ったデータをBigQueryに格納するという感じになります。

Cloud Composerについて

前提知識として軽くCloud Composerの説明をします。

Cloud ComposerとはGCPで提供されているフルマネージドのワークフローシステムで、Apache Airflowが構築されたリソースをGKE等々で展開され手間暇をかけずに使用することが出来ます。
他にもCloud Data Fusionが似てような利用方法としてはありますが、料金がお高いので(Basic:月11万、Enterprise:月30万)小規模なワークフローや叩き台としてはCloud Composerが向いていると考えています。

以下がGoogleの提供しているリソースイメージです。

f:id:syossan:20200425182726p:plain

Apache Airflowは2020/04/25 現在、ver1.10.6までサポートされています。

AutoML Tablesについて

こちらも前提知識として軽い説明を。

AutoML Tablesは機械学習モデルのビルド及び、デプロイを簡単に行うことの出来るサービスです。
CSVやBigQueryのデータセットを読み取り、どのような予測結果が欲しいか?を選ぶだけで最適なモデルを作成してくれます。

今回、お仕事で使ってみましたがデータセットの数さえ揃えれば、かなり高い予測モデルを構築してくれるのでめちゃくちゃ助かりました。ただし、トレーニング時間によってかかる料金が結構するので(1h: 約2000円)トレーニング時間を設定する際はお気を付けください。また、別の注意点として予測を行う際にはモデルのデプロイが行われますが、必要な予測が終わり次第必ずデプロイされたリソースを削除しましょう。モデルをデプロイしている間は常に料金が加算されていきますので、削除せず放置した場合とんでもない請求が来ることがあります。今回の推論パイプラインではその辺りも加味した作りにしていきます。

構築

では、実際にどのように構築するのかを解説していきます。
サンプルコードはGitHubにアップロードしていますので、参照しながらお読み下さい。

github.com

CSV Sensor - アップロードされたCSVをトリガーに実行する

初めに、Cloud StorageにCSVがアップロードされたらCloud Composerが動き出すように実装していきます。
サンプルコードを見てみましょう。

基本骨子としては

  • DAGの設定
  • GoogleCloudStoragePrefixSensorの利用
  • タスク間依存関係の設定

の3つになります。

DAGの設定

DAGとはDirected Acyclic Graphの略で有向非巡回グラフという意になります。
Airflowでは一つのワークフローの単位と考えて大丈夫です。

大体の設定はドキュメントを読むだけで理解は出来ると思うのですが、ひとつだけ大きな罠があります。それが dagrun_timeout です。
これは設定名だけ見ると「DAGのタイムアウト設定」と思うのですが大きな間違いで、「DAGの同時実行数が設定値以上になった場合、実行済みのDAGの実行時間がdagrun_timeoutより大きい時に失敗とする」といった設定になります。ややこしいのですが、以下の記事を読むとどういったことを指すか分かると思います。

lists.apache.org

今回はこの条件に引っかかることはないので設定する必要性はありませんが、一応説明のため書いておきました。

また、個人的には catchup の設定をFalseにしておくことをオススメしておきます。
これは start_date から現在までにスケジューリングされるはずだったジョブを実行するかどうかのフラグで、例えばstart_dateが days_ago(1)schedule_interval@hourly の時にcatchupをTrueでDAGを登録すると24個分のジョブが走ることになります。変に過去のジョブを走らせたくないという人はcatchupをFalseで設定するか、schedule_intervalをNoneとしそもそもスケジューリング自体を無効化すると良いでしょう。

GoogleCloudStoragePrefixSensorの利用

AirflowにはOperatorというタスクで何を実行するかを決める要素があります。こちらを見て頂くと様々なOperatorが存在することが分かります。特に複雑な事をしない場合は、これらプリセットのOperatorを組み合わせてワークフローを構成していくことになるでしょう。

このDAGではGoogleCloudStoragePrefixSensorというOperatorを利用します。Sensorはざっくり言うと「何かが起きた時に実行されるOperator」で、他にもHTTPのレスポンスを待って実行されるSensorやPythonプログラムの実行完了を待ってから実行されるSensorなどがあります。今回はGCSを監視し、指定したPrefixに合致するファイルが生成された場合に実行されるSensorを使って、CSVをアップロードした時に実行されることを確認します。

基本的な使い方はソースコード内にコメントで書いた通りなのですが、軽く説明を入れると差分データが入るようなアプリケーションを想定しているため20200426-120000.csvのようなYYMMDD-HHMMSS.csvの形式を監視するようにPrefixを指定しています。また日次でアップロードされるのでSensorのタイムアウト時間も1日待つようにしています。

タスク間依存関係の設定

最後にタスクについての依存関係を記載していきます。例えば以下のような依存関係だと

f:id:syossan:20200426150635p:plain

  • runme_0, runme_1, runme_2, also_run_thisが同時に実行される
  • runme_0, runme_1, runme_2が実行完了になるとrun_after_loopが実行される
  • run_after_loop, also_run_thisが実行完了になるとrun_this_lastが実行される

といったタスクの関係性になります。 今回は特に複雑な依存関係はありませんが、もし長大なワークフローになる場合などはSubDagOperatorといった別のDAGを実行させる仕組みがありますので、ワークフローの中から別途切り出してみるのも一つでしょう。そして基本的にはDAGでありますので、上流のタスクには戻るような依存関係にはさせないよう注意してください。

また、trigger_ruleという依存関係における親タスクが成功/失敗したかでタスクを実行させるかどうかを設定できますので、そういった要素を頭に入れてワークフローを組んでいくとよいでしょう。Predictの項では実際にtrigger_ruleを使って、ある状況に対する対処を実装していきます。

今はまだCSV Sensorしかありませんので、一旦一つだけのタスクを書いております。

Preprocessing - 前処理

データセットの前処理はCloud Functionsを通して行います。これには理由があり、データセットの前処理を行う場合にはpandasがほぼ必須かと思いますがpandasが依存しているパッケージにgcsfsがあり、それをComposerではインストールすることが出来なかったのです。この辺りの原因はイマイチわからなかったのですが、この記事が同じ問題にぶつかっていたので参考に貼っておきます。

stackoverflow.com

Cloud Functionsへリクエスト送信する方法として、OAuth ID Tokenを取得してAuthorization Headerにくっつけて送信しています。この辺りの実装方法はコメントにもあるGoogleのドキュメントに書いてありますので一度読んでみて下さい。またOperatorはSimpleHttpOperatorを一部改変したものを使っています。先述のOAuth ID Tokenの件があったのとレスポンスのチェックがしたかったという2点で改変しています。このように既存のOperatorで解決できないことがあった場合はCustom Operatorとして実装することが出来ます。

airflow.apache.org

あとはSimpleHttpOperatorにはhttp_conn_idというパラメータが設定できますが、これはリクエストのドメイン部分を設定しているものになります。AirflowのGUI画面から設定内容は確認でき、上部タブのAdmin→Connectionsを開くとコードでも指定しているhttp_defaultの項目があるはずです。このコードではhttp_defaultを上書きしてCloud FunctionsのURLである'https://asia-northeast1-inference-pipeline.cloudfunctions.net'を設定していますが、cloud_functions_httpなど別途connectionsを作りそれを指定するのが良いと思います。

また今回はCloud Functionsで実行するコードの説明や記載はしませんが「GCSからCSVを読み取り、データクレンジング等した結果をGCSに再度アップロード」といった感じのことをやっていると考えてください。

Import BQ - BigQueryへのインポート

Cloud Functionsを通してデータ加工したものをBigQueryへインポートします。Cloud Functions上で加工してBigQueryへインポートするのも良いのですが、せっかくなのでComposerで別タスクとしてやっていきます。丁度良くCloud StorageからBigQueryへインポートするOperatorが存在しているのでそちらを利用しています。

やっていることとしては、source_objectsにて指定した条件に合致するCSVを全てまとめてdataテーブルにTRUNCATEした上でインポートしています。schema_fieldsを設定していますが、他にも自動でスキーマを読み取ることの出来るautodetectという設定もありますので特別な理由がない限りはそちらを使うのが良いでしょう。

Predict - AutoML Tablesを用いた予測

さて、ここからはAutoML Tablesの要素が入ってきます。ここでは以下のフローを実現しています。

  • AutoML Tablesのモデルをデプロイ
  • デプロイされたモデルを使って予測を行う
  • モデルのアンデプロイ

どれも共通する項目としてはAutoML Clientのモデルパスの取得処理ですかね。ここで指定しているパラメータですが、プロジェクトID・AutoML Tablesのリージョン・モデルIDの3つ構成されます。モデルIDが一番わかりにくいですが、これはAutoML Tablesのモデル画面に表示されているIDになります。

f:id:syossan:20200426173522p:plain

このモデルパスを使って、デプロイしたり予測したりアンデプロイしています。
注意点として、データセット画面に記載されているTBLxxxxxのIDでは無いのでお気を付けください。

加えてAutoML Tables由来の注意点が幾つかあり、一つはリージョンに関するものです。AutoML Tablesは現在指定できるリージョンが「グローバル」と「欧州連合」の2つで、予測する際に使用する入力元と出力先のサービスもそのリージョンに合わせる必要があります。また、予測結果は「prediction-(モデル名)_(日付)」の名称で出力されます。今回だと出力先をBigQueryにすると先述の名称のテーブルが毎日作成されてしまうので、Cloud Storageを出力先として指定しています。

最後に新しくXComという要素が出てきました。これはタスク間での値の受け渡しに関する仕組みで、例えばタスクAでの出力結果をタスクBで使いたい場合にはAでXComに対してプッシュし、Bでプルするという流れになります。ここでは先程述べた予測結果のディレクトリ名を後処理を行うタスクに受け渡すために使用しています。使用する前にAirflowのGUIにあるAdmin→XComsから作成しておきましょう。PythonOperatorにはprovide_context=Trueを設定し、Xcomをプッシュする時に必要となる**kwargsを引数として渡せるようにしておきましょう。

また、delete_modelではtrigger_ruleをall_doneにしています。これは「モデルをデプロイしたが予測が失敗した場合」に対処するもので、trigger_ruleのデフォルトはall_success(親タスクが全て成功した時に実行)になっており、デフォルトのままだと予測が失敗した場合にはデプロイしている状態のままDAGが終了し、最悪の場合放置していて料金が大変なことになるということを防ぐためにも設定しています。

Postprocessing - 後処理

ワークフローの作成もラストになりました。予測した結果を加工する後処理を作成しましょう。

後処理では前処理の時と違う点が一つあります。それは予測の時にXComでプッシュしたディレクトリ名をCloud Functionsに引き渡すという点です。その点を考慮し、作成したRunCloudFunctionsOperatorを少し改造してディレクトリ名を引き渡せるようにしてみました。

XComからプルした値をCloud FunctionsにURLパラメータとして引き渡しています。これで、Cloud Functions側で予測結果が格納されたディレクトリを参照して、ディレクトリ内のファイルを加工できるようになりました。

もう少し便利に

最後の最後にこのワークフローをもう少し便利にするため、タスクの失敗/成功時にSlackへ通知するように実装しましょう。失敗時の通知は全タスクで共通のため、DAGのon_failure_callbackで設定。成功の通知は最後のタスクが成功した時に送るようにしたいのでpostprocessingのタスクに対してon_success_callbackを設定しています。詳しくは以下の電通さんのテックブログに詳しく書いてありますので参考にしてください。

note.com

ghでvimを使いたい

Githubからコマンドラインツールが登場しましたね。

www.publickey1.jp

READMEを読むと

For many years, hub was the unofficial GitHub CLI tool.

とあるように、既にあったhubは非公式ツールだったのね、など驚きがありつつ触ってみました。

今の所、ベータ版ではあるので出来ることはissue, prを作ったり眺めたり程度なのですが、ghのissueを覗くと「これ実装しなきゃね〜」みたいな話がチラホラあったりするので、気になる方はプロポーザルしてみたり覗いたりしてみるといいかも。

ということで、触ってみたところ gh pr create でprを作成出来るのですが本文を記述するエディタがデフォルトでnanoになっていました。

f:id:syossan:20200217085805p:plain

vimに慣れきった人間としてはvimで編集したい気持ちが一杯なので、エディタを変更してみることに。

github.com

↑ちょうど良くその手の話がされていたので、読んでみると以下のいずれかの環境変数を弄ることでデフォルトエディタを変更出来るという。

export GIT_EDITOR=vim
export VISUAL=vim
export EDITOR=vim

で、改めて実行してみると

f:id:syossan:20200217090152p:plain

おー、たしかに変わっとる。
というわけで困ってる方は参考にしてみてください〜。

勉強会の主催をやりません

Goに関連した勉強会をやっておりましたが、今後主催者として開催はしません。

理由として、勉強会でのいざこざにより心を病んでしまったことと、自分がやらなくてもGoに関する勉強会が増えたので必要無くなったかな?と思ったからです。

元々がもう少し多くの勉強会があった方が界隈が盛り上がるのかなという目論見からやり始めましたが、色々なところで勉強会開催されるようになりましたし、界隈としても十分盛り上がっているので存在意義が無くなりました。個人的にモチベーションも無くなりました。

精神が安定してきたので一応正式に今後の方針を述べさせて頂きました。 万が一開催を期待されている方がいたとしたら申し訳ありません。

Telegrafを用いたElastic Beanstalk環境のEC2インスタンスにあるDockerコンテナの監視

5ヶ月ぶりです。人生ですね。

Telegrafを使った監視をやってみたので、備忘録として残しておきます。

What is Telegraf

InfluxData社が作成したメトリクス集計、レポーティングツールになります。

github.com

構造として、

  1. インプット:メトリクスの集計先
  2. プロセッサー:集計したメトリクスの加工
  3. アグリゲーター: メトリクスから集計(最大、最小、平均 etc...)
  4. アウトプット:メトリクスの送信先

の4種類のプラグインで成り立っており、それぞれのプラグインの設定を記述して動かすことになります。
READMEに載っているプラグイン一覧を見ていただければ分かるのですが、凄い数のプラグインがあります。
無かったらGoで作ってPRを送りましょう。

概ね良いツールではあるのですが、如何せん設定周りの記述で分かりづらいところもあるので、以下のドキュメント集はさっくり見ておくことをオススメします。
(CONFIGRATION.mdだけでも読みましょう。筆者は読まなかったことでMetrics Filteringの存在に気付かず、痛く後悔をしました)

やりたい

こんな感じでメトリクスを集計・送信したいという願望図

f:id:syossan:20191219124249p:plain

やる

やっていきます。

まぁそんなにやることはなくて、手順としては

  • Telegrafのconfig fileを吟味する
  • .ebextensions でElastic Beanstalkに建てられるEC2インスタンス設定を弄る
    1. Telegrafをインストール
    2. Telegraf config fileを作成
    3. Telegrafを実行

てな流れになります。

Telegrafのconfig fileを吟味する

まずはTelegrafのデフォルトのconfig fileを見ていきましょう・・・ってのをやりたいのですが、6200行近くあるので必要なプラグインのみ記述されているconfig fileを出力しましょう。

$ telegraf --input-filter docker --output-filter cloudwatch config > ./telegraf.config

で、これでも600行近くあるので必要な部分をつまみ食いしたconfigを見てみましょう。

設定の意味合いとかを日本語で簡単に書きました。
特にこだわりなければデフォルトでもメトリクス吐いてくれますが、メトリクスを絞り込まなければかなりの量のカスタムメトリクスが生成されてしまうので注意しましょう。(カスタムメトリクスは即時削除できず、保有期間を過ぎるのを待たないといけない)
どのようなメトリクスが吐かれるのか知りたい場合は、 --test オプションを使って標準出力に吐いてみましょう。

また、各プラグインの詳細説明はREADMEにあるプラグイン一覧からリンクで飛べるため、使用する前に目を通しておくのをオススメします。
例えばDockerプラグインだと集計するメトリクスが書いてあったりするので、それを見ながらメトリクスフィルタリングすると良いでしょう。

メトリクスフィルタリング?

メトリクスを必要な分だけ絞り込む際に使うやつです。
基本的にどのプラグインでも使える設定値で、詳しくは以下参照。

https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#metric-filtering

例えば、Dockerからメモリ使用量・使用率しか集計したくない!って場合は、 [[inputs.docker]] に以下のような設定を記述することで実現できます。

[[inputs.docker]]
  namepass = ["docker_container_mem"]
  fieldpass = ["usage_percent", "usage"]

特にCloudWatchのようなクラウド環境に送る際に、不要なメトリクスを送ってしまうと無駄なコストが掛かってしまうのでメトリクスフィルタリングを是非活用したいですね。


また、今回のようにElastic Beanstalk環境でやる際には、 container_name_excludeecs-agent を追加しておくのが良いでしょう。特に見る必要のないコンテナなので・・・

というわけで、脳死でやりたいならデフォルトでも良いですが、変なことにならないよう諸々設定しておきましょうという話でした。

.ebextensions でElastic Beanstalkに建てられるEC2インスタンス設定を弄る

Elastic Beanstalkには .ebextensions という、EC2インスタンスが生成される際に走る設定があります。
今回はここにtelegrafのインストール・設定ファイルの生成・telegrafの実行、という3つの設定を書いていきましょう。

.ebextensionsの注意

地味に引っかかりそうなところなんですが、 .ebextensions はファイル名のアルファベット順に実行されますので注意してください。
例えば、

.ebextensions
  - install-telegraf.config
  - exec-telegraf.config

といったファイルがあった場合、予想に反して exec-telegraf.config から実行されてFailしてしまうので、 00.install-telegraf.config などファイル名の先頭に数字を付けておくと安全です。

telegrafのインストール

こちらが設定例になります。
yumのrepoを生成し、インストールしている簡単なものになります。

注意点として、インスタンスAmazon Linuxを使っている場合には baseurl を公式のインストールマニュアルにあるように https://repos.influxdata.com/rhel/$releasever/$basearch/stable とすると404エラーが発生するので、上記のようにしておきましょう。

Telegraf config fileを作成

config fileを /etc/telegraf/telegraf.conf に生成しています。

ここは content の部分が各々変わってくるところかなと。

Telegrafを実行

最後にTelegrafを実行しましょう。

これでデプロイ時に上手くtelegrafが動いて、CloudWatchにメトリクスが集計されるようになります。

おわり

という感じで簡単にTelegrafの使用例を書いてみました。
日本語でのTelegrafに関する記事が少なかったので、増やす意味で書きましたが参考にされる方が一人でもいれば幸いです。

このようにプラグインの設定をザッと書くだけで監視環境が整うのは、とても楽で良いですね。
他にもこんなツールは良いよ!というのがあれば是非教えて下さい🙇