Skip to content

Communication Protocols and Message Passing

Introduction

In multi-agent systems, communication between agents is the foundation of collaboration. From the classical FIPA-ACL structured messages to natural language dialogue between LLM agents, the choice of communication mechanism directly impacts system efficiency and capability.

Communication Paradigms

1. Shared Memory / Blackboard Model

All agents exchange information through a shared "blackboard":

class Blackboard:
    """Shared blackboard communication model"""

    def __init__(self):
        self.data = {}
        self.history = []

    def write(self, agent_id, key, value):
        """Agent writes information to the blackboard"""
        self.data[key] = {
            "value": value,
            "author": agent_id,
            "timestamp": datetime.now(),
        }
        self.history.append({
            "action": "write",
            "agent": agent_id,
            "key": key,
            "value": value,
        })

    def read(self, key=None):
        """Read information from the blackboard"""
        if key:
            return self.data.get(key)
        return self.data

    def subscribe(self, key_pattern, callback):
        """Subscribe to changes in specific information on the blackboard"""
        # Trigger callback when a matching key is updated
        pass

# Usage example
board = Blackboard()
board.write("researcher", "search_results", results)
board.write("analyst", "analysis", analysis)
summary = board.read("analysis")

Pros: Decoupled, flexible, supports asynchronous communication Cons: Lacks structured dialogue flow

2. Message Passing

Agents send messages directly to each other:

class Message:
    def __init__(self, sender, receiver, content, msg_type="inform"):
        self.sender = sender
        self.receiver = receiver
        self.content = content
        self.msg_type = msg_type  # inform, request, propose, accept, reject
        self.timestamp = datetime.now()
        self.id = generate_id()

class MessageBus:
    """Message bus"""

    def __init__(self):
        self.queues = {}  # agent_id -> message queue

    def register(self, agent_id):
        self.queues[agent_id] = asyncio.Queue()

    async def send(self, message: Message):
        """Send a message"""
        if message.receiver in self.queues:
            await self.queues[message.receiver].put(message)

    async def receive(self, agent_id, timeout=None):
        """Receive a message"""
        try:
            return await asyncio.wait_for(
                self.queues[agent_id].get(), timeout=timeout
            )
        except asyncio.TimeoutError:
            return None

    async def broadcast(self, sender, content, msg_type="inform"):
        """Broadcast a message"""
        for agent_id in self.queues:
            if agent_id != sender:
                msg = Message(sender, agent_id, content, msg_type)
                await self.send(msg)

3. Natural Language Dialogue

The most natural communication method for LLM agents---direct natural language conversation:

class NaturalLanguageCommunication:
    def __init__(self, agents):
        self.agents = agents
        self.conversation_history = []

    async def dialogue(self, speaker_id, message):
        """Agent speaks"""
        self.conversation_history.append({
            "speaker": speaker_id,
            "content": message,
            "timestamp": datetime.now(),
        })

    async def get_context_for_agent(self, agent_id, window=20):
        """Get the dialogue context visible to an agent"""
        recent = self.conversation_history[-window:]
        return [
            {"role": "assistant" if msg["speaker"] == agent_id else "user",
             "content": f"[{msg['speaker']}]: {msg['content']}"}
            for msg in recent
        ]

Classical Protocol: FIPA-ACL

FIPA (Foundation for Intelligent Physical Agents) defined a standard Agent Communication Language:

Core Communicative Acts

Act Description Example
inform Inform of information "Search results show..."
request Request an action "Please analyze this data"
query Query for information "Do you know the price of X?"
propose Propose a plan "I suggest using method A"
accept-proposal Accept a proposal "Agreed on plan A"
reject-proposal Reject a proposal "Disagree, because..."
cfp Call for proposals "Who can complete task X?"

Application in LLM Agents

