
AgentCore RuntimeにMCPサーバーをデプロイしてElicitation・Sampling・Progress notificationsを試す
AgentCore RuntimeのStateful MCPサポートを実際に動かした記録。
概念の説明は別記事にまとめているので、ここでは「どうやって動かすか」に絞る。やってみてわかったはまりどころも含めて書く。
構成の全体像
MCPサーバー(FastMCP)
└── ZIPでパッケージング(ARM64向けバイナリ込み)
└── S3にアップロード
└── AgentCore Runtime にデプロイ
└── エンドポイントを作成
└── FastMCP クライアントから呼び出す
前提条件
- AWS CLI設定済み(東京リージョン
ap-northeast-1を使用) - Python 3.12
- boto3インストール済み
MCPサーバーを書く
3機能(Progress notifications / Elicitation / Sampling)をすべて試せるサーバーを1ファイルに書く。
# server.py
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'lib'))
from fastmcp import FastMCP, Context
from pydantic import BaseModel
import asyncio
mcp = FastMCP("stateful-mcp-demo")
@mcp.tool()
async def long_task(ctx: Context) -> str:
"""Progress notificationsのデモ:4ステップの処理を進捗通知する"""
steps = ["データ取得中", "解析中", "集計中", "完了"]
for i, step in enumerate(steps, 1):
await ctx.report_progress(progress=i, total=len(steps), message=step)
await asyncio.sleep(1)
return "すべてのステップが完了しました"
class TravelInfo(BaseModel):
departure: str
destination: str
@mcp.tool()
async def book_flight(ctx: Context) -> str:
"""Elicitationのデモ:サーバーからユーザーに情報を聞き返す"""
result = await ctx.elicit(
message="旅行の情報を入力してください",
response_type=TravelInfo
)
if result.action == "accept":
return f"予約受付: {result.data.departure} → {result.data.destination}"
else:
return "キャンセルされました"
@mcp.tool()
async def generate_summary(ctx: Context, topic: str) -> str:
"""Samplingのデモ:サーバーからクライアントにLLM生成を依頼する"""
result = await ctx.sample(f"{topic}を一言で説明して", max_tokens=100)
return result.text
if __name__ == "__main__":
mcp.run(
transport="streamable-http",
host="0.0.0.0",
stateless_http=False,
)
重要な設定:
host="0.0.0.0"— デフォルトの127.0.0.1だとAgentCore Runtimeのプロキシから到達できずエラーになるstateless_http=False—TrueにするとSSEストリームが機能しなくなりProgress notifications等が動かない
パッケージングしてS3にアップロード
AgentCore RuntimeはARM64(aarch64)環境で動作する。依存パッケージはARM64向けにビルドしてZIPに含める必要がある。
mkdir -p /tmp/agentcore-demo
cd /tmp/agentcore-demo
# server.py を配置
cp /path/to/server.py .
# ARM64向けパッケージをlib/にインストール
mkdir -p lib
pip install fastmcp \
--target ./lib \
--platform manylinux2014_aarch64 \
--python-version 3.12 \
--only-binary=:all: \
--upgrade
# ZIPを作成(lib/ごと含める)
zip -r deployment.zip server.py lib/
はまりどころ: --platform を指定しないとx86_64バイナリが入り、デプロイ後にUPDATE_FAILEDになる。.soファイルのアーキテクチャが違うのでエラーメッセージは「incompatible with Linux ARM64」。
# S3にアップロード(バケット名は適宜変更)
BUCKET="your-bucket-name"
aws s3 cp deployment.zip s3://$BUCKET/agentcore/deployment.zip
IAMロールを作成する
AgentCore Runtimeが使うIAMロールを作成する。bedrock-agentcore.amazonaws.com を信頼ポリシーに含めるのが必須。bedrock.amazonaws.com だけでは足りない。
import boto3, json
iam = boto3.client('iam')
trust = {
'Version': '2012-10-17',
'Statement': [
{
'Effect': 'Allow',
'Principal': {'Service': 'bedrock.amazonaws.com'},
'Action': 'sts:AssumeRole'
},
{
'Effect': 'Allow',
'Principal': {'Service': 'bedrock-agentcore.amazonaws.com'},
'Action': 'sts:AssumeRole'
}
]
}
role = iam.create_role(
RoleName='agentcore-demo-role',
AssumeRolePolicyDocument=json.dumps(trust)
)
iam.attach_role_policy(
RoleName='agentcore-demo-role',
PolicyArn='arn:aws:iam::aws:policy/AmazonBedrockFullAccess'
)
iam.attach_role_policy(
RoleName='agentcore-demo-role',
PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
)
role_arn = role['Role']['Arn']
print(f"Role ARN: {role_arn}")
AgentCore Runtimeを作成する
import boto3
control = boto3.client('bedrock-agentcore-control', region_name='ap-northeast-1')
sts = boto3.client('sts')
account_id = sts.get_caller_identity()['Account']
bucket = "your-bucket-name"
role_arn = f"arn:aws:iam::{account_id}:role/agentcore-demo-role"
runtime = control.create_agent_runtime(
agentRuntimeName='stateful_mcp_demo', # ハイフン不可。[a-zA-Z][a-zA-Z0-9_]{0,47}
description='Stateful MCP demo server',
agentRuntimeArtifact={
'codeConfiguration': {
'code': {
's3': {
'bucket': bucket,
'prefix': 'agentcore/deployment.zip',
}
},
'runtime': 'PYTHON_3_12',
'entryPoint': ['server.py'], # .pyファイルを直接指定。.shは不可
}
},
networkConfiguration={'networkMode': 'PUBLIC'},
roleArn=role_arn,
protocolConfiguration={
'serverProtocol': 'MCP'
},
)
runtime_id = runtime['agentRuntimeId']
print(f"Runtime ID: {runtime_id}")
はまりどころ:
agentRuntimeNameはハイフン不可。[a-zA-Z][a-zA-Z0-9_]{0,47}の形式のみ受け付けるentryPointは.pyファイルを直接指定。.shスクリプトや空白を含む文字列はValidationExceptionになる
作成後、ステータスが READY になるまで待つ(数分かかる)。
import time
while True:
resp = control.get_agent_runtime(agentRuntimeId=runtime_id)
status = resp['agentRuntime']['status']
print(f"Status: {status}")
if status == 'READY':
break
elif status in ['FAILED', 'ERROR']:
raise Exception(f"Failed: {resp}")
time.sleep(15)
エンドポイントを作成する
endpoint = control.create_agent_runtime_endpoint(
agentRuntimeId=runtime_id,
name='demo_endpoint',
)
endpoint_id = endpoint['endpointArn']
print(f"Endpoint: {endpoint_id}")
# READY待ち
while True:
resp = control.get_agent_runtime_endpoint(
agentRuntimeId=runtime_id,
endpointName='demo_endpoint'
)
status = resp['status']
print(f"Endpoint status: {status}")
if status == 'READY':
break
time.sleep(15)
クライアントから呼び出す
AgentCore Runtimeのエンドポイントへの接続にはSigV4署名が必要。invoke_agent_runtime(boto3)はリクエスト/レスポンス型のAPIのためElicitation・Sampling・Progress notificationsには使えない。SigV4署名付きでMCPプロトコルを直接実装する必要がある。
エンドポイントURLのフォーマット:
https://bedrock-agentcore.{region}.amazonaws.com/runtimes/{URLエンコードしたARN}/invocations?qualifier={endpoint-name}
AgentCore RuntimeはGETを受け付けない(405)。すべてPOSTで送り、レスポンスのSSEストリームで中間通知を受け取る。
import boto3, botocore.auth, botocore.credentials, botocore.awsrequest
import requests, urllib.parse
runtime_arn = 'arn:aws:bedrock-agentcore:ap-northeast-1:ACCOUNT_ID:runtime/RUNTIME_ID'
encoded_arn = urllib.parse.quote(runtime_arn, safe='')
base_url = f'https://bedrock-agentcore.ap-northeast-1.amazonaws.com/runtimes/{encoded_arn}/invocations?qualifier=demo_endpoint'
session = boto3.Session()
creds = session.get_credentials().get_frozen_credentials()
def sign_request(method, url, body_bytes=None, extra_headers=None):
headers = extra_headers or {}
req = botocore.awsrequest.AWSRequest(method=method, url=url, data=body_bytes, headers=headers)
auth = botocore.auth.SigV4Auth(
botocore.credentials.Credentials(creds.access_key, creds.secret_key, creds.token),
'bedrock-agentcore', 'ap-northeast-1'
)
auth.add_auth(req)
return dict(req.headers)
def mcp_post(body_dict, session_id=None, stream=False):
body = json.dumps(body_dict).encode()
extra = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream',
'mcp-protocol-version': '2025-03-26',
}
if session_id:
extra['Mcp-Session-Id'] = session_id
headers = sign_request('POST', base_url, body, extra)
return requests.post(base_url, data=body, headers=headers, timeout=60, stream=stream)
MCPセッションの開始:
import json
# 1. initialize
resp = mcp_post({'jsonrpc':'2.0','id':1,'method':'initialize','params':{
'protocolVersion':'2025-03-26',
'capabilities':{'sampling':{},'elicitation':{}},
'clientInfo':{'name':'my-client','version':'1.0'}
}})
session_id = resp.headers.get('Mcp-Session-Id')
# 2. initialized 通知
mcp_post({'jsonrpc':'2.0','method':'notifications/initialized'}, session_id)
Progress notificationsを受け取る
_meta.progressToken を付けて呼び出し、SSEをストリーミングで受信する。
resp = mcp_post({
'jsonrpc': '2.0',
'id': 3,
'method': 'tools/call',
'params': {
'name': 'long_task',
'arguments': {},
'_meta': {'progressToken': 'progress-1'} # ← これを付けないと通知が来ない
}
}, session_id, stream=True)
for chunk in resp.iter_lines():
if chunk:
line = chunk.decode('utf-8')
if line.startswith('data:'):
data = json.loads(line[5:].strip())
if data.get('method') == 'notifications/progress':
params = data['params']
print(f'[{params["progress"]}/{params["total"]}] {params.get("message","")}')
elif 'result' in data:
print(f'完了: {data["result"]["content"][0]["text"]}')
Elicitationを処理する
サーバーから elicitation/create が来たら、新たなPOSTで応答する。元のストリームは保持したまま並行処理が必要。
import threading, queue
events_q = queue.Queue()
def call_book_flight():
resp = mcp_post({
'jsonrpc': '2.0', 'id': 4,
'method': 'tools/call',
'params': {'name': 'book_flight', 'arguments': {}},
}, session_id, stream=True)
for chunk in resp.iter_lines():
if chunk:
line = chunk.decode('utf-8')
if line.startswith('data:'):
events_q.put(json.loads(line[5:].strip()))
# 別スレッドでツールを呼び出す
t = threading.Thread(target=call_book_flight, daemon=True)
t.start()
# elicitation/create を待つ
for _ in range(30):
event = events_q.get(timeout=2)
if event.get('method') == 'elicitation/create':
elicitation_id = event['id']
print(f'サーバーからの質問: {event["params"]["message"]}')
print(f'スキーマ: {event["params"]["requestedSchema"]}')
break
# ユーザー入力を受け取ってサーバーに応答
user_input = {'departure': '東京', 'destination': 'ニューヨーク'}
mcp_post({
'jsonrpc': '2.0',
'id': elicitation_id,
'result': {'action': 'accept', 'content': user_input}
}, session_id)
# 最終結果を元のストリームから受け取る
final = events_q.get(timeout=10)
print(f'結果: {final["result"]["content"][0]["text"]}')
Samplingを処理する
sampling/createMessage が来たらLLMを呼び出して応答する。
def call_generate_summary():
resp = mcp_post({
'jsonrpc': '2.0', 'id': 5,
'method': 'tools/call',
'params': {'name': 'generate_summary', 'arguments': {'topic': 'Machine Learning'}},
}, session_id, stream=True)
for chunk in resp.iter_lines():
if chunk:
line = chunk.decode('utf-8')
if line.startswith('data:'):
events_q.put(json.loads(line[5:].strip()))
t = threading.Thread(target=call_generate_summary, daemon=True)
t.start()
# sampling/createMessage を受け取る
event = events_q.get(timeout=10)
sampling_id = event['id']
messages = event['params']['messages']
print(f'サーバーからのLLM生成依頼: {messages}')
# クライアント側でLLMを呼び出す(Bedrockなど)
bedrock = boto3.client('bedrock-runtime', region_name='ap-northeast-1')
br_resp = bedrock.converse(
modelId='jp.anthropic.claude-haiku-4-5-20251001-v1:0',
messages=[{'role': 'user', 'content': [{'text': messages[0]['content']['text']}]}]
)
generated_text = br_resp['output']['message']['content'][0]['text']
# 生成結果をサーバーに返す
mcp_post({
'jsonrpc': '2.0',
'id': sampling_id,
'result': {
'role': 'assistant',
'content': {'type': 'text', 'text': generated_text},
'model': 'claude-haiku-4-5',
'stopReason': 'endTurn'
}
}, session_id)
# 最終結果を受け取る
final = events_q.get(timeout=10)
print(f'結果: {final["result"]["content"][0]["text"]}')
各機能の確認結果
東京リージョン(ap-northeast-1)で実際にデプロイ・動作確認した結果。
Progress notifications ✅ — 4ステップの処理が1秒ごとにSSEストリームで届いた。progressToken は必須で、ないと通知がクライアントに届かない。
Elicitation ✅ — elicitation/create がSSEストリームで届き、フォームスキーマも含まれる。応答用のPOSTを送ると処理が再開して最終結果が返ってきた(東京→ニューヨークの予約)。
Sampling ✅ — sampling/createMessage がSSEストリームで届き、クライアントがBedrockを呼んで応答すると最終結果が返ってきた。
クリーンアップ
import boto3
control = boto3.client('bedrock-agentcore-control', region_name='ap-northeast-1')
iam = boto3.client('iam')
runtime_id = "your-runtime-id"
# エンドポイント削除
control.delete_agent_runtime_endpoint(
agentRuntimeId=runtime_id,
endpointName='demo_endpoint'
)
# Runtime削除
control.delete_agent_runtime(agentRuntimeId=runtime_id)
# IAMロール削除
for policy_arn in [
'arn:aws:iam::aws:policy/AmazonBedrockFullAccess',
'arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess'
]:
iam.detach_role_policy(RoleName='agentcore-demo-role', PolicyArn=policy_arn)
iam.delete_role(RoleName='agentcore-demo-role')
まとめ
- AgentCore RuntimeはARM64環境。
--platform manylinux2014_aarch64 --python-version 3.12でビルドしたパッケージをZIPに含める - FastMCP 3.1.1は
mcp.run()にstateless_http=Falseとhost="0.0.0.0"を指定する - IAMトラストポリシーに
bedrock-agentcore.amazonaws.comを含めないとランタイムが機能しない agentRuntimeNameはハイフン不可。[a-zA-Z][a-zA-Z0-9_]{0,47}に合わせるinvoke_agent_runtimeはstateful機能に対応できない。SigV4署名付きでMCPプロトコルを直接実装する- AgentCore RuntimeはGETを受け付けない(405)。すべてPOST経由でSSEストリームを受信する
3機能(Progress notifications・Elicitation・Sampling)はいずれも東京リージョン(ap-northeast-1)、FastMCP 3.1.1で動作確認済み。