[アドカレ2025]VPCフローログ + CloudWatch Insights で作る攻撃IP検知・遮断システム

はじめに

こんにちは ヤコです。
あとちょうど一週間でクリスマスですね。
皆さんはどう過ごす予定ですか??

私は姉と今話題の『ズートピア2』を観た後、クリスマスぱーりぃーをやる予定です。
ローストチキンを注文したので楽しみです♪

さて、本日はアドカレ2025ということで、VPCフローログ + CloudWatch Insights で作る攻撃IP検知・遮断システムというテーマで本ブログを書いていこうと思います。

今回使用するもの

・EC2(テスト用サーバー)
・VPC(フローログ設定)
・CloudWatch(ログ収集・分析)
・Lambda(遮断処理)
・SNS(メール通知)
・IAM(権限設定)

作成手順

事前準備

事前に必要なもの
・EC2(テスト用サーバー、セキュリティグループID)
・VPC
・メールアドレス(SNS通知受信用)

①VPCフローログ有効化

「CloudWatch」→「ログ」→「ロググループの作成」
保持期間は後から変更できるのでとりあえず1週間で設定します。

「VPC」 → 該当VPC選択 → 「フローログ」タブ → 「フローログを作成」
設定内容

  • フィルター: すべて(Accept, Reject両方)
  • 送信先: CloudWatch Logs に送信
  • ログ宛先: 先程作成したロググループを選
  • IAMロール: 新しいロール作成(自動)
  • ログ形式: デフォルト形式でOK

作成できたら、フローログが作成できているかチェックしに行きましょう。
「CloudWatch」→ログの「Log management」
対象のロググループをクリックし、ログストリームが作成されていたらOK

※事前に作成したサーバーにSSHしたりして、ログストリームの中にログが出てくることを確認

②CloudWatch Insightsでの攻撃IP検知

「CloudWatch」→ログ「ログのインサイト」
対象のロググループを選択し、クエリ欄に以下を入力

| filter action = "REJECT" 
| stats count() by srcaddr
| filter count >= 2
| sort count desc

〇| filter action = "REJECT"
「拒否された通信のみ」を抽出
Security Groupで拒否された = 不正アクセス試行

〇 | stats count() by srcaddr
送信元IP別に「何回アクセス試行したか」をカウント
同じIPから複数回 = 攻撃の可能性大

〇 | filter count >= 2
「2回以上拒否されたIP」のみを表示
偶然のアクセスではなく、意図的な攻撃を検知

〇 | sort count desc
試行回数の多い順(降順)で並び替え
最も危険なIPが上位に表示

「何回も不正アクセスしてくる悪質なIP」を自動で見つけるクエリです。

③SNSのトピック作成

タイプ:スタンダードでトピックを作成し、arnをコピーしておく

「サブスクリプションを作成」

プロトコル: Eメール
エンドポイント: メールアドレス

作成するとメールが届くのでリンクからサブスクリプションを確認しておく

④Lambda関数の作成

「関数の作成」→デフォルト実行ロールの変更で
「VPN 接続のモニタリングのアクセス権限」を選択し
関数を作成

作成した関数の設定からIAMロール→ロール名のリンクをクリック
IAMの画面に遷移するので、「許可を追加」→「ポリシーをアタッチ」
以下のAWSマネージドポリシーを検索して追加
〇 AmazonEC2ReadOnlyAccess
〇 AmazonSNSFullAccess

カスタムポリシーの作成
「許可を追加」→「インラインポリシーを作成」

JSONタブで以下をコピペ:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:DescribeSecurityGroups"
            ],
            "Resource": "*"
        }
    ]
}

ポリシーを作成する。

作成したLambda関数に戻り、「設定」→「環境変数」→「編集」で以下の変数を追加していく


上にはセキュリティグループのID
下には先ほどメモしたSNSトピックのARN
を入力

Lambda関数のコードに以下を入力

import boto3
import json
import os
from datetime import datetime

def lambda_handler(event, context):
    print(f"Lambda実行開始: {datetime.now()}")

    ec2 = boto3.client('ec2')
    sns = boto3.client('sns')

    # 環境変数取得
    security_group_id = os.environ.get('SECURITY_GROUP_ID')
    sns_topic_arn = os.environ.get('SNS_TOPIC_ARN')

    if not security_group_id or not sns_topic_arn:
        print("環境変数が設定されていません")
        return {'statusCode': 400, 'body': '環境変数エラー'}

    # テスト用の攻撃IP(実際の運用では動的に取得)
    suspicious_ip = "1.2.3.4"  # 後でCloudWatch Alarmから取得に変更

    try:
        # Security Group拒否ルール追加
        response = ec2.authorize_security_group_ingress(
            GroupId=security_group_id,
            IpPermissions=[{
                'IpProtocol': '-1',  # 全プロトコル遮断
                'IpRanges': [{
                    'CidrIp': f'{suspicious_ip}/32',
                    'Description': f'Auto-blocked at {datetime.now()}'
                }]
            }]
        )

        print(f"IP {suspicious_ip} を遮断しました")

        # 成功通知
        message = f"""🚫 攻撃IPを自動遮断しました

遮断IP: {suspicious_ip}
時刻: {datetime.now()}
Security Group: {security_group_id}
実行ID: {context.aws_request_id}
"""

        sns.publish(
            TopicArn=sns_topic_arn,
            Message=message,
            Subject="【セキュリティ】不審IP自動遮断完了"
        )

        return {
            'statusCode': 200,
            'body': json.dumps(f'Successfully blocked IP: {suspicious_ip}')
        }

    except Exception as e:
        error_msg = str(e)
        print(f"エラー: {error_msg}")

        # エラー通知
        sns.publish(
            TopicArn=sns_topic_arn,
            Message=f"❌ IP遮断エラー\nIP: {suspicious_ip}\nエラー: {error_msg}",
            Subject="【エラー】IP遮断失敗"
        )

        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {error_msg}')
        }

このコードの流れ
①攻撃IP検出(今はテスト用固定IP)
②Security Groupに拒否ルール追加
③成功/失敗をメールで通知
④ログに記録
という内容です(今回もAIにて作成)

デプロイできたら、「テスト」のタブにてテストを実行する

※失敗する場合は、Lambda関数の設定にてタイムアウト: 3秒 → 30秒 に変更してみるとよいかもです。

「セキュリティグループ」→「インバウンドルール」を確認

ブロックされていますね。

メールにも通知が来る設定にしているので、フォルダを確認します。

無事届いてますね。

まとめ

今回はVPCフローログ + CloudWatch Insights + Lambdaを組み合わせて、
攻撃IP検知・遮断システムを構築しました。

今回は工数の問題でここまでですが、次はCloudWatch Alarmと連携をして完全自動化しみるのもいいかもしれません。

皆様もよいクリスマス&年末をお過ごしください。

返信を残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA