ABCABC Tech Catalog
データ

Snowpipeでデータ分析基盤Snowflakeへデータロードする方法

Snowpipeでデータ分析基盤Snowflakeへデータロードする方法

ABCではテレビ視聴データ(視聴者非特定視聴データ)・動画配信ログ・ウェブアクセスログなどのデータ分析基盤としてSnowflakeを活用しています。

Snowflakeには、データロードの手法として「Snowpipe」という機能が提供されています。

このSnowpipeを利用することで、クラウド(AWS・GCP・Azure)へのデータ保存がトリガーとなり、Snowlfakeへ自動的にデータロードが可能になります。

そこでデータ分析基盤において、データレイクをAWS S3とし、Snowpipeの設定を行いました。

AWS S3にログが保存されると、Snowpipeを経由してSnowflakeへデータが自動的に取り込まれるという構成ですね。

自動でデータロードを行ってくれるSnowpipeですが、その設定はAWSとSnowflake間で行う必要があり、やや煩雑な作業に。

そのため設定方法について整理し、以下にまとめてみました。

Snowpipeの設定方法

IAMロールにアタッチするポリシーを作成

Snowpipeの設定にIAMロールが必要となるため、まずはデータレイクとなるS3バケットへの権限を付与したポリシーを作成します。

file1

IAMロールを作成しポリシーをアタッチ

次にIAMロールの作成。作成後は先ほどのポリシーをアタッチしましょう。

file2

なお、AWSリソースをサードパーティー(Snowflake)に許可するには外部IDが必要となるのですが、一旦ダミー「0000」とします。

IAMロールを指定したストレージ統合を作成

ここからはSnowflakeでの作業。IAMロールを指定してストレージ統合を作成します。

-- ACCOUNTADMINにて実行
USE ROLE ACCOUNTADMIN;

CREATE STORAGE INTEGRATION hoge
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '先ほど作成したIAMロールのARN'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<データレイクとなるS3バケット>');

ストレージ統合とは、S3などのクラウドストレージにアクセスするオブジェクトのこと。

このオブジェクトにIAMロールを設定することで、S3からのデータ取得を可能にしているということですね。

ストレージ統合からSnowflakeアカウントのIAMユーザーと外部IDを取得

DESC INTEGRATION hoge;

ストレージ統合の情報を表示させます。

file3

必要な情報はSTORAGE_AWS_IAM_USER_ARNとSTORAGE_AWS_EXTERNAL_IDの2つ。

コピーしておきましょう。

AWS IAMロールの信頼ポリシーを変更

それではAWSへ戻ってIAMロールの信頼ポリシーを変更します。

file4

先ほどコピーしたSTORAGE_AWS_IAM_USER_ARNを”AWS”、STORAGE_AWS_EXTERNAL_IDを”sts:ExternalId”に設定。

ステージとテーブルの作成

もう一度Snowflakeに戻り、データを取り込むためのステージとテーブルを作成します。

ステージはストレージ統合を参照しており、取り込むデータのS3パスを設定します。

ストレージ統合ではS3へのアクセス権限が付与されており、ステージにはS3のパスが設定されているということですね。

-- ステージの作成
CREATE STAGE hoge_stage
  STORAGE_INTEGRATION = hoge
  URL = 's3://<取り込むデータがあるパス>/';

-- テーブルの作成
CREATE TABLE hoge_table(
aaa varchar,
bbb varchar, 
~~~
zzz varchar);

Snowpipeの作成

ここでようやくSnowpipeの作成。

hoge_stageからhoge_tableへデータをロードしましょう。

CREATE PIPE XXX.PUBLIC.hoge_pipe AUTO_INGEST=TRUE AS
  COPY INTO XXX.PUBLIC.hoge_table
  FROM @XXX.PUBLIC.hoge_stage
  FILE_FORMAT = (TYPE = 'CSV' compression = 'GZIP' field_delimiter = ',' skip_header = 1 FIELD_OPTIONALLY_ENCLOSED_BY='\042');

S3バケットのイベント通知の作成

最後にS3バケットへのファイル作成をトリガーにSnowpipeへのデータロードを行う設定です。

必要となるARN(notification_channel)の取得をしましょう。

SHOW PIPES LIKE 'hoge_pipe';

file5

S3バケットのイベント通知の作成をします。

file6

「オブジェクトの作成」を選択し、SQSキューのARNとしてnotification_channelを設定します。

これでSnowpipeの設定は完了です。

すでにS3にファイルが保存されている場合は、COPY INTOコマンドにより手動でデータロードしておきましょう。

COPY INTO XXX.PUBLIC.hoge_table
  FROM @XXX.PUBLIC.hoge_stage
  FILE_FORMAT = (TYPE = 'CSV' compression = 'GZIP' field_delimiter = ',' skip_header = 1 FIELD_OPTIONALLY_ENCLOSED_BY='\042');

まとめ

Snowpipeを利用したSnowflakeへのデータロード方法についてまとめました。

ストレージ統合やステージなどSnowlfake特有の設定が最初は複雑に感じるかもしれません。

しかし、Snowpipeは一度設定すれば継続してデータを取り込んでくれるので非常に便利です。

また、データロード時のエラーを検知し、Slack等へ通知することも可能。

取りこぼしに気付くためにも運用時には設定しておきたいですね。

AUTHOR

髙木 衛

朝日放送テレビ株式会社 技術局 技術開発部

デジタル技術の開発・支援を担当。主にバーチャル高校野球、視聴データ・動画配信データ等のデータ利活用に取り組む。 平日はエンジニア、休日は少年野球の監督を務める。

WORK@ABC

技術力を培うための
環境と文化

ABCに昔から根付く「自分たちで開発する」文化を支える環境や取り組みをご紹介します
ABCについてもっと知る