目次
はじめに
こんにちは ヤコです。
あとちょうど一週間でクリスマスですね。
皆さんはどう過ごす予定ですか??
私は姉と今話題の『ズートピア2』を観た後、クリスマスぱーりぃーをやる予定です。
ローストチキンを注文したので楽しみです♪
さて、本日はアドカレ2025ということで、VPCフローログ + CloudWatch Insights で作る攻撃IP検知・遮断システムというテーマで本ブログを書いていこうと思います。
今回使用するもの
・EC2(テスト用サーバー)
・VPC(フローログ設定)
・CloudWatch(ログ収集・分析)
・Lambda(遮断処理)
・SNS(メール通知)
・IAM(権限設定)
作成手順
事前準備
事前に必要なもの
・EC2(テスト用サーバー、セキュリティグループID)
・VPC
・メールアドレス(SNS通知受信用)
①VPCフローログ有効化
「CloudWatch」→「ログ」→「ロググループの作成」
保持期間は後から変更できるのでとりあえず1週間で設定します。

「VPC」 → 該当VPC選択 → 「フローログ」タブ → 「フローログを作成」
設定内容
- フィルター: すべて(Accept, Reject両方)
- 送信先: CloudWatch Logs に送信
- ログ宛先: 先程作成したロググループを選
- IAMロール: 新しいロール作成(自動)
- ログ形式: デフォルト形式でOK
作成できたら、フローログが作成できているかチェックしに行きましょう。
「CloudWatch」→ログの「Log management」
対象のロググループをクリックし、ログストリームが作成されていたらOK
※事前に作成したサーバーにSSHしたりして、ログストリームの中にログが出てくることを確認
②CloudWatch Insightsでの攻撃IP検知
「CloudWatch」→ログ「ログのインサイト」
対象のロググループを選択し、クエリ欄に以下を入力

| filter action = "REJECT"
| stats count() by srcaddr
| filter count >= 2
| sort count desc
〇| filter action = "REJECT"
「拒否された通信のみ」を抽出
Security Groupで拒否された = 不正アクセス試行
〇 | stats count() by srcaddr
送信元IP別に「何回アクセス試行したか」をカウント
同じIPから複数回 = 攻撃の可能性大
〇 | filter count >= 2
「2回以上拒否されたIP」のみを表示
偶然のアクセスではなく、意図的な攻撃を検知
〇 | sort count desc
試行回数の多い順(降順)で並び替え
最も危険なIPが上位に表示
「何回も不正アクセスしてくる悪質なIP」を自動で見つけるクエリです。
③SNSのトピック作成
タイプ:スタンダードでトピックを作成し、arnをコピーしておく

「サブスクリプションを作成」
プロトコル: Eメール
エンドポイント: メールアドレス
作成するとメールが届くのでリンクからサブスクリプションを確認しておく
④Lambda関数の作成
「関数の作成」→デフォルト実行ロールの変更で
「VPN 接続のモニタリングのアクセス権限」を選択し
関数を作成
作成した関数の設定からIAMロール→ロール名のリンクをクリック
IAMの画面に遷移するので、「許可を追加」→「ポリシーをアタッチ」
以下のAWSマネージドポリシーを検索して追加
〇 AmazonEC2ReadOnlyAccess
〇 AmazonSNSFullAccess
カスタムポリシーの作成
「許可を追加」→「インラインポリシーを作成」
JSONタブで以下をコピペ:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:AuthorizeSecurityGroupIngress",
"ec2:DescribeSecurityGroups"
],
"Resource": "*"
}
]
}
ポリシーを作成する。
作成したLambda関数に戻り、「設定」→「環境変数」→「編集」で以下の変数を追加していく

上にはセキュリティグループのID
下には先ほどメモしたSNSトピックのARN
を入力
Lambda関数のコードに以下を入力
import boto3
import json
import os
from datetime import datetime
def lambda_handler(event, context):
print(f"Lambda実行開始: {datetime.now()}")
ec2 = boto3.client('ec2')
sns = boto3.client('sns')
# 環境変数取得
security_group_id = os.environ.get('SECURITY_GROUP_ID')
sns_topic_arn = os.environ.get('SNS_TOPIC_ARN')
if not security_group_id or not sns_topic_arn:
print("環境変数が設定されていません")
return {'statusCode': 400, 'body': '環境変数エラー'}
# テスト用の攻撃IP(実際の運用では動的に取得)
suspicious_ip = "1.2.3.4" # 後でCloudWatch Alarmから取得に変更
try:
# Security Group拒否ルール追加
response = ec2.authorize_security_group_ingress(
GroupId=security_group_id,
IpPermissions=[{
'IpProtocol': '-1', # 全プロトコル遮断
'IpRanges': [{
'CidrIp': f'{suspicious_ip}/32',
'Description': f'Auto-blocked at {datetime.now()}'
}]
}]
)
print(f"IP {suspicious_ip} を遮断しました")
# 成功通知
message = f"""🚫 攻撃IPを自動遮断しました
遮断IP: {suspicious_ip}
時刻: {datetime.now()}
Security Group: {security_group_id}
実行ID: {context.aws_request_id}
"""
sns.publish(
TopicArn=sns_topic_arn,
Message=message,
Subject="【セキュリティ】不審IP自動遮断完了"
)
return {
'statusCode': 200,
'body': json.dumps(f'Successfully blocked IP: {suspicious_ip}')
}
except Exception as e:
error_msg = str(e)
print(f"エラー: {error_msg}")
# エラー通知
sns.publish(
TopicArn=sns_topic_arn,
Message=f"❌ IP遮断エラー\nIP: {suspicious_ip}\nエラー: {error_msg}",
Subject="【エラー】IP遮断失敗"
)
return {
'statusCode': 500,
'body': json.dumps(f'Error: {error_msg}')
}
このコードの流れ
①攻撃IP検出(今はテスト用固定IP)
②Security Groupに拒否ルール追加
③成功/失敗をメールで通知
④ログに記録
という内容です(今回もAIにて作成)
デプロイできたら、「テスト」のタブにてテストを実行する

※失敗する場合は、Lambda関数の設定にてタイムアウト: 3秒 → 30秒 に変更してみるとよいかもです。
「セキュリティグループ」→「インバウンドルール」を確認

ブロックされていますね。
メールにも通知が来る設定にしているので、フォルダを確認します。

無事届いてますね。
まとめ
今回はVPCフローログ + CloudWatch Insights + Lambdaを組み合わせて、
攻撃IP検知・遮断システムを構築しました。
今回は工数の問題でここまでですが、次はCloudWatch Alarmと連携をして完全自動化しみるのもいいかもしれません。
皆様もよいクリスマス&年末をお過ごしください。
はじめましてこんにちは。
ヤコです。
ピッカピカのエンジニア1年生、猫を愛でるために生きています。