目次
はじめに
こんにちは、omkです。
EventBridgeとStep FunctionsでGlueジョブを実行している環境で、
ジョブの実行中に再度ステートマシンがキックされて、ジョブの同時実行が出来ないためにStartJobRunに失敗することがありました。
このケースにおいて自動でデータは取り込みたいのですが、これ以外のエラーでは再試行してほしくないのでエラーの内容に応じて再試行するしないを判定するようにしてみました。
やってみた
完成したもの
出来たステートマシン定義が以下です。
{
"Comment": "A description of my state machine",
"StartAt": "Pass1",
"States": {
"Pass1": {
"Type": "Pass",
"Next": "Glue StartJobRun",
"Result": {
"WaitTime": 60,
"NowWait": 0
}
},
"Glue StartJobRun": {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "omk-blog-job"
},
"Catch": [
{
"ErrorEquals": [
"States.TaskFailed"
],
"Next": "Choice",
"ResultPath": "$.JobError"
}
],
"Next": "Success"
},
"Success": {
"Type": "Succeed"
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.JobError.Error",
"StringMatches": "Glue.ConcurrentRunsExceededException",
"Next": "Pass2"
}
],
"Default": "Fail"
},
"Pass2": {
"Type": "Pass",
"Next": "Wait",
"Parameters": {
"WaitTime": 60,
"NowWait.$": "States.MathAdd($.WaitTime, $.NowWait)"
}
},
"Wait": {
"Type": "Wait",
"Next": "Glue StartJobRun",
"SecondsPath": "$.NowWait"
},
"Fail": {
"Type": "Fail"
}
}
}
図はこちら
説明
今回のポイントは以下の2点です。
- エラーの内容に基づいて再試行を行う
- 再試行の際にバックオフを入れる
1. エラーの内容に基づいて再試行を行う
まず1について、
ジョブの同時実行でコケるケースのエラーレスポンスは以下のような形式で返ってきます。
{
"Error": "Glue.ConcurrentRunsExceededException",
"Cause": "Concurrent runs exceeded for omk-blog-job (Service: AWSGlue; Status Code: 400; Error Code: ConcurrentRunsExceededException; Request ID: f51d4562-642f-459f-9fcd-ad78006ad831; Proxy: null)"
}
このときの以下のエラーを条件にします。
"Error": "Glue.ConcurrentRunsExceededException"
公式ドキュメントより対象の例外の内容を確認できます。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-exceptions.html#aws-glue-api-exceptions-ConcurrentRunsExceededException
Choiceを用いることでこのエラー内容を判定して再試行させることが出来ます。
「startJobRun.sync」でジョブを起動し、「State.TaskFailed」をキャッチするよう設定してChoiceに渡します。
StartJobRunでは再試行しません。TaskFailedで再試行するよう設定するとエラーの内容を問わずJobに失敗したら再試行してしまうためです。
Choiceでエラー内容を受け取ってエラーの文字列が「Glue.ConcurrentRunsExceededException」とマッチするかを判定します。
2. 再試行の際にバックオフを入れる
つぎに2について、Jobの実行に必要な時間を考慮してWaitを入れておきます。
今回用意したジョブは実行に最低60秒かかるものになっていますのでこの60秒を起点にし、失敗するごとに等差的にWait時間が増えていくものにしました。初回失敗時には60秒待って、次は120、180と増えていきます。
22年9月の最近のアップデートで追加された組み込み関数の「States.MathAdd()」をPassで利用することでこの等差を実現しています。
これは引数に2つの数値もしくは数値のパスを渡してそれらの数値を合算した数値を返します。
エクスポネンシャルバックオフにすることも考えましたが面倒なのでやめました。
再試行回数の上限を設定してStates.MathAdd()で失敗ごとに1足していって……とかも出来そうですね。(もっといいやり方があるかも)
実行してみた
2回だいたい同時にステートマシンを起動してみます。
2回目のステートマシンはジョブの同時実行が出来ないためエラーを返して最終的に以下のような流れになって無事完了しました。
図ではわかりませんが、実際には2回ジョブの起動に失敗して60秒、120秒とWaitが発生しています。
おわりに
コードの誤りなどのジョブの内容でコケた場合を除いて再試行したかったのでいい感じになりました。
先日のアップデートでできることが色々増えたようなのでさらにStep Functionsの活用チャンスが増えましたね。
ただ無限ループには気をつけたいなと思います。
以上、最後までお付き合いありがとうございました。
アーキテクト課のomkです。
AWSについて雑多に取り組んだ内容を発信しています!!