Live
Black Hat USAAI BusinessBlack Hat AsiaAI BusinessHow to Test Discord Webhooks with HookCapDEV CommunitySaaS Pricing Models Decoded: What Per-Seat, Usage-Based, and Flat-Rate Really Cost YouDEV CommunityClaude Code hooks: intercept every tool call before it runsDEV CommunityHow to Test Twilio Webhooks with HookCapDEV CommunityI'm an AI Agent That Built Its Own Training Data PipelineDEV CommunityMy React Portfolio SEO Checklist: From 0 to Rich Results in 48 HoursDEV CommunityWhy AI Agents Need a Trust Layer (And How We Built One)DEV CommunityBuilding a scoring engine with pure TypeScript functions (no ML, no backend)DEV Community🚀 I Vibecoded an AI Interview Simulator in 1 Hour using Gemini + GroqDEV CommunityWebhook Best Practices: Retry Logic, Idempotency, and Error HandlingDEV CommunityObservabilidade de agentes de IA com LangChain4jDEV CommunityI Ranked on Google's First Page in 6 Weeks — Here's Every SEO Tactic I Used (Part 2)DEV CommunityBlack Hat USAAI BusinessBlack Hat AsiaAI BusinessHow to Test Discord Webhooks with HookCapDEV CommunitySaaS Pricing Models Decoded: What Per-Seat, Usage-Based, and Flat-Rate Really Cost YouDEV CommunityClaude Code hooks: intercept every tool call before it runsDEV CommunityHow to Test Twilio Webhooks with HookCapDEV CommunityI'm an AI Agent That Built Its Own Training Data PipelineDEV CommunityMy React Portfolio SEO Checklist: From 0 to Rich Results in 48 HoursDEV CommunityWhy AI Agents Need a Trust Layer (And How We Built One)DEV CommunityBuilding a scoring engine with pure TypeScript functions (no ML, no backend)DEV Community🚀 I Vibecoded an AI Interview Simulator in 1 Hour using Gemini + GroqDEV CommunityWebhook Best Practices: Retry Logic, Idempotency, and Error HandlingDEV CommunityObservabilidade de agentes de IA com LangChain4jDEV CommunityI Ranked on Google's First Page in 6 Weeks — Here's Every SEO Tactic I Used (Part 2)DEV Community

9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)

Dev.to AIby dohkoApril 1, 202616 min read0 views
Source Quiz

<h1> 9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026) </h1> <p>Model Context Protocol went from "interesting spec" to industry standard in under a year. 97 million monthly SDK downloads. Every major AI provider on board — Anthropic, OpenAI, Google, Microsoft, Amazon.</p> <p>But most tutorials still show toy examples. A weather tool. A calculator. Cool for demos, useless for production.</p> <p>Here are 9 patterns we've battle-tested in real multi-agent systems — with code you can ship today.</p> <h2> 1. The Tool Registry Pattern </h2> <p>Don't hardcode tools. Register them dynamically so agents discover capabilities at runtime.<br> </p> <div class="highlight js-code-highlight"> <pre class="highlight typescript"><code><span class="c1">// mcp-registry/src/registry.ts</

9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)

Model Context Protocol went from "interesting spec" to industry standard in under a year. 97 million monthly SDK downloads. Every major AI provider on board — Anthropic, OpenAI, Google, Microsoft, Amazon.

But most tutorials still show toy examples. A weather tool. A calculator. Cool for demos, useless for production.

Here are 9 patterns we've battle-tested in real multi-agent systems — with code you can ship today.

1. The Tool Registry Pattern

Don't hardcode tools. Register them dynamically so agents discover capabilities at runtime.

// mcp-registry/src/registry.ts import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import {  CallToolRequestSchema,  ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js";

interface ToolDefinition { name: string; description: "string;" inputSchema: Record; handler: (args: Record) => Promise; version: string; healthCheck?: () => Promise; }

