Multi-Agent AI Systems: Architecture, Communication, and Coordination

Beyond single agents. Learn supervisor patterns, swarm architectures, and hybrid designs for multi-agent systems. Covers state synchronization, memory engineering, and failure recovery.

Multi-Agent AI Systems: Architecture, Communication, and Coordination

Single agents hit walls. They run out of context. They lack specialized knowledge. They can't parallelize work.

Multi-agent systems solve this by distributing intelligence across specialized agents that collaborate on complex tasks. One agent researches, another analyzes, a third writes the report. Each stays focused. Each has the context it needs.

But coordination is hard. Agents need to communicate, share state, resolve conflicts, and recover from failures. The architecture you choose determines whether your system scales or collapses under its own complexity.

This guide covers the orchestration patterns that work: supervisor, hierarchical, swarm, and network architectures. We'll examine state management, conflict resolution, and the memory engineering that makes multi-agent systems reliable.

Why Multi-Agent?

A single agent with access to every tool becomes unwieldy. Its context window fills with irrelevant information. Its prompts grow bloated. It makes mistakes because it's trying to be everything at once.

Multi-agent systems distribute cognitive load.

Single Agent Multi-Agent
One massive prompt Focused role prompts
All tools loaded Specialized toolsets
Linear execution Parallel work
Single point of failure Graceful degradation
Context overflow Scoped contexts

Research from Anthropic found that multi-agent systems with Claude Opus 4 leading and Claude Sonnet 4 subagents outperformed single-agent Claude Opus 4 by 90.2% on research tasks. The performance gain comes from specialization and parallel execution, not just throwing more compute at the problem.

The trade-off is coordination overhead. Multi-agent systems use roughly 15x more tokens than single-agent chat interactions. That overhead only pays off when the task genuinely benefits from distributed work.

Core Orchestration Patterns

Pattern 1: Supervisor (Hub-and-Spoke)

One orchestrator agent delegates to specialist workers:

                    ┌─────────────┐
                    │  Supervisor │
                    └──────┬──────┘
           ┌───────────────┼───────────────┐
           ▼               ▼               ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Research │    │ Analysis │    │  Writer  │
    └──────────┘    └──────────┘    └──────────┘

The supervisor:

  • Receives the task
  • Decomposes it into subtasks
  • Routes subtasks to appropriate specialists
  • Aggregates results
  • Decides when the task is complete

Implementation with LangGraph:

from typing import Literal
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command

def supervisor(state: MessagesState) -> Command[Literal["researcher", "analyst", "writer", "__end__"]]:
    """Route to the appropriate specialist or finish"""
    messages = state["messages"]
    
    response = llm.with_structured_output(Router).invoke([
        {"role": "system", "content": """You coordinate a team:
        - researcher: finds information from the web
        - analyst: analyzes data and identifies patterns  
        - writer: produces final reports
        
        Based on the conversation, decide who should work next.
        Return FINISH when the task is complete."""},
        *messages
    ])
    
    if response.next == "FINISH":
        return Command(goto=END)
    return Command(goto=response.next)

