Amazon Simple Storage Service

S3のイベント通知を使用してLambdaで文字コードを変換する

こんにちは、大阪リージョンにも自宅からVPN接続したいディーネットの山田です。

S3ネタ続きですが、今回はS3バケットへファイルをアップロードするだけで文字コードを変換する仕組みを作ってみたいと思います。

仕組み

  • S3バケットのイベントでLambda関数を発火
  • Lambda関数内で文字コードを変換しS3へアップロード

以下で紹介する例では、文字コードの変換ですが、Lambdaのコードを書き換えるだけで他の用途にも使えます。
例えば、Amazon Elastic Transcoderでメディア変換とか、Amazon Rekognitionで画像分析など。

さて、少しS3バケットのイベント通知についてAWSさんのサイトから引用します。

S3バケットのイベント通知とは

Amazon S3 通知機能により、バケット内で特定のイベントが発生したときに、通知を受けることができます。
通知を有効にするには、まず通知設定を追加します。
この設定で、Amazon S3 から発行するイベントと、Amazon S3 から通知を送信する宛先を指定します。

重要事項

Amazon S3 イベント通知は、少なくとも 1 回配信されるように設計されています。
通常、イベント通知は数秒で配信されますが、1 分以上かかる場合もあります。
バージョン管理されていない単一のオブジェクトに同時に 2 つの書き込みを行うと、イベント通知が 1 つしか送信されない場合があります。
バケットのバージョニングを有効にすると、正常な書き込みごとにイベント通知を受信できます。
バージョニングでは、正常な書き込みごとにオブジェクトの新しいバージョンが作成され、イベント通知が送信されます。

S3のイベント通知先として利用可能なもの

  • Lambda関数
  • SNSトピック
  • SQSキュー

今回は、Lambda関数へ通知したいので"Lambda関数"を使用します。

Amazon S3 イベント通知の設定

説明は、このあたりまでにして実際に環境を作ってテストしたいと思います。

実際に環境を作ってみる

事前準備として以下をお願いいたします

  • IAMロールとポリシーの割り当て
    • 本ブログ執筆の際には、横着して"AmazonS3FullAccess"を割り当てました。
    • 正しく運用される際は、最低限の権限に絞ってください。

Lambda関数を登録する

関数の作成で、ランタイムをPython3.8にして作成する

以下のソースコードをデプロイする

Pythonのコード内容
import json
import boto3
import codecs
import os
import urllib.parse

def lambda_handler(event, context):
    print(event)
    try:
        s3 = boto3.resource('s3')
    except:
        print("S3の初期化に失敗しました")
        exit()

    for eventdata in event['Records']:
        try:
            bucketname = eventdata['s3']['bucket']['name']
            bucketobject = eventdata['s3']['object']['key']
            print(bucketname)
            print(bucketobject)
        except:
            print("バケット名とオブジェクト名の受取に失敗しました")
            break

        try:
            bucket = s3.Bucket(bucketname)
        except:
            print("バケットの定義に失敗しました")
            break

        try:
            os.remove('/tmp/download_object_data_sjis')
            os.remove('/tmp/download_object_data_utf8')
        except:
            pass

        try:
            bucketobject = urllib.parse.unquote(bucketobject)
            print(bucketobject)
            bucket.download_file(bucketobject, '/tmp/download_object_data_sjis')
        except:
            print("オブジェクトのダウンロードに失敗しました")
            break

        try:
            sf = codecs.open('/tmp/download_object_data_sjis', 'r', encoding='shift-jis')
        except:
            print("S3からダウンロードしたSJISファイルのオープンに失敗しました")
            break

        try:
            uf = codecs.open('/tmp/download_object_data_utf8', 'w', encoding='utf-8')
        except:
            print("変換先のUTF8保存用ファイルのオープンに失敗しました")
            break

        try:
            for line in sf:
                uf.write(line)
        except:
            print("ファイルへの書き出しに失敗しました")
            break
        try:
            sf.close()
        except:
            print("S3からダウンロードしたSJISファイルのクローズに失敗しました")
            break

        try:
            uf.close()
        except:
            print("変換先のUTF8保存用ファイルのクローズに失敗しました")
            break

        try:
            bucket.upload_file('/tmp/download_object_data_utf8', 'convert/' + os.path.basename(bucketobject))
        except:
            print("S3へのアップロードに失敗しました")
            break

        try:
            os.remove('/tmp/download_object_data_sjis')
            os.remove('/tmp/download_object_data_utf8')
        except:
            pass

    print("変換処理が完了しました")
    return True

流れとしては、以下のような動きをします。

  1. S3バケットのイベントでLambda関数が発火
  2. Lambda関数に対してイベントが通知される
  3. "/tmp"配下にファイルをS3からダウンロードする
  4. codecsを使ってSJISからUTF8に変換したものを再度"/tmp"配下に出力する
  5. "/tmp"配下に出力したものをS3へアップロードする

Lambda関数のタイムアウト値を気持ち伸ばしておく

標準では、3秒のタイムアウトだが30秒にしておく

S3バケットを作成する

S3イベント通知設定を行う

動作テストを行ってみる

SJISファイルを用意する

S3バケットにアップロードする

Lambdaが動作して変換保管用フォルダにファイルが生成されたことを確認する

実際に文字コードが変換されたかファイルをダウンロードしてエディタで開いてみる

詰まったポイント

S3からLambda関数に渡されるeventでは、マルチバイト(日本語など)がURLエンコードされた状態です。
その為、そのまま変数に格納してS3からダウンロードしても"そのようなファイルはない"と怒られます。

マルチバイトは、一度URLデコードしてから扱うようにしましょう。

返信を残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

CAPTCHA