ankuro.dev
← ブログ一覧に戻る
AgentCore RuntimeにMCPサーバーをデプロイしてElicitation・Sampling・Progress notificationsを試す
2026-03-22#AWS#Bedrock#AgentCore#MCP#Python#生成AI

AgentCore RuntimeにMCPサーバーをデプロイしてElicitation・Sampling・Progress notificationsを試す

AgentCore RuntimeのStateful MCPサポートを実際に動かした記録。

概念の説明は別記事にまとめているので、ここでは「どうやって動かすか」に絞る。やってみてわかったはまりどころも含めて書く。


構成の全体像

MCPサーバー(FastMCP)
  └── ZIPでパッケージング(ARM64向けバイナリ込み)
        └── S3にアップロード
              └── AgentCore Runtime にデプロイ
                    └── エンドポイントを作成
                          └── FastMCP クライアントから呼び出す

前提条件

  • AWS CLI設定済み(東京リージョン ap-northeast-1 を使用)
  • Python 3.12
  • boto3インストール済み

MCPサーバーを書く

3機能(Progress notifications / Elicitation / Sampling)をすべて試せるサーバーを1ファイルに書く。

# server.py
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'lib'))

from fastmcp import FastMCP, Context
from pydantic import BaseModel
import asyncio

mcp = FastMCP("stateful-mcp-demo")


@mcp.tool()
async def long_task(ctx: Context) -> str:
    """Progress notificationsのデモ:4ステップの処理を進捗通知する"""
    steps = ["データ取得中", "解析中", "集計中", "完了"]
    for i, step in enumerate(steps, 1):
        await ctx.report_progress(progress=i, total=len(steps), message=step)
        await asyncio.sleep(1)
    return "すべてのステップが完了しました"


class TravelInfo(BaseModel):
    departure: str
    destination: str


@mcp.tool()
async def book_flight(ctx: Context) -> str:
    """Elicitationのデモ:サーバーからユーザーに情報を聞き返す"""
    result = await ctx.elicit(
        message="旅行の情報を入力してください",
        response_type=TravelInfo
    )
    if result.action == "accept":
        return f"予約受付: {result.data.departure} → {result.data.destination}"
    else:
        return "キャンセルされました"


@mcp.tool()
async def generate_summary(ctx: Context, topic: str) -> str:
    """Samplingのデモ:サーバーからクライアントにLLM生成を依頼する"""
    result = await ctx.sample(f"{topic}を一言で説明して", max_tokens=100)
    return result.text


if __name__ == "__main__":
    mcp.run(
        transport="streamable-http",
        host="0.0.0.0",
        stateless_http=False,
    )

重要な設定:

  • host="0.0.0.0" — デフォルトの127.0.0.1だとAgentCore Runtimeのプロキシから到達できずエラーになる
  • stateless_http=FalseTrueにするとSSEストリームが機能しなくなりProgress notifications等が動かない

パッケージングしてS3にアップロード

AgentCore RuntimeはARM64(aarch64)環境で動作する。依存パッケージはARM64向けにビルドしてZIPに含める必要がある。

mkdir -p /tmp/agentcore-demo
cd /tmp/agentcore-demo
# server.py を配置
cp /path/to/server.py .

# ARM64向けパッケージをlib/にインストール
mkdir -p lib
pip install fastmcp \
    --target ./lib \
    --platform manylinux2014_aarch64 \
    --python-version 3.12 \
    --only-binary=:all: \
    --upgrade

# ZIPを作成(lib/ごと含める)
zip -r deployment.zip server.py lib/

はまりどころ: --platform を指定しないとx86_64バイナリが入り、デプロイ後にUPDATE_FAILEDになる。.soファイルのアーキテクチャが違うのでエラーメッセージは「incompatible with Linux ARM64」。

# S3にアップロード(バケット名は適宜変更)
BUCKET="your-bucket-name"
aws s3 cp deployment.zip s3://$BUCKET/agentcore/deployment.zip

IAMロールを作成する

AgentCore Runtimeが使うIAMロールを作成する。bedrock-agentcore.amazonaws.com を信頼ポリシーに含めるのが必須。bedrock.amazonaws.com だけでは足りない。

