Multi-Agent AI Systems: Architecture, Communication, and Coordination
Beyond single agents. Learn supervisor patterns, swarm architectures, and hybrid designs for multi-agent systems. Covers state synchronization, memory engineering, and failure recovery.
Single agents hit walls. They run out of context. They lack specialized knowledge. They can't parallelize work.
Multi-agent systems solve this by distributing intelligence across specialized agents that collaborate on complex tasks. One agent researches, another analyzes, a third writes the report. Each stays focused. Each has the context it needs.
But coordination is hard. Agents need to communicate, share state, resolve conflicts, and recover from failures. The architecture you choose determines whether your system scales or collapses under its own complexity.
This guide covers the orchestration patterns that work: supervisor, hierarchical, swarm, and network architectures. We'll examine state management, conflict resolution, and the memory engineering that makes multi-agent systems reliable.
Why Multi-Agent?
A single agent with access to every tool becomes unwieldy. Its context window fills with irrelevant information. Its prompts grow bloated. It makes mistakes because it's trying to be everything at once.
Multi-agent systems distribute cognitive load.
| Single Agent | Multi-Agent |
|---|---|
| One massive prompt | Focused role prompts |
| All tools loaded | Specialized toolsets |
| Linear execution | Parallel work |
| Single point of failure | Graceful degradation |
| Context overflow | Scoped contexts |
Research from Anthropic found that multi-agent systems with Claude Opus 4 leading and Claude Sonnet 4 subagents outperformed single-agent Claude Opus 4 by 90.2% on research tasks. The performance gain comes from specialization and parallel execution, not just throwing more compute at the problem.
The trade-off is coordination overhead. Multi-agent systems use roughly 15x more tokens than single-agent chat interactions. That overhead only pays off when the task genuinely benefits from distributed work.
Core Orchestration Patterns
Pattern 1: Supervisor (Hub-and-Spoke)
One orchestrator agent delegates to specialist workers:
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Research │ │ Analysis │ │ Writer │
└──────────┘ └──────────┘ └──────────┘
The supervisor:
- Receives the task
- Decomposes it into subtasks
- Routes subtasks to appropriate specialists
- Aggregates results
- Decides when the task is complete
Implementation with LangGraph:
from typing import Literal
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
def supervisor(state: MessagesState) -> Command[Literal["researcher", "analyst", "writer", "__end__"]]:
"""Route to the appropriate specialist or finish"""
messages = state["messages"]
response = llm.with_structured_output(Router).invoke([
{"role": "system", "content": """You coordinate a team:
- researcher: finds information from the web
- analyst: analyzes data and identifies patterns
- writer: produces final reports
Based on the conversation, decide who should work next.
Return FINISH when the task is complete."""},
*messages
])
if response.next == "FINISH":
return Command(goto=END)
return Command(goto=response.next)
# Build graph
graph = StateGraph(MessagesState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("analyst", analyst_agent)
graph.add_node("writer", writer_agent)
# All workers report back to supervisor
for worker in ["researcher", "analyst", "writer"]:
graph.add_edge(worker, "supervisor")
graph.add_edge(START, "supervisor")
app = graph.compile()
When to use:
- Clear task decomposition
- Need for coordination between steps
- Predictable workflow patterns
- When audit trails matter
Failure mode: Supervisor becomes a bottleneck. If it fails, everything stops. Mitigate with supervisor health checks and fallback supervisors.
Pattern 2: Hierarchical (Multi-Level Supervision)
Supervisors manage other supervisors, creating a tree structure:
┌─────────────────┐
│ Top Supervisor │
└────────┬────────┘
┌───────────────────┴───────────────────┐
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Research Supvsr │ │ Writing Supvsr │
└────────┬────────┘ └────────┬────────┘
┌─────┴─────┐ ┌─────┴─────┐
▼ ▼ ▼ ▼
┌───────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Web │ │ Docs │ │ Draft │ │ Editor │
│Search │ │Analysis│ │ Writer │ │ │
└───────┘ └────────┘ └────────┘ └────────┘
Each level abstracts complexity for the level above. The top supervisor thinks in terms of "research" and "writing," not individual search queries or editing passes.
Implementation:
# Research team subgraph
research_graph = StateGraph(ResearchState)
research_graph.add_node("research_supervisor", research_supervisor)
research_graph.add_node("web_search", web_search_agent)
research_graph.add_node("doc_analysis", doc_analysis_agent)
# ... wire edges
research_team = research_graph.compile()
# Writing team subgraph
writing_graph = StateGraph(WritingState)
writing_graph.add_node("writing_supervisor", writing_supervisor)
writing_graph.add_node("drafter", draft_agent)
writing_graph.add_node("editor", editor_agent)
# ... wire edges
writing_team = writing_graph.compile()
# Top-level graph uses teams as nodes
def research_node(state):
result = research_team.invoke({"query": state["task"]})
return {"research_results": result["findings"]}
def writing_node(state):
result = writing_team.invoke({
"topic": state["task"],
"research": state["research_results"]
})
return {"final_output": result["document"]}
main_graph = StateGraph(MainState)
main_graph.add_node("research", research_node)
main_graph.add_node("writing", writing_node)
main_graph.add_edge(START, "research")
main_graph.add_edge("research", "writing")
main_graph.add_edge("writing", END)
When to use:
- Complex tasks requiring multiple coordination levels
- Large agent teams (10+ agents)
- Domain-specific sub-teams with distinct expertise
- When different parts of the system need different update cycles
Failure mode: Information loss between levels. Summaries discard nuance. Mitigate by passing both summaries and raw data where context budget allows.
Pattern 3: Agents as Tools
Wrap specialist agents as callable tools for an orchestrator:
from langchain_core.tools import tool
@tool
def legal_review(document: str) -> str:
"""Have a legal specialist review a document for compliance issues"""
legal_agent = create_legal_agent()
result = legal_agent.invoke({"document": document})
return result["review"]
@tool
def technical_review(code: str) -> str:
"""Have a technical specialist review code for issues"""
tech_agent = create_tech_agent()
result = tech_agent.invoke({"code": code})
return result["review"]
# Orchestrator has these agents as tools
orchestrator = create_react_agent(
llm,
tools=[legal_review, technical_review, web_search, file_write]
)
The orchestrator decides when to invoke specialists based on the task. Specialists do focused work and return results. The orchestrator maintains conversation continuity.
Key practice: Write detailed docstrings. The orchestrator's LLM uses these to decide when to call each specialist. Vague descriptions lead to wrong routing.
When to use:
- Bounded subtasks that don't need conversation takeover
- Specialists that produce discrete outputs (reviews, analyses, transforms)
- When you want the orchestrator to maintain context across specialist calls
Pattern 4: Swarm (Decentralized)
No central coordinator. Agents interact peer-to-peer based on local rules:
┌───────────┐ ┌───────────┐
│ Agent A │◄───►│ Agent B │
└─────┬─────┘ └─────┬─────┘
│ │
│ ┌─────────┐ │
└──►│ Shared │◄──┘
│ Memory │
┌──►│ │◄──┐
│ └─────────┘ │
┌─────┴─────┐ ┌─────┴─────┐
│ Agent C │◄───►│ Agent D │
└───────────┘ └───────────┘
Each agent:
- Operates on local information
- Communicates with nearby agents
- Follows simple behavioral rules
- Contributes to emergent collective behavior
Implementation sketch:
class SwarmAgent:
def __init__(self, name: str, specialty: str, shared_memory):
self.name = name
self.specialty = specialty
self.memory = shared_memory
async def step(self, task_context: dict):
# Read what others have contributed
others_work = await self.memory.read_recent(exclude=self.name)
# Decide if I should contribute
if self._should_contribute(task_context, others_work):
my_contribution = await self._do_work(task_context, others_work)
await self.memory.write(self.name, my_contribution)
# Check if task is complete
return self._evaluate_completion(task_context, others_work)
def _should_contribute(self, task, others):
"""Local decision based on specialty and current state"""
# Does my specialty apply?
# Have others covered this already?
# Is there a gap I can fill?
pass
async def run_swarm(agents: list, task: dict, max_rounds: int = 5):
shared_memory = SwarmMemory()
for round in range(max_rounds):
results = await asyncio.gather(*[
agent.step(task) for agent in agents
])
if all(r.get("complete") for r in results):
break
return shared_memory.synthesize()
Swarm characteristics:
- No single point of failure
- Scales horizontally
- Emergent behavior from local interactions
- Harder to debug and predict
When to use:
- Brainstorming and ideation (multiple perspectives)
- Problems benefiting from parallel exploration
- Systems requiring fault tolerance
- When you can't predict the optimal coordination structure
Failure mode: Coordination chaos. Without structure, agents may duplicate work, contradict each other, or never converge. Anthropic's early multi-agent experiments saw agents spawning 50 subagents for simple queries, searching endlessly for nonexistent sources, and distracting each other with excessive updates. Mitigate with round limits, contribution rules, and convergence criteria.
Pattern 5: Network (Full Mesh)
Every agent can communicate with every other agent:
class NetworkOrchestrator:
def __init__(self, agents: dict):
self.agents = agents # name -> agent
self.message_bus = MessageBus()
async def run(self, task: dict):
# Any agent can send to any other
for name, agent in self.agents.items():
agent.set_message_handler(
lambda msg, sender: self.route_message(msg, sender, name)
)
# Kick off with the task
await self.message_bus.broadcast(task)
# Run until termination condition
while not self._is_complete():
messages = await self.message_bus.collect()
await self._process_messages(messages)
def route_message(self, message, sender, recipient):
"""Route message from sender to recipient"""
self.message_bus.send(recipient, message, sender)
When to use:
- Dynamic interaction patterns
- When agents need to negotiate directly
- Problems where the communication structure can't be predetermined
Failure mode: Communication explosion. N agents means N² possible connections. At scale, message volume can overwhelm the system. Mitigate with message prioritization, rate limiting, and topic-based filtering.
State Management
Multi-agent systems need shared state. Without it, agents operate on different versions of reality and make conflicting decisions.
Think of it like a shared document. If every team member works on their own copy, you end up with chaos when trying to merge. Same problem with agents. They need a single source of truth for what's been decided, what's been done, and what the current goal is.
State Architecture Options
1. Centralized State Store
All agents read from and write to a single state object:
class CentralState:
def __init__(self):
self._state = {}
self._lock = asyncio.Lock()
self._version = 0
async def read(self, key: str):
async with self._lock:
return self._state.get(key), self._version
async def write(self, key: str, value, expected_version: int):
async with self._lock:
if self._version != expected_version:
raise OptimisticLockError("State changed")
self._state[key] = value
self._version += 1
Pros: Simple, consistent, easy to debug Cons: Bottleneck, single point of failure
2. Event-Sourced State
State changes are recorded as immutable events:
class EventStore:
def __init__(self):
self.events = []
def append(self, event: dict):
self.events.append({
**event,
"timestamp": time.time(),
"sequence": len(self.events)
})
def replay(self, up_to: int = None) -> dict:
"""Reconstruct state by replaying events"""
state = {}
events = self.events[:up_to] if up_to else self.events
for event in events:
state = self._apply_event(state, event)
return state
def _apply_event(self, state, event):
if event["type"] == "SET":
state[event["key"]] = event["value"]
elif event["type"] == "DELETE":
state.pop(event["key"], None)
return state
Pros: Full audit trail, time travel, easy replication Cons: Storage grows indefinitely, replay can be slow
3. Distributed State with Consensus
Each agent maintains local state, synchronized via consensus protocol:
class DistributedState:
def __init__(self, agent_id: str, peers: list):
self.agent_id = agent_id
self.peers = peers
self.local_state = {}
self.vector_clock = {p: 0 for p in peers}
async def propose(self, key: str, value):
"""Propose a state change to the cluster"""
proposal = {
"key": key,
"value": value,
"proposer": self.agent_id,
"clock": self.vector_clock.copy()
}
# Collect votes from peers
votes = await asyncio.gather(*[
self._request_vote(peer, proposal)
for peer in self.peers
])
if sum(votes) > len(self.peers) / 2:
await self._commit(proposal)
return True
return False
Pros: No single point of failure, scales horizontally Cons: Complex, eventual consistency, network partition handling
Choosing a State Architecture
| Requirement | Recommended Architecture |
|---|---|
| Simple coordination | Centralized |
| Audit/compliance needs | Event-sourced |
| High availability | Distributed |
| < 5 agents | Centralized |
| 5-20 agents | Event-sourced with central store |
| 20+ agents | Distributed |
Conflict Resolution
When multiple agents write conflicting values, the system needs a policy. This happens more often than you'd expect. Two agents analyze the same data and reach different conclusions. One agent updates a field while another is still reading it. The system needs rules for who wins.
Last-Writer-Wins (Usually Wrong)
def resolve_lww(conflicts: list) -> any:
return max(conflicts, key=lambda c: c["timestamp"])["value"]
Simple but dangerous. A less competent agent can overwrite a better assessment. Only use when conflicts are genuinely equivalent.
Role-Based Priority
ROLE_PRIORITY = {
"supervisor": 100,
"specialist": 50,
"worker": 10
}
def resolve_by_role(conflicts: list) -> any:
return max(conflicts, key=lambda c: ROLE_PRIORITY.get(c["role"], 0))["value"]
Supervisors override workers. Specialists override generalists. Explicit hierarchy.
Confidence-Weighted Voting
def resolve_by_confidence(conflicts: list) -> any:
# Group by value
votes = defaultdict(float)
for conflict in conflicts:
votes[conflict["value"]] += conflict["confidence"]
# Return highest confidence value
return max(votes.items(), key=lambda x: x[1])[0]
Agents report confidence with their writes. Higher confidence carries more weight.
Human Escalation
def resolve_with_escalation(conflicts: list, threshold: float = 0.3) -> any:
# If confidence spread is too wide, escalate
confidences = [c["confidence"] for c in conflicts]
if max(confidences) - min(confidences) > threshold:
return interrupt({
"type": "conflict_resolution",
"conflicts": conflicts,
"message": "Agents disagree significantly. Please resolve."
})
# Otherwise use confidence voting
return resolve_by_confidence(conflicts)
When agents strongly disagree, bring in a human. This is especially important for high-stakes decisions.
Provenance Tracking
Whatever resolution strategy you use, track metadata:
@dataclass
class StateEntry:
value: any
author_agent: str
timestamp: float
confidence: float
evidence: list[str]
supersedes: str | None # ID of entry this replaced
When conflicts arise, you can inspect provenance to understand why and make better decisions.
Memory Engineering
Memory is the foundation of multi-agent coordination. Without proper memory architecture, agents operate on stale or inconsistent information.
Anthropic's early multi-agent experiments hit this hard. Agents spawned 50 subagents for simple queries, searched endlessly for nonexistent sources, and distracted each other with excessive updates. The problem wasn't communication. It was memory. Agents couldn't remember what had already been tried, what was already known, or what decisions had already been made.
Memory Types
Working Memory (Short-term) Current task context, recent messages, immediate state. Stored in the graph state or conversation history. Cleared between sessions.
Episodic Memory (Session) What happened during this task execution. Stored in checkpoints. Enables resume and time travel.
Semantic Memory (Long-term) Facts, procedures, learned patterns. Persisted to database. Shared across sessions.
Multi-Agent Memory Patterns
Blackboard Pattern A shared workspace where agents read and write:
class Blackboard:
def __init__(self):
self.entries = {}
self.watchers = defaultdict(list)
def write(self, agent: str, key: str, value: any):
self.entries[key] = {
"value": value,
"author": agent,
"timestamp": time.time()
}
# Notify watchers
for callback in self.watchers[key]:
callback(self.entries[key])
def read(self, key: str) -> any:
return self.entries.get(key, {}).get("value")
def watch(self, key: str, callback):
self.watchers[key].append(callback)
Agents subscribe to keys they care about. Updates trigger callbacks.
Hierarchical Summarization Different abstraction levels for different consumers:
class HierarchicalMemory:
def __init__(self):
self.detailed = [] # Full entries
self.summaries = [] # Compressed summaries
self.global_summary = "" # One-liner
def add(self, entry: dict):
self.detailed.append(entry)
# Summarize every N entries
if len(self.detailed) % 10 == 0:
summary = self._summarize(self.detailed[-10:])
self.summaries.append(summary)
self.global_summary = self._summarize_summaries(self.summaries)
def get_for_role(self, role: str):
if role == "supervisor":
return self.global_summary
elif role == "planner":
return self.summaries
else:
return self.detailed[-20:] # Recent detailed
Supervisors get high-level summaries. Workers get detailed context.
Role-Adaptive Retrieval What you retrieve depends on who's asking:
def retrieve(query: str, agent_role: str, agent_specialty: str):
# Base retrieval
results = vector_store.search(query, k=20)
# Filter by relevance to role
if agent_role == "specialist":
results = [r for r in results if r.metadata.get("domain") == agent_specialty]
# Rank by agent-specific criteria
if agent_role == "supervisor":
results.sort(key=lambda r: r.metadata.get("importance", 0), reverse=True)
return results[:10]
Token Economics
Multi-agent systems are expensive. Anthropic's data shows agents use 4x more tokens than chat, and multi-agent systems use 15x more. Manage this with:
Context Trimming
def trim_for_agent(messages: list, max_tokens: int, agent_role: str):
# Always keep system message
system_msg = messages[0]
# Keep more recent context for workers, broader context for supervisors
if agent_role == "worker":
keep_recent = int(max_tokens * 0.8)
else:
keep_recent = int(max_tokens * 0.5)
trimmed = trim_messages(messages[1:], keep_recent, strategy="last")
return [system_msg] + trimmed
Selective Loading Don't load everything. Load what the current agent needs:
def load_context_for_agent(agent: Agent, task: dict):
context = []
# Task-specific context
context.extend(memory.get_task_context(task["id"]))
# Agent-specialty context
context.extend(memory.get_specialty_context(agent.specialty))
# Recent inter-agent messages (last 5 only)
context.extend(message_bus.get_recent(agent.id, limit=5))
return context
Communication Protocols
Agents need to talk to each other. The protocol you choose affects reliability, debuggability, and performance.
Message Passing
Direct communication between agents:
@dataclass
class AgentMessage:
sender: str
recipient: str
type: str # "request", "response", "broadcast"
content: dict
correlation_id: str # Link request/response pairs
timestamp: float
class MessageBus:
def __init__(self):
self.queues = defaultdict(asyncio.Queue)
async def send(self, message: AgentMessage):
await self.queues[message.recipient].put(message)
async def receive(self, agent_id: str, timeout: float = None):
try:
return await asyncio.wait_for(
self.queues[agent_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
async def broadcast(self, sender: str, content: dict):
for recipient in self.queues.keys():
if recipient != sender:
await self.send(AgentMessage(
sender=sender,
recipient=recipient,
type="broadcast",
content=content,
correlation_id=str(uuid4()),
timestamp=time.time()
))
Handoffs
Transfer conversation control between agents:
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
def create_agent_with_handoffs(name: str, tools: list, handoff_targets: list):
"""Create an agent that can hand off to other agents"""
@tool
def transfer_to_agent(target: str, context: str) -> str:
"""Transfer the conversation to another specialist agent.
Args:
target: Name of the agent to transfer to
context: Context to pass to the receiving agent
"""
if target not in handoff_targets:
return f"Unknown agent: {target}. Available: {handoff_targets}"
return Command(
goto=target,
update={"handoff_context": context, "from_agent": name}
)
return create_react_agent(
llm,
tools=tools + [transfer_to_agent],
name=name
)
Handoffs are explicit transfers. The sending agent acknowledges it can't handle the request and routes to a better-suited agent.
Tool Calling vs. Handoffs
| Aspect | Agent as Tool | Handoff |
|---|---|---|
| Control | Orchestrator retains | Transfers to specialist |
| Context | Orchestrator manages | Specialist takes over |
| Use case | Bounded subtasks | Full delegation |
| Return | Result back to orchestrator | Specialist responds to user |
Use tool calling when the orchestrator should synthesize results. Use handoffs when the specialist should own the interaction.
Failure Handling
Multi-agent systems have more failure modes than single agents. Plan for them.
Timeout and Retry
async def call_agent_with_retry(
agent,
input: dict,
max_retries: int = 3,
timeout: float = 30.0
):
for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
agent.ainvoke(input),
timeout=timeout
)
return result
except asyncio.TimeoutError:
if attempt < max_retries - 1:
# Exponential backoff
await asyncio.sleep(2 ** attempt)
continue
raise AgentTimeoutError(f"Agent timed out after {max_retries} attempts")
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
Circuit Breaker
Prevent cascading failures:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.state = "closed" # closed, open, half-open
self.last_failure_time = 0
async def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
Graceful Degradation
When a specialist fails, fall back to a generalist:
async def research_with_fallback(query: str):
try:
# Try specialist first
return await research_specialist.ainvoke({"query": query})
except AgentFailure:
# Fall back to generalist
return await general_agent.ainvoke({
"query": query,
"context": "Research task (specialist unavailable)"
})
Health Monitoring
Track agent health metrics:
class AgentHealthMonitor:
def __init__(self):
self.metrics = defaultdict(lambda: {
"success_count": 0,
"failure_count": 0,
"total_latency": 0.0,
"last_success": None,
"last_failure": None
})
def record_success(self, agent_id: str, latency: float):
m = self.metrics[agent_id]
m["success_count"] += 1
m["total_latency"] += latency
m["last_success"] = time.time()
def record_failure(self, agent_id: str, error: str):
m = self.metrics[agent_id]
m["failure_count"] += 1
m["last_failure"] = time.time()
m["last_error"] = error
def get_health(self, agent_id: str) -> dict:
m = self.metrics[agent_id]
total = m["success_count"] + m["failure_count"]
return {
"success_rate": m["success_count"] / total if total > 0 else 0,
"avg_latency": m["total_latency"] / m["success_count"] if m["success_count"] > 0 else 0,
"healthy": m["success_count"] / total > 0.9 if total > 10 else True
}
Choosing an Architecture
| Scenario | Recommended Pattern |
|---|---|
| Clear task decomposition, need audit trail | Supervisor |
| Large teams (10+), complex domain structure | Hierarchical |
| Bounded specialist subtasks | Agents as Tools |
| Brainstorming, need diverse perspectives | Swarm |
| Dynamic interaction needs | Network |
| Fault tolerance critical | Swarm or distributed supervisor |
| Debugging simplicity needed | Supervisor |
Most production systems use hybrid approaches: a supervisor orchestrating specialists (agents as tools), with hierarchical organization for large agent counts, and swarm-like parallel exploration for specific subtasks.
Real-World Architectures
Research Agent Team
A working research system with supervisor, web researcher, document analyst, and synthesizer:
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
class ResearchState(TypedDict):
messages: Annotated[list, add_messages]
query: str
web_results: List[dict]
document_analysis: List[dict]
synthesis: str
current_agent: str
iteration: int
def supervisor(state: ResearchState):
"""Coordinate the research team"""
query = state["query"]
web_results = state.get("web_results", [])
doc_analysis = state.get("document_analysis", [])
iteration = state.get("iteration", 0)
# Decide next action
if not web_results:
return {"current_agent": "web_researcher", "iteration": iteration + 1}
if not doc_analysis and web_results:
return {"current_agent": "doc_analyst", "iteration": iteration + 1}
if web_results and doc_analysis:
return {"current_agent": "synthesizer", "iteration": iteration + 1}
return {"current_agent": "done"}
def web_researcher(state: ResearchState):
"""Search the web for relevant information"""
query = state["query"]
# Use search tools
results = tavily_search.invoke(query)
return {
"web_results": results,
"messages": [{"role": "assistant", "content": f"Found {len(results)} web sources"}]
}
def doc_analyst(state: ResearchState):
"""Analyze documents and extract key information"""
web_results = state["web_results"]
analysis = []
for result in web_results[:5]: # Analyze top 5
response = llm.invoke([
{"role": "system", "content": "Extract key facts, claims, and evidence from this content."},
{"role": "user", "content": result["content"]}
])
analysis.append({
"source": result["url"],
"analysis": response.content
})
return {"document_analysis": analysis}
def synthesizer(state: ResearchState):
"""Synthesize research into coherent answer"""
query = state["query"]
analysis = state["document_analysis"]
analysis_text = "\n\n".join([
f"Source: {a['source']}\n{a['analysis']}"
for a in analysis
])
response = llm.invoke([
{"role": "system", "content": "Synthesize these research findings into a comprehensive answer with citations."},
{"role": "user", "content": f"Question: {query}\n\nResearch:\n{analysis_text}"}
])
return {"synthesis": response.content}
def route(state: ResearchState):
agent = state.get("current_agent", "")
if agent == "web_researcher":
return "web_researcher"
elif agent == "doc_analyst":
return "doc_analyst"
elif agent == "synthesizer":
return "synthesizer"
return END
# Build the graph
graph = StateGraph(ResearchState)
graph.add_node("supervisor", supervisor)
graph.add_node("web_researcher", web_researcher)
graph.add_node("doc_analyst", doc_analyst)
graph.add_node("synthesizer", synthesizer)
graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route)
graph.add_edge("web_researcher", "supervisor")
graph.add_edge("doc_analyst", "supervisor")
graph.add_edge("synthesizer", END)
research_team = graph.compile()
Code Review Pipeline
Sequential agents for code quality:
class CodeReviewState(TypedDict):
code: str
language: str
security_issues: List[dict]
performance_issues: List[dict]
style_issues: List[dict]
summary: str
def security_reviewer(state: CodeReviewState):
"""Check for security vulnerabilities"""
code = state["code"]
response = llm.invoke([
{"role": "system", "content": """You are a security expert.
Identify security vulnerabilities: injection, XSS, auth issues, secrets exposure.
Return JSON: {"issues": [{"severity": "high/medium/low", "line": N, "description": "..."}]}"""},
{"role": "user", "content": code}
])
issues = json.loads(response.content)
return {"security_issues": issues.get("issues", [])}
def performance_reviewer(state: CodeReviewState):
"""Identify performance problems"""
code = state["code"]
response = llm.invoke([
{"role": "system", "content": """You are a performance engineer.
Identify: N+1 queries, memory leaks, inefficient algorithms, blocking calls.
Return JSON: {"issues": [{"impact": "high/medium/low", "line": N, "description": "..."}]}"""},
{"role": "user", "content": code}
])
issues = json.loads(response.content)
return {"performance_issues": issues.get("issues", [])}
def style_reviewer(state: CodeReviewState):
"""Check coding standards"""
code = state["code"]
language = state["language"]
response = llm.invoke([
{"role": "system", "content": f"""Review this {language} code for style issues.
Check: naming conventions, code organization, documentation, idioms.
Return JSON: {{"issues": [{{"severity": "suggestion/warning", "line": N, "description": "..."}}]}}"""},
{"role": "user", "content": code}
])
issues = json.loads(response.content)
return {"style_issues": issues.get("issues", [])}
def summarizer(state: CodeReviewState):
"""Create review summary"""
sec = state.get("security_issues", [])
perf = state.get("performance_issues", [])
style = state.get("style_issues", [])
high_severity = len([i for i in sec if i.get("severity") == "high"])
summary = f"""## Code Review Summary
**Security**: {len(sec)} issues ({high_severity} high severity)
**Performance**: {len(perf)} issues
**Style**: {len(style)} suggestions
{"⚠️ HIGH SEVERITY SECURITY ISSUES - Do not merge" if high_severity > 0 else "✅ No blocking issues"}
"""
return {"summary": summary}
# Sequential pipeline
graph = StateGraph(CodeReviewState)
graph.add_node("security", security_reviewer)
graph.add_node("performance", performance_reviewer)
graph.add_node("style", style_reviewer)
graph.add_node("summarize", summarizer)
graph.add_edge(START, "security")
graph.add_edge("security", "performance")
graph.add_edge("performance", "style")
graph.add_edge("style", "summarize")
graph.add_edge("summarize", END)
review_pipeline = graph.compile()
Customer Support Escalation
Agents with handoff capabilities:
def triage_agent(state):
"""Initial contact - classify and route"""
message = state["messages"][-1].content
# Classify intent
classification = llm.with_structured_output(TicketClass).invoke([
{"role": "system", "content": "Classify this support request: billing, technical, account, general"},
{"role": "user", "content": message}
])
if classification.category == "billing":
return Command(goto="billing_specialist", update={"category": "billing"})
elif classification.category == "technical":
return Command(goto="tech_support", update={"category": "technical"})
elif classification.urgency == "high":
return Command(goto="escalation", update={"urgency": "high"})
# Handle general queries directly
response = llm.invoke(state["messages"])
return {"messages": [response]}
def billing_specialist(state):
"""Handle billing inquiries"""
# Has access to billing tools
return create_react_agent(
llm,
tools=[lookup_invoice, process_refund, update_payment],
system_prompt="You are a billing specialist..."
).invoke(state)
def tech_support(state):
"""Handle technical issues"""
# Has access to diagnostic tools
return create_react_agent(
llm,
tools=[check_service_status, lookup_logs, create_ticket],
system_prompt="You are technical support..."
).invoke(state)
def escalation(state):
"""Escalate to human"""
ticket = create_support_ticket(state)
return {
"messages": [{
"role": "assistant",
"content": f"I've escalated your issue to a specialist. Ticket #{ticket.id}. You'll hear back within 2 hours."
}],
"escalated": True,
"ticket_id": ticket.id
}
Debugging Multi-Agent Systems
Trace Visualization
Track agent interactions:
class AgentTracer:
def __init__(self):
self.traces = []
def trace(self, agent: str, action: str, input: dict, output: dict, latency: float):
self.traces.append({
"timestamp": time.time(),
"agent": agent,
"action": action,
"input_summary": self._summarize(input),
"output_summary": self._summarize(output),
"latency_ms": latency * 1000
})
def _summarize(self, data: dict) -> str:
"""Truncate for readability"""
s = str(data)
return s[:200] + "..." if len(s) > 200 else s
def print_trace(self):
for t in self.traces:
print(f"[{t['agent']}] {t['action']} ({t['latency_ms']:.0f}ms)")
print(f" In: {t['input_summary']}")
print(f" Out: {t['output_summary']}")
print()
def to_mermaid(self) -> str:
"""Generate sequence diagram"""
lines = ["sequenceDiagram"]
for i, t in enumerate(self.traces):
if i > 0:
prev = self.traces[i-1]["agent"]
curr = t["agent"]
lines.append(f" {prev}->>+{curr}: {t['action']}")
return "\n".join(lines)
Replay Testing
Test with recorded traces:
class AgentReplayTest:
def __init__(self, recorded_traces: list):
self.traces = recorded_traces
self.mock_responses = {}
def mock_agent(self, agent_name: str, response: dict):
self.mock_responses[agent_name] = response
def replay(self, graph, initial_state: dict):
"""Replay with mocked agent responses"""
with patch_agents(self.mock_responses):
result = graph.invoke(initial_state)
return result
def compare(self, expected: dict, actual: dict) -> list:
"""Diff expected vs actual"""
diffs = []
for key in expected:
if key not in actual:
diffs.append(f"Missing key: {key}")
elif expected[key] != actual[key]:
diffs.append(f"Diff in {key}: expected {expected[key]}, got {actual[key]}")
return diffs
Production Checklist
Before deploying multi-agent systems:
Architecture
- [ ] Defined agent roles and boundaries
- [ ] Chosen appropriate orchestration pattern
- [ ] Mapped communication flows
- [ ] Identified single points of failure
State Management
- [ ] State schema defined
- [ ] Conflict resolution policy chosen
- [ ] Persistence strategy selected
- [ ] Provenance tracking implemented
Reliability
- [ ] Timeout and retry logic
- [ ] Circuit breakers for external calls
- [ ] Graceful degradation paths
- [ ] Health monitoring
Observability
- [ ] Structured logging per agent
- [ ] Trace correlation across agents
- [ ] Token usage tracking
- [ ] Latency monitoring
Governance
- [ ] Human-in-the-loop checkpoints
- [ ] Approval workflows for risky actions
- [ ] Audit trail for decisions
- [ ] Role-based access control
Frequently Asked Questions
When should I use multi-agent instead of a single agent?
Use multi-agent when your single agent hits one of these limits: context window overflow from too many tools or too much history, poor performance because the agent is trying to be a generalist at everything, or tasks that would benefit from parallel execution. If your agent handles requests well with a focused prompt and a handful of tools, stick with single-agent. Multi-agent adds 15x token overhead, so the coordination cost needs to pay off in better results.
What's the simplest multi-agent architecture to start with?
Supervisor pattern with two specialists. One orchestrator decides which specialist handles each subtask. Start here because it's easy to debug (you can trace every decision the supervisor makes), easy to extend (add more specialists as needed), and covers most use cases. Graduate to hierarchical only when you have 10+ agents and need multiple coordination levels.
How do I prevent agents from contradicting each other?
Three approaches work in practice. First, give each agent a distinct scope with no overlap. The researcher finds information, the analyst interprets it, the writer presents it. Clear boundaries prevent conflicts. Second, use a supervisor to sequence work so agents build on each other's output rather than working in parallel on the same thing. Third, if parallel work is necessary, implement explicit conflict resolution: confidence-weighted voting, role-based priority, or human escalation when agents disagree significantly.
Should agents share memory or have separate contexts?
Both, with clear boundaries. Shared memory works for task state: what's been accomplished, what decisions were made, what the current goal is. Separate contexts work for agent-specific reasoning: the researcher's search strategy, the analyst's interpretation framework. The mistake is putting everything in shared memory (context overflow) or keeping everything separate (agents can't coordinate). Use hierarchical summarization: detailed context stays local, high-level summaries go to shared state.
How do I handle agent failures without losing work?
Checkpoint aggressively. Save state after every significant step, not just at the end. When an agent fails, you can resume from the last checkpoint instead of starting over. For critical paths, run redundant agents and take the first successful result. For non-critical work, implement graceful degradation: if the specialist fails, fall back to a generalist that can produce acceptable (if not optimal) results.
What's the difference between handoffs and tool calling?
Tool calling keeps the orchestrator in control. The orchestrator calls a specialist, gets a result, and decides what to do next. The specialist does bounded work and returns. Handoffs transfer control entirely. The specialist takes over the conversation and responds directly to the user. Use tool calling when you need to synthesize results from multiple specialists. Use handoffs when one specialist should own the entire interaction for their domain.
How many agents is too many?
Coordination overhead grows faster than agent count. With 3 agents, you have 3 potential communication paths. With 10 agents, you have 45. With 20, you have 190. Most production systems stay under 10 agents for a single task. If you need more, use hierarchical organization: teams of 3-5 agents each, coordinated by team supervisors, coordinated by a top-level orchestrator. This keeps any single coordination layer manageable.
Can I use different LLMs for different agents?
Yes, and you probably should. Use capable models (GPT-4, Claude Opus) for orchestrators and complex reasoning. Use faster, cheaper models (GPT-4o-mini, Claude Haiku, Mistral) for specialists doing focused tasks. Anthropic's research showed their best results came from Opus orchestrating Sonnet subagents. The orchestrator needs judgment; specialists need speed and focus. Match model capability to task complexity.
How do I debug a multi-agent system when something goes wrong?
Trace everything. Log every agent invocation with input state, output state, and latency. Use correlation IDs to link related calls across agents. When something fails, you should be able to reconstruct the exact sequence: which agent was called, what it received, what it returned, and where the chain broke. Tools like LangSmith provide this out of the box. Without tracing, multi-agent debugging becomes guesswork.
What frameworks support multi-agent orchestration?
LangGraph handles graph-based workflows with explicit state management and is good for complex, branching logic. CrewAI uses role-based agents with simpler configuration and is faster to prototype. AutoGen from Microsoft focuses on conversational multi-agent patterns. AWS Strands provides patterns for agents as tools and swarms. All of these support the core patterns (supervisor, hierarchical, swarm) with different trade-offs in flexibility versus ease of use.
How do I test multi-agent systems?
Test at three levels. Unit test individual agents in isolation: given this input state, does the agent produce the expected output? Integration test agent pairs: when the researcher hands off to the analyst, does the interface work correctly? End-to-end test the full system: given a realistic task, does the system produce acceptable results within time and cost budgets? Use recorded traces to create regression tests. When you fix a bug, add the failing case to your test suite.
What's the cost difference between single-agent and multi-agent?
Expect 10-20x higher token usage for multi-agent systems. Each agent call includes system prompts, context, and reasoning. Coordination messages add overhead. Parallel execution means paying for multiple simultaneous calls. This cost is justified when multi-agent produces significantly better results or enables tasks that single-agent can't handle. Track cost per task, not just cost per token. If multi-agent completes in one attempt what single-agent fails at in five, multi-agent might be cheaper despite higher token usage.
Summary
Multi-agent orchestration is coordination engineering. The patterns are straightforward: supervisor for centralized control, hierarchical for scale, swarm for distributed exploration. The hard parts are state management, conflict resolution, and failure handling.
Start simple. A supervisor with two or three specialists handles most use cases. Add hierarchy when you hit coordination limits. Consider swarms when you need fault tolerance or diverse exploration.
Memory architecture matters more than communication architecture. Agents that share state effectively outperform agents that communicate constantly but can't remember what was decided. Invest in memory engineering before optimizing message passing.
For teams building multi-agent systems that need enterprise reliability, Prem Studio provides fine-tuning and evaluation tools to test agent performance before deployment. When smaller specialized models outperform larger general models, fine-tuning becomes the competitive advantage in agentic workflows.