|

2025-11-28

Tips

Airflowでdatetime.now()はNG?コンテキストから正しい実行時刻を取得する方法

AirflowCloud ComposerGoogle Cloud

Airflow で datetime.now() はなぜ NG?再実行時も正しい時刻を扱うテクニック

私たちのチームでは、データパイプラインの構築に、 Google Cloud のマネージドサービスである Cloud ComposerApache Airflow ) を活用して運用を行っています。

(実体は Airflow ですので、以下「 Airflow 」と表記します。)

 

Airflow は非常に強力なツールですが、独特の「クセ」があり、慣れるまでは少し戸惑うこともありますよね。

 

今回は、 Airflow を触り始めたばかりの方が(そして実は慣れている人でもうっかり)やってしまいがちな「実行時刻の取得に関するアンチパターンと、その正しい解決方法についてご紹介します。

Airflow における「時刻」のアンチパターン

DAG の中で、「現在の時刻」をファイル名や、処理の中の分岐条件に利用する場面はよくあります。

 

Python に慣れていると、ついつい datetime.now() を使いたくなりますが、 Airflow においてはコードを書く場所が「 DAG の定義(トップレベル)であっても実際の処理(タスクの中)であっても、安易に datetime.now() を使うことはアンチパターンになり得ます。

 

それぞれのケースで何が問題なのか見ていきましょう。

 

ケース1: DAG のトップレベル(タスクの外側)での使用

まずは、 DAG ファイル直下に datetime.now() を書いてしまう以下のようなケースです。

import datetime from airflow import DAG current_time = datetime.datetime.now() with DAG( dag_id="anti_pattern_example", start_date=datetime.datetime(2025, 1, 1), schedule="0 1 * * *", catchup=False, ) as dag: # ここでcurrent_timeを使ったタスク定義などを行う... pass

 

これは公式ドキュメントの Best Practices でも明確に禁止されています。

Top level Python Code (...) You should avoid writing the top level code which is not necessary to create Operators and build Dag relations between them.

 

問題の理由スケジューラへの負荷と整合性

Airflow のスケジューラは、新しいタスクがないか確認するために DAG ファイルを頻繁にパース(読み込み・解析)します。

トップレベルに datetime.now() があると、パースするたびに変数の値が変わってしまいます。

これにより不要な負荷がかかったり、 DAG の定義自体が不安定になるリスクがあります。

 

ケース2:タスク(関数)内での使用

では、 @task デコレータの中や Operator の中ならどうでしょうか?

 

トップレベルではないのでスケジューラの負荷にはなりませんが、「いつ何度実行しても同じ結果が得られる性質」、つまり「冪等性」 の観点で注意が必要です。

 

以下のようなケースです。

import datetime from airflow import DAG from airflow.decorators import task with DAG( dag_id="anti_pattern_example", start_date=datetime.datetime(2025, 1, 1), schedule="0 1 * * *", catchup=False, ) as dag: @task def export_data(): filename = f"data_{datetime.datetime.now().strftime('%Y%m%d')}.csv" print(f"{filename} にデータを出力します") export_data()

 

問題の理由冪等性の欠如リカバリができない

Airflow の強みの一つは、過去の日付を指定して再実行できることです。

 

例えば、「昨日( 11/27 )分のバッチ処理が失敗していたので、今日( 11/28 )再実行する」という場面を想像してください。

タスク内で datetime.now() を使ってファイル名を生成していると、本来「昨日( 11/27 )」の日付で作成されるべきファイルが、「再実行したタイミング( 11/28 )」の日付で作成されてしまいます。

これでは過去分のデータを正しく上書きしたり、参照したりすることができなくなります

 

いつ実行しても、そのタスクが本来処理すべき対象期間論理的な実行日時は変わらないように作るのが、 Airflow におけるベストプラクティスです。

 

※もちろん、処理にかかった実時間を計測したい場合や、 API リクエスト時のタイムスタンプなど、意図的に実行したその瞬間の時刻を使いたい場合はこの限りではありません。

用途に応じて使い分けることが大切です。

解決策: Airflow のコンテキスト情報から正しい時刻を取得する

では、これらのようなアンチパターンに対して、データの対象期間を正しく扱うためにはどうすれば良いのでしょうか?

 

それは、Airflow が持っているコンテキスト情報から時刻を取得することです。

 

Airflow のタスク実行時には、そのタスクに関するメタデータ(データ期間の開始・終了日時、日付文字列、実行 ID など)が辞書形式の可変長引数**kwargsとして渡されます。

具体的にどのような値が取得できるかは、公式リファレンス で確認できます。

 

ここから情報を引き出すことで、いつ再実行しても「正しい対象期間の日時を取得できます。

 

※なお、汎用性の高い処理を事前定義した Operator では Jinja テンプレート (例:{{ data_interval_end }} )を使ってコンテキストを参照することも一般的です。

 

コンテキストはあらゆる場面で利用可能ですが、今回は @task デコレータを用いたタスクの実装にフォーカスして解説します。

 

以下が実装コードの例になります。

 

実装コード例

具体的には、 kwargs から data_interval_end を取得し、それを日本時間( JST )に変換しています。

また、比較のためにdatetime.now() で取得した時刻も表示しています。