import boto3, json

iam = boto3.client('iam')

trust = {
    'Version': '2012-10-17',
    'Statement': [
        {
            'Effect': 'Allow',
            'Principal': {'Service': 'bedrock.amazonaws.com'},
            'Action': 'sts:AssumeRole'
        },
        {
            'Effect': 'Allow',
            'Principal': {'Service': 'bedrock-agentcore.amazonaws.com'},
            'Action': 'sts:AssumeRole'
        }
    ]
}

role = iam.create_role(
    RoleName='agentcore-demo-role',
    AssumeRolePolicyDocument=json.dumps(trust)
)
iam.attach_role_policy(
    RoleName='agentcore-demo-role',
    PolicyArn='arn:aws:iam::aws:policy/AmazonBedrockFullAccess'
)
iam.attach_role_policy(
    RoleName='agentcore-demo-role',
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
)

role_arn = role['Role']['Arn']
print(f"Role ARN: {role_arn}")

AgentCore Runtimeを作成する

import boto3

control = boto3.client('bedrock-agentcore-control', region_name='ap-northeast-1')
sts = boto3.client('sts')

account_id = sts.get_caller_identity()['Account']
bucket = "your-bucket-name"
role_arn = f"arn:aws:iam::{account_id}:role/agentcore-demo-role"

runtime = control.create_agent_runtime(
    agentRuntimeName='stateful_mcp_demo',    # ハイフン不可。[a-zA-Z][a-zA-Z0-9_]{0,47}
    description='Stateful MCP demo server',
    agentRuntimeArtifact={
        'codeConfiguration': {
            'code': {
                's3': {
                    'bucket': bucket,
                    'prefix': 'agentcore/deployment.zip',
                }
            },
            'runtime': 'PYTHON_3_12',
            'entryPoint': ['server.py'],    # .pyファイルを直接指定。.shは不可
        }
    },
    networkConfiguration={'networkMode': 'PUBLIC'},
    roleArn=role_arn,
    protocolConfiguration={
        'serverProtocol': 'MCP'
    },
)

runtime_id = runtime['agentRuntimeId']
print(f"Runtime ID: {runtime_id}")

はまりどころ:

  • agentRuntimeName はハイフン不可。[a-zA-Z][a-zA-Z0-9_]{0,47} の形式のみ受け付ける
  • entryPoint.py ファイルを直接指定。.sh スクリプトや空白を含む文字列は ValidationException になる

作成後、ステータスが READY になるまで待つ(数分かかる)。

import time

while True:
    resp = control.get_agent_runtime(agentRuntimeId=runtime_id)
    status = resp['agentRuntime']['status']
    print(f"Status: {status}")
    if status == 'READY':
        break
    elif status in ['FAILED', 'ERROR']:
        raise Exception(f"Failed: {resp}")
    time.sleep(15)

エンドポイントを作成する

endpoint = control.create_agent_runtime_endpoint(
    agentRuntimeId=runtime_id,
    name='demo_endpoint',
)

endpoint_id = endpoint['endpointArn']
print(f"Endpoint: {endpoint_id}")

# READY待ち
while True:
    resp = control.get_agent_runtime_endpoint(
        agentRuntimeId=runtime_id,
        endpointName='demo_endpoint'
    )
    status = resp['status']
    print(f"Endpoint status: {status}")
    if status == 'READY':
        break
    time.sleep(15)

クライアントから呼び出す

AgentCore Runtimeのエンドポイントへの接続にはSigV4署名が必要。invoke_agent_runtime(boto3)はリクエスト/レスポンス型のAPIのためElicitation・Sampling・Progress notificationsには使えない。SigV4署名付きでMCPプロトコルを直接実装する必要がある。

エンドポイントURLのフォーマット:

https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{URLエンコードしたARN}/invocations?qualifier={endpoint-name}

AgentCore RuntimeはGETを受け付けない(405)。すべてPOSTで送り、レスポンスのSSEストリームで中間通知を受け取る。

import boto3, botocore.auth, botocore.credentials, botocore.awsrequest
import requests, urllib.parse

