ABCABC Tech Catalog
データ

Cloud Composer (Airflow) のタスクグループ内でタスクを分岐させる

タスクグループとタスクの分岐の組み合わせでつまずいた話

こちらのブログでたまに登場しております、Cloud Composer (Airflow) ですが、新しい機能や記法が頻繁に更新されており、我々も日々新たな気づきがあります。

今回は、その中でもタスクグループタスクの分岐を組み合わせて使ってみた話になります!

その前に、まずはタスクグループとタスクの分岐がそれぞれ具体的にどのようなものなのか、簡単に確認してみたいと思います。

タスクグループ

こちらは公式ドキュメントにもありますように、各タスクをまとまりごとにグループ化することができます。

特に、繰り返しになってしまうタスク群を管理するときに、グラフ化したときのビジュアルが分かりやすくなり、コードの可読性も向上します。

例えば、以下のように DAG を定義してみます。

from airflow import DAG
from airflow.decorators import task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago


@task_group(group_id="group1")
def group1():
    """タスクグループ1"""
    task1 = EmptyOperator(task_id="task1")
    task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
    task1 >> task2


@task_group(group_id="group2")
def group2():
    """タスクグループ2"""
    task3 = EmptyOperator(task_id="task3")
    task4 = BashOperator(task_id="task4", bash_command="echo Hello World!", retries=2)
    task3 >> task4


with DAG(
    dag_id="test1",
    start_date=days_ago(1, hour=0, minute=0, second=0),
    schedule=None,
    default_args={"retries": 1},
):
    group1() >> group2()

すると、このように group1group2 にそれぞれのタスクをまとめることができました。

file1

グループごとにまとまっていることで、複雑な DAG を分かりやすく整理できたり、ワークフローの意図が伝わりやすくなります。

タスクの分岐

次に、タスクの分岐になります。

こちらも、公式ドキュメントに詳細が載っており、英語表記では Branching と書かれています。

こちらは文字通り、条件によってタスクを分岐させることができる機能になっており、日によって処理を変えたり、前のタスク結果に応じて処理を分岐させる等、様々な用途が考えられます。

こちらも、例えば以下のような DAG を定義してみます。

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago


@task(task_id="task1")
def value_func():
    """分岐先を決定する値を渡すためのタスク"""
    return 5


@task.branch(task_id="branch_task")
def branch_func(value):
    """分岐処理を行うタスク"""
    if value >= 5:
        return "task2"
    elif value >= 3:
        return "task3"
    else:
        return None


with DAG(
    dag_id="test2",
    start_date=days_ago(1, hour=0, minute=0, second=0),
    schedule=None,
    default_args={"retries": 1},
):
    value = value_func()

    branch_task = branch_func(value)

    # 分岐先のタスク
    task2 = EmptyOperator(task_id="task2")
    task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2)

    branch_task >> [task2, task3]

すると、こちらのようにタスクを分岐させることができました。

file2

実行してみると、branch_task のあとに task2 が実行され、task3 はスキップされました。

file3

処理の流れとしては、task1 から branch_task5 という出力を受け取り、条件分岐で task2 という文字列が返され、task2 は実行されたが、task3 はスキップされたということになります。

ちなみに、branch_task で返す値は対象のタスクの task_id の値であることに注意が必要です。

タスクグループ内でタスクの分岐を行う

では、これらを応用して、今度はタスクグループ内でタスクの分岐を行ってみたいと思います。

早速、先程のサンプルコードを組み合わせて、以下のようなDAGを作成してみます。

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago


@task(task_id="task1")
def value_func():
    """分岐先を決定する値を渡すためのタスク"""
    return 5


@task.branch(task_id="branch_task")
def branch_func(value):
    """分岐処理を行うタスク"""
    if value >= 5:
        return "task2"
    elif value >= 3:
        return "task3"
    else:
        return None


@task_group(group_id="group1")
def group1():
    """タスクグループ1"""
    value = value_func()

    branch_task = branch_func(value)

    # 分岐先のタスク
    task2 = EmptyOperator(task_id="task2")
    task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2)

    branch_task >> [task2, task3]


@task_group(group_id="group2")
def group2():
    """タスクグループ2"""
    task4 = EmptyOperator(task_id="task4")
    task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2)
    task4 >> task5


with DAG(
    dag_id="test3",
    start_date=days_ago(1, hour=0, minute=0, second=0),
    schedule=None,
    default_args={"retries": 1},
):
    group1() >> group2()

シンプルに先程の group1 内に分岐処理を入れ込んだ形になります。

グラフ表示してみても、上手くいけてそうです。