class StructuredMessage:
    """Structured message combining FIPA concepts"""

    def __init__(self, sender, receiver, performative, content, 
                 reply_to=None, conversation_id=None):
        self.sender = sender
        self.receiver = receiver
        self.performative = performative  # FIPA communicative act
        self.content = content            # Natural language content
        self.reply_to = reply_to
        self.conversation_id = conversation_id or generate_id()

    def to_prompt(self):
        """Convert to an LLM-understandable prompt"""
        return (
            f"[{self.performative.upper()}] From {self.sender}:\n"
            f"{self.content}"
        )

Google A2A Protocol

Task Lifecycle

stateDiagram-v2
    [*] --> submitted: Create Task
    submitted --> working: Agent Starts Processing
    working --> working: Progress Update
    working --> input_required: More Information Needed
    input_required --> working: Information Provided
    working --> completed: Task Completed
    working --> failed: Task Failed
    completed --> [*]
    failed --> [*]

Agent Card

{
    "name": "Data Analyst Agent",
    "description": "Professional data analysis agent skilled in statistical analysis and visualization",
    "capabilities": ["data_analysis", "visualization", "statistics"],
    "input_formats": ["csv", "json", "sql"],
    "output_formats": ["text", "image", "json"],
    "url": "https://agent.example.com/analyst",
    "authentication": {
        "type": "oauth2",
        "scopes": ["read", "analyze"]
    }
}

Choosing Communication Patterns

Synchronous vs Asynchronous

Mode Use Case Pros Cons
Synchronous Simple request-response Simple, deterministic Blocking, inefficient
Asynchronous Long-running tasks, parallel processing Efficient, flexible Complex, requires callbacks

Point-to-Point vs Broadcast

# Point-to-point: communication between specific agents
await bus.send(Message(
    sender="manager",
    receiver="developer_1",
    content="Please implement the login feature",
    msg_type="request"
))

# Broadcast: notify all agents
await bus.broadcast(
    sender="manager",
    content="Project requirements have been updated, please check the latest version",
    msg_type="inform"
)

# Multicast: notify a specific group
await bus.multicast(
    sender="manager",
    group="developers",
    content="Code review meeting starts at 3 PM",
    msg_type="inform"
)

Communication Topologies

graph TB
    subgraph "Star (Hub-Spoke)"
        H[Manager] --> S1[Agent 1]
        H --> S2[Agent 2]
        H --> S3[Agent 3]
    end
graph TB
    subgraph "Full Mesh"
        F1[Agent 1] --- F2[Agent 2]
        F1 --- F3[Agent 3]
        F2 --- F3
    end
graph TB
    subgraph "Chain"
        C1[Agent 1] --> C2[Agent 2] --> C3[Agent 3]
    end

Communication Efficiency Optimization

Reducing Communication Rounds

class EfficientCommunication:
    def batch_messages(self, messages):
        """Batch messages to reduce communication overhead"""
        by_receiver = {}
        for msg in messages:
            if msg.receiver not in by_receiver:
                by_receiver[msg.receiver] = []
            by_receiver[msg.receiver].append(msg)

        for receiver, msgs in by_receiver.items():
            combined = "\n---\n".join([m.content for m in msgs])
            yield Message(
                sender="system",
                receiver=receiver,
                content=combined,
                msg_type="batch"
            )

    def compress_context(self, conversation, max_tokens=2000):
        """Compress conversation context"""
        if count_tokens(conversation) <= max_tokens:
            return conversation

        # Retain only key information
        summary = summarize(conversation)
        return [{"role": "system", "content": f"Conversation summary: {summary}"}]

Communication Protocol Selection Guide

Scenario Recommended Protocol Reason
Simple task delegation Message passing Direct and efficient
Complex collaborative tasks Natural language + blackboard Flexible with information sharing
Cross-system agent communication A2A Standardized interoperability
Agent-tool communication MCP Designed for tools
High-concurrency scenarios Message queues Scalable, decoupled

Further Reading

  • MCP and Tool Protocols - Communication protocols between agents and tools
  • FIPA. "Agent Communication Language Specifications"
  • Google. "Agent-to-Agent Protocol" Specification
  • Guo, T., et al. (2024). "Large Language Model based Multi-Agents: A Survey"

评论 #