跳转至

通信协议与消息传递

引言

多智能体系统中,Agent 之间的通信是协作的基础。从经典的 FIPA-ACL 结构化消息到 LLM Agent 之间的自然语言对话,通信机制的选择直接影响系统的效率和能力。

通信范式

1. 共享内存 / 黑板模型(Blackboard)

所有 Agent 通过共享的"黑板"交换信息:

class Blackboard:
    """共享黑板通信模型"""

    def __init__(self):
        self.data = {}
        self.history = []

    def write(self, agent_id, key, value):
        """Agent 向黑板写入信息"""
        self.data[key] = {
            "value": value,
            "author": agent_id,
            "timestamp": datetime.now(),
        }
        self.history.append({
            "action": "write",
            "agent": agent_id,
            "key": key,
            "value": value,
        })

    def read(self, key=None):
        """读取黑板上的信息"""
        if key:
            return self.data.get(key)
        return self.data

    def subscribe(self, key_pattern, callback):
        """订阅黑板上特定信息的变化"""
        # 当匹配的 key 被更新时触发回调
        pass

# 使用示例
board = Blackboard()
board.write("researcher", "search_results", results)
board.write("analyst", "analysis", analysis)
summary = board.read("analysis")

优点:解耦、灵活、支持异步 缺点:缺乏结构化的对话流程

2. 消息传递(Message Passing)

Agent 之间直接发送消息:

class Message:
    def __init__(self, sender, receiver, content, msg_type="inform"):
        self.sender = sender
        self.receiver = receiver
        self.content = content
        self.msg_type = msg_type  # inform, request, propose, accept, reject
        self.timestamp = datetime.now()
        self.id = generate_id()

class MessageBus:
    """消息总线"""

    def __init__(self):
        self.queues = {}  # agent_id -> message queue

    def register(self, agent_id):
        self.queues[agent_id] = asyncio.Queue()

    async def send(self, message: Message):
        """发送消息"""
        if message.receiver in self.queues:
            await self.queues[message.receiver].put(message)

    async def receive(self, agent_id, timeout=None):
        """接收消息"""
        try:
            return await asyncio.wait_for(
                self.queues[agent_id].get(), timeout=timeout
            )
        except asyncio.TimeoutError:
            return None

    async def broadcast(self, sender, content, msg_type="inform"):
        """广播消息"""
        for agent_id in self.queues:
            if agent_id != sender:
                msg = Message(sender, agent_id, content, msg_type)
                await self.send(msg)

3. 自然语言对话

LLM Agent 最自然的通信方式——直接用自然语言对话:

class NaturalLanguageCommunication:
    def __init__(self, agents):
        self.agents = agents
        self.conversation_history = []

    async def dialogue(self, speaker_id, message):
        """Agent 发言"""
        self.conversation_history.append({
            "speaker": speaker_id,
            "content": message,
            "timestamp": datetime.now(),
        })

    async def get_context_for_agent(self, agent_id, window=20):
        """获取 Agent 可见的对话上下文"""
        recent = self.conversation_history[-window:]
        return [
            {"role": "assistant" if msg["speaker"] == agent_id else "user",
             "content": f"[{msg['speaker']}]: {msg['content']}"}
            for msg in recent
        ]

经典协议:FIPA-ACL

FIPA(Foundation for Intelligent Physical Agents)定义了标准的 Agent 通信语言:

核心通信动作(Communicative Acts)

动作 说明 示例
inform 告知信息 "搜索结果显示..."
request 请求执行动作 "请分析这份数据"
query 查询信息 "你知道X的价格吗?"
propose 提出方案 "建议使用方法A"
accept-proposal 接受方案 "同意方案A"
reject-proposal 拒绝方案 "不同意,因为..."
cfp 招标 "谁能完成任务X?"

在 LLM Agent 中的应用

