|

2024-06-26

データ関連

AWS Glue のローカル開発環境から BigQuery に接続する

AWSBigQuery

AWS Glue をローカル環境でも開発したい

容易に ETL 処理を実現してくれる AWS Glue

データ分析の前段で欠かすことのできない ETL ( Extract, Transform, Load ) 処理はマネージドサービスに頼らざるを得ません。弊社では BigQuery から S3 へのデータ転送に AWS Glue for Spark を利用しています。

AWS Glue ではビジュアルエディタで直感的に ETL ジョブの設定ができたり、AWS 以外のサービスとのコネクタが用意されていたり、難しいことを考えなくても利用できる優れたサービスです。その反面、ローカルでの開発環境を構築するのに少してこずりましたので、本記事では AWS Glue のローカル開発環境の構築、特に BigQuery との接続について記載していきます。

 

AWS Glue のローカル環境に必要なもの

弊社の運用に沿って BigQuery から S3 へのデータ転送を行う AWS Glue for Spark をローカル環境で動作させる場合を考えます。

ここで、BigQuery に対する処理はデータ読み込みだけなので、ローカル環境には構築せず本番環境に接続することとし、データ書き込み先である S3 は LocalStack でローカル環境に構築することとします。

残りの AWS Glue for Spark は大変ありがたいことに AWS から Docker イメージが提供されているため、 公式ドキュメントの通りに手順を踏むだけでローカル開発環境が構築できます。

LocalStack もまた Docker イメージが提供されているので、BigQuery 以外は Docker 上で動作させるのがよさそうです。

 

ローカル環境から BigQuery への接続

今回紹介したいのはこの部分です。

AWS 上では BigQuery とのコネクタが提供されているので、GoogleCloud 側のサービスアカウントを用意してキーを SecretManager へ登録してあげるだけで、AWS Glue の PySpark 拡張を用いて簡単に接続できます。

class WriteAndReadDyanamicFrame: ... def create_df_from_bq(self) -> DynamicFrame: dynamic_frame = self.glueContext.create_dynamic_frame.from_options( connection_type="bigquery", connection_options={ "parentProject": GOOGLE_CLOUD_PROJECT, "table": TABLE_NAME, "connectionName": CONNECTION_NAME, }, transformation_ctx="AWSGlueConnectorForBigQueryNode", ) return dynamic_frame ...

 

しかし、ローカル環境では BigQuery とのコネクタを使用できません。( AWS サポートへ問い合わせたところ使用できないとの回答でした)

ですので、GoogleCloud 側が用意してくれている Apache Spark SQL connector for Google BigQuery を使用します。適当なバージョンの jar ファイルを spark-submit 時に渡してあげるだけで Spark DataFrame API を介して BigQuery にアクセスできるようになります。

spark-submit --jars spark-3.3-bigquery-0.39.0.jar example_local.py

ちなみにこの時、下記のようにセッション確立時に jar ファイルを指定しても読み込んでくれずハマりました 😅

class WriteAndReadDyanamicFrameOnLocal(WriteAndReadDyanamicFrame): def __init__(self): super().__init__() self.spark = ( SparkSession.builder.appName("ConnectToBigQuery") .config("spark.jars", "spark-3.3-bigquery-0.39.0.jar") .getOrCreate() ) ...

 

あとは、ローカル環境では Spark DataFrame でデータを読み込むことになるので、

class WriteAndReadDyanamicFrameOnLocal(WriteAndReadDyanamicFrame): ... def create_df_from_bq(self) -> DynamicFrame: self.spark.conf.set("viewsEnabled", "true") spark_data_frame = ( self.spark.read.format("bigquery") .option("credentialsFile", GOOGLE_CLOUD_SEVICE_ACCOUNT_KEY_PATH) .option("parentProject", GOOGLE_CLOUD_PROJECT) .option("table", f"{DATASET_NAME}.{TABLE_NAME}") .load() ) dynamic_frame = DynamicFrame.fromDF(spark_data_frame, self.glueContext, name="bq_dynamic_frame") return dynamic_frame ...

のようにオーバーライドしてあげればコードもすっきりしますね。

 

LocalStack 上の S3 との接続

ここでも一点注意が必要でした。

LocalStack 上の S3 と Spark 間の認証を記載してあげなければいけませんので、この部分もオーバーライドしてあげましょう。

class WriteAndReadDyanamicFrameOnLocal(WriteAndReadDyanamicFrame): def __init__(self): super().__init__() self.spark = ( SparkSession.builder.appName("ConnectToBigQuery") .getOrCreate() ) self.spark._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", AWS_ACCESS_KEY_ID) self.spark._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY) self.spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", LOCALSTACK_ENDPOINT_URL) self.spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") # S3にパススタイルでアクセスする場合 ...

 

ローカル環境でのテスト

ここまでで、ローカル環境で BigQuery のデータを読み込み、LocalStack 上の S3 へデータを読み書きできるようになりました。せっかくローカル開発環境を構築できたので、さらに S3 に出力したファイルを再度 Spark で読み込んで、BigQuery から読み込んだデータと行数が一致することだけでも Unittest してあげましょう 🧑‍💻

import unittest class TestWriteAndReadDyanamicFrame(unittest.TestCase): def setUp(self): ... self.wrdf = WriteAndReadDyanamicFrameOnLocal() ... ... def test_diff_df_rows(self): """BigQueryから読み込んだDynamicFrameとS3に保存したDynamicFrameの行数が一致するか確認""" # BigQueryのデータを読み込んでDynamicFrameを作成 bq_dynamic_farme = self.wrdf.create_df_from_bq() bq_df_count = bq_dynamic_farme.count() # 作成したDynamicFrameをS3に書き込み self.wrdf.write_df_to_s3(bq_dynamic_farme) # 書き込んだS3のデータを読み込見直してDynamicFrameを作成 s3_dynamic_farme = self.wrdf.create_df_from_s3() s3_df_count = s3_dynamic_farme.count() print(f"DynamicFrame count(BigQuery): {bq_df_count}") print(f"DynamicFrame count(S3): {s3_df_count}") # 行数の一致を確認 self.assertEqual(bq_df_count, s3_df_count) ... if __name__ == "__main__": unittest.main()

 

ただ、当然っちゃ当然ですがデータ量が多いとローカルマシンではかなり時間がかかります 😇  794a9df8-eb28-45d1-aa1a-7dcb5805de5b

さいごに

今回は AWS Glue のローカル開発環境で BigQuery と接続する方法について記載しました。

やってみてわかったのですが、Terraform や CloudFormation といった IaC を用いて構築し、検証環境の AWS 上で動作確認する方が効率よいかもしれません。

しかし、ローカル環境であればクラウド利用料がかからなかったり、テストやデバッグを行いやすかったりといい面もありますので、AWS Glue 上で複雑な処理を行う場合にはローカル環境での開発でもよさそうです。

このあたりを早い段階で正しく見極めれるようになりたいものです 🤔

 


この記事の著者

プロフィール画像

山野 悠

朝日放送グループホールディングス株式会社 DX・メディアデザイン局 R&Dチーム

動画配信・災害情報・データ放送など社内の運用負荷軽減のためのCMS開発に従事。 プロジェクトの規模に応じて、ディレクション業務からアプリケーション開発、サーバ設計までを担当。 デスクワークによる運動不足解消のため、日々ウエイトトレーニングに励む。