タスクグループとタスクの分岐の組み合わせでつまずいた話
こちらのブログでたまに登場しております、Cloud Composer (Airflow) ですが、新しい機能や記法が頻繁に更新されており、我々も日々新たな気づきがあります。
今回は、その中でもタスクグループとタスクの分岐を組み合わせて使ってみた話になります!
その前に、まずはタスクグループとタスクの分岐がそれぞれ具体的にどのようなものなのか、簡単に確認してみたいと思います。
タスクグループ
こちらは公式ドキュメントにもありますように、各タスクをまとまりごとにグループ化することができます。
特に、繰り返しになってしまうタスク群を管理するときに、グラフ化したときのビジュアルが分かりやすくなり、コードの可読性も向上します。
例えば、以下のように DAG を定義してみます。
from airflow import DAG from airflow.decorators import task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task_group(group_id="group1") def group1(): """タスクグループ1""" task1 = EmptyOperator(task_id="task1") task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2) task1 >> task2 @task_group(group_id="group2") def group2(): """タスクグループ2""" task3 = EmptyOperator(task_id="task3") task4 = BashOperator(task_id="task4", bash_command="echo Hello World!", retries=2) task3 >> task4 with DAG( dag_id="test1", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()
すると、このように group1
と group2
にそれぞれのタスクをまとめることができました。
グループごとにまとまっていることで、複雑な DAG を分かりやすく整理できたり、ワークフローの意図が伝わりやすくなります。
タスクの分岐
次に、タスクの分岐になります。
こちらも、公式ドキュメントに詳細が載っており、英語表記では Branching と書かれています。
こちらは文字通り、条件によってタスクを分岐させることができる機能になっており、日によって処理を変えたり、前のタスク結果に応じて処理を分岐させる等、様々な用途が考えられます。
こちらも、例えば以下のような DAG を定義してみます。
from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "task2" elif value >= 3: return "task3" else: return None with DAG( dag_id="test2", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3]
すると、こちらのようにタスクを分岐させることができました。
実行してみると、branch_task
のあとに task2
が実行され、task3
はスキップされました。
処理の流れとしては、task1
から branch_task
が 5
という出力を受け取り、条件分岐で task2
という文字列が返され、task2
は実行されたが、task3
はスキップされたということになります。
ちなみに、branch_task
で返す値は対象のタスクの task_id
の値であることに注意が必要です。
タスクグループ内でタスクの分岐を行う
では、これらを応用して、今度はタスクグループ内でタスクの分岐を行ってみたいと思います。
早速、先程のサンプルコードを組み合わせて、以下のようなDAGを作成してみます。
from airflow import DAG from airflow.decorators import task, task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "task2" elif value >= 3: return "task3" else: return None @task_group(group_id="group1") def group1(): """タスクグループ1""" value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3] @task_group(group_id="group2") def group2(): """タスクグループ2""" task4 = EmptyOperator(task_id="task4") task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2) task4 >> task5 with DAG( dag_id="test3", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()
シンプルに先程の group1
内に分岐処理を入れ込んだ形になります。
グラフ表示してみても、上手くいけてそうです。
しかし、実際に実行してみると、、、
branch_task
時点で失敗してしまいます。。
ログを確認すると、どうやら branch_task
で指定している task_id
の指定が間違ってそうです🤔
調査したところ、こちらに今回の内容にピッタリの回答がありました。
どうやら、 group_id.task_id
の形で指定する必要があるみたいですので、改めて以下のように設定してみます。
from airflow import DAG from airflow.decorators import task, task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "group1.task2" elif value >= 3: return "group1.task3" else: return None @task_group(group_id="group1") def group1(): """タスクグループ1""" value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3] @task_group(group_id="group2") def group2(): """タスクグループ2""" task4 = EmptyOperator(task_id="task4") task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2) task4 >> task5 with DAG( dag_id="test3", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()
branch_func
で返す値を group_id.task_id
の形にしております。
実行してみると、、、
branch_task
は上手く実行できました!!が、group2
内のタスクが全てスキップされてしまいました😭
こちらもハマりがちな良くあるパターンなのですが、こういったときに気にするのは、タスクごとに設定できる trigger_rule
です。
詳しくは、こちらの公式ドキュメントに記載されているのですが、各タスクをトリガーするための条件を設定することができます。
そして、デフォルトの trigger_rule
は all_success
になっており、前段の全タスクが成功状態にならないとトリガーされないということになります。
つまり、今回の場合は、task3
がスキップされたので、task4
のトリガールールに合わずスキップされて、芋づる式に task5
もスキップされてしまったということになります。
では、下記のように、task4
にトリガールールを設定してみます。
今回は、失敗以外の場合にタスクをトリガーする none_failed
にしてみます。
from airflow import DAG from airflow.decorators import task, task_group from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.dates import days_ago from airflow.utils.trigger_rule import TriggerRule @task(task_id="task1") def value_func(): """分岐先を決定する値を渡すためのタスク""" return 5 @task.branch(task_id="branch_task") def branch_func(value): """分岐処理を行うタスク""" if value >= 5: return "group1.task2" elif value >= 3: return "group1.task3" else: return None @task_group(group_id="group1") def group1(): """タスクグループ1""" value = value_func() branch_task = branch_func(value) # 分岐先のタスク task2 = EmptyOperator(task_id="task2") task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2) branch_task >> [task2, task3] @task_group(group_id="group2") def group2(): """タスクグループ2""" task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.NONE_FAILED) task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2) task4 >> task5 with DAG( dag_id="test3", start_date=days_ago(1, hour=0, minute=0, second=0), schedule=None, default_args={"retries": 1}, ): group1() >> group2()
実行すると、、、
無事に最後まで成功しました!
少し長い道のりでしたが、このようにして、連続したタスクグループ内でタスクの分岐をさせることができました。
まとめ
今回は、Cloud Composer (Airflow) で連続したタスクグループを作成して、そのグループ内でタスクを分岐させる方法とハマりどころについて紹介させていただきました。
これらを組み合わせることにより、タスク全体の整理ができるとともに、柔軟なワークフローを実現することができるようになります。
このように、Cloud Composer (Airflow) に限らずですが、一見簡単そうに見えても思わぬところに落とし穴があるため、こういった記事が少しでも役に立てば幸いです。
これからも、こういったちょっとしたお役立ち情報は積極的に発信していければと思います!