class ToolRegistry { private tools = new Map(); private healthStatus = new Map();

register(tool: ToolDefinition): void { this.tools.set(tool.name, tool); this.healthStatus.set(tool.name, true); console.error([registry] Registered tool: ${tool.name} v${tool.version}); }

unregister(name: string): void { this.tools.delete(name); this.healthStatus.delete(name); console.error([registry] Unregistered tool: ${name}); }

getHealthy(): ToolDefinition[] { return Array.from(this.tools.values()).filter( (t) => this.healthStatus.get(t.name) === true ); }

async runHealthChecks(): Promise { for (const [name, tool] of this.tools) { if (tool.healthCheck) { try { const healthy = await tool.healthCheck(); this.healthStatus.set(name, healthy); } catch { this.healthStatus.set(name, false); console.error([registry] Health check failed: ${name}); } } } } }

const registry = new ToolRegistry();

// Example: register a database query tool registry.register({ name: "query_database", description: ""Execute read-only SQL queries against the analytics database"," version: "2.1.0", inputSchema: { type: "object", properties: { query: { type: "string", description: ""SQL SELECT query" }," timeout_ms: { type: "number", description: ""Query timeout", default: 5000 }," }, required: ["query"], }, handler: async (args) => { const query = args.query as string; if (!query.trim().toUpperCase().startsWith("SELECT")) { throw new Error("Only SELECT queries allowed"); } // Execute against your DB pool return await executeQuery(query, args.timeout_ms as number); }, healthCheck: async () => { try { await executeQuery("SELECT 1", 1000); return true; } catch { return false; } }, });

// Wire up the MCP server const server = new Server({ name: "tool-registry", version: "1.0.0" }, { capabilities: { tools: {} }, });

server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: registry.getHealthy().map((t) => ({ name: t.name, description: "t.description," inputSchema: t.inputSchema, })), }));

server.setRequestHandler(CallToolRequestSchema, async (request) => { const tool = registry.getHealthy().find((t) => t.name === request.params.name); if (!tool) { return { content: [{ type: "text", text: Tool not found or unhealthy: ${request.params.name} }], isError: true }; } try { const result = await tool.handler(request.params.arguments ?? {}); return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] }; } catch (err) { return { content: [{ type: "text", text: Error: ${(err as Error).message} }], isError: true }; } });

// Health check loop setInterval(() => registry.runHealthChecks(), 30_000);

const transport = new StdioServerTransport(); await server.connect(transport);`

Enter fullscreen mode

Exit fullscreen mode

Why it matters: In production you have dozens of tools. Some go down. The registry pattern means agents only see what's actually working.

2. Context Window Budget Manager

MCP servers can return massive payloads. Without budget management, you blow your context window on one tool call.

# budget_manager.py import tiktoken from dataclasses import dataclass from typing import Any

@dataclass class ContextBudget: total_tokens: int reserved_for_response: int = 4096 reserved_for_system: int = 2000 used_tokens: int = 0

@property def available(self) -> int: return self.total_tokens - self.reserved_for_response - self.reserved_for_system - self.used_tokens

def consume(self, tokens: int) -> None: self.used_tokens += tokens

def can_afford(self, tokens: int) -> bool: return tokens <= self.available

class MCPBudgetProxy: """Wraps MCP tool results to enforce context budgets."""

def init(self, budget: ContextBudget, model: str = "gpt-4"): self.budget = budget self.encoder = tiktoken.encoding_for_model(model)

def count_tokens(self, text: str) -> int: return len(self.encoder.encode(text))

def truncate_to_budget(self, text: str, max_fraction: float = 0.3) -> str: """Truncate result to fit within budget fraction.""" max_tokens = int(self.budget.available * max_fraction) tokens = self.encoder.encode(text)*

if len(tokens) <= max_tokens: self.budget.consume(len(tokens)) return text

Truncate and add indicator

truncated = self.encoder.decode(tokens[:max_tokens - 20]) suffix = f"\n\n[TRUNCATED: {len(tokens)} tokens → {max_tokens} tokens. Request specific sections for full data.]" result = truncated + suffix self.budget.consume(max_tokens) return result

async def call_tool_with_budget( self, mcp_client, tool_name: str, arguments: dict[str, Any], max_fraction: float = 0.3, ) -> str: """Call MCP tool and enforce budget on response.""" if self.budget.available < 500: return "[BUDGET EXHAUSTED: Cannot make more tool calls. Respond with available context.]"

raw_result = await mcp_client.call_tool(tool_name, arguments) text = raw_result.content[0].text if raw_result.content else ""

return self.truncate_to_budget(text, max_fraction)

Usage in an agent loop

budget = ContextBudget(total_tokens=128_000) proxy = MCPBudgetProxy(budget)

async def agent_step(mcp_client, tool_name: str, args: dict) -> str: result = await proxy.call_tool_with_budget(mcp_client, tool_name, args) print(f"Budget remaining: {budget.available} tokens") return result`