runtime_arn = 'arn:aws:bedrock-agentcore:ap-northeast-1:ACCOUNT_ID:runtime/RUNTIME_ID'
encoded_arn = urllib.parse.quote(runtime_arn, safe='')
base_url = f'https://bedrock-agentcore.ap-northeast-1.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=demo_endpoint'

session = boto3.Session()
creds = session.get_credentials().get_frozen_credentials()

def sign_request(method, url, body_bytes=None, extra_headers=None):
    headers = extra_headers or {}
    req = botocore.awsrequest.AWSRequest(method=method, url=url, data=body_bytes, headers=headers)
    auth = botocore.auth.SigV4Auth(
        botocore.credentials.Credentials(creds.access_key, creds.secret_key, creds.token),
        'bedrock-agentcore', 'ap-northeast-1'
    )
    auth.add_auth(req)
    return dict(req.headers)

def mcp_post(body_dict, session_id=None, stream=False):
    body = json.dumps(body_dict).encode()
    extra = {
        'Content-Type': 'application/json',
        'Accept': 'application/json, text/event-stream',
        'mcp-protocol-version': '2025-03-26',
    }
    if session_id:
        extra['Mcp-Session-Id'] = session_id
    headers = sign_request('POST', base_url, body, extra)
    return requests.post(base_url, data=body, headers=headers, timeout=60, stream=stream)

MCPセッションの開始:

import json

# 1. initialize
resp = mcp_post({'jsonrpc':'2.0','id':1,'method':'initialize','params':{
    'protocolVersion':'2025-03-26',
    'capabilities':{'sampling':{},'elicitation':{}},
    'clientInfo':{'name':'my-client','version':'1.0'}
}})
session_id = resp.headers.get('Mcp-Session-Id')

# 2. initialized 通知
mcp_post({'jsonrpc':'2.0','method':'notifications/initialized'}, session_id)

Progress notificationsを受け取る

_meta.progressToken を付けて呼び出し、SSEをストリーミングで受信する。

resp = mcp_post({
    'jsonrpc': '2.0',
    'id': 3,
    'method': 'tools/call',
    'params': {
        'name': 'long_task',
        'arguments': {},
        '_meta': {'progressToken': 'progress-1'}   # ← これを付けないと通知が来ない
    }
}, session_id, stream=True)

for chunk in resp.iter_lines():
    if chunk:
        line = chunk.decode('utf-8')
        if line.startswith('data:'):
            data = json.loads(line[5:].strip())
            if data.get('method') == 'notifications/progress':
                params = data['params']
                print(f'[{params["progress"]}/{params["total"]}] {params.get("message","")}')
            elif 'result' in data:
                print(f'完了: {data["result"]["content"][0]["text"]}')

Elicitationを処理する

サーバーから elicitation/create が来たら、新たなPOSTで応答する。元のストリームは保持したまま並行処理が必要。

import threading, queue

events_q = queue.Queue()

def call_book_flight():
    resp = mcp_post({
        'jsonrpc': '2.0', 'id': 4,
        'method': 'tools/call',
        'params': {'name': 'book_flight', 'arguments': {}},
    }, session_id, stream=True)
    for chunk in resp.iter_lines():
        if chunk:
            line = chunk.decode('utf-8')
            if line.startswith('data:'):
                events_q.put(json.loads(line[5:].strip()))

# 別スレッドでツールを呼び出す
t = threading.Thread(target=call_book_flight, daemon=True)
t.start()

# elicitation/create を待つ
for _ in range(30):
    event = events_q.get(timeout=2)
    if event.get('method') == 'elicitation/create':
        elicitation_id = event['id']
        print(f'サーバーからの質問: {event["params"]["message"]}')
        print(f'スキーマ: {event["params"]["requestedSchema"]}')
        break

# ユーザー入力を受け取ってサーバーに応答
user_input = {'departure': '東京', 'destination': 'ニューヨーク'}
mcp_post({
    'jsonrpc': '2.0',
    'id': elicitation_id,
    'result': {'action': 'accept', 'content': user_input}
}, session_id)

