9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)
<h1> 9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026) </h1> <p>Model Context Protocol went from "interesting spec" to industry standard in under a year. 97 million monthly SDK downloads. Every major AI provider on board — Anthropic, OpenAI, Google, Microsoft, Amazon.</p> <p>But most tutorials still show toy examples. A weather tool. A calculator. Cool for demos, useless for production.</p> <p>Here are 9 patterns we've battle-tested in real multi-agent systems — with code you can ship today.</p> <h2> 1. The Tool Registry Pattern </h2> <p>Don't hardcode tools. Register them dynamically so agents discover capabilities at runtime.<br> </p> <div class="highlight js-code-highlight"> <pre class="highlight typescript"><code><span class="c1">// mcp-registry/src/registry.ts</
9 MCP Production Patterns That Actually Scale Multi-Agent Systems (2026)
Model Context Protocol went from "interesting spec" to industry standard in under a year. 97 million monthly SDK downloads. Every major AI provider on board — Anthropic, OpenAI, Google, Microsoft, Amazon.
But most tutorials still show toy examples. A weather tool. A calculator. Cool for demos, useless for production.
Here are 9 patterns we've battle-tested in real multi-agent systems — with code you can ship today.
1. The Tool Registry Pattern
Don't hardcode tools. Register them dynamically so agents discover capabilities at runtime.
// mcp-registry/src/registry.ts import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js";// mcp-registry/src/registry.ts import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListToolsRequestSchema, } from "@modelcontextprotocol/sdk/types.js";interface ToolDefinition { name: string; description: "string;" inputSchema: Record; handler: (args: Record) => Promise; version: string; healthCheck?: () => Promise; }
class ToolRegistry { private tools = new Map(); private healthStatus = new Map();
register(tool: ToolDefinition): void {
this.tools.set(tool.name, tool);
this.healthStatus.set(tool.name, true);
console.error([registry] Registered tool: ${tool.name} v${tool.version});
}
unregister(name: string): void {
this.tools.delete(name);
this.healthStatus.delete(name);
console.error([registry] Unregistered tool: ${name});
}
getHealthy(): ToolDefinition[] { return Array.from(this.tools.values()).filter( (t) => this.healthStatus.get(t.name) === true ); }
async runHealthChecks(): Promise {
for (const [name, tool] of this.tools) {
if (tool.healthCheck) {
try {
const healthy = await tool.healthCheck();
this.healthStatus.set(name, healthy);
} catch {
this.healthStatus.set(name, false);
console.error([registry] Health check failed: ${name});
}
}
}
}
}
const registry = new ToolRegistry();
// Example: register a database query tool registry.register({ name: "query_database", description: ""Execute read-only SQL queries against the analytics database"," version: "2.1.0", inputSchema: { type: "object", properties: { query: { type: "string", description: ""SQL SELECT query" }," timeout_ms: { type: "number", description: ""Query timeout", default: 5000 }," }, required: ["query"], }, handler: async (args) => { const query = args.query as string; if (!query.trim().toUpperCase().startsWith("SELECT")) { throw new Error("Only SELECT queries allowed"); } // Execute against your DB pool return await executeQuery(query, args.timeout_ms as number); }, healthCheck: async () => { try { await executeQuery("SELECT 1", 1000); return true; } catch { return false; } }, });
// Wire up the MCP server const server = new Server({ name: "tool-registry", version: "1.0.0" }, { capabilities: { tools: {} }, });
server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: registry.getHealthy().map((t) => ({ name: t.name, description: "t.description," inputSchema: t.inputSchema, })), }));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const tool = registry.getHealthy().find((t) => t.name === request.params.name);
if (!tool) {
return { content: [{ type: "text", text: Tool not found or unhealthy: ${request.params.name} }], isError: true };
}
try {
const result = await tool.handler(request.params.arguments ?? {});
return { content: [{ type: "text", text: JSON.stringify(result, null, 2) }] };
} catch (err) {
return { content: [{ type: "text", text: Error: ${(err as Error).message} }], isError: true };
}
});
// Health check loop setInterval(() => registry.runHealthChecks(), 30_000);
const transport = new StdioServerTransport(); await server.connect(transport);`
Enter fullscreen mode
Exit fullscreen mode
Why it matters: In production you have dozens of tools. Some go down. The registry pattern means agents only see what's actually working.
2. Context Window Budget Manager
MCP servers can return massive payloads. Without budget management, you blow your context window on one tool call.
# budget_manager.py import tiktoken from dataclasses import dataclass from typing import Any# budget_manager.py import tiktoken from dataclasses import dataclass from typing import Any@dataclass class ContextBudget: total_tokens: int reserved_for_response: int = 4096 reserved_for_system: int = 2000 used_tokens: int = 0
@property def available(self) -> int: return self.total_tokens - self.reserved_for_response - self.reserved_for_system - self.used_tokens
def consume(self, tokens: int) -> None: self.used_tokens += tokens
def can_afford(self, tokens: int) -> bool: return tokens <= self.available
class MCPBudgetProxy: """Wraps MCP tool results to enforce context budgets."""
def init(self, budget: ContextBudget, model: str = "gpt-4"): self.budget = budget self.encoder = tiktoken.encoding_for_model(model)
def count_tokens(self, text: str) -> int: return len(self.encoder.encode(text))
def truncate_to_budget(self, text: str, max_fraction: float = 0.3) -> str: """Truncate result to fit within budget fraction.""" max_tokens = int(self.budget.available * max_fraction) tokens = self.encoder.encode(text)*
if len(tokens) <= max_tokens: self.budget.consume(len(tokens)) return text
Truncate and add indicator
truncated = self.encoder.decode(tokens[:max_tokens - 20]) suffix = f"\n\n[TRUNCATED: {len(tokens)} tokens → {max_tokens} tokens. Request specific sections for full data.]" result = truncated + suffix self.budget.consume(max_tokens) return result
async def call_tool_with_budget( self, mcp_client, tool_name: str, arguments: dict[str, Any], max_fraction: float = 0.3, ) -> str: """Call MCP tool and enforce budget on response.""" if self.budget.available < 500: return "[BUDGET EXHAUSTED: Cannot make more tool calls. Respond with available context.]"
raw_result = await mcp_client.call_tool(tool_name, arguments) text = raw_result.content[0].text if raw_result.content else ""
return self.truncate_to_budget(text, max_fraction)
Usage in an agent loop
budget = ContextBudget(total_tokens=128_000) proxy = MCPBudgetProxy(budget)
async def agent_step(mcp_client, tool_name: str, args: dict) -> str: result = await proxy.call_tool_with_budget(mcp_client, tool_name, args) print(f"Budget remaining: {budget.available} tokens") return result`
Enter fullscreen mode
Exit fullscreen mode
The lesson: Production agents need resource management. Treat context tokens like memory — allocate, track, and refuse when depleted.
3. MCP Server Composition (The Gateway Pattern)
One agent shouldn't connect to 15 MCP servers. Build a gateway that composes multiple servers behind a single interface.
# mcp_gateway.py import asyncio import json from dataclasses import dataclass, field# mcp_gateway.py import asyncio import json from dataclasses import dataclass, field@dataclass class MCPServerConfig: name: str command: str args: list[str] = field(default_factory=list) env: dict[str, str] = field(default_factory=dict) priority: int = 0 # Higher = preferred when tools overlap
@dataclass class GatewayTool: name: str server: str original_name: str description: str input_schema: dict
class MCPGateway: """Composes multiple MCP servers into a single tool namespace."""
def init(self, configs: list[MCPServerConfig]): self.configs = {c.name: c for c in configs} self.connections: dict[str, Any] = {} self.tool_map: dict[str, GatewayTool] = {}
async def connect_all(self) -> None: for name, config in self.configs.items(): try: conn = await self.connect_server(config) self.connections[name] = conn tools = await conn.list_tools() for tool in tools: namespaced = f"{name}__{tool.name}" self.tool_map[namespaced] = GatewayTool( name=namespaced, server=name, original_name=tool.name, description=f"[{name}] {tool.description}", input_schema=tool.inputSchema, ) print(f"[gateway] Connected: {name} ({len(tools)} tools)") except Exception as e: print(f"[gateway] Failed to connect {name}: {e}")
async def call_tool(self, namespaced_name: str, arguments: dict) -> Any: gateway_tool = self.tool_map.get(namespaced_name) if not gateway_tool: raise ValueError(f"Unknown tool: {namespaced_name}")
conn = self.connections.get(gateway_tool.server) if not conn: raise ConnectionError(f"Server disconnected: {gateway_tool.server}")
return await conn.call_tool(gateway_tool.original_name, arguments)
def list_tools(self) -> list[dict]: return [ { "name": t.name, "description": t.description, "inputSchema": t.input_schema, } for t in self.tool_map.values() ]
async def connect_server(self, config: MCPServerConfig):
Uses MCP SDK client to connect via stdio
from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client
params = StdioServerParameters( command=config.command, args=config.args, env=config.env, ) transport = await stdio_client(params).aenter() session = ClientSession(transport) await session.initialize() return session
Configuration
gateway = MCPGateway([ MCPServerConfig(name="db", command="node", args=["./mcp-db-server/dist/index.js"]), MCPServerConfig(name="github", command="npx", args=["-y", "@modelcontextprotocol/server-github"]), MCPServerConfig(name="search", command="python", args=["./mcp-search-server.py"]), ])`
Enter fullscreen mode
Exit fullscreen mode
Why: A gateway gives you one connection, namespaced tools, and centralized error handling. Your agent doesn't need to know the topology.
4. Authentication Proxy for MCP
MCP has no built-in auth. Wrap your servers with an auth layer before exposing them.
// mcp-auth-proxy.ts import { createServer } from "net"; import { randomUUID } from "crypto"; import jwt from "jsonwebtoken";// mcp-auth-proxy.ts import { createServer } from "net"; import { randomUUID } from "crypto"; import jwt from "jsonwebtoken";interface AuthConfig { jwtSecret: string; allowedScopes: Map; // tool_name → required scopes }
interface AuthenticatedRequest { userId: string; scopes: string[]; requestId: string; }
class MCPAuthProxy { private config: AuthConfig; private auditLog: Array<{ timestamp: string; requestId: string; userId: string; tool: string; allowed: boolean; }> = [];
constructor(config: AuthConfig) { this.config = config; }
authenticate(token: string): AuthenticatedRequest | null { try { const decoded = jwt.verify(token, this.config.jwtSecret) as { sub: string; scopes: string[]; }; return { userId: decoded.sub, scopes: decoded.scopes, requestId: randomUUID(), }; } catch { return null; } }
authorize(auth: AuthenticatedRequest, toolName: string): boolean { const requiredScopes = this.config.allowedScopes.get(toolName) ?? ["admin"]; const allowed = requiredScopes.some((s) => auth.scopes.includes(s));
this.auditLog.push({ timestamp: new Date().toISOString(), requestId: auth.requestId, userId: auth.userId, tool: toolName, allowed, });
if (!allowed) {
console.error(
[auth] DENIED: user=${auth.userId} tool=${toolName} +
has=[${auth.scopes}] needs=[${requiredScopes}]
);
}
return allowed; }
getAuditLog(limit = 100) { return this.auditLog.slice(-limit); } }
// Setup const proxy = new MCPAuthProxy({ jwtSecret: process.env.MCP_JWT_SECRET!, allowedScopes: new Map([ ["query_database", ["db:read", "admin"]], ["write_database", ["db:write", "admin"]], ["deploy", ["deploy:prod", "admin"]], ["search_code", ["code:read", "admin"]], ]), });
// In your MCP server's tool handler: async function handleToolCall( token: string, toolName: string, args: Record ) { const auth = proxy.authenticate(token); if (!auth) { return { error: "Authentication failed", isError: true }; }
if (!proxy.authorize(auth, toolName)) { return { error: "Insufficient permissions", isError: true }; }
// Proceed with actual tool execution return await executeToolHandler(toolName, args); }`
Enter fullscreen mode
Exit fullscreen mode
Critical for production. Without auth, any agent can call any tool. That's fine in dev. In production, it's a security hole.
5. Streaming Results with Progress Reporting
Long-running MCP tools should stream progress, not block for 30 seconds and then dump results.
# streaming_mcp_tool.py import asyncio import json from datetime import datetime# streaming_mcp_tool.py import asyncio import json from datetime import datetimeclass StreamingToolHandler: """MCP tool handler that reports progress via notifications."""
def init(self, server): self.server = server
async def handle_long_analysis(self, arguments: dict) -> dict: """Analyze a large dataset with progress updates.""" dataset_url = arguments["dataset_url"] analysis_type = arguments.get("type", "summary")
steps = [ ("Fetching dataset", self._fetch_data, dataset_url), ("Validating schema", self._validate, None), ("Running analysis", self._analyze, analysis_type), ("Generating report", self._report, None), ]
results = {} for i, (description, func, arg) in enumerate(steps):
Send progress notification
await self.server.send_notification( "notifications/progress", { "progressToken": arguments.get("progressToken"), "progress": i, "total": len(steps), "message": description, }, )
start = datetime.now() result = await func(arg) if arg else await func() elapsed = (datetime.now() - start).total_seconds()
results[description] = { "status": "complete", "elapsed_seconds": elapsed, "output": result, }
return { "content": [ { "type": "text", "text": json.dumps(results, indent=2, default=str), } ] }
async def fetch_data(self, url: str) -> dict: import httpx async with httpx.AsyncClient() as client: resp = await client.get(url) data = resp.json() return {"rows": len(data), "size_kb": len(resp.content) / 1024}
async def validate(self) -> dict: await asyncio.sleep(0.5) # Schema validation return {"valid": True, "warnings": 0}
async def analyze(self, analysis_type: str) -> dict: await asyncio.sleep(2) # Heavy computation return {"type": analysis_type, "insights": 42}
async def report(self) -> dict: await asyncio.sleep(0.3) return {"format": "markdown", "sections": 5}`
Enter fullscreen mode
Exit fullscreen mode
User experience matters. Even for AI agents, knowing "step 2 of 4" is better than a hanging request.
6. Error Recovery with Retry Policies
MCP tool calls fail. Network issues, rate limits, timeouts. Build retry logic into your client.
# mcp_retry.py import asyncio import random from dataclasses import dataclass from enum import Enum from typing import Callable, Any# mcp_retry.py import asyncio import random from dataclasses import dataclass from enum import Enum from typing import Callable, Anyclass RetryStrategy(Enum): EXPONENTIAL = "exponential" LINEAR = "linear" IMMEDIATE = "immediate"
@dataclass class RetryPolicy: max_retries: int = 3 strategy: RetryStrategy = RetryStrategy.EXPONENTIAL base_delay_ms: int = 1000 max_delay_ms: int = 30000 jitter: bool = True retryable_errors: tuple = ("ConnectionError", "TimeoutError", "RateLimitError")
def get_delay(self, attempt: int) -> float: if self.strategy == RetryStrategy.EXPONENTIAL: delay = self.base_delay_ms * (2 ** attempt) elif self.strategy == RetryStrategy.LINEAR: delay = self.base_delay_ms * (attempt + 1) else: delay = 0
delay = min(delay, self.max_delay_ms)
if self.jitter: delay = delay * (0.5 + random.random())*
return delay / 1000 # Convert to seconds
class ResilientMCPClient: """MCP client wrapper with automatic retry and circuit breaking."""
def init(self, client, default_policy: RetryPolicy | None = None): self.client = client self.default_policy = default_policy or RetryPolicy() self.failure_counts: dict[str, int] = {} self.circuit_open: dict[str, float] = {}
async def call_tool( self, name: str, arguments: dict, policy: RetryPolicy | None = None, ) -> Any: policy = policy or self.default_policy
Circuit breaker check
if name in self.circuit_open: import time if time.time() - self.circuit_open[name] < 60: raise Exception(f"Circuit open for {name}. Try again later.") del self.circuit_open[name] self.failure_counts[name] = 0
last_error = None for attempt in range(policy.max_retries + 1): try: result = await self.client.call_tool(name, arguments) self.failure_counts[name] = 0 # Reset on success return result except Exception as e: last_error = e error_type = type(e).name
if error_type not in policy.retryable_errors: raise # Non-retryable, fail immediately
self.failure_counts[name] = self.failure_counts.get(name, 0) + 1
Trip circuit breaker after 5 consecutive failures
if self.failure_counts[name] >= 5: import time self.circuit_open[name] = time.time() raise Exception(f"Circuit breaker tripped for {name}") from e
if attempt < policy.max_retries: delay = policy.get_delay(attempt) print(f"[retry] {name} attempt {attempt + 1} failed: {e}. " f"Retrying in {delay:.1f}s") await asyncio.sleep(delay)
raise last_error`
Enter fullscreen mode
Exit fullscreen mode
Production reality: Things fail. The question is whether your system recovers automatically or pages you at 3 AM.
7. Tool Result Caching
Identical tool calls within the same conversation shouldn't hit the backend twice.
# mcp_cache.py import hashlib import json import time from typing import Any# mcp_cache.py import hashlib import json import time from typing import Anyclass ToolResultCache: """LRU cache for MCP tool results with TTL support."""
def init(self, max_size: int = 1000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: dict[str, dict] = {} self.access_order: list[str] = [] self.tool_ttls: dict[str, int] = {} self.stats = {"hits": 0, "misses": 0}
def set_ttl(self, tool_name: str, ttl_seconds: int) -> None: """Set custom TTL for a specific tool.""" self.tool_ttls[tool_name] = ttl_seconds
def cache_key(self, tool_name: str, arguments: dict) -> str: arg_str = json.dumps(arguments, sort_keys=True, default=str) return hashlib.sha256(f"{tool_name}:{arg_str}".encode()).hexdigest()
def get(self, tool_name: str, arguments: dict) -> Any | None: key = self.cache_key(tool_name, arguments) entry = self.cache.get(key)
if entry is None: self.stats["misses"] += 1 return None
ttl = self.tool_ttls.get(tool_name, self.default_ttl) if time.time() - entry["timestamp"] > ttl: del self.cache[key] self.stats["misses"] += 1 return None
self.stats["hits"] += 1
Move to end (most recently used)
if key in self.access_order: self.access_order.remove(key) self.access_order.append(key)
return entry["result"]
def put(self, tool_name: str, arguments: dict, result: Any) -> None: key = self.cache_key(tool_name, arguments)
Evict LRU if at capacity
while len(self.cache) >= self.max_size and self.access_order: evict_key = self.access_order.pop(0) self.cache.pop(evict_key, None)
self.cache[key] = {"result": result, "timestamp": time.time()} self.access_order.append(key)
class CachedMCPClient: """MCP client with transparent caching."""
Tools that should never be cached (side effects)
NEVER_CACHE = {"write_file", "send_email", "deploy", "delete"}
def init(self, client, cache: ToolResultCache | None = None): self.client = client self.cache = cache or ToolResultCache()
async def call_tool(self, name: str, arguments: dict) -> Any: if name in self.NEVER_CACHE: return await self.client.call_tool(name, arguments)
cached = self.cache.get(name, arguments) if cached is not None: return cached
result = await self.client.call_tool(name, arguments) self.cache.put(name, arguments, result) return result`
Enter fullscreen mode
Exit fullscreen mode
Real impact: In a multi-step agent workflow, the same search or database query can fire 3-4 times. Caching saves tokens, time, and API costs.
8. Observability: Structured Logging for MCP
You can't debug what you can't see. Instrument every MCP call.
# mcp_observability.py import json import time import logging from contextvars import ContextVar from dataclasses import dataclass, field, asdict from typing import Any# mcp_observability.py import json import time import logging from contextvars import ContextVar from dataclasses import dataclass, field, asdict from typing import Anyrequest_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")
@dataclass class ToolCallMetric: tool_name: str arguments_hash: str start_time: float end_time: float = 0 duration_ms: float = 0 success: bool = True error: str | None = None result_tokens: int = 0 cached: bool = False request_id: str = ""
def finalize(self) -> "ToolCallMetric": self.end_time = time.time() self.duration_ms = (self.end_time - self.start_time) * 1000 self.request_id = request_id_var.get() return self*
class MCPObserver: """Structured observability for MCP tool calls."""
def init(self): self.logger = logging.getLogger("mcp.observer") self.metrics: list[ToolCallMetric] = []
def record(self, metric: ToolCallMetric) -> None: self.metrics.append(metric) log_data = asdict(metric)
if metric.success: self.logger.info("mcp.tool.call", extra={"data": log_data}) else: self.logger.error("mcp.tool.error", extra={"data": log_data})
def get_summary(self) -> dict: if not self.metrics: return {"total_calls": 0}
durations = [m.duration_ms for m in self.metrics] errors = [m for m in self.metrics if not m.success] cached = [m for m in self.metrics if m.cached]
return { "total_calls": len(self.metrics), "error_count": len(errors), "cache_hit_rate": len(cached) / len(self.metrics), "avg_duration_ms": sum(durations) / len(durations), "p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)], "by_tool": self.by_tool(), }*
def by_tool(self) -> dict: tools: dict[str, list[float]] = {} for m in self.metrics: tools.setdefault(m.tool_name, []).append(m.duration_ms) return { name: {"calls": len(ds), "avg_ms": sum(ds) / len(ds)} for name, ds in tools.items() }`
Enter fullscreen mode
Exit fullscreen mode
Ship this on day one. When your agent makes 47 tool calls in a conversation and something goes wrong, you need to know which call, when, and why.
9. Multi-Agent Task Delegation via MCP
The real power: agents that delegate tasks to other agents through MCP.
# agent_delegation.py import asyncio import json from dataclasses import dataclass from enum import Enum# agent_delegation.py import asyncio import json from dataclasses import dataclass from enum import Enumclass TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETE = "complete" FAILED = "failed"
@dataclass class AgentTask: task_id: str description: str assigned_to: str status: TaskStatus = TaskStatus.PENDING result: str | None = None dependencies: list[str] | None = None
class AgentOrchestrator: """Coordinates multiple AI agents via MCP tool delegation."""
def init(self): self.agents: dict[str, dict] = {} self.tasks: dict[str, AgentTask] = {}
def register_agent(self, name: str, capabilities: list[str], mcp_client) -> None: self.agents[name] = { "capabilities": capabilities, "client": mcp_client, "active_tasks": 0, "max_concurrent": 3, }
def find_agent(self, required_capability: str) -> str | None: """Find the least-loaded agent with the required capability.""" candidates = [ (name, info) for name, info in self.agents.items() if required_capability in info["capabilities"] and info["active_tasks"] < info["max_concurrent"] ] if not candidates: return None return min(candidates, key=lambda x: x[1]["active_tasks"])[0]
async def delegate(self, task: AgentTask) -> str: """Delegate a task to an appropriate agent.""" self.tasks[task.task_id] = task
Wait for dependencies
if task.dependencies: await self.wait_for_dependencies(task.dependencies)
agent_name = task.assigned_to or self.find_agent("general") if not agent_name: task.status = TaskStatus.FAILED task.result = "No available agent" return task.result
agent = self.agents[agent_name] agent["active_tasks"] += 1 task.status = TaskStatus.RUNNING
try: result = await agent["client"].call_tool( "execute_task", { "task_id": task.task_id, "description": task.description, "context": self.get_dependency_results(task.dependencies or []), }, ) task.status = TaskStatus.COMPLETE task.result = result.content[0].text except Exception as e: task.status = TaskStatus.FAILED task.result = str(e) finally: agent["active_tasks"] -= 1
return task.result
async def run_parallel(self, tasks: list[AgentTask]) -> dict[str, str]: """Run independent tasks in parallel.""" results = await asyncio.gather( [self.delegate(t) for t in tasks], return_exceptions=True, ) return { t.task_id: str(r) for t, r in zip(tasks, results) }
async def wait_for_dependencies(self, dep_ids: list[str]) -> None: while True: all_done = all( self.tasks.get(d, AgentTask("", "", "")).status in (TaskStatus.COMPLETE, TaskStatus.FAILED) for d in dep_ids ) if all_done: return await asyncio.sleep(0.5)
def get_dependency_results(self, dep_ids: list[str]) -> dict: return { d: self.tasks[d].result for d in dep_ids if d in self.tasks and self.tasks[d].result }`
Enter fullscreen mode
Exit fullscreen mode
This is where MCP shines. Not as a tool protocol for one agent, but as the communication layer for agent swarms.
Putting It All Together
These 9 patterns form a production MCP stack:
Layer Pattern Purpose
Discovery Tool Registry Dynamic tool availability
Resource Budget Manager Context window protection
Topology Gateway Server composition
Security Auth Proxy Access control + audit
UX Streaming Progress feedback
Resilience Retry + Circuit Breaker Failure recovery
Performance Caching Reduce redundant calls
Ops Observability Debugging + metrics
Scale Delegation Multi-agent orchestration
If you're building agents that need to interact with real systems — databases, APIs, codebases — these patterns are the difference between a demo and a product.
Resources
Building production MCP systems means managing tools, context budgets, auth, and observability all at once. If you want pre-built utilities for common patterns like token budgeting, multi-model routing, and agent orchestration, check out the AI Dev Toolkit — it includes production-ready components for exactly these kinds of workflows.
The MCP ecosystem moves fast. What patterns are you using in production? Drop them in the comments.
This is part of the "AI Engineering in Practice" series — real patterns from real systems, not toy demos.
Dev.to AI
https://dev.to/dohkoai/9-mcp-production-patterns-that-actually-scale-multi-agent-systems-2026-4ap3Sign in to highlight and annotate this article

Conversation starters
Daily AI Digest
Get the top 5 AI stories delivered to your inbox every morning.
More about
modelavailableversionWebhook Best Practices: Retry Logic, Idempotency, and Error Handling
<h1> Webhook Best Practices: Retry Logic, Idempotency, and Error Handling </h1> <p>Most webhook integrations fail silently. A handler returns 500, the provider retries a few times, then stops. Your system never processed the event and no one knows.</p> <p>Webhooks are not guaranteed delivery by default. How reliably your integration works depends almost entirely on how you write the receiver. This guide covers the patterns that make webhook handlers production-grade: proper retry handling, idempotency, error response codes, and queue-based processing.</p> <h2> Understand the Delivery Model </h2> <p>Before building handlers, understand what you are dealing with:</p> <ul> <li>Providers send webhook events as HTTP POST requests</li> <li>They expect a 2xx response within a timeout (typically 5
Building a scoring engine with pure TypeScript functions (no ML, no backend)
<p>We needed to score e-commerce products across multiple dimensions: quality, profitability, market conditions, and risk.</p> <p>The constraints:</p> <ul> <li>Scores must update in real time</li> <li>Must run entirely in the browser (Chrome extension)</li> <li>Must be explainable (not a black box)</li> </ul> <p>We almost built an ML pipeline — training data, model serving, APIs, everything.</p> <p>Then we asked a simple question:</p> <p><strong>Do we actually need machine learning for this?</strong></p> <p>The answer was no.</p> <p>We ended up building several scoring engines in pure TypeScript.<br> Each one is a single function, under 100 lines, zero dependencies, and runs in under a millisecond.</p> <h2> What "pure function" means here </h2> <p>Each scoring engine follows 3 rules:</p> <
Why AI Agents Need a Trust Layer (And How We Built One)
<p><em>What happens when AI agents need to prove they're reliable before anyone trusts them with real work?</em></p> <h2> The Problem No One's Talking About </h2> <p>Every week, a new AI agent framework drops. Autonomous agents that can write code, send emails, book flights, manage databases. The capabilities are incredible.</p> <p>But here's the question nobody's answering: <strong>how do you know which agent to trust?</strong></p> <p>Right now, hiring an AI agent feels like hiring a contractor with no references, no portfolio, and no track record. You're just... hoping it works. And when it doesn't, there's no accountability trail.</p> <p>We kept running into this building our own multi-agent systems:</p> <ul> <li>Agent A says it can handle email outreach. Can it? Who knows.</li> <li>Age
Knowledge Map
Connected Articles — Knowledge Graph
This article is connected to other articles through shared AI topics and tags.
More in Releases
How to Use the ES2026 Temporal API in Node.js REST APIs (2026 Guide)
<p>After 9 years in development and countless TC39 meetings, the JavaScript Temporal API officially reached <strong>Stage 4 on March 11, 2026</strong>, locking it into the ES2026 specification. That means it's no longer a proposal — it's the future of date and time handling in JavaScript, and you should start using it in your Node.js APIs today.</p> <p>If you've ever shipped a date-related bug in production — DST edge cases, wrong timezone conversions, silent mutation bugs from <code>Date.setDate()</code> — you're not alone. The <code>Date</code> object was designed in 1995, copied from Java, and has been causing developer pain ever since. Temporal is the fix.</p> <p>This guide covers <strong>how to use the ES2026 Temporal API in Node.js REST APIs</strong> with practical, real-world patter
缓存架构深度指南:如何设计高性能缓存系统
<h1> 缓存架构深度指南:如何设计高性能缓存系统 </h1> <blockquote> <p>在现代分布式系统中,缓存是提升系统性能的核心组件。本文将深入探讨缓存架构的设计原则、策略与实战技巧。</p> </blockquote> <h2> 为什么要使用缓存? </h2> <p>在软件系统中,缓存的本质是<strong>用空间换时间</strong>。通过将频繁访问的数据存储在高速存储介质中,减少对慢速数据源的访问次数,从而显著提升系统响应速度。</p> <p>典型场景:</p> <ul> <li>数据库查询结果缓存</li> <li>API响应缓存</li> <li>会话状态缓存</li> <li>计算结果缓存</li> </ul> <h2> 缓存架构设计原则 </h2> <h3> 1. 缓存层级策略 </h3> <p>现代系统通常采用多级缓存架构:<br> </p> <div class="highlight js-code-highlight"> <pre class="highlight plaintext"><code>┌─────────────────────────────────────────────┐ │ CDN (边缘缓存) │ ├─────────────────────────────────────────────┤ │ Redis/Memcached │ ├─────────────────────────────────────────────┤ │ 本地缓存 │ ├─────────────────────────────────────────────┤ │ 数据库 │ └─────────────────────────────────────────────┘ </code></pre> </div> <p><strong>原则<
Axios Hijack Post-Mortem: How to Audit, Pin, and Automate a Defense
<p>On March 31, 2026, the <code>axios</code> npm package was compromised via a hijacked maintainer account. Two versions, <code>1.14.1</code> and <code>0.30.4</code>, were weaponised with a malicious phantom dependency called <code>plain-crypto-js</code>. It functions as a Remote Access Trojan (RAT) that executes during the <code>postinstall</code> phase and silently exfiltrates environment variables: AWS keys, GitHub tokens, database credentials, and anything present in your <code>.env</code> at install time.</p> <p>The attack window was approximately 3 hours (00:21 to 03:29 UTC) before the packages were unpublished. A single CI run during that window is sufficient exposure.<br> This post documents the forensic audit and remediation steps performed on a Next.js production stack immediatel
Guilford Technical CC to Launch Degrees in AI, Digital Media - govtech.com
<a href="https://news.google.com/rss/articles/CBMipgFBVV95cUxQOXdfNFpXQjJyRlo4aTA1cjdwZk5IbTNTNi1BU25hQUNlSjVXcE5ZelJNbFRMYUZsVFNWZ3lxX21TQ3NocHdLbldydkR0Q1JURXR5eVhXd3ItNjlJcE1TdHFPMnA1c0FQWDBmbWtNRC04YWRIelU5LWU3Rl9ZWHctYU02d2M4WHJ5a2pwaW0xcTRyNkVqSThhNkNxbFlZSkF4Q2tIZHNn?oc=5" target="_blank">Guilford Technical CC to Launch Degrees in AI, Digital Media</a> <font color="#6f6f6f">govtech.com</font>

Discussion
Sign in to join the discussion
No comments yet — be the first to share your thoughts!