# Build graph
graph = StateGraph(MessagesState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("analyst", analyst_agent)
graph.add_node("writer", writer_agent)

# All workers report back to supervisor
for worker in ["researcher", "analyst", "writer"]:
    graph.add_edge(worker, "supervisor")

graph.add_edge(START, "supervisor")
app = graph.compile()

When to use:

  • Clear task decomposition
  • Need for coordination between steps
  • Predictable workflow patterns
  • When audit trails matter

Failure mode: Supervisor becomes a bottleneck. If it fails, everything stops. Mitigate with supervisor health checks and fallback supervisors.

Pattern 2: Hierarchical (Multi-Level Supervision)

Supervisors manage other supervisors, creating a tree structure:

                         ┌─────────────────┐
                         │  Top Supervisor │
                         └────────┬────────┘
              ┌───────────────────┴───────────────────┐
              ▼                                       ▼
    ┌─────────────────┐                     ┌─────────────────┐
    │ Research Supvsr │                     │ Writing Supvsr  │
    └────────┬────────┘                     └────────┬────────┘
       ┌─────┴─────┐                           ┌─────┴─────┐
       ▼           ▼                           ▼           ▼
   ┌───────┐  ┌────────┐                  ┌────────┐  ┌────────┐
   │ Web   │  │ Docs   │                  │ Draft  │  │ Editor │
   │Search │  │Analysis│                  │ Writer │  │        │
   └───────┘  └────────┘                  └────────┘  └────────┘

Each level abstracts complexity for the level above. The top supervisor thinks in terms of "research" and "writing," not individual search queries or editing passes.

Implementation:

# Research team subgraph
research_graph = StateGraph(ResearchState)
research_graph.add_node("research_supervisor", research_supervisor)
research_graph.add_node("web_search", web_search_agent)
research_graph.add_node("doc_analysis", doc_analysis_agent)
# ... wire edges
research_team = research_graph.compile()

# Writing team subgraph
writing_graph = StateGraph(WritingState)
writing_graph.add_node("writing_supervisor", writing_supervisor)
writing_graph.add_node("drafter", draft_agent)
writing_graph.add_node("editor", editor_agent)
# ... wire edges
writing_team = writing_graph.compile()

# Top-level graph uses teams as nodes
def research_node(state):
    result = research_team.invoke({"query": state["task"]})
    return {"research_results": result["findings"]}

def writing_node(state):
    result = writing_team.invoke({
        "topic": state["task"],
        "research": state["research_results"]
    })
    return {"final_output": result["document"]}

main_graph = StateGraph(MainState)
main_graph.add_node("research", research_node)
main_graph.add_node("writing", writing_node)
main_graph.add_edge(START, "research")
main_graph.add_edge("research", "writing")
main_graph.add_edge("writing", END)

When to use:

  • Complex tasks requiring multiple coordination levels
  • Large agent teams (10+ agents)
  • Domain-specific sub-teams with distinct expertise
  • When different parts of the system need different update cycles

Failure mode: Information loss between levels. Summaries discard nuance. Mitigate by passing both summaries and raw data where context budget allows.

Pattern 3: Agents as Tools

Wrap specialist agents as callable tools for an orchestrator:

from langchain_core.tools import tool

@tool
def legal_review(document: str) -> str:
    """Have a legal specialist review a document for compliance issues"""
    legal_agent = create_legal_agent()
    result = legal_agent.invoke({"document": document})
    return result["review"]

@tool  
def technical_review(code: str) -> str:
    """Have a technical specialist review code for issues"""
    tech_agent = create_tech_agent()
    result = tech_agent.invoke({"code": code})
    return result["review"]

# Orchestrator has these agents as tools
orchestrator = create_react_agent(
    llm,
    tools=[legal_review, technical_review, web_search, file_write]
)

The orchestrator decides when to invoke specialists based on the task. Specialists do focused work and return results. The orchestrator maintains conversation continuity.

Key practice: Write detailed docstrings. The orchestrator's LLM uses these to decide when to call each specialist. Vague descriptions lead to wrong routing.

When to use:

  • Bounded subtasks that don't need conversation takeover
  • Specialists that produce discrete outputs (reviews, analyses, transforms)
  • When you want the orchestrator to maintain context across specialist calls

Pattern 4: Swarm (Decentralized)

No central coordinator. Agents interact peer-to-peer based on local rules:

    ┌───────────┐     ┌───────────┐
    │  Agent A  │◄───►│  Agent B  │
    └─────┬─────┘     └─────┬─────┘
          │                 │
          │   ┌─────────┐   │
          └──►│ Shared  │◄──┘
              │ Memory  │
          ┌──►│         │◄──┐
          │   └─────────┘   │
    ┌─────┴─────┐     ┌─────┴─────┐
    │  Agent C  │◄───►│  Agent D  │
    └───────────┘     └───────────┘

Each agent:

  • Operates on local information
  • Communicates with nearby agents
  • Follows simple behavioral rules
  • Contributes to emergent collective behavior

Implementation sketch:

class SwarmAgent:
    def __init__(self, name: str, specialty: str, shared_memory):
        self.name = name
        self.specialty = specialty
        self.memory = shared_memory
    
    async def step(self, task_context: dict):
        # Read what others have contributed
        others_work = await self.memory.read_recent(exclude=self.name)
        
        # Decide if I should contribute
        if self._should_contribute(task_context, others_work):
            my_contribution = await self._do_work(task_context, others_work)
            await self.memory.write(self.name, my_contribution)
            
        # Check if task is complete
        return self._evaluate_completion(task_context, others_work)
    
    def _should_contribute(self, task, others):
        """Local decision based on specialty and current state"""
        # Does my specialty apply?
        # Have others covered this already?
        # Is there a gap I can fill?
        pass

async def run_swarm(agents: list, task: dict, max_rounds: int = 5):
    shared_memory = SwarmMemory()
    
    for round in range(max_rounds):
        results = await asyncio.gather(*[
            agent.step(task) for agent in agents
        ])
        
        if all(r.get("complete") for r in results):
            break
    
    return shared_memory.synthesize()

Swarm characteristics:

  • No single point of failure
  • Scales horizontally
  • Emergent behavior from local interactions
  • Harder to debug and predict

When to use:

  • Brainstorming and ideation (multiple perspectives)
  • Problems benefiting from parallel exploration
  • Systems requiring fault tolerance
  • When you can't predict the optimal coordination structure

Failure mode: Coordination chaos. Without structure, agents may duplicate work, contradict each other, or never converge. Anthropic's early multi-agent experiments saw agents spawning 50 subagents for simple queries, searching endlessly for nonexistent sources, and distracting each other with excessive updates. Mitigate with round limits, contribution rules, and convergence criteria.

Pattern 5: Network (Full Mesh)

Every agent can communicate with every other agent:

class NetworkOrchestrator:
    def __init__(self, agents: dict):
        self.agents = agents  # name -> agent
        self.message_bus = MessageBus()
    
    async def run(self, task: dict):
        # Any agent can send to any other
        for name, agent in self.agents.items():
            agent.set_message_handler(
                lambda msg, sender: self.route_message(msg, sender, name)
            )
        
        # Kick off with the task
        await self.message_bus.broadcast(task)
        
        # Run until termination condition
        while not self._is_complete():
            messages = await self.message_bus.collect()
            await self._process_messages(messages)
    
    def route_message(self, message, sender, recipient):
        """Route message from sender to recipient"""
        self.message_bus.send(recipient, message, sender)

When to use:

  • Dynamic interaction patterns
  • When agents need to negotiate directly
  • Problems where the communication structure can't be predetermined

Failure mode: Communication explosion. N agents means N² possible connections. At scale, message volume can overwhelm the system. Mitigate with message prioritization, rate limiting, and topic-based filtering.

State Management

Multi-agent systems need shared state. Without it, agents operate on different versions of reality and make conflicting decisions.

Think of it like a shared document. If every team member works on their own copy, you end up with chaos when trying to merge. Same problem with agents. They need a single source of truth for what's been decided, what's been done, and what the current goal is.

State Architecture Options

1. Centralized State Store

All agents read from and write to a single state object:

class CentralState:
    def __init__(self):
        self._state = {}
        self._lock = asyncio.Lock()
        self._version = 0
    
    async def read(self, key: str):
        async with self._lock:
            return self._state.get(key), self._version
    
    async def write(self, key: str, value, expected_version: int):
        async with self._lock:
            if self._version != expected_version:
                raise OptimisticLockError("State changed")
            self._state[key] = value
            self._version += 1

Pros: Simple, consistent, easy to debug Cons: Bottleneck, single point of failure

2. Event-Sourced State

State changes are recorded as immutable events:

class EventStore:
    def __init__(self):
        self.events = []
    
    def append(self, event: dict):
        self.events.append({
            **event,
            "timestamp": time.time(),
            "sequence": len(self.events)
        })
    
    def replay(self, up_to: int = None) -> dict:
        """Reconstruct state by replaying events"""
        state = {}
        events = self.events[:up_to] if up_to else self.events
        for event in events:
            state = self._apply_event(state, event)
        return state
    
    def _apply_event(self, state, event):
        if event["type"] == "SET":
            state[event["key"]] = event["value"]
        elif event["type"] == "DELETE":
            state.pop(event["key"], None)
        return state

Pros: Full audit trail, time travel, easy replication Cons: Storage grows indefinitely, replay can be slow

3. Distributed State with Consensus

Each agent maintains local state, synchronized via consensus protocol:

class DistributedState:
    def __init__(self, agent_id: str, peers: list):
        self.agent_id = agent_id
        self.peers = peers
        self.local_state = {}
        self.vector_clock = {p: 0 for p in peers}
    
    async def propose(self, key: str, value):
        """Propose a state change to the cluster"""
        proposal = {
            "key": key,
            "value": value,
            "proposer": self.agent_id,
            "clock": self.vector_clock.copy()
        }
        
        # Collect votes from peers
        votes = await asyncio.gather(*[
            self._request_vote(peer, proposal) 
            for peer in self.peers
        ])
        
        if sum(votes) > len(self.peers) / 2:
            await self._commit(proposal)
            return True
        return False

Pros: No single point of failure, scales horizontally Cons: Complex, eventual consistency, network partition handling

Choosing a State Architecture

Requirement Recommended Architecture
Simple coordination Centralized
Audit/compliance needs Event-sourced
High availability Distributed
< 5 agents Centralized
5-20 agents Event-sourced with central store
20+ agents Distributed

Conflict Resolution

When multiple agents write conflicting values, the system needs a policy. This happens more often than you'd expect. Two agents analyze the same data and reach different conclusions. One agent updates a field while another is still reading it. The system needs rules for who wins.

Last-Writer-Wins (Usually Wrong)

def resolve_lww(conflicts: list) -> any:
    return max(conflicts, key=lambda c: c["timestamp"])["value"]

Simple but dangerous. A less competent agent can overwrite a better assessment. Only use when conflicts are genuinely equivalent.

Role-Based Priority

ROLE_PRIORITY = {
    "supervisor": 100,
    "specialist": 50,
    "worker": 10
}

def resolve_by_role(conflicts: list) -> any:
    return max(conflicts, key=lambda c: ROLE_PRIORITY.get(c["role"], 0))["value"]

Supervisors override workers. Specialists override generalists. Explicit hierarchy.

Confidence-Weighted Voting

def resolve_by_confidence(conflicts: list) -> any:
    # Group by value
    votes = defaultdict(float)
    for conflict in conflicts:
        votes[conflict["value"]] += conflict["confidence"]
    
    # Return highest confidence value
    return max(votes.items(), key=lambda x: x[1])[0]

Agents report confidence with their writes. Higher confidence carries more weight.

Human Escalation

def resolve_with_escalation(conflicts: list, threshold: float = 0.3) -> any:
    # If confidence spread is too wide, escalate
    confidences = [c["confidence"] for c in conflicts]
    if max(confidences) - min(confidences) > threshold:
        return interrupt({
            "type": "conflict_resolution",
            "conflicts": conflicts,
            "message": "Agents disagree significantly. Please resolve."
        })
    
    # Otherwise use confidence voting
    return resolve_by_confidence(conflicts)

When agents strongly disagree, bring in a human. This is especially important for high-stakes decisions.

Provenance Tracking

Whatever resolution strategy you use, track metadata:

@dataclass
class StateEntry:
    value: any
    author_agent: str
    timestamp: float
    confidence: float
    evidence: list[str]
    supersedes: str | None  # ID of entry this replaced

When conflicts arise, you can inspect provenance to understand why and make better decisions.

Memory Engineering

Memory is the foundation of multi-agent coordination. Without proper memory architecture, agents operate on stale or inconsistent information.

Anthropic's early multi-agent experiments hit this hard. Agents spawned 50 subagents for simple queries, searched endlessly for nonexistent sources, and distracted each other with excessive updates. The problem wasn't communication. It was memory. Agents couldn't remember what had already been tried, what was already known, or what decisions had already been made.

Memory Types

Working Memory (Short-term) Current task context, recent messages, immediate state. Stored in the graph state or conversation history. Cleared between sessions.

Episodic Memory (Session) What happened during this task execution. Stored in checkpoints. Enables resume and time travel.

Semantic Memory (Long-term) Facts, procedures, learned patterns. Persisted to database. Shared across sessions.

Multi-Agent Memory Patterns

Blackboard Pattern A shared workspace where agents read and write:

class Blackboard:
    def __init__(self):
        self.entries = {}
        self.watchers = defaultdict(list)
    
    def write(self, agent: str, key: str, value: any):
        self.entries[key] = {
            "value": value,
            "author": agent,
            "timestamp": time.time()
        }
        # Notify watchers
        for callback in self.watchers[key]:
            callback(self.entries[key])
    
    def read(self, key: str) -> any:
        return self.entries.get(key, {}).get("value")
    
    def watch(self, key: str, callback):
        self.watchers[key].append(callback)

Agents subscribe to keys they care about. Updates trigger callbacks.

Hierarchical Summarization Different abstraction levels for different consumers:

class HierarchicalMemory:
    def __init__(self):
        self.detailed = []      # Full entries
        self.summaries = []     # Compressed summaries
        self.global_summary = "" # One-liner
    
    def add(self, entry: dict):
        self.detailed.append(entry)
        
        # Summarize every N entries
        if len(self.detailed) % 10 == 0:
            summary = self._summarize(self.detailed[-10:])
            self.summaries.append(summary)
            self.global_summary = self._summarize_summaries(self.summaries)
    
    def get_for_role(self, role: str):
        if role == "supervisor":
            return self.global_summary
        elif role == "planner":
            return self.summaries
        else:
            return self.detailed[-20:]  # Recent detailed

Supervisors get high-level summaries. Workers get detailed context.

Role-Adaptive Retrieval What you retrieve depends on who's asking:

def retrieve(query: str, agent_role: str, agent_specialty: str):
    # Base retrieval
    results = vector_store.search(query, k=20)
    
    # Filter by relevance to role
    if agent_role == "specialist":
        results = [r for r in results if r.metadata.get("domain") == agent_specialty]
    
    # Rank by agent-specific criteria
    if agent_role == "supervisor":
        results.sort(key=lambda r: r.metadata.get("importance", 0), reverse=True)
    
    return results[:10]

Token Economics

Multi-agent systems are expensive. Anthropic's data shows agents use 4x more tokens than chat, and multi-agent systems use 15x more. Manage this with:

Context Trimming

def trim_for_agent(messages: list, max_tokens: int, agent_role: str):
    # Always keep system message
    system_msg = messages[0]
    
    # Keep more recent context for workers, broader context for supervisors
    if agent_role == "worker":
        keep_recent = int(max_tokens * 0.8)
    else:
        keep_recent = int(max_tokens * 0.5)
    
    trimmed = trim_messages(messages[1:], keep_recent, strategy="last")
    return [system_msg] + trimmed

Selective Loading Don't load everything. Load what the current agent needs:

def load_context_for_agent(agent: Agent, task: dict):
    context = []
    
    # Task-specific context
    context.extend(memory.get_task_context(task["id"]))
    
    # Agent-specialty context
    context.extend(memory.get_specialty_context(agent.specialty))
    
    # Recent inter-agent messages (last 5 only)
    context.extend(message_bus.get_recent(agent.id, limit=5))
    
    return context

Communication Protocols

Agents need to talk to each other. The protocol you choose affects reliability, debuggability, and performance.

Message Passing

Direct communication between agents:

@dataclass
class AgentMessage:
    sender: str
    recipient: str
    type: str  # "request", "response", "broadcast"
    content: dict
    correlation_id: str  # Link request/response pairs
    timestamp: float

class MessageBus:
    def __init__(self):
        self.queues = defaultdict(asyncio.Queue)
    
    async def send(self, message: AgentMessage):
        await self.queues[message.recipient].put(message)
    
    async def receive(self, agent_id: str, timeout: float = None):
        try:
            return await asyncio.wait_for(
                self.queues[agent_id].get(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return None
    
    async def broadcast(self, sender: str, content: dict):
        for recipient in self.queues.keys():
            if recipient != sender:
                await self.send(AgentMessage(
                    sender=sender,
                    recipient=recipient,
                    type="broadcast",
                    content=content,
                    correlation_id=str(uuid4()),
                    timestamp=time.time()
                ))

Handoffs

Transfer conversation control between agents:

from langgraph.prebuilt import create_react_agent
from langgraph.types import Command

def create_agent_with_handoffs(name: str, tools: list, handoff_targets: list):
    """Create an agent that can hand off to other agents"""
    
    @tool
    def transfer_to_agent(target: str, context: str) -> str:
        """Transfer the conversation to another specialist agent.
        
        Args:
            target: Name of the agent to transfer to
            context: Context to pass to the receiving agent
        """
        if target not in handoff_targets:
            return f"Unknown agent: {target}. Available: {handoff_targets}"
        
        return Command(
            goto=target,
            update={"handoff_context": context, "from_agent": name}
        )
    
    return create_react_agent(
        llm,
        tools=tools + [transfer_to_agent],
        name=name
    )

Handoffs are explicit transfers. The sending agent acknowledges it can't handle the request and routes to a better-suited agent.

Tool Calling vs. Handoffs

Aspect Agent as Tool Handoff
Control Orchestrator retains Transfers to specialist
Context Orchestrator manages Specialist takes over
Use case Bounded subtasks Full delegation
Return Result back to orchestrator Specialist responds to user

Use tool calling when the orchestrator should synthesize results. Use handoffs when the specialist should own the interaction.

Failure Handling

Multi-agent systems have more failure modes than single agents. Plan for them.

Timeout and Retry

async def call_agent_with_retry(
    agent,
    input: dict,
    max_retries: int = 3,
    timeout: float = 30.0
):
    for attempt in range(max_retries):
        try:
            result = await asyncio.wait_for(
                agent.ainvoke(input),
                timeout=timeout
            )
            return result
        except asyncio.TimeoutError:
            if attempt < max_retries - 1:
                # Exponential backoff
                await asyncio.sleep(2 ** attempt)
                continue
            raise AgentTimeoutError(f"Agent timed out after {max_retries} attempts")
        except Exception as e:
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)
                continue
            raise

Circuit Breaker

Prevent cascading failures:

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60.0):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.state = "closed"  # closed, open, half-open
        self.last_failure_time = 0
    
    async def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError("Circuit breaker is open")
        
        try:
            result = await func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            if self.failure_count >= self.failure_threshold:
                self.state = "open"
            raise

Graceful Degradation

When a specialist fails, fall back to a generalist:

async def research_with_fallback(query: str):
    try:
        # Try specialist first
        return await research_specialist.ainvoke({"query": query})
    except AgentFailure:
        # Fall back to generalist
        return await general_agent.ainvoke({
            "query": query,
            "context": "Research task (specialist unavailable)"
        })

Health Monitoring

Track agent health metrics:

class AgentHealthMonitor:
    def __init__(self):
        self.metrics = defaultdict(lambda: {
            "success_count": 0,
            "failure_count": 0,
            "total_latency": 0.0,
            "last_success": None,
            "last_failure": None
        })
    
    def record_success(self, agent_id: str, latency: float):
        m = self.metrics[agent_id]
        m["success_count"] += 1
        m["total_latency"] += latency
        m["last_success"] = time.time()
    
    def record_failure(self, agent_id: str, error: str):
        m = self.metrics[agent_id]
        m["failure_count"] += 1
        m["last_failure"] = time.time()
        m["last_error"] = error
    
    def get_health(self, agent_id: str) -> dict:
        m = self.metrics[agent_id]
        total = m["success_count"] + m["failure_count"]
        return {
            "success_rate": m["success_count"] / total if total > 0 else 0,
            "avg_latency": m["total_latency"] / m["success_count"] if m["success_count"] > 0 else 0,
            "healthy": m["success_count"] / total > 0.9 if total > 10 else True
        }

Choosing an Architecture

Scenario Recommended Pattern
Clear task decomposition, need audit trail Supervisor
Large teams (10+), complex domain structure Hierarchical
Bounded specialist subtasks Agents as Tools
Brainstorming, need diverse perspectives Swarm
Dynamic interaction needs Network
Fault tolerance critical Swarm or distributed supervisor
Debugging simplicity needed Supervisor

Most production systems use hybrid approaches: a supervisor orchestrating specialists (agents as tools), with hierarchical organization for large agent counts, and swarm-like parallel exploration for specific subtasks.

Real-World Architectures

Research Agent Team

A working research system with supervisor, web researcher, document analyst, and synthesizer:

from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class ResearchState(TypedDict):
    messages: Annotated[list, add_messages]
    query: str
    web_results: List[dict]
    document_analysis: List[dict]
    synthesis: str
    current_agent: str
    iteration: int

def supervisor(state: ResearchState):
    """Coordinate the research team"""
    query = state["query"]
    web_results = state.get("web_results", [])
    doc_analysis = state.get("document_analysis", [])
    iteration = state.get("iteration", 0)
    
    # Decide next action
    if not web_results:
        return {"current_agent": "web_researcher", "iteration": iteration + 1}
    
    if not doc_analysis and web_results:
        return {"current_agent": "doc_analyst", "iteration": iteration + 1}
    
    if web_results and doc_analysis:
        return {"current_agent": "synthesizer", "iteration": iteration + 1}
    
    return {"current_agent": "done"}

def web_researcher(state: ResearchState):
    """Search the web for relevant information"""
    query = state["query"]
    
    # Use search tools
    results = tavily_search.invoke(query)
    
    return {
        "web_results": results,
        "messages": [{"role": "assistant", "content": f"Found {len(results)} web sources"}]
    }

def doc_analyst(state: ResearchState):
    """Analyze documents and extract key information"""
    web_results = state["web_results"]
    
    analysis = []
    for result in web_results[:5]:  # Analyze top 5
        response = llm.invoke([
            {"role": "system", "content": "Extract key facts, claims, and evidence from this content."},
            {"role": "user", "content": result["content"]}
        ])
        analysis.append({
            "source": result["url"],
            "analysis": response.content
        })
    
    return {"document_analysis": analysis}

def synthesizer(state: ResearchState):
    """Synthesize research into coherent answer"""
    query = state["query"]
    analysis = state["document_analysis"]
    
    analysis_text = "\n\n".join([
        f"Source: {a['source']}\n{a['analysis']}" 
        for a in analysis
    ])
    
    response = llm.invoke([
        {"role": "system", "content": "Synthesize these research findings into a comprehensive answer with citations."},
        {"role": "user", "content": f"Question: {query}\n\nResearch:\n{analysis_text}"}
    ])
    
    return {"synthesis": response.content}

def route(state: ResearchState):
    agent = state.get("current_agent", "")
    if agent == "web_researcher":
        return "web_researcher"
    elif agent == "doc_analyst":
        return "doc_analyst"
    elif agent == "synthesizer":
        return "synthesizer"
    return END

# Build the graph
graph = StateGraph(ResearchState)
graph.add_node("supervisor", supervisor)
graph.add_node("web_researcher", web_researcher)
graph.add_node("doc_analyst", doc_analyst)
graph.add_node("synthesizer", synthesizer)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", route)
graph.add_edge("web_researcher", "supervisor")
graph.add_edge("doc_analyst", "supervisor")
graph.add_edge("synthesizer", END)

research_team = graph.compile()

Code Review Pipeline

Sequential agents for code quality:

class CodeReviewState(TypedDict):
    code: str
    language: str
    security_issues: List[dict]
    performance_issues: List[dict]
    style_issues: List[dict]
    summary: str

def security_reviewer(state: CodeReviewState):
    """Check for security vulnerabilities"""
    code = state["code"]
    
    response = llm.invoke([
        {"role": "system", "content": """You are a security expert. 
        Identify security vulnerabilities: injection, XSS, auth issues, secrets exposure.
        Return JSON: {"issues": [{"severity": "high/medium/low", "line": N, "description": "..."}]}"""},
        {"role": "user", "content": code}
    ])
    
    issues = json.loads(response.content)
    return {"security_issues": issues.get("issues", [])}

def performance_reviewer(state: CodeReviewState):
    """Identify performance problems"""
    code = state["code"]
    
    response = llm.invoke([
        {"role": "system", "content": """You are a performance engineer.
        Identify: N+1 queries, memory leaks, inefficient algorithms, blocking calls.
        Return JSON: {"issues": [{"impact": "high/medium/low", "line": N, "description": "..."}]}"""},
        {"role": "user", "content": code}
    ])
    
    issues = json.loads(response.content)
    return {"performance_issues": issues.get("issues", [])}

def style_reviewer(state: CodeReviewState):
    """Check coding standards"""
    code = state["code"]
    language = state["language"]
    
    response = llm.invoke([
        {"role": "system", "content": f"""Review this {language} code for style issues.
        Check: naming conventions, code organization, documentation, idioms.
        Return JSON: {{"issues": [{{"severity": "suggestion/warning", "line": N, "description": "..."}}]}}"""},
        {"role": "user", "content": code}
    ])
    
    issues = json.loads(response.content)
    return {"style_issues": issues.get("issues", [])}

def summarizer(state: CodeReviewState):
    """Create review summary"""
    sec = state.get("security_issues", [])
    perf = state.get("performance_issues", [])
    style = state.get("style_issues", [])
    
    high_severity = len([i for i in sec if i.get("severity") == "high"])
    
    summary = f"""## Code Review Summary
    
**Security**: {len(sec)} issues ({high_severity} high severity)
**Performance**: {len(perf)} issues
**Style**: {len(style)} suggestions

{"⚠️ HIGH SEVERITY SECURITY ISSUES - Do not merge" if high_severity > 0 else "✅ No blocking issues"}
"""
    return {"summary": summary}

# Sequential pipeline
graph = StateGraph(CodeReviewState)
graph.add_node("security", security_reviewer)
graph.add_node("performance", performance_reviewer)
graph.add_node("style", style_reviewer)
graph.add_node("summarize", summarizer)

graph.add_edge(START, "security")
graph.add_edge("security", "performance")
graph.add_edge("performance", "style")
graph.add_edge("style", "summarize")
graph.add_edge("summarize", END)

review_pipeline = graph.compile()

Customer Support Escalation

Agents with handoff capabilities:

def triage_agent(state):
    """Initial contact - classify and route"""
    message = state["messages"][-1].content
    
    # Classify intent
    classification = llm.with_structured_output(TicketClass).invoke([
        {"role": "system", "content": "Classify this support request: billing, technical, account, general"},
        {"role": "user", "content": message}
    ])
    
    if classification.category == "billing":
        return Command(goto="billing_specialist", update={"category": "billing"})
    elif classification.category == "technical":
        return Command(goto="tech_support", update={"category": "technical"})
    elif classification.urgency == "high":
        return Command(goto="escalation", update={"urgency": "high"})
    
    # Handle general queries directly
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

def billing_specialist(state):
    """Handle billing inquiries"""
    # Has access to billing tools
    return create_react_agent(
        llm,
        tools=[lookup_invoice, process_refund, update_payment],
        system_prompt="You are a billing specialist..."
    ).invoke(state)

def tech_support(state):
    """Handle technical issues"""
    # Has access to diagnostic tools
    return create_react_agent(
        llm,
        tools=[check_service_status, lookup_logs, create_ticket],
        system_prompt="You are technical support..."
    ).invoke(state)

def escalation(state):
    """Escalate to human"""
    ticket = create_support_ticket(state)
    
    return {
        "messages": [{
            "role": "assistant",
            "content": f"I've escalated your issue to a specialist. Ticket #{ticket.id}. You'll hear back within 2 hours."
        }],
        "escalated": True,
        "ticket_id": ticket.id
    }

Debugging Multi-Agent Systems

Trace Visualization

Track agent interactions:

class AgentTracer:
    def __init__(self):
        self.traces = []
    
    def trace(self, agent: str, action: str, input: dict, output: dict, latency: float):
        self.traces.append({
            "timestamp": time.time(),
            "agent": agent,
            "action": action,
            "input_summary": self._summarize(input),
            "output_summary": self._summarize(output),
            "latency_ms": latency * 1000
        })
    
    def _summarize(self, data: dict) -> str:
        """Truncate for readability"""
        s = str(data)
        return s[:200] + "..." if len(s) > 200 else s
    
    def print_trace(self):
        for t in self.traces:
            print(f"[{t['agent']}] {t['action']} ({t['latency_ms']:.0f}ms)")
            print(f"  In: {t['input_summary']}")
            print(f"  Out: {t['output_summary']}")
            print()
    
    def to_mermaid(self) -> str:
        """Generate sequence diagram"""
        lines = ["sequenceDiagram"]
        for i, t in enumerate(self.traces):
            if i > 0:
                prev = self.traces[i-1]["agent"]
                curr = t["agent"]
                lines.append(f"    {prev}->>+{curr}: {t['action']}")
        return "\n".join(lines)

Replay Testing

Test with recorded traces:

class AgentReplayTest:
    def __init__(self, recorded_traces: list):
        self.traces = recorded_traces
        self.mock_responses = {}
    
    def mock_agent(self, agent_name: str, response: dict):
        self.mock_responses[agent_name] = response
    
    def replay(self, graph, initial_state: dict):
        """Replay with mocked agent responses"""
        with patch_agents(self.mock_responses):
            result = graph.invoke(initial_state)
        return result
    
    def compare(self, expected: dict, actual: dict) -> list:
        """Diff expected vs actual"""
        diffs = []
        for key in expected:
            if key not in actual:
                diffs.append(f"Missing key: {key}")
            elif expected[key] != actual[key]:
                diffs.append(f"Diff in {key}: expected {expected[key]}, got {actual[key]}")
        return diffs

Production Checklist

Before deploying multi-agent systems:

Architecture

  • [ ] Defined agent roles and boundaries
  • [ ] Chosen appropriate orchestration pattern
  • [ ] Mapped communication flows
  • [ ] Identified single points of failure

State Management

  • [ ] State schema defined
  • [ ] Conflict resolution policy chosen
  • [ ] Persistence strategy selected
  • [ ] Provenance tracking implemented

Reliability

  • [ ] Timeout and retry logic
  • [ ] Circuit breakers for external calls
  • [ ] Graceful degradation paths
  • [ ] Health monitoring

Observability

  • [ ] Structured logging per agent
  • [ ] Trace correlation across agents
  • [ ] Token usage tracking
  • [ ] Latency monitoring

Governance

  • [ ] Human-in-the-loop checkpoints
  • [ ] Approval workflows for risky actions
  • [ ] Audit trail for decisions
  • [ ] Role-based access control

Frequently Asked Questions

When should I use multi-agent instead of a single agent?

Use multi-agent when your single agent hits one of these limits: context window overflow from too many tools or too much history, poor performance because the agent is trying to be a generalist at everything, or tasks that would benefit from parallel execution. If your agent handles requests well with a focused prompt and a handful of tools, stick with single-agent. Multi-agent adds 15x token overhead, so the coordination cost needs to pay off in better results.

What's the simplest multi-agent architecture to start with?

Supervisor pattern with two specialists. One orchestrator decides which specialist handles each subtask. Start here because it's easy to debug (you can trace every decision the supervisor makes), easy to extend (add more specialists as needed), and covers most use cases. Graduate to hierarchical only when you have 10+ agents and need multiple coordination levels.

How do I prevent agents from contradicting each other?

Three approaches work in practice. First, give each agent a distinct scope with no overlap. The researcher finds information, the analyst interprets it, the writer presents it. Clear boundaries prevent conflicts. Second, use a supervisor to sequence work so agents build on each other's output rather than working in parallel on the same thing. Third, if parallel work is necessary, implement explicit conflict resolution: confidence-weighted voting, role-based priority, or human escalation when agents disagree significantly.

Should agents share memory or have separate contexts?

Both, with clear boundaries. Shared memory works for task state: what's been accomplished, what decisions were made, what the current goal is. Separate contexts work for agent-specific reasoning: the researcher's search strategy, the analyst's interpretation framework. The mistake is putting everything in shared memory (context overflow) or keeping everything separate (agents can't coordinate). Use hierarchical summarization: detailed context stays local, high-level summaries go to shared state.