# 最終結果を元のストリームから受け取る
final = events_q.get(timeout=10)
print(f'結果: {final["result"]["content"][0]["text"]}')

Samplingを処理する

sampling/createMessage が来たらLLMを呼び出して応答する。

def call_generate_summary():
    resp = mcp_post({
        'jsonrpc': '2.0', 'id': 5,
        'method': 'tools/call',
        'params': {'name': 'generate_summary', 'arguments': {'topic': 'Machine Learning'}},
    }, session_id, stream=True)
    for chunk in resp.iter_lines():
        if chunk:
            line = chunk.decode('utf-8')
            if line.startswith('data:'):
                events_q.put(json.loads(line[5:].strip()))

t = threading.Thread(target=call_generate_summary, daemon=True)
t.start()

# sampling/createMessage を受け取る
event = events_q.get(timeout=10)
sampling_id = event['id']
messages = event['params']['messages']
print(f'サーバーからのLLM生成依頼: {messages}')

# クライアント側でLLMを呼び出す(Bedrockなど)
bedrock = boto3.client('bedrock-runtime', region_name='ap-northeast-1')
br_resp = bedrock.converse(
    modelId='jp.anthropic.claude-haiku-4-5-20251001-v1:0',
    messages=[{'role': 'user', 'content': [{'text': messages[0]['content']['text']}]}]
)
generated_text = br_resp['output']['message']['content'][0]['text']

# 生成結果をサーバーに返す
mcp_post({
    'jsonrpc': '2.0',
    'id': sampling_id,
    'result': {
        'role': 'assistant',
        'content': {'type': 'text', 'text': generated_text},
        'model': 'claude-haiku-4-5',
        'stopReason': 'endTurn'
    }
}, session_id)

# 最終結果を受け取る
final = events_q.get(timeout=10)
print(f'結果: {final["result"]["content"][0]["text"]}')

各機能の確認結果

東京リージョン(ap-northeast-1)で実際にデプロイ・動作確認した結果。

Progress notifications ✅ — 4ステップの処理が1秒ごとにSSEストリームで届いた。progressToken は必須で、ないと通知がクライアントに届かない。

Elicitation ✅ — elicitation/create がSSEストリームで届き、フォームスキーマも含まれる。応答用のPOSTを送ると処理が再開して最終結果が返ってきた(東京→ニューヨークの予約)。

Sampling ✅ — sampling/createMessage がSSEストリームで届き、クライアントがBedrockを呼んで応答すると最終結果が返ってきた。


クリーンアップ

import boto3

control = boto3.client('bedrock-agentcore-control', region_name='ap-northeast-1')
iam = boto3.client('iam')

runtime_id = "your-runtime-id"

# エンドポイント削除
control.delete_agent_runtime_endpoint(
    agentRuntimeId=runtime_id,
    endpointName='demo_endpoint'
)

# Runtime削除
control.delete_agent_runtime(agentRuntimeId=runtime_id)

# IAMロール削除
for policy_arn in [
    'arn:aws:iam::aws:policy/AmazonBedrockFullAccess',
    'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
]:
    iam.detach_role_policy(RoleName='agentcore-demo-role', PolicyArn=policy_arn)

iam.delete_role(RoleName='agentcore-demo-role')

まとめ

  • AgentCore RuntimeはARM64環境。--platform manylinux2014_aarch64 --python-version 3.12 でビルドしたパッケージをZIPに含める
  • FastMCP 3.1.1は mcp.run()stateless_http=Falsehost="0.0.0.0" を指定する
  • IAMトラストポリシーに bedrock-agentcore.amazonaws.com を含めないとランタイムが機能しない
  • agentRuntimeName はハイフン不可。[a-zA-Z][a-zA-Z0-9_]{0,47} に合わせる
  • invoke_agent_runtime はstateful機能に対応できない。SigV4署名付きでMCPプロトコルを直接実装する
  • AgentCore RuntimeはGETを受け付けない(405)。すべてPOST経由でSSEストリームを受信する

3機能(Progress notifications・Elicitation・Sampling)はいずれも東京リージョン(ap-northeast-1)、FastMCP 3.1.1で動作確認済み。


関連記事