Enter fullscreen mode

Exit fullscreen mode

The lesson: Production agents need resource management. Treat context tokens like memory — allocate, track, and refuse when depleted.

3. MCP Server Composition (The Gateway Pattern)

One agent shouldn't connect to 15 MCP servers. Build a gateway that composes multiple servers behind a single interface.

# mcp_gateway.py import asyncio import json from dataclasses import dataclass, field

@dataclass class MCPServerConfig: name: str command: str args: list[str] = field(default_factory=list) env: dict[str, str] = field(default_factory=dict) priority: int = 0 # Higher = preferred when tools overlap

@dataclass class GatewayTool: name: str server: str original_name: str description: str input_schema: dict

class MCPGateway: """Composes multiple MCP servers into a single tool namespace."""

def init(self, configs: list[MCPServerConfig]): self.configs = {c.name: c for c in configs} self.connections: dict[str, Any] = {} self.tool_map: dict[str, GatewayTool] = {}

async def connect_all(self) -> None: for name, config in self.configs.items(): try: conn = await self.connect_server(config) self.connections[name] = conn tools = await conn.list_tools() for tool in tools: namespaced = f"{name}__{tool.name}" self.tool_map[namespaced] = GatewayTool( name=namespaced, server=name, original_name=tool.name, description=f"[{name}] {tool.description}", input_schema=tool.inputSchema, ) print(f"[gateway] Connected: {name} ({len(tools)} tools)") except Exception as e: print(f"[gateway] Failed to connect {name}: {e}")

async def call_tool(self, namespaced_name: str, arguments: dict) -> Any: gateway_tool = self.tool_map.get(namespaced_name) if not gateway_tool: raise ValueError(f"Unknown tool: {namespaced_name}")

conn = self.connections.get(gateway_tool.server) if not conn: raise ConnectionError(f"Server disconnected: {gateway_tool.server}")

return await conn.call_tool(gateway_tool.original_name, arguments)

def list_tools(self) -> list[dict]: return [ { "name": t.name, "description": t.description, "inputSchema": t.input_schema, } for t in self.tool_map.values() ]

async def connect_server(self, config: MCPServerConfig):

Uses MCP SDK client to connect via stdio

from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client

params = StdioServerParameters( command=config.command, args=config.args, env=config.env, ) transport = await stdio_client(params).aenter() session = ClientSession(transport) await session.initialize() return session

Configuration

gateway = MCPGateway([ MCPServerConfig(name="db", command="node", args=["./mcp-db-server/dist/index.js"]), MCPServerConfig(name="github", command="npx", args=["-y", "@modelcontextprotocol/server-github"]), MCPServerConfig(name="search", command="python", args=["./mcp-search-server.py"]), ])`

Enter fullscreen mode

Exit fullscreen mode

Why: A gateway gives you one connection, namespaced tools, and centralized error handling. Your agent doesn't need to know the topology.

4. Authentication Proxy for MCP

MCP has no built-in auth. Wrap your servers with an auth layer before exposing them.

// mcp-auth-proxy.ts import { createServer } from "net"; import { randomUUID } from "crypto"; import jwt from "jsonwebtoken";

interface AuthConfig { jwtSecret: string; allowedScopes: Map; // tool_name → required scopes }

interface AuthenticatedRequest { userId: string; scopes: string[]; requestId: string; }

class MCPAuthProxy { private config: AuthConfig; private auditLog: Array<{ timestamp: string; requestId: string; userId: string; tool: string; allowed: boolean; }> = [];

constructor(config: AuthConfig) { this.config = config; }

authenticate(token: string): AuthenticatedRequest | null { try { const decoded = jwt.verify(token, this.config.jwtSecret) as { sub: string; scopes: string[]; }; return { userId: decoded.sub, scopes: decoded.scopes, requestId: randomUUID(), }; } catch { return null; } }

authorize(auth: AuthenticatedRequest, toolName: string): boolean { const requiredScopes = this.config.allowedScopes.get(toolName) ?? ["admin"]; const allowed = requiredScopes.some((s) => auth.scopes.includes(s));

this.auditLog.push({ timestamp: new Date().toISOString(), requestId: auth.requestId, userId: auth.userId, tool: toolName, allowed, });

if (!allowed) { console.error( [auth] DENIED: user=${auth.userId} tool=${toolName} + has=[${auth.scopes}] needs=[${requiredScopes}] ); }

return allowed; }

getAuditLog(limit = 100) { return this.auditLog.slice(-limit); } }

// Setup const proxy = new MCPAuthProxy({ jwtSecret: process.env.MCP_JWT_SECRET!, allowedScopes: new Map([ ["query_database", ["db:read", "admin"]], ["write_database", ["db:write", "admin"]], ["deploy", ["deploy:prod", "admin"]], ["search_code", ["code:read", "admin"]], ]), });

// In your MCP server's tool handler: async function handleToolCall( token: string, toolName: string, args: Record ) { const auth = proxy.authenticate(token); if (!auth) { return { error: "Authentication failed", isError: true }; }

if (!proxy.authorize(auth, toolName)) { return { error: "Insufficient permissions", isError: true }; }

// Proceed with actual tool execution return await executeToolHandler(toolName, args); }`

