Amazon-CloudFront

cloudfrontのアクセスログからアクセスユーザを判別する

大久保です。

やりたいこと

CloudFrontの機能であるLambda@Edgeを使用し、S3の静的WEBページへのアクセスに対して、BASIC認証を設定しました。認証を通過したユーザごとのアクセス履歴をcloudfrontのアクセスログから確認しようと思います。

apacheであればBASIC認証を行った際のユーザをアクセスログに出力できますが、cloudfrontのアクセスログから確認には、それに該当するフィールドはないので、Cookieのフィールドで代用することにします。

【apacheのアクセスログフィールド】(抜粋)
%h 
%l 
%u ←当該フィールド
【cloudfrontのアクセスログフィールド】(抜粋)
date 
time 
x-edge-location 
sc-bytes 
c-ip 
cs-method 
cs(Host) 
cs(Cookie) ←Cookieフィールド

その他条件

  • BASIC認証用のリストとして、S3のcsvファイルを参照します。
  • ランタイムはPythonを使用します。

やったこと

完成したコード

  • 今回作成したコードは以下の通りです。
import io
import csv
import boto3
import json
import base64

userid = []

def lambda_handler(event, context):
    request = event.get("Records")[0].get("cf").get("request")
    headers = request.get("headers")
    cookie_header = headers.get("cookie")
    REDIRECT_HOST = "denet.cloudfront.net"
    authorization_header = headers.get("authorization")

    if check_cookie_header(cookie_header) and cookie_header[0].get("value") == userid:
        return request
    elif check_authorization_header(authorization_header):
             return {
                    'status': '301',
                    'statusDescription': 'Moved Permanently',
                    'headers': {
                         'location': [{
                             'key': 'Location',
                             'value': f'https://{REDIRECT_HOST}'
                        }],
                         'Set-Cookie': [{
                             'value': userid 
                        }]
                    }
                }           

    return{
            'headers': {
                'www-authenticate': [
                    {
                        'key': 'WWW-Authenticate',
                        'value':'Basic'
                    }
                ]
            },
            'status': 401,
            'body': 'Unauthorized'
        }    

def check_cookie_header(cookie_header: list) -> bool:
    if not cookie_header:
        return False
    return True

def get_s3file(bucket_name, key):
    s3 = boto3.resource('s3')
    s3obj = s3.Object(bucket_name, key).get()

    return io.TextIOWrapper(io.BytesIO(s3obj['Body'].read()))

def check_authorization_header(authorization_header: list) -> bool:
    if not authorization_header:
        return False

    accounts = []

    for rec in csv.DictReader(get_s3file('Authlist', 'Authlist.csv')):
        users = rec["user"]
        passwprd = rec["pass"]
        accounts.append({"user":users,"pass":passwprd})

    for account in accounts:     
        encoded_value = base64.b64encode("{}:{}".format(account.get("user"), account.get("pass")).encode('utf-8'))
        check_value = "Basic {}".format(encoded_value.decode(encoding='utf-8'))
        global authorization_value
        authorization_value = authorization_header[0].get("value")
        global userid
        userid = account.get("user")

        if authorization_value == check_value:
            return True

    return False

アクセスログの確認

以下、認証用リスト(Authlist.csv)の内容です。

user,pass,
test1,1000ljhkhK
test2,2000vxbxvL
test3,3000zxbzxz

cloudfrontのドメインにアクセスし、BASIC認証通過後、S3に保存されたログを確認してみます。

【cloudfrontのアクセスログ】(抜粋)
date                2021-06-01
time                06:03:56
x-edge-location     NRT51-C4
sc-bytes            281
c-ip                221.243.14.xxxx
cs-method           GET
cs(Host)            denet.cloudfront.net
cs(Cookie)          test1 ←ユーザ名を確認

おわりに

今回は、参考にした記事と完成したコードを簡単に紹介しました。
各フェーズでの設定の詳細については別記事で紹介したいと思います。
ではまた。

返信を残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA