Amazon-Kinesis-Data-Firehose

【Lambda】Amazon Kinesis Data FirehoseのApacheログ出力を時間順にソートしてみた

はじめに

こんにちは、omkです。
前回はApacheのログを「Kinesis Data Firehose」を用いてS3にアップロードしてみました。
やってみて1点気になることがあったのですが、ファイルに出力される順番がタイムスタンプの順じゃないんですよね。
一度ログをバッファしてからまとめて書き出しているので当然といえば当然かもしれません。

出力されるログは以下のようになっています。

{"host":"***.***.**.**","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:52:13 +0900","request":"GET /favicon.ico HTTP/1.1","response":"404","bytes":"209","referer":"http://**.***.**.***/","agent":"Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host":"***.***.**.**","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:52:13 +0900","request":"GET / HTTP/1.1","response":"200","bytes":"36","referer":null,"agent":"Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host":"***.***.**.**","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:52:14 +0900","request":"GET / HTTP/1.1","response":"304","bytes":null,"referer":null,"agent":"Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host":"***.***.***.***","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:51:42 +0900","request":"GET / HTTP/1.1","response":"200","bytes":"36","referer":null,"agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host":"***.***.***.***","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:51:42 +0900","request":"GET / HTTP/1.1","response":"200","bytes":"36","referer":null,"agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host":"***.***.***.***","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:52:43 +0900","request":null,"response":"408","bytes":null,"referer":null,"agent":null}
{"host":"***.***.**.**","ident":null,"authuser":null,"datetime":"09/Feb/2021:10:53:05 +0900","request":null,"response":"408","bytes":null,"referer":null,"agent":null}

何の順でしょうか。もしかして文字数の多い順?
ということで上から読んでいったときに通常のApacheのログと順番が違っていて見づらいのでLambdaで時間の早い順に並べ替えて出力するようにします。

なお、前回にKinesisエージェントでCombined形式のログをJSONに変換しているので、その前提で進めます。
KinesisエージェントでApacheのログをJSONに変換する方法はこちらをご覧ください。

全体の流れ

今回は内容がちょっとややこしくなるので全体の流れを先にご説明します。
現状ではWebサーバ上でKinesisエージェントがApacheのログを集め、「Kinesis Data Firehose」の配信ストリームに流しています。
ストリームでは集められたログを一時的にバッファしてS3にログのレコードをまとめたファイルをアップロードしています。
そして、Kinesis Data FirehoseではS3にアップロードする前にLambdaを介すことでデータを整形することが可能です。

つまり、ファイルを出力する前にLambdaでログをタイムスタンプ順に並び変えた上でS3に吐かせるという寸法です。
レコードの並び替え関数を作成し、配信ストリームに設定することでこれを実現します。

Lambda関数作成

Lambdaを用いての事前処理についての詳細は公式ドキュメントをご覧ください。

関数の作成

Lambdaコンソールで関数を作成します。
今回はPython3.8で記述します。
時間がかかると思われるのでタイムアウトをとりあえず5分に設定しておきます。

Lambdaイベント

まずはLambdaに与えられるイベントを確認します。
イベントの内容は以下のようになっています。