How do I handle agent failures without losing work?

Checkpoint aggressively. Save state after every significant step, not just at the end. When an agent fails, you can resume from the last checkpoint instead of starting over. For critical paths, run redundant agents and take the first successful result. For non-critical work, implement graceful degradation: if the specialist fails, fall back to a generalist that can produce acceptable (if not optimal) results.

What's the difference between handoffs and tool calling?

Tool calling keeps the orchestrator in control. The orchestrator calls a specialist, gets a result, and decides what to do next. The specialist does bounded work and returns. Handoffs transfer control entirely. The specialist takes over the conversation and responds directly to the user. Use tool calling when you need to synthesize results from multiple specialists. Use handoffs when one specialist should own the entire interaction for their domain.

How many agents is too many?

Coordination overhead grows faster than agent count. With 3 agents, you have 3 potential communication paths. With 10 agents, you have 45. With 20, you have 190. Most production systems stay under 10 agents for a single task. If you need more, use hierarchical organization: teams of 3-5 agents each, coordinated by team supervisors, coordinated by a top-level orchestrator. This keeps any single coordination layer manageable.

Can I use different LLMs for different agents?

Yes, and you probably should. Use capable models (GPT-4, Claude Opus) for orchestrators and complex reasoning. Use faster, cheaper models (GPT-4o-mini, Claude Haiku, Mistral) for specialists doing focused tasks. Anthropic's research showed their best results came from Opus orchestrating Sonnet subagents. The orchestrator needs judgment; specialists need speed and focus. Match model capability to task complexity.