Enter fullscreen mode

Exit fullscreen mode

Critical for production. Without auth, any agent can call any tool. That's fine in dev. In production, it's a security hole.

5. Streaming Results with Progress Reporting

Long-running MCP tools should stream progress, not block for 30 seconds and then dump results.

# streaming_mcp_tool.py import asyncio import json from datetime import datetime

class StreamingToolHandler: """MCP tool handler that reports progress via notifications."""

def init(self, server): self.server = server

async def handle_long_analysis(self, arguments: dict) -> dict: """Analyze a large dataset with progress updates.""" dataset_url = arguments["dataset_url"] analysis_type = arguments.get("type", "summary")

steps = [ ("Fetching dataset", self._fetch_data, dataset_url), ("Validating schema", self._validate, None), ("Running analysis", self._analyze, analysis_type), ("Generating report", self._report, None), ]

results = {} for i, (description, func, arg) in enumerate(steps):

Send progress notification

await self.server.send_notification( "notifications/progress", { "progressToken": arguments.get("progressToken"), "progress": i, "total": len(steps), "message": description, }, )

start = datetime.now() result = await func(arg) if arg else await func() elapsed = (datetime.now() - start).total_seconds()

results[description] = { "status": "complete", "elapsed_seconds": elapsed, "output": result, }

return { "content": [ { "type": "text", "text": json.dumps(results, indent=2, default=str), } ] }

async def fetch_data(self, url: str) -> dict: import httpx async with httpx.AsyncClient() as client: resp = await client.get(url) data = resp.json() return {"rows": len(data), "size_kb": len(resp.content) / 1024}

async def validate(self) -> dict: await asyncio.sleep(0.5) # Schema validation return {"valid": True, "warnings": 0}

async def analyze(self, analysis_type: str) -> dict: await asyncio.sleep(2) # Heavy computation return {"type": analysis_type, "insights": 42}

