DigdagをECSでスケールアウトして性能改善をしました

はじめに

初めまして。エンジニアの有山と申します。普段は主にデータ分析基盤の開発、運用、およびウェルスナビのサービス基盤全般に関わる業務を担当しています。
今回の記事では、主にデータ分析基盤の設計や主要な技術に興味関心のある方を対象に、ウェルスナビで構築しているデータ分析基盤のご紹介と、Digdagを使用して構築したデータパイプラインの性能改善について解説していきます。

ウェルスナビのデータ分析基盤

ウェルスナビでは、データドリブンなサービス改善を行うため、社内にデータ分析基盤を構築しています。この基盤はマーケティング施策などに利用され、社員は必要なデータにアクセスしてPDCAを効率的に回せるような環境*1が整っています。技術スタックとしては以下のような技術を使用しています。

役割 サービス
インフラ Amazon EC2, ECR, ECS
DWH Bigquery
ワークフロー Digdag
ETL Embulk, Python, Ruby

当初は以下の図のような構成で運用していたのですが、運用していくにつれ一部のワークフローに課題が出てきたため、当該ワークフローをECSを使用して改善していくことにしました。

運用して見えてきた課題

この基盤の大きな課題は、プロセスの実行環境が一台のEC2に集中してしまうということでした。Digdagのワークフローが競合する、もしくはワークフロー内で多数のプロセスが並列実行されることでサーバーが高負荷な状態となり、他のプロセスに影響を与えてしまうことで、全体の処理速度が劣化してしまっていました。
解決策として、以下の図のようにDigdagサーバーの実行環境と起動するプロセスの実行環境を分離して、プロセスの数だけサーバーをスケールアウトするような構成を考えました。Digdagサーバーは今まで通りEC2で動作させることにして、Digdagで実行するプロセスには、プロセスごとにFargateを割り当て、他のシステムに影響を与えないようにしました。
コンテナ化をするに当たって、今回はDigdagのバージョン0.10.4から使用可能となったECSCommandExecutorというツールを使用しました。 docs.digdag.io

設定方法はドキュメントに記載の通り、Digdagサーバーの設定ファイルにECSクラスター名などの設定を記述し、_export:にECSタスク定義のARNを記載することでプロセス実行時にECSを起動できます。使用するIAMユーザーにはECSとCloudwatchの権限をつける必要があるため注意しましょう。ECSへの通信に失敗した場合は、dockerコンテナの起動を試行し、コンテナの起動に失敗した場合は、ローカルでプロセスを実行するように遷移していきます。また、デフォルトのログレベルではECSへの通信に関するログは一切出ないため、ECSタスクをうまく起動できない場合は下記のようにローカルモードでデバッグレベルのログを確認すると良いと思います。
digdag run Workflow.dig -l debug

次章ではECSCommandExecutorを使用する上での、実装上のポイントを解説していきます。

実装上のポイント

共有ファイルシステムを使用する

あるワークフローにおいて、プロセスが動的に作成したファイルを別のプロセスが参照するという処理を行なっていました。EC2上で実行する場合はホストのディレクトリを共有しているため問題なかったのですが、実行環境をプロセスごとに分散させる場合には、ファイルの共有ができないという事象が発生します。
このため、AWSの共有ファイルシステムサービスであるEFS(ElasticFileSystem)をECSタスクにマウントすることによって、プロセス間でのファイルの共有を可能にしました。

DigdagサーバーのJavascriptエンジンを変更する

前提として、今回のECSCommandExecutorを使用するにあたり、Digdagのバージョンを0.10以上にバージョンアップする必要があります。バージョン0.10がリリースされたタイミングで、Java11が実験的にサポートされました。

docs.digdag.io

元々DigdagはJSのエンジンとしてNashornを使用していましたが、Java11からは非推奨となり、このリリースで新しくGraalJSというエンジンが追加されました。バージョンアップの検証により、Nashornを使った場合、プロセスを大量に並列実行するワークフローでは、性能が著しく劣化することがわかったため、GraalJSを使用することでパフォーマンスの維持を行いました。

一時ストレージであるS3をプライベートサブネット内に配置する

ECSCommandExecutorを使用する際、Digdagサーバーは以下のような流れでプロセスを実行していきます。

  1. ソースコードを一時ストレージであるS3にアップロードする。
  2. ECSタスクを起動する。
  3. ECSタスク内のプロセスがS3からソースコードをダウンロードし、実行する
  4. 実行結果が出力されたファイルをS3にアップロードする。

手順2のタイミングでS3の署名付きURLを発行し、ECSとS3の通信を行います。こちらは有効期限内であれば、発行したURLに対してHTTPリクエストを送れるというものです。そのため、有効期限内に万が一第三者にURLを知られてしまった場合、S3のソースコードにアクセスできてしまうという懸念がありました。
解決策として、S3バケットにECSと同じVPCからのアクセスのみ許容するバケットポリシーを設定し、仮にURLを取得されたとしても仮想ネットワークの外からアクセスできないようにしてリスクを減らしました。

