目次
はじめに
こんにちは、omkです。
データレイク等でS3にデータを保管する際に頻繁にアクセスしないデータは安く保管したいですよね。うんうん。
Glue DataCatalogのテーブルのデータについて、Glueジョブの中で特定の条件をもとにストレージクラスを移行してみました。
やってみた
今回は日付を条件にしてGlueでの取り込みから30日経ったデータのストレージクラスを標準から1 ゾーン -IAに変更してみました。
ソースコード
import sys
import boto3
import datetime
from dateutil.relativedelta import relativedelta
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
# 初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME','AccountId'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 変数定義
database = args['DB']
readtable = args['ReadTable']
writetable = args['WriteTable']
AccountId = args['AccountId']
client = boto3.client('glue')
etljob = client.get_job(JobName=args['JOB_NAME'])
jobrole = etljob['Job']['Role']
# この辺まで特に気にしなくてOK
# ジョブ実行日の1ヶ月前以前のデータがストレージクラス移行の対象
jst = datetime.timezone(datetime.timedelta(hours=9))
trans_day = (datetime.datetime.now(jst) + relativedelta(months=-1)).strftime('%Y%m%d')
# 元データ取得
sdf_source = glueContext.create_dynamic_frame.from_catalog(database = database, table_name = readtable).toDF()
# 日付カラム追加
sdf_addtime = glueContext.add_ingestion_time_columns(sdf_source, "day")
sdf_addtime = sdf_addtime.withColumn('extract_day',F.format_string('%s%s%s',F.col('ingest_year'), F.col('ingest_month'), F.col('ingest_day')))
dyf_addtime = DynamicFrame.fromDF(sdf_addtime, glueContext, "dyf_addtime")
# 既存データのストレージクラスの変更
predicate = "extract_day < '" + trans_day + "'"
glueContext.transition_table(
database = database,
table_name = writetable,
transition_to = "ONEZONE_IA",
options = {
"retentionPeriod": 0,
"partitionPredicate": predicate,
"excludeStorageClasses": ["ONEZONE_IA"],
"accountId": AccountId,
"roleArn": jobrole
}
)
# データ書き込み
glueContext.write_dynamic_frame_from_catalog(
frame = dyf_addtime,
database = database,
table_name = writetable,
additional_options={
"partitionKeys": ['extract_day']
}
)
job.commit()
簡単にコードの解説をします。
まず条件に利用する日付カラムを作成します(条件となる日付カラムがすでにある場合や別の条件を利用する場合はこれは全く不要です)。
glueContext.add_ingestion_time_columns()でデータフレームにデータの抽出日のカラムを追加することが出来ます。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-add-ingestion-time-columns
これにより、データフレームに「ingest_year」「ingest_month」「ingest_day」というカラムが追加されます。
パーティションインデックスなども使えると思いますが、取り扱いが面倒なのでこれらのカラムをくっつけて「yyyyMMdd」のフォーマットにした「extract_day」というカラムを作ってこれをパーティションキーとします。
この「extract_day」のパーティションがストレージクラスを移行する際の条件(partitionPredicate)となります。
次に作成したカラムをもとにストレージクラスを移行させます。
glueContext.transition_table()でテーブルのデータのストレージクラスを移行出来ます。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-transition_table
ここにストレージクラスや移行条件の指定、除外設定などを行います。
ここでは抽出日のカラム(extract_day)の値がジョブの実行日の30日前以前であるパーティションが対象となります。
また、ここで指定するIAMロールにストレージクラスを変更する権限を付与しておきます。
ざっくりコードの解説はこんな感じです。
次にやってみたスクショを添付しておきます。
実行結果
実行前はこのような状態です。今日が22年9月30日なので1ヶ月間放置しました。
実行前のストレージクラスは標準になっています。
それでは実行してストレージクラスが変わることを確認します。
変わりましたね。
終わりに
めったにアクセスしないデータにおいてはストレージクラスを変更しておくことで安く利用できることがあります。
特に日付を条件にAthenaでクエリするときなどはクエリの条件に合わせてパーティションを設定しておくことでWHERE句と合致しないパーティションのデータは読み込まれないので非常に有効だと思います。
逆にクエリの条件が様々である環境やクローラーを回す環境ではやらない方が良いと思います。
データは安く貯めていきたいですね。
以上、最後までお付き合いありがとうございました。
アーキテクト課のomkです。
AWSについて雑多に取り組んだ内容を発信しています!!