async def report(self) -> dict: await asyncio.sleep(0.3) return {"format": "markdown", "sections": 5}`

Enter fullscreen mode

Exit fullscreen mode

User experience matters. Even for AI agents, knowing "step 2 of 4" is better than a hanging request.

6. Error Recovery with Retry Policies

MCP tool calls fail. Network issues, rate limits, timeouts. Build retry logic into your client.

# mcp_retry.py import asyncio import random from dataclasses import dataclass from enum import Enum from typing import Callable, Any

class RetryStrategy(Enum): EXPONENTIAL = "exponential" LINEAR = "linear" IMMEDIATE = "immediate"

@dataclass class RetryPolicy: max_retries: int = 3 strategy: RetryStrategy = RetryStrategy.EXPONENTIAL base_delay_ms: int = 1000 max_delay_ms: int = 30000 jitter: bool = True retryable_errors: tuple = ("ConnectionError", "TimeoutError", "RateLimitError")

def get_delay(self, attempt: int) -> float: if self.strategy == RetryStrategy.EXPONENTIAL: delay = self.base_delay_ms * (2 ** attempt) elif self.strategy == RetryStrategy.LINEAR: delay = self.base_delay_ms * (attempt + 1) else: delay = 0

delay = min(delay, self.max_delay_ms)

if self.jitter: delay = delay * (0.5 + random.random())*

return delay / 1000 # Convert to seconds

class ResilientMCPClient: """MCP client wrapper with automatic retry and circuit breaking."""

def init(self, client, default_policy: RetryPolicy | None = None): self.client = client self.default_policy = default_policy or RetryPolicy() self.failure_counts: dict[str, int] = {} self.circuit_open: dict[str, float] = {}

async def call_tool( self, name: str, arguments: dict, policy: RetryPolicy | None = None, ) -> Any: policy = policy or self.default_policy

Circuit breaker check

if name in self.circuit_open: import time if time.time() - self.circuit_open[name] < 60: raise Exception(f"Circuit open for {name}. Try again later.") del self.circuit_open[name] self.failure_counts[name] = 0

last_error = None for attempt in range(policy.max_retries + 1): try: result = await self.client.call_tool(name, arguments) self.failure_counts[name] = 0 # Reset on success return result except Exception as e: last_error = e error_type = type(e).name

if error_type not in policy.retryable_errors: raise # Non-retryable, fail immediately

self.failure_counts[name] = self.failure_counts.get(name, 0) + 1

Trip circuit breaker after 5 consecutive failures

if self.failure_counts[name] >= 5: import time self.circuit_open[name] = time.time() raise Exception(f"Circuit breaker tripped for {name}") from e

if attempt < policy.max_retries: delay = policy.get_delay(attempt) print(f"[retry] {name} attempt {attempt + 1} failed: {e}. " f"Retrying in {delay:.1f}s") await asyncio.sleep(delay)

raise last_error`

Enter fullscreen mode

Exit fullscreen mode

Production reality: Things fail. The question is whether your system recovers automatically or pages you at 3 AM.

7. Tool Result Caching

Identical tool calls within the same conversation shouldn't hit the backend twice.

# mcp_cache.py import hashlib import json import time from typing import Any

class ToolResultCache: """LRU cache for MCP tool results with TTL support."""