また、署名付きURLのデフォルト有効時間はアップロード、ダウンロードともに10分間に設定されています。仮に応答時間が10分を超えるようなプロセスをECS上で起動した場合、処理自体は成功しますが、手順4の結果のアップロード時に、URLの有効期限切れによってDigdagの処理が失敗してしまうため、アップロードの有効期限のみ10分以上にする設定が必要です。具体的には、Digdagサーバーの設定にて、以下のパラメータの値を600以上に設定する必要があります。 agent.command_executor.ecs.temporal_storage.s3.direct_upload_expiration

Embulkの実行をsh>:オペレータではなく、pythonのsubprocessで行う。

DigdagからEmbulkを呼び出す場合、一般的にsh>:オペレータを使用してembulk runコマンドを実行することになります。sh>:オペレータを使用するにあたり、ECSのある制約によりタスクの起動に失敗してしまうため、暫定的な回避策としてpythonのsubprocessからEmbulkを実行することにしました。
ECSを起動する際、指定したタスク定義にContainerOverrideという機能を使用して環境変数や実行するコマンドなどを動的に上書きして起動します。ContainerOverideには、オブジェクトサイズを8KB以上設定できないという仕様があり、ECSタスク起動に失敗してしまう原因として、この8KBという仕様を超えてしまっていたということが原因でした。
当社では、Digdagのプロジェクトを以下のようなディレクトリ構成で運用しており、環境変数を集約したdigファイルをスケジューラを設定しているワークフローでインポートしています。

project
├── config # プロジェクト全体で使用する環境変数
│   ├── prod.dig
│   └── dev.dig
├── dig # 実際の処理
│   └── Sample.dig
├── workflow.dig # スケジューラー
├── query # SQL
│       └── query.sql
├── task # python、ruby等のスクリプト
│   └── task.py
├── tests # ユニットテスト
│   └── test_task.py
└── embulk # embulkのymlファイル
        └── EmbulkTask.yml

workflow.digの中身は以下のようになります。

_export:
  !include : "config/${env}.dig"

schedule:
  daily>: 00:00:00

!include : "dig/Sample.dig"

Digdagでは、環境変数の取り扱い方法がオペレータごとに異なっており、sh>:オペレータを使用する場合、_exportに設定した変数はOSの環境変数にセットされます。したがって、!includeした環境変数全てがContainerOverridesに渡され、Fargateの環境変数にセットされることで、8KBの制限に引っかかり、ECSタスクの起動に失敗していました。
このディレクトリ構成を変えるのは難しく、解決策として、pythonのsubprocessからシェルのプロセスを起動し、そのプロセスからEmbulkを実行するという方法を取りました。ただし、正攻法な対策ではないため、他の解決策を継続的に模索中です。
併せて、環境変数ファイルを使用して変数を渡す方法もあるため、これの使用をDigdag上で設定できるようなPullRequestを作成しました。

github.com

移行した結果

以上のポイントを踏まえ幾つかのワークフローを移行しました。移行したもののうち、プロセスの並列度が高いことに起因してサーバーに高負荷を与えていたワークフローについては、移行前と比べて約3倍ほど処理速度が改善しました。 他方で、並列数が高いものの、サーバーへの負荷が高くない処理*2については高速化させることはできませんでした。今回改善できなかった点については、現在別の改善アプローチを検討しているところです。

最後に

以上がウェルスナビの分析基盤と一部プロジェクトのご紹介でした。この記事が分析基盤の開発に関心のある方や、Digdagに興味のある方の助けになれば幸いです。またこのプロジェクトについて、色々サポート下さった分析基盤、システム基盤のメンバーにはこの場をお借りして感謝申し上げます。

ウェルスナビでは、一緒に働く仲間を募集しています。働く世代に豊かさを、というミッションを私たちと一緒に目指していきませんか? 興味のある方は、こちらからご応募お待ちしております。

hrmos.co

まずはカジュアルに話を聞いてみたいという方は、エンジニア採用担当の仲田まで気軽にご連絡下さい。 Twitter:@yukorin89274809


筆者プロフィール: 有山 涼(ありやま りょう)
2021年6月ウェルスナビにデータエンジニアとして入社。 2022年7月よりシステム基盤チームに異動し、インフラエンジニアとしてウェルスナビのサービス基盤に関する業務に従事しています。

GitHub : https://github.com/rariyama

*1:顧客情報などのセンシティブなデータについては、個人を識別できないよう匿名加工等を行い、統計目的で使用しています。

*2:例えばDB間のデータ連携などでDBサーバーがボトルネックになる場合や、ネットワークI/Oに負荷がかかっている場合などです。