class StructuredMessage:
    """结合 FIPA 概念的结构化消息"""

    def __init__(self, sender, receiver, performative, content, 
                 reply_to=None, conversation_id=None):
        self.sender = sender
        self.receiver = receiver
        self.performative = performative  # FIPA 通信动作
        self.content = content            # 自然语言内容
        self.reply_to = reply_to
        self.conversation_id = conversation_id or generate_id()

    def to_prompt(self):
        """转换为 LLM 可理解的提示"""
        return (
            f"[{self.performative.upper()}] 来自 {self.sender}:\n"
            f"{self.content}"
        )

Google A2A 协议

Task 生命周期

stateDiagram-v2
    [*] --> submitted: 创建任务
    submitted --> working: Agent 开始处理
    working --> working: 进度更新
    working --> input_required: 需要更多信息
    input_required --> working: 提供信息
    working --> completed: 任务完成
    working --> failed: 任务失败
    completed --> [*]
    failed --> [*]

Agent Card

{
    "name": "Data Analyst Agent",
    "description": "专业的数据分析Agent,擅长统计分析和可视化",
    "capabilities": ["data_analysis", "visualization", "statistics"],
    "input_formats": ["csv", "json", "sql"],
    "output_formats": ["text", "image", "json"],
    "url": "https://agent.example.com/analyst",
    "authentication": {
        "type": "oauth2",
        "scopes": ["read", "analyze"]
    }
}

通信模式的选择

同步 vs 异步

模式 适用场景 优点 缺点
同步 简单的请求-响应 简单、确定性 阻塞、低效
异步 长时间任务、并行处理 高效、灵活 复杂、需要回调

点对点 vs 广播

# 点对点:特定 Agent 之间通信
await bus.send(Message(
    sender="manager",
    receiver="developer_1",
    content="请实现登录功能",
    msg_type="request"
))

# 广播:通知所有 Agent
await bus.broadcast(
    sender="manager",
    content="项目需求已更新,请查看最新版本",
    msg_type="inform"
)

# 组播:通知特定组
await bus.multicast(
    sender="manager",
    group="developers",
    content="代码审查会议在 3 点开始",
    msg_type="inform"
)

通信拓扑

graph TB
    subgraph "星形(Hub-Spoke)"
        H[管理者] --> S1[Agent 1]
        H --> S2[Agent 2]
        H --> S3[Agent 3]
    end
graph TB
    subgraph "全连接(Full Mesh)"
        F1[Agent 1] --- F2[Agent 2]
        F1 --- F3[Agent 3]
        F2 --- F3
    end
graph TB
    subgraph "链式(Chain)"
        C1[Agent 1] --> C2[Agent 2] --> C3[Agent 3]
    end

通信效率优化

减少通信轮次

class EfficientCommunication:
    def batch_messages(self, messages):
        """批量发送消息减少通信开销"""
        by_receiver = {}
        for msg in messages:
            if msg.receiver not in by_receiver:
                by_receiver[msg.receiver] = []
            by_receiver[msg.receiver].append(msg)

        for receiver, msgs in by_receiver.items():
            combined = "\n---\n".join([m.content for m in msgs])
            yield Message(
                sender="system",
                receiver=receiver,
                content=combined,
                msg_type="batch"
            )

    def compress_context(self, conversation, max_tokens=2000):
        """压缩对话上下文"""
        if count_tokens(conversation) <= max_tokens:
            return conversation

        # 只保留关键信息
        summary = summarize(conversation)
        return [{"role": "system", "content": f"对话摘要: {summary}"}]

通信协议选择指南

场景 推荐协议 原因
简单任务委托 消息传递 直接高效
复杂协作任务 自然语言 + 黑板 灵活且信息共享
跨系统 Agent 通信 A2A 标准化互操作
Agent-工具通信 MCP 专为工具设计
高并发场景 消息队列 可扩展、解耦

延伸阅读

  • MCP 与工具协议 - Agent 与工具的通信协议
  • FIPA. "Agent Communication Language Specifications"
  • Google. "Agent-to-Agent Protocol" Specification
  • Guo, T., et al. (2024). "Large Language Model based Multi-Agents: A Survey"

评论 #