|

2025-02-25

Tips

Cloud Composer (Airflow) のタスクグループ内でタスクを分岐させる

Cloud ComposerAirflowGoogle Cloud

タスクグループとタスクの分岐の組み合わせでつまずいた話

こちらのブログでたまに登場しております、Cloud Composer (Airflow) ですが、新しい機能や記法が頻繁に更新されており、我々も日々新たな気づきがあります。

 

今回は、その中でもタスクグループタスクの分岐を組み合わせて使ってみた話になります!

 

その前に、まずはタスクグループとタスクの分岐がそれぞれ具体的にどのようなものなのか、簡単に確認してみたいと思います。

タスクグループ

こちらは公式ドキュメントにもありますように、各タスクをまとまりごとにグループ化することができます。

特に、繰り返しになってしまうタスク群を管理するときに、グラフ化したときのビジュアルが分かりやすくなり、コードの可読性も向上します。

 

例えば、以下のように DAG を定義してみます。

from airflow import DAG from airflow.decorators import task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task_group(group_id="group1") def group1(): """タスクグループ1""" task1 = EmptyOperator(task_id="task1") task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2) task1 >> task2 @task_group(group_id="group2") def group2(): """タスクグループ2""" task3 = EmptyOperator(task_id="task3") task4 = BashOperator(task_id="task4", bash_command="echo Hello World!", retries=2) task3 >> task4 with DAG( dag_id="test1", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()

 

すると、このように group1group2 にそれぞれのタスクをまとめることができました。 image

 

グループごとにまとまっていることで、複雑な DAG を分かりやすく整理できたり、ワークフローの意図が伝わりやすくなります。

タスクの分岐

次に、タスクの分岐になります。

こちらも、公式ドキュメントに詳細が載っており、英語表記では Branching と書かれています。

 

こちらは文字通り、条件によってタスクを分岐させることができる機能になっており、日によって処理を変えたり、前のタスク結果に応じて処理を分岐させる等、様々な用途が考えられます。

 

こちらも、例えば以下のような DAG を定義してみます。

from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "task2" elif value >= 3: return "task3" else: return None with DAG( dag_id="test2", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3]

 

すると、こちらのようにタスクを分岐させることができました。 image

 

実行してみると、branch_task のあとに task2 が実行され、task3 はスキップされました。 image

 

処理の流れとしては、task1 から branch_task5 という出力を受け取り、条件分岐で task2 という文字列が返され、task2 は実行されたが、task3 はスキップされたということになります。

ちなみに、branch_task で返す値は対象のタスクの task_id の値であることに注意が必要です。

タスクグループ内でタスクの分岐を行う

では、これらを応用して、今度はタスクグループ内でタスクの分岐を行ってみたいと思います。

 

早速、先程のサンプルコードを組み合わせて、以下のようなDAGを作成してみます。

from airflow import DAG from airflow.decorators import task, task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "task2" elif value >= 3: return "task3" else: return None @task_group(group_id="group1") def group1(): """タスクグループ1""" value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3] @task_group(group_id="group2") def group2(): """タスクグループ2""" task4 = EmptyOperator(task_id="task4") task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2) task4 >> task5 with DAG( dag_id="test3", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()

 

シンプルに先程の group1 内に分岐処理を入れ込んだ形になります。

グラフ表示してみても、上手くいけてそうです。 image

 

しかし、実際に実行してみると、、、 image

branch_task 時点で失敗してしまいます。。

 

ログを確認すると、どうやら branch_task で指定している task_id の指定が間違ってそうです🤔 image

 

調査したところ、こちらに今回の内容にピッタリの回答がありました。

 

どうやら、 group_id.task_id の形で指定する必要があるみたいですので、改めて以下のように設定してみます。

from airflow import DAG from airflow.decorators import task, task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "group1.task2" elif value >= 3: return "group1.task3" else: return None @task_group(group_id="group1") def group1(): """タスクグループ1""" value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3] @task_group(group_id="group2") def group2(): """タスクグループ2""" task4 = EmptyOperator(task_id="task4") task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2) task4 >> task5 with DAG( dag_id="test3", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()

branch_func で返す値を group_id.task_id の形にしております。

 

実行してみると、、、 image

branch_task は上手く実行できました!!が、group2 内のタスクが全てスキップされてしまいました😭

 

こちらもハマりがちな良くあるパターンなのですが、こういったときに気にするのは、タスクごとに設定できる trigger_rule です。

 

詳しくは、こちらの公式ドキュメントに記載されているのですが、各タスクをトリガーするための条件を設定することができます。

そして、デフォルトの trigger_ruleall_success になっており、前段の全タスクが成功状態にならないとトリガーされないということになります。

 

つまり、今回の場合は、task3 がスキップされたので、task4 のトリガールールに合わずスキップされて、芋づる式に task5 もスキップされてしまったということになります。

 

では、下記のように、task4 にトリガールールを設定してみます。

今回は、失敗以外の場合にタスクをトリガーする none_failed にしてみます。

from airflow import DAG from airflow.decorators import task, task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago from airflow.utils.trigger_rule import TriggerRule @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "group1.task2" elif value >= 3: return "group1.task3" else: return None @task_group(group_id="group1") def group1(): """タスクグループ1""" value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3] @task_group(group_id="group2") def group2(): """タスクグループ2""" task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.NONE_FAILED) task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2) task4 >> task5 with DAG( dag_id="test3", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()

 

実行すると、、、 image

無事に最後まで成功しました!

 

少し長い道のりでしたが、このようにして、連続したタスクグループ内でタスクの分岐をさせることができました。

まとめ

今回は、Cloud Composer (Airflow) で連続したタスクグループを作成して、そのグループ内でタスクを分岐させる方法とハマりどころについて紹介させていただきました。

 

これらを組み合わせることにより、タスク全体の整理ができるとともに、柔軟なワークフローを実現することができるようになります

 

このように、Cloud Composer (Airflow) に限らずですが、一見簡単そうに見えても思わぬところに落とし穴があるため、こういった記事が少しでも役に立てば幸いです。

これからも、こういったちょっとしたお役立ち情報は積極的に発信していければと思います!


この記事の著者

プロフィール画像

中村 卓矢

朝日放送グループホールディングス株式会社 DX・メディアデザイン局 デジタル・メディアチーム

グループ全体の統合的なデータ基盤の構築・データ分析の支援に従事している。 動画配信・テレビの視聴データ分析等で身につけた幅広い知識を活かして日々奮闘中!