ABCABC Tech Catalog
データ

Cloud Composer(Airflow)のon_failure_callbackの位置には注意が必要

なんとなくでやっていると泣きを見る(?)Cloud Composerのcallbackたちの位置

Cloud Composer(Airflow)とコールバック

最近はもっぱらCloud Composer(Airflow)を使ってデータに関するパイプラインの構築をしていますが、やはりデータパイプラインに欠かせないものといえばエラーハンドリングです。

Cloud Composer(Airflow)にはエラーハンドリングのためのコールバックが用意されていますが、今回はなんとなくで設定していると思っていた挙動と違うことが起きたりするので適切に設定していきましょう、という内容です。

コールバックの設定の形

まずはコールバックの設定の基本的な形を見ておきます。

on_failure_callback で失敗時のコールバックを設定でき、 on_success_callback で成功時のコールバックを設定できること自体はいずれの形にも共通しています。

タスク単位での設定

まずはタスク単位で設定する方法です。

def on_failure_callback(context):
    print('on_failure_callback')
    print(context)

def on_success_callback(context):
    print('on_success_callback')
    print(context)

with DAG('sample_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "task1"',
        on_failure_callback=on_failure_callback,
        on_success_callback=on_success_callback
    )

わかりやすく BashOperator の中で設定していますね。

こうすることで、この echo "task1" のタスクが失敗したときに on_failure_callback が呼ばれ、成功したときに on_success_callback が呼ばれます。

default_argsでの設定

次にDAGの default_args で設定する方法です。

def on_failure_callback(context):
    print('on_failure_callback')
    print(context)

def on_success_callback(context):
    print('on_success_callback')
    print(context)

default_args = {
    "on_failure_callback": on_failure_callback,
    "on_success_callback": on_success_callback
}

with DAG('sample_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "task1"',
    )

default_args の設定は、DAG内の全てのタスクに引数として展開されることになりますので、 こうすることで、全てのタスクに対して on_failure_callbackon_success_callback が設定されます。

DAG自体への設定

そして、DAG自体に設定する方法です。

def on_failure_callback(context):
    print('on_failure_callback')
    print(context)

def on_success_callback(context):
    print('on_success_callback')
    print(context)

with DAG(
    'sample_dag',
    schedule_interval='@daily',
    default_args=default_args,
    catchup=False,
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "task1"',
    )

このように DAG に引数として渡すことによって、

DAG全体に対して on_failure_callbackon_success_callback を設定することができます。

これはDAG全体の実行が failed の場合に呼ばれるコールバックとなります。つまり、 複数のtaskが失敗していたとしても呼ばれるのはDAG全体で失敗となったタイミングの1回だけです。

以上を踏まえてのコールバック活用方法

と、このように3通りの設定方法がありまして、それぞれの使い分けを意識しておく必要があります。

DAG自体の失敗を検知しておきたい場合

たとえば日次/週次など、定期実行されるパイプラインについては基本的にはDAG自体の失敗を検知しておきたいと思いますので、DAG自体にメールなりSlackなりに情報を飛ばすコールバックを設定しておくと良いと思います。

この設定はよっぽど頻度が高く通知が来てしまいうるものDAG以外すべてに入れておくと良いのではないでしょうか。

全タスクの失敗を個別に検知したい場合

DAG全体ではなく、タスク単位での失敗を把握しておきたいといった場合には default_args を使うことになると思います。 個人的には、結局DAG自体がコケていたらComposerの画面を開くなどして確認すると思うので、 default_args にコールバックを設定するケースは稀なのではないかと思います。

(DAG自体の失敗を検知するものをひとつ仕込んでおけば充分かと思います。)

特定のタスクの成功/失敗を検知しておきたい場合

特定のタスクに注目していて何らかの特殊なコールバックを呼びたい場合は、最初に紹介した「タスクにコールバックを設定する」パターンが適しています。

たとえば、何か外部のシステムへの送出成功/失敗など、こちら起因で迷惑をかけてしまう場合や特に注意して状態を把握しておきたいタスクには設定しておくのが良いかと思います。

以上のように 「なんとなく」ではなくきちんと意味あいを考えながら設定していくことが大事ですね。

DAG単位でのエラー検知のコールバックの例

DAG単位でエラー検知をする場合にSlackに通知するコールバックの例を示します。 (connectionを張っておいてから SlackAPIPostOperator を使うパターンです)

DAG単位でエラー検知をする場合、失敗したタスクは複数ある可能性があるので、失敗したタスクの中から1つを取り出して通知するのが良いと思います。

その方法については アラートを出す際にAirflowのContextから誤ったtask idが取得されてしまうバグの対処法 の記事に書いてある通りで、 以下のように context.get("dag_run") から state="failed" のタスクを全て取り出してその先頭を取る形で充分かと思います。

import textwrap
from airflow.providers.slack.operators.slack import SlackAPIPostOperator


SLACK_CHANNEL = "your-slack-channel"

def slack_notification(context):
    """Slackに通知を送信する"""
    # 実行日時(JST)を取得
    jst_exec_date = context.get("logical_date").in_timezone("Asia/Tokyo")
    # 失敗したタスクを1つ抜いてくる
    failed_task = context.get("dag_run").get_task_instances(state="failed")[0]

    slack_msg = f"""
    :red_circle: Task Failed.
    *Task*: {failed_task.task_id}
    *Dag*: {failed_task.dag_id}
    *Run ID*: {dag_run.run_id}
    *Execution Time*: {jst_exec_date: %Y-%m-%d %H:%M:%S}
    <{failed_task.log_url}| *Logを開く*>
    """
    failed_alert = SlackAPIPostOperator(
        task_id="slack_notification",
        slack_conn_id="slack_api_default",
        text=textwrap.dedent(slack_msg),
        channel=SLACK_CHANNEL,
    )
    return failed_alert.execute(context=context)

このようなコールバックを仕込むことで、こんな感じで通知されます。

file1

とても便利ですし、おそらく皆さんこういった形でのエラー検知を仕込まれているのではないでしょうか。

まとめ

今回の記事では、Cloud Composer(Airflow)のコールバックについての設定方法のパターンとその違い・活用方法について紹介しました。

というのも、実はかなり初期に default_args にコールバックを仕込んだあと、放置してしまっていまして…タスクごとにエラー通知が来てしまっていました。

(並行して複数のタスクが失敗するとエラー通知地獄、みたいな状況を生みかねない状況でした。。)

特に default_args などはなんとなくの設定値が残ってしまっていたりするので、数が増えてきても、あらためてきちんと意味を理解して設定していくことが大事ですね。

なんとなくではなく、ちゃんとドキュメントやソースコードを読んで意味を理解しましょう、という基本に立ち返る話でした。。

AUTHOR

伴 拓也

朝日放送グループホールディングス株式会社 デジタル・アーキテック局 データ戦略チーム

アプリケーションからインフラ、ネットワーク、データエンジニアリングまで幅広い守備範囲が売り。最近はデータ基盤の構築まわりに力を入れて取り組む。 主な実績として、M-1グランプリ敗者復活戦投票システムのマルチクラウド化等。

WORK@ABC

技術力を培うための
環境と文化

ABCに昔から根付く「自分たちで開発する」文化を支える環境や取り組みをご紹介します
ABCについてもっと知る