def init(self, max_size: int = 1000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: dict[str, dict] = {} self.access_order: list[str] = [] self.tool_ttls: dict[str, int] = {} self.stats = {"hits": 0, "misses": 0}

def set_ttl(self, tool_name: str, ttl_seconds: int) -> None: """Set custom TTL for a specific tool.""" self.tool_ttls[tool_name] = ttl_seconds

def cache_key(self, tool_name: str, arguments: dict) -> str: arg_str = json.dumps(arguments, sort_keys=True, default=str) return hashlib.sha256(f"{tool_name}:{arg_str}".encode()).hexdigest()

def get(self, tool_name: str, arguments: dict) -> Any | None: key = self.cache_key(tool_name, arguments) entry = self.cache.get(key)

if entry is None: self.stats["misses"] += 1 return None

ttl = self.tool_ttls.get(tool_name, self.default_ttl) if time.time() - entry["timestamp"] > ttl: del self.cache[key] self.stats["misses"] += 1 return None

self.stats["hits"] += 1

Move to end (most recently used)

if key in self.access_order: self.access_order.remove(key) self.access_order.append(key)

return entry["result"]

def put(self, tool_name: str, arguments: dict, result: Any) -> None: key = self.cache_key(tool_name, arguments)

Evict LRU if at capacity

while len(self.cache) >= self.max_size and self.access_order: evict_key = self.access_order.pop(0) self.cache.pop(evict_key, None)

self.cache[key] = {"result": result, "timestamp": time.time()} self.access_order.append(key)

class CachedMCPClient: """MCP client with transparent caching."""

Tools that should never be cached (side effects)

NEVER_CACHE = {"write_file", "send_email", "deploy", "delete"}

def init(self, client, cache: ToolResultCache | None = None): self.client = client self.cache = cache or ToolResultCache()

async def call_tool(self, name: str, arguments: dict) -> Any: if name in self.NEVER_CACHE: return await self.client.call_tool(name, arguments)

cached = self.cache.get(name, arguments) if cached is not None: return cached

result = await self.client.call_tool(name, arguments) self.cache.put(name, arguments, result) return result`

Enter fullscreen mode

Exit fullscreen mode

Real impact: In a multi-step agent workflow, the same search or database query can fire 3-4 times. Caching saves tokens, time, and API costs.

8. Observability: Structured Logging for MCP

You can't debug what you can't see. Instrument every MCP call.

# mcp_observability.py import json import time import logging from contextvars import ContextVar from dataclasses import dataclass, field, asdict from typing import Any

request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")

@dataclass class ToolCallMetric: tool_name: str arguments_hash: str start_time: float end_time: float = 0 duration_ms: float = 0 success: bool = True error: str | None = None result_tokens: int = 0 cached: bool = False request_id: str = ""

def finalize(self) -> "ToolCallMetric": self.end_time = time.time() self.duration_ms = (self.end_time - self.start_time) * 1000 self.request_id = request_id_var.get() return self*

class MCPObserver: """Structured observability for MCP tool calls."""

def init(self): self.logger = logging.getLogger("mcp.observer") self.metrics: list[ToolCallMetric] = []

def record(self, metric: ToolCallMetric) -> None: self.metrics.append(metric) log_data = asdict(metric)

if metric.success: self.logger.info("mcp.tool.call", extra={"data": log_data}) else: self.logger.error("mcp.tool.error", extra={"data": log_data})

def get_summary(self) -> dict: if not self.metrics: return {"total_calls": 0}

durations = [m.duration_ms for m in self.metrics] errors = [m for m in self.metrics if not m.success] cached = [m for m in self.metrics if m.cached]

return { "total_calls": len(self.metrics), "error_count": len(errors), "cache_hit_rate": len(cached) / len(self.metrics), "avg_duration_ms": sum(durations) / len(durations), "p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)], "by_tool": self.by_tool(), }*

def by_tool(self) -> dict: tools: dict[str, list[float]] = {} for m in self.metrics: tools.setdefault(m.tool_name, []).append(m.duration_ms) return { name: {"calls": len(ds), "avg_ms": sum(ds) / len(ds)} for name, ds in tools.items() }`

Enter fullscreen mode

Exit fullscreen mode

Ship this on day one. When your agent makes 47 tool calls in a conversation and something goes wrong, you need to know which call, when, and why.

9. Multi-Agent Task Delegation via MCP

The real power: agents that delegate tasks to other agents through MCP.

# agent_delegation.py import asyncio import json from dataclasses import dataclass from enum import Enum

class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETE = "complete" FAILED = "failed"

@dataclass class AgentTask: task_id: str description: str assigned_to: str status: TaskStatus = TaskStatus.PENDING result: str | None = None dependencies: list[str] | None = None

class AgentOrchestrator: """Coordinates multiple AI agents via MCP tool delegation."""

def init(self): self.agents: dict[str, dict] = {} self.tasks: dict[str, AgentTask] = {}

def register_agent(self, name: str, capabilities: list[str], mcp_client) -> None: self.agents[name] = { "capabilities": capabilities, "client": mcp_client, "active_tasks": 0, "max_concurrent": 3, }

def find_agent(self, required_capability: str) -> str | None: """Find the least-loaded agent with the required capability.""" candidates = [ (name, info) for name, info in self.agents.items() if required_capability in info["capabilities"] and info["active_tasks"] < info["max_concurrent"] ] if not candidates: return None return min(candidates, key=lambda x: x[1]["active_tasks"])[0]

async def delegate(self, task: AgentTask) -> str: """Delegate a task to an appropriate agent.""" self.tasks[task.task_id] = task

Wait for dependencies

if task.dependencies: await self.wait_for_dependencies(task.dependencies)

agent_name = task.assigned_to or self.find_agent("general") if not agent_name: task.status = TaskStatus.FAILED task.result = "No available agent" return task.result

agent = self.agents[agent_name] agent["active_tasks"] += 1 task.status = TaskStatus.RUNNING

try: result = await agent["client"].call_tool( "execute_task", { "task_id": task.task_id, "description": task.description, "context": self.get_dependency_results(task.dependencies or []), }, ) task.status = TaskStatus.COMPLETE task.result = result.content[0].text except Exception as e: task.status = TaskStatus.FAILED task.result = str(e) finally: agent["active_tasks"] -= 1

return task.result

async def run_parallel(self, tasks: list[AgentTask]) -> dict[str, str]: """Run independent tasks in parallel.""" results = await asyncio.gather( [self.delegate(t) for t in tasks], return_exceptions=True, ) return { t.task_id: str(r) for t, r in zip(tasks, results) }

async def wait_for_dependencies(self, dep_ids: list[str]) -> None: while True: all_done = all( self.tasks.get(d, AgentTask("", "", "")).status in (TaskStatus.COMPLETE, TaskStatus.FAILED) for d in dep_ids ) if all_done: return await asyncio.sleep(0.5)

def get_dependency_results(self, dep_ids: list[str]) -> dict: return { d: self.tasks[d].result for d in dep_ids if d in self.tasks and self.tasks[d].result }`

Enter fullscreen mode

Exit fullscreen mode

This is where MCP shines. Not as a tool protocol for one agent, but as the communication layer for agent swarms.

Putting It All Together

These 9 patterns form a production MCP stack:

Layer Pattern Purpose

Discovery Tool Registry Dynamic tool availability

Resource Budget Manager Context window protection

Topology Gateway Server composition

Security Auth Proxy Access control + audit

UX Streaming Progress feedback

Resilience Retry + Circuit Breaker Failure recovery

Performance Caching Reduce redundant calls

Ops Observability Debugging + metrics

Scale Delegation Multi-agent orchestration

If you're building agents that need to interact with real systems — databases, APIs, codebases — these patterns are the difference between a demo and a product.

Resources

Building production MCP systems means managing tools, context budgets, auth, and observability all at once. If you want pre-built utilities for common patterns like token budgeting, multi-model routing, and agent orchestration, check out the AI Dev Toolkit — it includes production-ready components for exactly these kinds of workflows.

The MCP ecosystem moves fast. What patterns are you using in production? Drop them in the comments.

This is part of the "AI Engineering in Practice" series — real patterns from real systems, not toy demos.

Was this article helpful?

Sign in to highlight and annotate this article

AI
Ask AI about this article
Powered by AI News Hub · full article context loaded
Ready

Conversation starters

Ask anything about this article…

Daily AI Digest

Get the top 5 AI stories delivered to your inbox every morning.

More about

modelavailableversion

Knowledge Map

Knowledge Map
TopicsEntitiesSource
9 MCP Produ…modelavailableversionupdateproductmillionDev.to AI

Connected Articles — Knowledge Graph

This article is connected to other articles through shared AI topics and tags.

Knowledge Graph100 articles · 203 connections
Scroll to zoom · drag to pan · click to open

Discussion

Sign in to join the discussion

No comments yet — be the first to share your thoughts!

More in Releases

缓存架构深度指南:如何设计高性能缓存系统
ReleasesLive

缓存架构深度指南:如何设计高性能缓存系统

<h1> 缓存架构深度指南:如何设计高性能缓存系统 </h1> <blockquote> <p>在现代分布式系统中,缓存是提升系统性能的核心组件。本文将深入探讨缓存架构的设计原则、策略与实战技巧。</p> </blockquote> <h2> 为什么要使用缓存? </h2> <p>在软件系统中,缓存的本质是<strong>用空间换时间</strong>。通过将频繁访问的数据存储在高速存储介质中,减少对慢速数据源的访问次数,从而显著提升系统响应速度。</p> <p>典型场景:</p> <ul> <li>数据库查询结果缓存</li> <li>API响应缓存</li> <li>会话状态缓存</li> <li>计算结果缓存</li> </ul> <h2> 缓存架构设计原则 </h2> <h3> 1. 缓存层级策略 </h3> <p>现代系统通常采用多级缓存架构:<br> </p> <div class="highlight js-code-highlight"> <pre class="highlight plaintext"><code>┌─────────────────────────────────────────────┐ │ CDN (边缘缓存) │ ├─────────────────────────────────────────────┤ │ Redis/Memcached │ ├─────────────────────────────────────────────┤ │ 本地缓存 │ ├─────────────────────────────────────────────┤ │ 数据库 │ └─────────────────────────────────────────────┘ </code></pre> </div> <p><strong>原则<