{
    "invocationId": "4f8f7226-6e4f-464f-8629-eed6635ae312",
    "deliveryStreamArn": "arn:aws:firehose:ap-northeast-1:************:deliverystream/***************",
    "region": "ap-northeast-1",
    "records": [
        {
            "recordId": "49615127070733847426369323753649459781349550173355769858000000",
            "approximateArrivalTimestamp": 1612830298243,
            "data": "5b+144Gu44Gf44KB44Os44Kz44O844OJ44Gu5YaF5a6544KS44G844GL44GZ55uu55qE44Gn44OV44Kn44Kk44Kv44Gu5YCk44KS5YWl44KM44Gm44GE44G+44GZ44CC"
        },
        {
            "recordId": "49615127070733847426369323753649459781349550173355769858000001",
            "approximateArrivalTimestamp": 1612830298243,
            "data": "5b+144Gu44Gf44KB44Os44Kz44O844OJ44Gu5YaF5a6544KS44G844GL44GZ55uu55qE44Gn44OV44Kn44Kk44Kv44Gu5YCk44KS5YWl44KM44Gm44GE44G+44GZ44CC"
        },
        {
            "recordId": "49615127070733847426369323753649459781349550173355769858000002",
            "approximateArrivalTimestamp": 1612830298243,
            "data": "5b+144Gu44Gf44KB44Os44Kz44O844OJ44Gu5YaF5a6544KS44G844GL44GZ55uu55qE44Gn44OV44Kn44Kk44Kv44Gu5YCk44KS5YWl44KM44Gm44GE44G+44GZ44CC"
        },
        {
            "recordId": "49615127070733847426369323753649459781349550173355769858000003",
            "approximateArrivalTimestamp": 1612830298243,
            "data": "5b+144Gu44Gf44KB44Os44Kz44O844OJ44Gu5YaF5a6544KS44G844GL44GZ55uu55qE44Gn44OV44Kn44Kk44Kv44Gu5YCk44KS5YWl44KM44Gm44GE44G+44GZ44CC"
        }
    ]
}

それぞれのレコードに「recordId」が割り振られ、レコードの内容がBase64にエンコードされて「data」に入れられています。

dataをデコードしてみると、Apacheのログのレコードであることがわかります。

