[アドカレ2025]Xの自動投稿は使いにくい!?

ごあいさつ

まず、あんただれ??
ということで、私のかる~い自己紹介から!
どうも!おかぴーです!

そう!これを読んでる皆さんお察しのとおり、最近入社した
ほやほやの新入社員のおかぴーです。
エンジニア経験はまだ数ヶ月のエンジニアのたまご中のたまごです

最近は、ゲームは全然しない派ですが、
デジモンストーリー タイムストレンジャーの実況ばっかり見てます。
最近のデジモンはもう大人向けですごく神ゲーってやつですね( ´∀` )

いろいろ不慣れな点がありますが、温かく見守っていただければ幸いですmm

今回の記事

自己紹介も終わったところで、
さっそく、本題にふれていきたいと思います~

ふと、X(旧Twitter)民の友人と話していた時でした...(急に回想はいりまーす)

Xとかただつぶやいているだけなのによく見られる時間とかがアカウントの雰囲気によって偏りがかなりあるらしい
(ネタツイとかでも結構あるんですって)
だから、自分のツイートはよく20時前後に見られるとかがあるんだとか。。。

でもXって自動投稿(予約投稿)の機能がないらしいんですよ!
意外ですよね!
自動投稿をしたい!となってもサードパーティー系のアプリを使わないといけないとか
(回想はもういいや)

他には、Google Apps ScriptとかPython + APIで自分で構築とかしないといけないらしい。
どっちもあんまりよく知らないけど、
最近CLFを取った私は
自動投稿してくれそうな奴なんかあったな、、、
Lambdaじゃね?
Lambda使えばなんかできんじゃね?
Lambdaって奴はそのためにあるんじゃね?

ということで、今回は!
LambdaでXの自動投稿~!!!!!!!!(ぱちぱち)

先に感想を言うと、コード以外は割と簡単なので、AI大先生を使ってみると意外とかんたんではありました!

詳しくは、次の章で~~~

つくっていく~

今回使っていくのは、こいつら~

X Developer Portal
S3
Lambda
EventBridge

構成は超簡単

X Developer Portal(API)を通して、
S3のツイートファイルを読み込んで、Lambdaに自動実行してもらいます。
EventBridge SchedulerでさっきのLambdaの実行タイミングを任意の時間に指定する

大まかにはこんな感じです

STEP1 X Developer portalとS3

まず、
X Developer Portalでぐぐって
アカウントを作成しましょ~
今回はテストなので、無料枠でいいかな(ここは必要に応じて課金してくれよな!)
アカウント作成したら、APIキーを取得しますが、その前にAPIキーの権限を変えましょう!

以下の画面から設定変更できます。

デフォルトだとReadのみで書き込めないので、
開発者ポータルの下記箇所で権限を変更します
Read and writeにしましょう!

権限変更できれば、
API Key and Secret
Access Token and Secret
この子らを取得して、メモに保存していく
全部で計4つかな?

取得できれば、S3のバケットとオブジェクトの作成します

とりあえず、バケットは1つ、オブジェクトも1つか2つで大丈夫です

STEP2 IAMロールとLambda

Lambdaへアタッチする用のIAMロールの作成をします。
このあたりのコードは全くもってわからんですので、AI大先生にたよります。。。


    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::バケット名/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "S3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::バケット名/"
        }
    ]
}

AI大先生 はいよ
さっすが~

じゃ、これをLambdaへアタッチ
Lambdaへ行き、関数の作成を押します
デフォルトの実行ロールの変更のタブをクリックすると以下のようなのが出てきます
ここで既存のロールを使って、さっき作ったIAMロールをアタッチします

これでアタッチは完了~

つぎは、環境変数なるものを編集します。
さっきの出来上がった関数を開いて、
設定欄の環境変数を以下のように書き込みます。ここはSTEP1で取得したものとかをいれてくださいね。

STEP3 Tweepy Layer

着々とできてきましたよ~

このSTEPでは、Tweepyなるものをインストールする必要があります。

Tweepyとは?
PythonでTwitter APIを利用するためのオープンソースライブラリです
簡単にいうと、PythonからXの投稿やらその他Xの機能を操作するためのツールらしいです。
(しかしTweepyってTwitterの要素残りすぎじゃない?)

これを扱うために
Tweepy Layerの作成をします。
コンソールでやると時間かかりそうなので、今回はCLIでいきました。
Cloud Shellを開いて、以下のコマンドを順番に叩いてみてください。

# 1. 作業フォルダ作成
mkdir tweepy-layer && cd tweepy-layer
# 2. Python用ディレクトリ構造作成
mkdir -p python/lib/python3.9/site-packages
# 3. Tweepyと依存ライブラリをダウンロード
pip install tweepy -t python/lib/python3.9/site-packages/
# 4. ZIPファイル作成
zip -r tweepy-layer.zip python/
# 5. Lambda Layerとして公開
aws lambda publish-layer-version \
  --layer-name tweepy-layer \
  --zip-file fileb://tweepy-layer.zip \
  --compatible-runtimes python3.9

これで、Tweepy Layerが完成しました。
これもまた、LambdaのLayerに入れます。
Lambdaの関数画面のLayerをクリックすると以下の画面になります。
その赤枠のところに先ほど作成したLayerを追加してあげます。
ちなみに追加するときは、「カスタムレイヤー」を選んだ気がします。

STEP4 Lambda関数

いよいよ終盤に差し掛かってきました!

関数の設定です。

そうは言いますが、私、たまごもたまごなので、関数とかは全部AI大先生に任せっきりです。
ということで、こんな感じ↓↓

import json
import os
import boto3
import tweepy
from datetime import datetime, timezone, timedelta
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    メイン処理:イベントソース別に正確に分岐
    """
    logger.info(f"受信イベント: {json.dumps(event, default=str)}")

    try:
        twitter_client = init_twitter_client()

        # イベントソースを詳細に判別
        event_source = event.get('source', '')
        detail_type = event.get('detail-type', '')

        logger.info(f"イベントソース: {event_source}, 詳細タイプ: {detail_type}")

        # EventBridge Scheduler からの呼び出し
        if event_source == 'aws.scheduler':
            logger.info("EventBridge Schedulerからの実行")
            return handle_daily_posts(twitter_client, 'evening')

        # EventBridge Rules からの呼び出し  
        elif event_source == 'aws.events':
            logger.info("EventBridge Rulesからの実行")
            return handle_scheduled_check(twitter_client)

        # S3イベントから呼ばれた場合
        elif 'Records' in event and event['Records']:
            logger.info("S3イベントからの実行")
            return handle_s3_event(event, twitter_client)

        # テスト実行の場合
        else:
            logger.info("テスト実行")
            return handle_test_execution(twitter_client)

    except Exception as e:
        logger.error(f"エラー発生: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)}, ensure_ascii=False)
        }

    except Exception as e:
        logger.error(f"エラー発生: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': str(e),
                'message': 'Lambda実行でエラーが発生しました'
            }, ensure_ascii=False)
        }

def init_twitter_client():
    api_key = os.environ.get('X_API_Key')
    api_secret = os.environ.get('X_API_SecretKey')
    access_token = os.environ.get('X_Access_Token')
    access_token_secret = os.environ.get('X_Access_Token_Secret')

    missing_vars = []
    if not api_key: missing_vars.append('X_API_Key')
    if not api_secret: missing_vars.append('X_API_SecretKey')
    if not access_token: missing_vars.append('X_Access_Token')
    if not access_token_secret: missing_vars.append('X_Access_Token_Secret')

    if missing_vars:
        raise Exception(f"環境変数が設定されていません: {', '.join(missing_vars)}")

    client = tweepy.Client(
        consumer_key=api_key,
        consumer_secret=api_secret,
        access_token=access_token,
        access_token_secret=access_token_secret
    )

    logger.info("Twitter クライアント初期化完了")
    return client

def handle_test_execution(client):
    logger.info("テスト実行モード - 実際にTwitterに投稿します")

    now = datetime.now(timezone(timedelta(hours=9)))
    test_message = f"🚀 AWS Lambda + Tweepy テスト投稿\n\n実行時刻: {now.strftime('%Y-%m-%d %H:%M:%S')} JST\n\n#AWS #Lambda #Tweepy #自動投稿"

    try:
        response = client.create_tweet(text=test_message)
        logger.info(f"テスト投稿成功: Tweet ID = {response.data['id']}")

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Twitterテスト投稿が成功しました!',
                'tweet_id': response.data['id'],
                'content': test_message,
                'execution_time': now.isoformat()
            }, ensure_ascii=False)
        }
    except Exception as e:
        logger.error(f"Twitter投稿失敗: {str(e)}")
        raise Exception(f"Twitter投稿エラー: {str(e)}")

def handle_daily_posts(client, time_category):
    """
    毎日定時投稿処理
    daily/フォルダからファイルを読み取って投稿
    """
    logger.info(f"毎日定時投稿処理開始: {time_category}")

    try:
        bucket_name = os.environ.get('S3_BUCKET_NAME')
        s3_client = boto3.client('s3')

        # daily/{category}/フォルダ内のファイルを取得
        prefix = f'daily/{time_category}/'

        logger.info(f"フォルダチェック: {prefix}")

        response = s3_client.list_objects_v2(
            Bucket=bucket_name,
            Prefix=prefix
        )

        if 'Contents' not in response:
            logger.warning(f"daily/{time_category}/フォルダにファイルがありません")
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': f'{time_category}の投稿ファイルが見つかりませんでした',
                    'category': time_category,
                    'checked_folder': prefix
                }, ensure_ascii=False)
            }

        # アクティブなファイルを探す
        for obj in response['Contents']:
            key = obj['Key']

            logger.info(f"ファイルチェック: {key}")

            if key.endswith('.json'):
                try:
                    # ファイル内容取得
                    post_data = get_json_from_s3(bucket_name, key)

                    # アクティブかチェック
                    if post_data.get('metadata', {}).get('active', True):
                        # 投稿実行
                        content = post_data.get('content', {}).get('text', '')
                        if content:
                            response = client.create_tweet(text=content)

                            logger.info(f"定時投稿成功: {response.data['id']} (ファイル: {key})")

                            return {
                                'statusCode': 200,
                                'body': json.dumps({
                                    'message': f'{time_category}の定時投稿が完了しました',
                                    'tweet_id': response.data['id'],
                                    'file_used': key,
                                    'content': content[:50] + '...' if len(content) > 50 else content
                                }, ensure_ascii=False)
                            }

                except Exception as e:
                    logger.error(f"ファイル処理エラー ({key}): {str(e)}")
                    continue

        # 投稿可能なファイルが見つからなかった
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'{time_category}の投稿可能なファイルが見つかりませんでした',
                'files_found': len(response.get('Contents', [])),
                'category': time_category
            }, ensure_ascii=False)
        }

    except Exception as e:
        logger.error(f"定時投稿エラー: {str(e)}")
        raise e

def handle_s3_event(event, client):
    logger.info("S3イベント処理モード")

    results = []

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        logger.info(f"処理ファイル: s3://{bucket}/{key}")

        try:
            if key.startswith('immediate/'):
                result = process_immediate_post(bucket, key, client)
            elif key.startswith('scheduled/'):
                result = process_scheduled_post(bucket, key, client)
            else:
                result = {
                    'file': key,
                    'status': 'skipped',
                    'reason': '対象外のフォルダです'
                }

            results.append(result)

        except Exception as e:
            logger.error(f"ファイル処理エラー ({key}): {str(e)}")
            results.append({
                'file': key,
                'status': 'error',
                'error': str(e)
            })

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'S3イベント処理完了',
            'results': results
        }, ensure_ascii=False)
    }

def handle_scheduled_check(client):
    logger.info("定期チェックモード: scheduled/フォルダ内の投稿時間をチェック")

    try:
        s3_client = boto3.client('s3')
        bucket_name = os.environ.get('S3_BUCKET_NAME')

        response = s3_client.list_objects_v2(
            Bucket=bucket_name,
            Prefix='scheduled/'
        )

        processed_count = 0
        results = []

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']

                if key.endswith('.json'):
                    try:
                        result = process_scheduled_post(bucket_name, key, client)
                        results.append(result)

                        if result['status'] == 'posted':
                            processed_count += 1

                    except Exception as e:
                        logger.error(f"ファイル処理エラー ({key}): {str(e)}")
                        results.append({
                            'file': key,
                            'status': 'error',
                            'error': str(e)
                        })

        logger.info(f"定期チェック完了: {processed_count}件の投稿を実行")

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'定期チェック完了: {processed_count}件の投稿を実行しました',
                'processed_count': processed_count,
                'results': results
            }, ensure_ascii=False)
        }

    except Exception as e:
        logger.error(f"定期チェックエラー: {str(e)}")
        raise e

def process_immediate_post(bucket, key, client):
    logger.info(f"即座投稿処理: {key}")

    post_data = get_json_from_s3(bucket, key)

    content = post_data.get('content', {}).get('text', '')
    if not content:
        raise Exception('投稿内容が見つかりません')

    response = client.create_tweet(text=content)
    logger.info(f"即座投稿成功: Tweet ID = {response.data['id']}")

    return {
        'file': key,
        'status': 'posted',
        'tweet_id': response.data['id'],
        'content': content
    }

def process_scheduled_post(bucket, key, client):
    logger.info(f"予約投稿処理: {key}")

    post_data = get_json_from_s3(bucket, key)

    scheduled_for = post_data.get('metadata', {}).get('scheduled_for')
    if not scheduled_for:
        raise Exception('scheduled_for が設定されていません')

    try:
        scheduled_time = datetime.fromisoformat(scheduled_for.replace('Z', '+00:00'))
        current_time = datetime.now(timezone.utc)

        logger.info(f"現在時刻: {current_time}")
        logger.info(f"投稿予定時刻: {scheduled_time}")

        if current_time < scheduled_time:
            logger.info("まだ投稿時間ではありません")
            return {
                'file': key,
                'status': 'waiting',
                'scheduled_for': scheduled_for,
                'current_time': current_time.isoformat()
            }

        logger.info("投稿時間になりました!投稿を実行します")

    except ValueError as e:
        raise Exception(f'日時形式が正しくありません: {scheduled_for}')

    content = post_data.get('content', {}).get('text', '')
    if not content:
        raise Exception('投稿内容が見つかりません')

    response = client.create_tweet(text=content)
    logger.info(f"予約投稿成功: Tweet ID = {response.data['id']}")

    move_to_processed(bucket, key)

    return {
        'file': key,
        'status': 'posted',
        'tweet_id': response.data['id'],
        'content': content,
        'posted_at': current_time.isoformat()
    }

def move_to_processed(bucket, key):
    try:
        s3_client = boto3.client('s3')

        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = key.split('/')[-1].replace('.json', f'_{timestamp}.json')
        new_key = f"processed/{filename}"

        s3_client.copy_object(
            Bucket=bucket,
            CopySource={'Bucket': bucket, 'Key': key},
            Key=new_key
        )

        s3_client.delete_object(Bucket=bucket, Key=key)
        logger.info(f"ファイル移動完了: {key} → {new_key}")

    except Exception as e:
        logger.error(f"ファイル移動エラー: {str(e)}")

def get_json_from_s3(bucket, key):
    try:
        s3_client = boto3.client('s3')
        response = s3_client.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')
        return json.loads(content)
    except Exception as e:
        logger.error(f"S3ファイル取得エラー: {str(e)}")
        raise e

(こんなの手で書けるわけがねえだろうが!)

コードを書いたら、DeployとTestを実行します。
ちなみに、私はここでなぜかTestが失敗し、
初めてSTEP1のAPIキーの権限がRead onlyになってたことに気づきました。
あと、X DeveloperのAPIKeyは権限とかを変更すると取得しないといけないっぽいので皆さん気をつけて

※実際はS3にアップしたのを自動で投稿するコードとかでちょっとずつやっていったんですが、
コードが多すぎるので、今回はEventBridgeで使ったものだけを掲載してます。
実際は何回か失敗してます。。。

STEP5 EventBridge

いよいよですね。
今回はEventBridgeにある
EventBridge schedulerという機能を使います。EventBridgeの中でも特にスケジュール関係を扱いやすい機能らしいです。
最近導入されたっぽいんですかね?

EventBridgeから
このschedulerを開き、設定していきます。
以下ではとりあえず、毎日20時に設定してます。
右下の「次へ」へ

この画面ではさっき作成したLambdaの関数を選択します。
「次へ」

次の画面では、
一番下のアクセス許可をSTEP2で作成したIAMロールを選択します
そのほかは特にいじってなかったと思います。

「次へ」を押すと
最終確認になり、そのまま作成します

最後は好きな文をJSONファイルとして作成し、
S3にアップロードすれば、自動投稿がされます!

まとめ

いや~文量とかはかなりのものになりましたが、
ここまでするのにわからないなりにやりながらでも、数時間といったところでした。
慣れてる人からすると1時間もかからないと思います。
(Lambdaのコードとかがすんなりいくとほんとにすぐ)

今回はS3にアップロードでしたが、
WEBの管理画面を作ったり、そもそもAI連携とかできたりするとめっちゃ使い勝手がよくなりそう
X民の皆さんも自動投稿は結構簡単にできるものなので、
ぜひチャレンジしてみてください!

返信を残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

CAPTCHA