ABCABC Tech Catalog
データ

AirflowでGCSのオブジェクト検知にワイルドカードを利用する

Airflowで名前に規則性がないGCSのオブジェクトを検知したい

「Cloud ComposerでSecret変数を使う」でも紹介しておりますが、弊社ではワークフロー オーケストレーションツールとして、Airflowをベースにした、フルマネージドのCloud Composerを利用しております。

このAirflowですが、様々な処理をテンプレート化したオペレータを多数取り揃えております。

その中で、ワークフローの途中でGoogle Cloud Storage(GCS)バケット内のファイルの存在をチェックする処理を入れたいというのは、よくあるシチュエーションかと思います。

今回は、その中でも特に名前に規則性がないオブジェクトを検知する方法について紹介したいと思います。

AirflowでGCS内のオブジェクトを検知する方法

まず、どういったときにGCS内のオブジェクト検知が必要になるのか紹介したいと思います。

例えば、GCSからBigQueryにデータをロードするワークフローがあるとします。

しかし、いざデータロードを行うためにGCS内のオブジェクトを見に行ったときに、該当のオブジェクトが存在していない場合エラーになってしまい、ワークフローが中断されてしまいます。

そのため、GCS内のファイルを指定された期間監視して、ファイルの到着を検知したら後続のタスクを実行するようなタスクが必要になってきます。

ここで利用できるのが、センサと呼ばれるもので、Airflowで用意されている、何かアクションが起きるまで待機するオペレータの一種になります。

今回のGCS内のオブジェクトを監視するにはGCSObjectExistenceSensorというセンサが用意されているので、これが使えそうです。

GCSObjectExistenceSensorでワイルドカードを使う

GCSObjectExistenceSensor

GCSObjectExistenceSensor とは読んで字の如く、GCS内のファイルの存在をチェックするセンサになっています。

基本的なパラメータには以下のようなものがあります。

  • bucket: オブジェクトがあるGCSのバケット名
  • object: GCS内のオブジェクト名(プレフィックスも含む)
  • use_glob: Trueに設定するとobjectパラメータがglobとして解釈される
  • google_cloud_conn_id: GCSに接続するためのコネクションID
  • timeout: タスクがタイムアウトして失敗するまでの経過時間

実はこのuse_glob というパラメータですが、ライブラリのバージョン10.13.0から実装されているパラメータでリリースが2023年12月28日ということなので、比較的新しい機能になります。

では、実際にこのオプションを利用してオブジェクトの検知を行う方法を紹介したいと思います。

DAGの実装例

例えば、あるバケット内のオブジェクトを検知したいときに、オブジェクト名がlogs_[ランダム文字列].csv のようになっており、[ランダム文字列] の部分にはどのような文字列が入るか分からないとします。

こういったシチュエーションの際に、use_glob というパラメータにTrueを指定することで、object パラメータの値にワイルドカードを利用できるようになります。

以下が、use_glob を利用したDAGの実装例になります。(your_project_idは各自のGoogle CloudのプロジェクトIDを入力)

import os

from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from airflow.utils.dates import days_ago

PYTHON_FILENAME = os.path.basename(__file__)[:-3]
PROJECT_ID = "your_project_id"
GCS_BUCKET_NAME = "airflow_test_gcs_bucket"
GCS_OBJECT_NAME = "foo/logs_*.csv"


default_args = {
    "owner": "airflow",
    "project_id": PROJECT_ID,
    "retries": 3,
}

with DAG(
    dag_id=PYTHON_FILENAME,
    default_args=default_args,
    start_date=days_ago(1, hour=0, minute=0, second=0),
    schedule=None,
    catchup=False,
) as dag:
    # GCS内へのファイル到達を確認する
    exist_sensor = GCSObjectExistenceSensor(
        task_id="check_file_exists",
        bucket=GCS_BUCKET_NAME,
        object=GCS_OBJECT_NAME,
        use_glob=True,
        timeout=3600.0,
    )

    # 空のタスク
    dummy_task = EmptyOperator(task_id="dummy_task")

    exist_sensor >> dummy_task

今回は、オブジェクトの検知を行い、見つかったら後続の空のタスクを実行するようなワークフローとします。

実行結果

デプロイ完了後、Cloud Composerのコンソール上で手動実行すると以下のような待機状態になります。

file1

ここで、logs_[ランダム文字列].csvを満たすファイルをGCSにアップロードしてみます。

file2

無事にタスクが成功して、後続のタスクまで実行されました。

file3

今回は後続のタスクをダミーの空タスクにしましたが、実際はGCSToBigQueryOperator 等を利用して、オブジェクトの存在を確認してからBigQueryに取り込むなどのワークフローが考えられるかと思います。

また、

GCS_OBJECT_NAME = "*/logs_*.csv"

のようにプレフィックスにもワイルドカードの指定は可能です。

その他の方法

ちなみに、今回利用したGCSObjectExistenceSensor以外にもGCSObjectsWithPrefixExistenceSensorを利用することで、特定のプレフィックスを持ったオブジェクトを検知でき、検知したオブジェクトの情報をクロス・コミュニケーション(XComs)を用いて、後続タスクに渡すことができます。

一見、こちらの方が、汎用性は高そうなのですが、例えば「logs_qwertyuiop.csv」と「logs_qwertyuiop.json」のようにオブジェクトのサフィックスのみ異なっていると、片方のオブジェクトのみの検知ができないなど、GCSObjectExistenceSensoruse_globパラメータを利用した方がオブジェクト検知の観点だけでいうと柔軟性は高そうです。

このように、それぞれのセンサごとに特徴が異なっているため、ユースケースに応じた使い分けが重要になってきます。

まとめ

今回は、AirflowでGCSのオブジェクト検知を行うときに規則性がないオブジェクト名であっても検知ができる方法を紹介しました。

本文中にもあったように、今回の機能は昨年12月にリリースされたものであり、数か月前までできないと思っていたことが、できるようになっているアップデートの早さはOSSの強みかと思います。

今後もAirflow(Cloud Composer)に限らず、各種サービスのアップデートは定期的にチェックするようにしておきたいですね。

AUTHOR

中村 卓矢

朝日放送グループホールディングス株式会社 デジタル・アーキテック局 データ戦略チーム

グループ全体の統合的なデータ基盤の構築・データ分析の支援に従事している。 動画配信・テレビの視聴データ分析等で身につけた幅広い知識を活かして日々奮闘中!

WORK@ABC

技術力を培うための
環境と文化

ABCに昔から根付く「自分たちで開発する」文化を支える環境や取り組みをご紹介します
ABCについてもっと知る