import datetime import pendulum from airflow import DAG from airflow.decorators import task with DAG( dag_id="context_best_practice_example", start_date=datetime.datetime(2025, 1, 1), schedule="0 1 * * *", catchup=False, ) as dag: @task def get_execution_time(**kwargs) -> datetime.datetime: """DAGの実行時間を取得する Args: **kwargs: Airflowのコンテキスト情報 Returns: datetime.datetime: スケジュール実行時の実行日時(日本時間) Raises: ValueError: kwargsにdata_interval_endが見つからない場合 """ # Airflowのコンテキストからdata_interval_end(実行日時)を取得 data_interval_end = kwargs.get("data_interval_end") if data_interval_end is None: raise ValueError("kwargsにdata_interval_endが見つかりません") # JSTに変換して返す jst_time = data_interval_end.astimezone(pendulum.timezone("Asia/Tokyo")) # 比較検証用に現在時刻も取得 now_time = datetime.datetime.now(pendulum.timezone("Asia/Tokyo")) # ログ確認用に出力 print(f"datetime.now() : {now_time}") print(f"コンテキスト取得 : {jst_time}") return jst_time # タスクを実行グラフに追加 get_execution_time()

 

ポイントは以下の通りです。

 

  • 引数に **kwargs を指定することで、 Airflow が自動的にコンテキスト情報を注入してくれます。

    • 補足:慣例的に kwargs という変数名がよく使われますが、 Python の仕組み上、重要なのは先頭の ** (アスタリスク2つ)です。これさえ付いていれば、例えば **context のような名前でも全ての情報を受け取ることができます。
  • data_interval_end を利用しています(なぜ logical_date ではないかは後述します)。

  • Airflow 内部は基本的に UTC で扱われるため、わかりやすく Asia/Tokyo に変換しています。

 

実際に取得できるか検証してみる

それでは、このDAGを実際に動かしてみたいと思います。 image

 

1. スケジュール通りの実行( 2025/11/28 10:00:00)

まずは普通にスケジュール実行された場合のログです。

[2025-11-28, 01:00:03 UTC] {logging_mixin.py:190} INFO - datetime.now() : 2025-11-28 10:00:03.985817+09:00 [2025-11-28, 01:00:03 UTC] {logging_mixin.py:190} INFO - コンテキスト取得 : 2025-11-28 10:00:00+09:00

当然ですが、どちらもほぼ同じ時刻を指しています。これだけ見ると違いはわかりません。

 

2. 2時間後に再実行した場合( Clear )

では、2時間ほど経って12:00に、このタスクを「 Clear (再実行)」してみました。ここが重要です。

[2025-11-28, 03:00:05 UTC] {logging_mixin.py:190} INFO - datetime.now() : 2025-11-28 12:00:05.377344+09:00 [2025-11-28, 03:00:05 UTC] {logging_mixin.py:190} INFO - コンテキスト取得 : 2025-11-28 10:00:00+09:00

 

ご覧の通り、 datetime.now() で取得した日時は再実行したタイミングの日時12:00に変わってしまっています。

もしファイル名や処理分岐にこれを使っていたら、再実行の結果が変わってしまいますね。

 

一方で、コンテキストから取得した時刻は 10:00 のまま です。

これなら、いつ何度再実行しても、対象となるデータ期間は変わらず、冪等性が担保されます。

間違いやすい「 data_interval_end 」と「 logical_date 」の違い

ここで一つ、 Airflow 初学者が必ずと言っていいほど悩むポイントに触れておきます。

 

どの変数が、我々が思う実行時刻なのか?」 です。

 

コンテキストには似たような変数がいくつかあります。

  • logical_date

  • data_interval_start

  • data_interval_end

 

例えば、「毎日 10:00 に実行される DAG 」 があるとします。

本日( 11月28日 )の 10:00 に動く処理の場合、それぞれの値はどうなるでしょうか?

変数名意味今回の値(例)
logical_dateデータ期間の開始11月27日 10:00:00
data_interval_startデータ期間の開始11月27日 10:00:00
data_interval_endデータ期間の終了11月28日 10:00:00

ここが直感に反する部分です。

Airflow のスケジュール実行は「期間が終わった後に実行される」という考え方が基本です( Daily 実行であれば、今日が終わった瞬間に昨日の分のデータを処理する、など)。

 

そのため、 logical_date は「スケジュールの基準日」であり、多くの場合「前回の実行時刻」を指します。

 

我々が直感的に「今、実行されているこのタイミングの時刻」が欲しい場合は、data_interval_end を使うのが正解となるケースが多いです。

 

今回のコードで data_interval_end を採用しているのは、そういった理由からです。

まとめ

今回は Airflow における時刻取得のアンチパターンと、コンテキストを利用した正しい実装方法について解説しました。

 

  • DAG トップレベルでの datetime.now() は、スケジューラ負荷の原因になるため NG 。

  • タスク内部での datetime.now() は、冪等性が求められる処理では避ける(実時間が必要な場合を除く)。

  • コンテキスト( **kwargs** から data_interval_end などを取得して使うのが正解。

 

これらを意識するだけで、 DAG の堅牢性がぐっと高まります。

Airflow は独特のクセもありますが、一つひとつの挙動を理解していくと、非常に柔軟性の高いワークフローを構築することができます。

 

これからも、細かい仕様を正確に理解して、信頼性の高いデータパイプラインを構築していきたいですね。


この記事の著者

プロフィール画像

中村 卓矢

朝日放送グループホールディングス株式会社 DX・メディアデザイン局 デジタル・メディアチーム

グループ全体の統合的なデータ基盤の構築・データ分析の支援に従事している。 動画配信・テレビの視聴データ分析等で身につけた幅広い知識を活かして日々奮闘中!