How do I debug a multi-agent system when something goes wrong?

Trace everything. Log every agent invocation with input state, output state, and latency. Use correlation IDs to link related calls across agents. When something fails, you should be able to reconstruct the exact sequence: which agent was called, what it received, what it returned, and where the chain broke. Tools like LangSmith provide this out of the box. Without tracing, multi-agent debugging becomes guesswork.

What frameworks support multi-agent orchestration?

LangGraph handles graph-based workflows with explicit state management and is good for complex, branching logic. CrewAI uses role-based agents with simpler configuration and is faster to prototype. AutoGen from Microsoft focuses on conversational multi-agent patterns. AWS Strands provides patterns for agents as tools and swarms. All of these support the core patterns (supervisor, hierarchical, swarm) with different trade-offs in flexibility versus ease of use.

How do I test multi-agent systems?

Test at three levels. Unit test individual agents in isolation: given this input state, does the agent produce the expected output? Integration test agent pairs: when the researcher hands off to the analyst, does the interface work correctly? End-to-end test the full system: given a realistic task, does the system produce acceptable results within time and cost budgets? Use recorded traces to create regression tests. When you fix a bug, add the failing case to your test suite.

What's the cost difference between single-agent and multi-agent?

Expect 10-20x higher token usage for multi-agent systems. Each agent call includes system prompts, context, and reasoning. Coordination messages add overhead. Parallel execution means paying for multiple simultaneous calls. This cost is justified when multi-agent produces significantly better results or enables tasks that single-agent can't handle. Track cost per task, not just cost per token. If multi-agent completes in one attempt what single-agent fails at in five, multi-agent might be cheaper despite higher token usage.

Summary

Multi-agent orchestration is coordination engineering. The patterns are straightforward: supervisor for centralized control, hierarchical for scale, swarm for distributed exploration. The hard parts are state management, conflict resolution, and failure handling.

Start simple. A supervisor with two or three specialists handles most use cases. Add hierarchy when you hit coordination limits. Consider swarms when you need fault tolerance or diverse exploration.

Memory architecture matters more than communication architecture. Agents that share state effectively outperform agents that communicate constantly but can't remember what was decided. Invest in memory engineering before optimizing message passing.

For teams building multi-agent systems that need enterprise reliability, Prem Studio provides fine-tuning and evaluation tools to test agent performance before deployment. When smaller specialized models outperform larger general models, fine-tuning becomes the competitive advantage in agentic workflows.

Subscribe to Prem AI

Don’t miss out on the latest issues. Sign up now to get access to the library of members-only issues.
[email protected]
Subscribe