Collaboration Patterns
Introduction
The core of multi-agent systems is how agents collaborate. Different collaboration patterns suit different types of tasks. This section details six common collaboration patterns, providing implementation approaches and applicable scenarios for each.
Pattern 1: Sequential Chain
Agents process tasks in sequence, with each agent receiving the previous agent's output as input.
graph LR
INPUT[Input] --> A1[Agent A<br/>Research]
A1 -->|Research Results| A2[Agent B<br/>Analysis]
A2 -->|Analysis Report| A3[Agent C<br/>Writing]
A3 -->|Draft| A4[Agent D<br/>Review]
A4 --> OUTPUT[Final Output]
Implementation
class SequentialPipeline:
def __init__(self, agents):
self.agents = agents # Ordered list of agents
async def run(self, initial_input):
current = initial_input
for agent in self.agents:
current = await agent.process(current)
return current
# Usage
pipeline = SequentialPipeline([
ResearchAgent(),
AnalysisAgent(),
WritingAgent(),
ReviewAgent(),
])
result = await pipeline.run("Analyze 2024 AI Agent market trends")
Use cases: Document generation pipelines, data processing pipelines, multi-step approval workflows
Pros: Simple and clear, easy to debug
Cons: Cumulative latency, error propagation, no parallelism
Pattern 2: Parallel (Fan-out / Fan-in)
Multiple agents process different subtasks simultaneously, then aggregate results.
graph TB
INPUT[Input Task] --> SPLIT[Task Decomposition]
SPLIT --> A1[Agent A<br/>Search Academic Papers]
SPLIT --> A2[Agent B<br/>Search Industry Reports]
SPLIT --> A3[Agent C<br/>Search News]
A1 --> MERGE[Result Aggregation]
A2 --> MERGE
A3 --> MERGE
MERGE --> OUTPUT[Comprehensive Report]
Implementation
class ParallelFanOut:
def __init__(self, agents, aggregator):
self.agents = agents
self.aggregator = aggregator
async def run(self, task):
# Fan-out: parallel execution
tasks = [agent.process(task) for agent in self.agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter failed results
valid_results = [r for r in results if not isinstance(r, Exception)]
# Fan-in: aggregation
return await self.aggregator.aggregate(valid_results)
# Usage
research = ParallelFanOut(
agents=[AcademicSearchAgent(), IndustryReportAgent(), NewsSearchAgent()],
aggregator=SynthesisAgent(),
)
report = await research.run("Current state of AI Agent technology")
Use cases: Multi-source information gathering, distributed search, parallel testing
Pros: Low latency (depends on the slowest agent), high throughput
Cons: Requires aggregation logic, high resource consumption
Pattern 3: Hierarchical (Manager-Worker)
A manager agent handles task decomposition and assignment; worker agents handle execution.
graph TB
PM[Project Manager Agent] --> ARCH[Architect Agent]
PM --> DEV1[Developer Agent 1]
PM --> DEV2[Developer Agent 2]
PM --> QA[Test Agent]
ARCH -->|Design| PM
DEV1 -->|Code| PM
DEV2 -->|Code| PM
QA -->|Test Report| PM
PM --> RESULT[Final Deliverable]
Implementation
class HierarchicalSystem:
def __init__(self, manager, workers):
self.manager = manager
self.workers = {w.name: w for w in workers}
async def run(self, task):
# Manager decomposes the task
subtasks = await self.manager.decompose(task)
results = {}
for subtask in subtasks:
# Manager assigns the task
worker_name = subtask["assigned_to"]
worker = self.workers[worker_name]
# Worker executes
result = await worker.process(subtask["description"])
results[worker_name] = result
# Manager reviews
feedback = await self.manager.review(subtask, result)
if feedback["needs_revision"]:
result = await worker.revise(result, feedback["comments"])
results[worker_name] = result
# Manager integrates
return await self.manager.synthesize(results)
Use cases: Software development, project management, complex task coordination
Pros: Clear division of responsibilities, scalable
Cons: Manager is a bottleneck, communication overhead
Pattern 4: Debate (Adversarial)
Agents argue from different perspectives, improving conclusion quality through debate.
graph TB
TOPIC[Debate Topic] --> PRO[Proponent Agent]
TOPIC --> CON[Opponent Agent]
PRO -->|Arguments| JUDGE[Judge Agent]
CON -->|Arguments| JUDGE
JUDGE -->|More Arguments Needed| PRO
JUDGE -->|More Arguments Needed| CON
JUDGE --> VERDICT[Final Conclusion]
Implementation
class DebateSystem:
def __init__(self, proponent, opponent, judge, max_rounds=3):
self.proponent = proponent
self.opponent = opponent
self.judge = judge
self.max_rounds = max_rounds
async def debate(self, topic):
history = []
for round_num in range(self.max_rounds):
# Proponent argues
pro_arg = await self.proponent.argue(topic, history)
history.append({"side": "pro", "argument": pro_arg})
# Opponent rebuts
con_arg = await self.opponent.argue(topic, history)
history.append({"side": "con", "argument": con_arg})
# Judge evaluates
evaluation = await self.judge.evaluate(topic, history)
if evaluation["sufficient"]:
break
# Judge makes final ruling
verdict = await self.judge.verdict(topic, history)
return verdict
# Usage
debate = DebateSystem(
proponent=Agent(system="Argue why microservices architecture should be adopted"),
opponent=Agent(system="Argue why monolithic architecture should be adopted"),
judge=Agent(system="You are a technical architecture review expert. Make an objective judgment based on both sides' arguments"),
)
result = await debate.debate("Should the new project use microservices or monolithic architecture?")
Use cases: Decision analysis, viewpoint validation, red-blue teaming
Pros: Multi-perspective thinking, reduces bias
Cons: Many communication rounds, may reach a deadlock
Pattern 5: Market-Based (Auction)
Agents bid for tasks, similar to a market economy:
class TaskAuction:
"""Auction-based task assignment"""
async def auction(self, task, agents):
# 1. Publish task (Call for Proposals)
bids = []
for agent in agents:
bid = await agent.bid(task)
if bid:
bids.append({
"agent": agent,
"price": bid["estimated_cost"],
"time": bid["estimated_time"],
"confidence": bid["confidence"],
})
# 2. Evaluate bids
best_bid = self.evaluate_bids(bids)
# 3. Assign task
result = await best_bid["agent"].execute(task)
return result
def evaluate_bids(self, bids):
"""Comprehensive bid evaluation"""
for bid in bids:
bid["score"] = (
bid["confidence"] * 0.5 +
(1 / bid["time"]) * 0.3 +
(1 / bid["price"]) * 0.2
)
return max(bids, key=lambda b: b["score"])
Use cases: Resource allocation, task scheduling, load balancing
Pattern 6: Voting
Multiple agents make independent decisions, aggregated through voting:
class VotingSystem:
async def majority_vote(self, question, agents):
"""Majority voting"""
votes = []
for agent in agents:
answer = await agent.answer(question)
votes.append(answer)
# Count votes
from collections import Counter
vote_counts = Counter(votes)
winner = vote_counts.most_common(1)[0]
return {
"answer": winner[0],
"votes": winner[1],
"total": len(votes),
"confidence": winner[1] / len(votes),
}
async def weighted_vote(self, question, agents):
"""Weighted voting (based on agent expertise)"""
weighted_answers = {}
for agent in agents:
answer = await agent.answer(question)
weight = agent.expertise_score # Expertise weight
if answer not in weighted_answers:
weighted_answers[answer] = 0
weighted_answers[answer] += weight
winner = max(weighted_answers.items(), key=lambda x: x[1])
return {"answer": winner[0], "weighted_score": winner[1]}
Use cases: Quality assurance (multi-model voting), decision making
Pattern Comparison
| Pattern | Latency | Communication | Complexity | Quality Improvement | Suitable Tasks |
|---|---|---|---|---|---|
| Sequential | High | Low | Low | Medium | Pipeline tasks |
| Parallel | Low | Medium | Medium | Medium | Parallelizable tasks |
| Hierarchical | Medium | High | High | High | Complex projects |
| Debate | High | High | Medium | High | Decision analysis |
| Market-based | Medium | Medium | Medium | Medium | Resource allocation |
| Voting | Low | Low | Low | Medium | Quality assurance |
Hybrid Patterns
Real systems typically combine multiple patterns:
class HybridSystem:
"""Hybrid system combining hierarchical + parallel + voting"""
async def run(self, task):
# 1. Manager decomposes task
subtasks = await self.manager.decompose(task)
# 2. Execute subtasks in parallel (each subtask uses multi-agent voting)
results = {}
for subtask in subtasks:
# 3 agents execute the same subtask in parallel
answers = await asyncio.gather(*[
agent.process(subtask) for agent in self.get_workers(subtask, n=3)
])
# Vote for the best result
results[subtask["id"]] = majority_vote(answers)
# 3. Manager integrates
return await self.manager.synthesize(results)
Further Reading
- Multi-Agent Frameworks - Frameworks implementing these collaboration patterns
- Wu, Q., et al. (2023). "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation"
- Hong, S., et al. (2023). "MetaGPT: Meta Programming for Multi-Agent Collaborative Framework"