{"host":"***.***.***.***","ident":null,"authuser":null,"datetime":"09/Feb/2021:09:23:57 +0900","request":"GET / HTTP/1.1","response":"200","bytes":"36","referer":null,"agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}

よって、dataをそれぞれのレコ―ドから取得してBase64からデコードします。
デコードしたレコ―ドデータをタイムスタンプを基準にソートします。

recordIdに関して、公式ドキュメントに記載がありますが、データの変換前と変換後でIDが異なる場合は変換失敗とみなされます。

recordId
レコード ID は、呼び出し時に Kinesis Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

よってレコードIDについては連番で付与されてるので、「recordId」はそのままに、対応する「data」を変更してソートされた順に割り当てていきます。

Lambda関数

import json
import base64

def lambda_handler(event, context):

    # Records are encorded to base64, Create new list and append them with converting from base64 to dict
    records = []

    for record in event['records']:
        # Convert "data" base64 -> utf-8(json) -> dict
        data = json.loads(base64.b64decode(record['data']).decode('utf-8'))
        records.append(data)

    # Sort records based on datetime
    sorted_records = sorted(records, key=lambda x: x['datetime'])

    i=0
    return_records = []

    # Reset Records
    for i in range(len(event['records'])):
        # Get RecordID
        record_id = event['records'][i]['recordId']

        # Encode "data" dict -> json -> base64
        base64_data = base64.b64encode(json.dumps(sorted_records[i]).encode())
        # Add Line feed
        base64_linefeed = base64.b64encode(("\n").encode())
        # Decode base64 -> utf-8(str)
        record_data = (base64_data + base64_linefeed).decode('utf-8')

        # Set Record
        rec = {
                'recordId': record_id,
                'result': 'Ok',
                'data': record_data
        }
        return_records.append(rec)

    return {'records': return_records}

Lambda関数解説

上から順番に要所要所の関数の説明をしていきます。

    records = []

    for record in event['records']:
        # Convert "data" base64 -> utf-8(json) -> dict
        data = json.loads(base64.b64decode(record['data']).decode('utf-8'))
        records.append(data)

まずイベントからレコードを取り出します。
取り出したレコード1つ1つに対してレコードデータをbase64からJSONにデコードします。
JSONはテキスト形式なので扱いやすいようにdict形式に変換します。
これを配列に追加します。

次に配列の中でタイムスタンプに基づきデータをソートします。

    # Sort records based on datetime
    sorted_records = sorted(records, key=lambda x: x['datetime'])

dict型の特定のキーを基準にソートする際には以上のように記述します。
これでソートが完了しました。

今度はソートしたデータを既定のフォーマットにセットしていきます。
返却するデータのフォーマットは以下のようにする必要があります。

{
  records: [
  {
    'recordId': {レコードID},
    'result': {Ok|Dropped|ProcessingFailed},
    'data': {レコードデータ(Base64形式)}
  },
  .....
  ]
}

詳細は公式ドキュメントをご覧ください。

よってイベントで渡されたレコードIDと再びBase64にエンコードしたレコードデータをそれぞれ0番目から順に抱き合わせて返します。(resultは固定でOkで問題なし)

    i=0
    return_records = []

    # Reset Records
    for i in range(len(event['records'])):
        # Get RecordID
        record_id = event['records'][i]['recordId']

        # Encode "data" dict -> json -> base64
        base64_data = base64.b64encode(json.dumps(sorted_records[i]).encode())
        # Add Line feed
        base64_linefeed = base64.b64encode(("\n").encode())
        # Decode base64 -> utf-8(str)
        record_data = (base64_data + base64_linefeed).decode('utf-8')

        # Set Record
        rec = {
                'recordId': record_id,
                'result': 'Ok',
                'data': record_data
        }
        return_records.append(rec)

    return {'records': return_records}

最終的にS3に出力されるファイルにはこのデータのうちの「data」のみが列挙されるかたちになるのですが、そのままのレコ―ドデータを入れてしまいますと改行してくれません。
よって改行コードを追加した上で値を入れます。
あとはレコードをまとめた配列を「records」のバリューとして返せばソートした状態でS3にデータを渡してくれます。

Kinesis Data FirehoseのLambda割り当て

作成したLambda関数をKinesis Data Firehoseで利用します。

編集する配信ストリームを選択し、「Details」を選択して編集します。
「Transform source records with AWS Lambda」を「Enable」にすることでLambdaを用いてレコードを変換することが可能になります。
関数に先ほど作成した関数を割り当てて保存します。
これで先にLambda関数が呼び出されて、変換された状態でS3に保存されます。

確認

WEBサーバにアクセスしてログをいくつか吐かせます。

しばらく待って、S3に新規ファイルが保存されるのを確認したら開いてみましょう。

{"host": "***.***.***.***", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:09:54 +0900", "request": "GET / HTTP/1.1", "response": "200", "bytes": "36", "referer": null, "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host": "***.***.***.***", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:09:55 +0900", "request": "GET / HTTP/1.1", "response": "200", "bytes": "36", "referer": null, "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host": "***.***.**.**", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:01 +0900", "request": "GET / HTTP/1.1", "response": "304", "bytes": null, "referer": null, "agent": "Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host": "***.***.***.***", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:02 +0900", "request": "GET / HTTP/1.1", "response": "200", "bytes": "36", "referer": null, "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host": "***.***.**.**", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:07 +0900", "request": "GET / HTTP/1.1", "response": "304", "bytes": null, "referer": null, "agent": "Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host": "***.***.***.***", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:07 +0900", "request": "GET / HTTP/1.1", "response": "200", "bytes": "36", "referer": null, "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host": "***.***.**.**", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:08 +0900", "request": "GET / HTTP/1.1", "response": "304", "bytes": null, "referer": null, "agent": "Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host": "***.***.***.***", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:08 +0900", "request": "GET / HTTP/1.1", "response": "200", "bytes": "36", "referer": null, "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}
{"host": "***.***.**.**", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:09 +0900", "request": "GET / HTTP/1.1", "response": "304", "bytes": null, "referer": null, "agent": "Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host": "***.***.**.**", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:10 +0900", "request": "GET / HTTP/1.1", "response": "304", "bytes": null, "referer": null, "agent": "Mozilla/5.0 (Linux; Android 10; SOG02) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.141 Mobile Safari/537.36"}
{"host": "***.***.***.***", "ident": null, "authuser": null, "datetime": "09/Feb/2021:15:10:14 +0900", "request": "GET / HTTP/1.1", "response": "304", "bytes": null, "referer": null, "agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36"}

データがタイムスタンプで並び替えられていることがわかります。

まとめ

これであとからログを見たときに見やすくなりました。
改行が意外と大事で、Athenaに入れるときに改行が必要になるようなのですが、Kinesis Data firehoseは改行を入れてくれないので自分で入れる必要があったりします。(参考
今回は単純にログを逃がす目的でFirehoseを利用しましたが、分析に利用するのもやってみたいですね。

以上、お付き合いありがとうございました。

返信を残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA