|

2024-06-19

データ関連

Airflow REST APIでCloud ComposerのDAGを実行する

AirflowCloud Composer

Cloud Composer(Airflow)のREST APIを叩いてDAGを走らせる

Cloud ComposerのDAGの実行方法

Cloud Composer(Apache AirflowのGoogle Cloudによるマネージドサービス)では、DAG(Directed Acyclic Graph)をPythonによるファイルに定義して、依存関係をもとにデータに関するワークフローを実行できます。

基本的にはそのファイル内に書かれたスケジュール設定に基づいて自動で実行されるようにDAGを設定することが多いです。

 

例えば…

dag = DAG( dag_id="foo", default_args=default_args, start_date=days_ago(1), schedule_interval="0 0 * * *", catchup=False, )

のような指定の場合は、UTCの0時 = JSTの9時に毎日実行…とこんな感じです。

この schedule_interval が実行スケジュールを握っているわけですね。

 

これ自体は None を指定することができます。

dag = DAG( dag_id="bar", default_args=default_args, start_date=days_ago(1), schedule_interval=None, catchup=False, )

こうするとどうなるか?というと、スケジュール実行されなくなります。

つまり、 手動でDAGを実行する状況になります。

Dailyでなくまとめてのimport処理などにはこのようなDAGを準備することになるかと思います。

 

AirflowのWeb UIでも、スケジュール・Next RunともにNoneとなります。 amc_itomani_proc_raw_data_manual_-Grid-_main_2024-06-05_09-46-44

どうやって実行するかというと、この画像のとおり、再生ボタンから実行可能です。

が、 何らかの理由でローカルのPythonスクリプト等から実行したいときが出てくるかもしれません。 今回はその話です。

 

Airflow REST API

AirflowにはREST APIがありますので、今回はこれをDAG実行に使います。

Google Cloudの公式ドキュメントはこれです: Airflow REST APIにアクセスする

今回扱うDAG実行の他にも接続の管理などもできるのでぜひ知っておくべきものの一つかと思います。

Apache Airflowの Airflow REST API の仕様書 を見ていただけるとどんなことが出来るかがわかりますので必読です。

 

実際のAirflow REST APIの叩き方

では実際にどのようにAirflow REST APIを叩くかについてまとめます。

先ほどの公式ドキュメントにもコード付きでありますが、やり方は極めてシンプルです。

 

認証情報の設定

まずは認証情報を用意しなくてはなりません。

公式ドキュメントにあるような

import google.auth AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform" CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])

デフォルトの認証情報(ADC)をとってくる方法もありますし

 

サービスアカウントの認証情報のJSONファイルがあれば

from google.oauth2 import service_account CREDENTIALS = service_account.Credentials.from_service_account_file( "service-account.json", scopes=["https://www.googleapis.com/auth/cloud-platform"], )

のように google.oauth2 から認証情報を立てても良いです。

 

いずれの認証情報でも、それをもとにセッションを立ち上げます。

import google.auth.transport.requests authed_session = google.auth.transport.requests.AuthorizedSession(CREDENTIALS)

 

APIの実行

APIの実行は、作成したセッションをもとにメソッドとURLを指定して行うのみです

response = authed_session.request(method, url, json=data)

このような形です。

 

ここで指定する methodurlAirflow REST APIの仕様書ページ を参照することでわかります。 環境の詳細__Composer__CDP__Google_Cloud_コンソール__2024-06-05_10-20-21

ベースとなるAirflowのURLはCloud Composerの画面からも確認出来ます。

ここに /api/v1 を足したものをベースとして、あとは仕様書通りにPathを指定するだけですね。

 

たとえばDAGの実行であれば、「Trigger a new DAG run」の仕様通り、

https://[WEB UIのホスト名]/api/v1/dags/{dag_id}/dagRuns

にPOSTするだけです。簡単ですね。

 

まとめ

今回はCloud ComposerのDAGをREST API経由で実行する方法についてまとめました。

 

これを知っているとデータに関する処理をCloud Composerに集約していく発想にも繋がるかもしれません。

例えば、Dataformでターゲットを指定して実行したい場合に、Pythonから直接Dataformを叩こうとおもうとまだDataform用のライブラリ( google-cloud-dataform )がBetaだったりしますので、Cloud Composerで DataformCreateWorkflowInvocationOperator などを使うDAGを作ってそれを今回の方法で実行するほうが楽だったりします。

 

ちょっとニッチめな話ですが、Cloud Composerでデータ基盤管理をしている場合はREST APIについても知っておきたいところです。

 


この記事の著者

プロフィール画像

伴 拓也

朝日放送グループホールディングス株式会社 DX・メディアデザイン局 デジタル・メディアチーム

アプリケーションからインフラ、ネットワーク、データエンジニアリングまで幅広い守備範囲が売り。最近はデータ基盤の構築まわりに力を入れて取り組む。 主な実績として、M-1グランプリ敗者復活戦投票システムのマルチクラウド化等。