目次
はじめに
こんにちは、サーバレス強化期間中のomkです。
ここしばらくはずっとGlueでETL処理を書いていますが、まぁデータが重複します。
ジョブのブックマーク機能を用いていい感じにデータの重複を無くしていきたいのですが、ブックマーク機能を使っても重複を排除できていないケースがあったのでどういった条件で利用する必要があるのか検証していきます。
TL;DR
読み込みは重複排除できましたが書き込みはできませんでした。
AWSのサポートに問い合わせたところ書き込みはブックマークで重複排除できないみたいです(ご対応ありがとうございました。)。
検証
検証にあたって次のブログを参考にしております。
[AWS Glue]ジョブブックマークの動作を確認してみた
大変勉強になりました。
検証環境
さて、次の簡単なテーブルをCSVを作成しました。
NAME | AGE | FAVORITE |
---|---|---|
Taro | 20 | Apple |
Jiro | 18 | Orange |
Hanako | 15 | Grape |
こちらをS3バケットに置いて読み込みを行い、別のS3バケットに出力します。Glue Data Catalogも使います。
ジョブはSparkタイプのPython(Glue Version 3.0)で、ジョブのブックマークを有効にして実行します。
また、 ジョブブックマークが有効になっている場合でも、AWS Glue ETL ジョブがデータを再処理するのはなぜですか? にあるように以下の4つの条件を満たすコードを書いていきます。
ジョブブックマークを持つ複数の同時ジョブがあり、最大同時実行数は 1 に設定されていない。
job.init() オブジェクトが見つからない。
job.commit() オブジェクトが見つからない。
transformation_ctx パラメータが見つからない。
ジョブのIAMロールには不足が無いように「AmazonS3FullAccess」と「AWSGlueServiceRole」を付与しました。
読み込みのブックマークについて
読み込み時にブックマークが効くかどうか以下のパターンで検証します
- GlueContext.create_dynamic_frame_from_catalog
- GlueContext.create_dynamic_frame_from_options
- DynamicFrameReader.from_catalog
- DynamicFrameReader.from_options
- GlueContext.create_data_frame_from_catalog
- GlueContext.create_data_frame_from_options
ジョブを複数回実行して読み込んだ行数を比較してブックマークが機能しているか確認します。というか単一ファイルなのでブックマークが機能していたら2回目以降は0行になります。
ちなみにData CatalogのDB名に「-(ダッシュ)」を入れた場合うまく動きませんでした。
GlueContext.create_dynamic_frame_from_catalog
以下のコードで検証します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame_from_catalog(database="omkblogdb", table_name="family", transformation_ctx="datasource0")
print(datasource0.count())
job.commit()
1回目は3行、2回目は0行だったのでブックマークが機能していることがわかります。
GlueContext.create_dynamic_frame_from_options
以下のコードで検証します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME','S3BucketUri'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
bucket = args['S3BucketUri']
datasource0 = glueContext.create_dynamic_frame_from_options(connection_type="s3", connection_options={"paths": [ bucket ]}, format="csv", transformation_ctx="datasource0")
print(datasource0.count())
job.commit()
1回目は4行で2回目は0行だったのでブックマークが機能していることがわかります。
format="csv"で指定しているので1行目はヘッダーとして読んでくれるかもと思いましたがそんなことは無かったです。
↑format_optionsで「withHeader」を「True」にすると最初の行はヘッダーとして読んでくれるようです。
DynamicFrameReader.from_catalog
以下のコードで検証します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
dyfreader = DynamicFrameReader(glueContext)
datasource0 = dyfreader.from_catalog(name_space="omkblogdb", table_name="family", transformation_ctx="datasource0")
print(datasource0.count())
job.commit()
1回目は3行、2回目は0行だったのでブックマークが機能していることがわかります。
多分内部的にはcreate_dynamic_frame_from_catalogと一緒ですよね、これ。
DynamicFrameReader.from_options
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader
args = getResolvedOptions(sys.argv, ['JOB_NAME','S3BucketUri'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
bucket = args['S3BucketUri']
dyfreader = DynamicFrameReader(glueContext)
datasource0 = dyfreader.from_options(connection_type="s3",connection_options={"paths": [ bucket ]}, format="csv", transformation_ctx="datasource0")
print(datasource0.count())
job.commit()
これもブックマークが効きました。
GlueContext.create_data_frame_from_catalog
今度はDynamicFrameではなくSpark DataFrameとして読み込みます。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_data_frame_from_catalog(database="omkblogdb", table_name="family", transformation_ctx="datasource0")
print(datasource0.count())
job.commit()
効きました。
GlueContext.create_data_frame_from_options
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME','S3BucketUri'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
bucket = args['S3BucketUri']
datasource0 = glueContext.create_data_frame_from_options(connection_type="s3", connection_options={"paths": [ bucket ]}, format="csv", format_options={"withHeader": True}, transformation_ctx="datasource0")
print(datasource0.count())
job.commit()
こちらもブックマークは機能しました。
これで読み込みパターンで用意したものはすべてOKです。
書き込みのブックマークについて
書き込みもいろいろなやり方があるみたいですね。
- GlueContext.write_dynamic_frame_from_catalog
- GlueContext.write_dynamic_frame_from_options
- GlueContext.write_from_options
- DynamicFrameWriter.from_catalog
- DynamicFrameWriter.from_options
- DynamicFrame.write
GlueContext.write_dynamic_frame_from_catalog
以下のコードで検証。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME','S3ReadBucket','S3WriteBucket'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
readBucket = args['S3ReadBucket']
writeBucket = args['S3WriteBucket']
datasource0 = glueContext.create_dynamic_frame_from_catalog(database="omkblogdb", table_name="family", transformation_ctx = "datasource0")
print(datasource0.count())
datasink1 = glueContext.write_dynamic_frame.from_catalog(frame=datasource0, database="omkblogdb", table_name="family2", transformation_ctx = "datasink1")
job.commit()
ダメでした。
GlueContext.write_dynamic_frame_from_options
以下のコードで検証。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME','S3ReadBucket','S3WriteBucket'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
readBucket = args['S3ReadBucket']
writeBucket = args['S3WriteBucket']
datasource0 = glueContext.create_dynamic_frame_from_catalog(database="omkblogdb", table_name="family")
print(datasource0.count())
datasink1 = glueContext.write_dynamic_frame_from_options(frame=datasource0, connection_type="s3", connection_options = {"path": writeBucket}, format="csv", format_options={"writeHeader": True}, transformation_ctx = "datasink1")
job.commit()
こちらもだめでした。
ということで書き込みでは機能しないことがわかったので切り上げ。
結果まとめ
6パターンの読み込みを試してみましたが全部のパターンでブックマークが正常に機能しました。
上手く重複排除できなかった原因は読み込みでは無く書き込みでした。
上にも書きましたがそもそも書き込み時の重複排除はジョブのブックマークではできず、他の仕組みが必要なようです。
また、書き込み時の「transformation_ctx」は出力時のファイル名にかかってくるようです。
overwriteするしかないのでしょうかねぇ……
以上お付き合いありがとうございました。
アーキテクト課のomkです。
AWSについて雑多に取り組んだ内容を発信しています!!