file4

しかし、実際に実行してみると、、、

file5

branch_task 時点で失敗してしまいます。。

ログを確認すると、どうやら branch_task で指定している task_id の指定が間違ってそうです🤔

file6

調査したところ、こちらに今回の内容にピッタリの回答がありました。

どうやら、 group_id.task_id の形で指定する必要があるみたいですので、改めて以下のように設定してみます。

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago


@task(task_id="task1")
def value_func():
    """分岐先を決定する値を渡すためのタスク"""
    return 5


@task.branch(task_id="branch_task")
def branch_func(value):
    """分岐処理を行うタスク"""
    if value >= 5:
        return "group1.task2"
    elif value >= 3:
        return "group1.task3"
    else:
        return None


@task_group(group_id="group1")
def group1():
    """タスクグループ1"""
    value = value_func()

    branch_task = branch_func(value)

    # 分岐先のタスク
    task2 = EmptyOperator(task_id="task2")
    task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2)

    branch_task >> [task2, task3]


@task_group(group_id="group2")
def group2():
    """タスクグループ2"""
    task4 = EmptyOperator(task_id="task4")
    task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2)
    task4 >> task5


with DAG(
    dag_id="test3",
    start_date=days_ago(1, hour=0, minute=0, second=0),
    schedule=None,
    default_args={"retries": 1},
):
    group1() >> group2()

branch_func で返す値を group_id.task_id の形にしております。

実行してみると、、、

file7

branch_task は上手く実行できました!!が、group2 内のタスクが全てスキップされてしまいました😭

こちらもハマりがちな良くあるパターンなのですが、こういったときに気にするのは、タスクごとに設定できる trigger_rule です。

詳しくは、こちらの公式ドキュメントに記載されているのですが、各タスクをトリガーするための条件を設定することができます。

そして、デフォルトの trigger_ruleall_success になっており、前段の全タスクが成功状態にならないとトリガーされないということになります。

つまり、今回の場合は、task3 がスキップされたので、task4 のトリガールールに合わずスキップされて、芋づる式に task5 もスキップされてしまったということになります。

では、下記のように、task4 にトリガールールを設定してみます。

今回は、失敗以外の場合にタスクをトリガーする none_failed にしてみます。

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule


@task(task_id="task1")
def value_func():
    """分岐先を決定する値を渡すためのタスク"""
    return 5


@task.branch(task_id="branch_task")
def branch_func(value):
    """分岐処理を行うタスク"""
    if value >= 5:
        return "group1.task2"
    elif value >= 3:
        return "group1.task3"
    else:
        return None


@task_group(group_id="group1")
def group1():
    """タスクグループ1"""
    value = value_func()

    branch_task = branch_func(value)

    # 分岐先のタスク
    task2 = EmptyOperator(task_id="task2")
    task3 = BashOperator(task_id="task3", bash_command="echo Hello World!", retries=2)

    branch_task >> [task2, task3]


@task_group(group_id="group2")
def group2():
    """タスクグループ2"""
    task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.NONE_FAILED)
    task5 = BashOperator(task_id="task5", bash_command="echo Hello World!", retries=2)
    task4 >> task5


with DAG(
    dag_id="test3",
    start_date=days_ago(1, hour=0, minute=0, second=0),
    schedule=None,
    default_args={"retries": 1},
):
    group1() >> group2()

実行すると、、、

file8

無事に最後まで成功しました!

少し長い道のりでしたが、このようにして、連続したタスクグループ内でタスクの分岐をさせることができました。

まとめ

今回は、Cloud Composer (Airflow) で連続したタスクグループを作成して、そのグループ内でタスクを分岐させる方法とハマりどころについて紹介させていただきました。

これらを組み合わせることにより、タスク全体の整理ができるとともに、柔軟なワークフローを実現することができるようになります

このように、Cloud Composer (Airflow) に限らずですが、一見簡単そうに見えても思わぬところに落とし穴があるため、こういった記事が少しでも役に立てば幸いです。

これからも、こういったちょっとしたお役立ち情報は積極的に発信していければと思います!

AUTHOR

中村 卓矢

朝日放送グループホールディングス株式会社 デジタル・アーキテック局 データ戦略チーム

グループ全体の統合的なデータ基盤の構築・データ分析の支援に従事している。 動画配信・テレビの視聴データ分析等で身につけた幅広い知識を活かして日々奮闘中!

WORK@ABC

技術力を培うための
環境と文化

ABCに昔から根付く「自分たちで開発する」文化を支える環境や取り組みをご紹介します
ABCについてもっと知る