通信协议与消息传递
引言
多智能体系统中,Agent 之间的通信是协作的基础。从经典的 FIPA-ACL 结构化消息到 LLM Agent 之间的自然语言对话,通信机制的选择直接影响系统的效率和能力。
通信范式
1. 共享内存 / 黑板模型(Blackboard)
所有 Agent 通过共享的"黑板"交换信息:
class Blackboard:
"""共享黑板通信模型"""
def __init__(self):
self.data = {}
self.history = []
def write(self, agent_id, key, value):
"""Agent 向黑板写入信息"""
self.data[key] = {
"value": value,
"author": agent_id,
"timestamp": datetime.now(),
}
self.history.append({
"action": "write",
"agent": agent_id,
"key": key,
"value": value,
})
def read(self, key=None):
"""读取黑板上的信息"""
if key:
return self.data.get(key)
return self.data
def subscribe(self, key_pattern, callback):
"""订阅黑板上特定信息的变化"""
# 当匹配的 key 被更新时触发回调
pass
# 使用示例
board = Blackboard()
board.write("researcher", "search_results", results)
board.write("analyst", "analysis", analysis)
summary = board.read("analysis")
优点:解耦、灵活、支持异步 缺点:缺乏结构化的对话流程
2. 消息传递(Message Passing)
Agent 之间直接发送消息:
class Message:
def __init__(self, sender, receiver, content, msg_type="inform"):
self.sender = sender
self.receiver = receiver
self.content = content
self.msg_type = msg_type # inform, request, propose, accept, reject
self.timestamp = datetime.now()
self.id = generate_id()
class MessageBus:
"""消息总线"""
def __init__(self):
self.queues = {} # agent_id -> message queue
def register(self, agent_id):
self.queues[agent_id] = asyncio.Queue()
async def send(self, message: Message):
"""发送消息"""
if message.receiver in self.queues:
await self.queues[message.receiver].put(message)
async def receive(self, agent_id, timeout=None):
"""接收消息"""
try:
return await asyncio.wait_for(
self.queues[agent_id].get(), timeout=timeout
)
except asyncio.TimeoutError:
return None
async def broadcast(self, sender, content, msg_type="inform"):
"""广播消息"""
for agent_id in self.queues:
if agent_id != sender:
msg = Message(sender, agent_id, content, msg_type)
await self.send(msg)
3. 自然语言对话
LLM Agent 最自然的通信方式——直接用自然语言对话:
class NaturalLanguageCommunication:
def __init__(self, agents):
self.agents = agents
self.conversation_history = []
async def dialogue(self, speaker_id, message):
"""Agent 发言"""
self.conversation_history.append({
"speaker": speaker_id,
"content": message,
"timestamp": datetime.now(),
})
async def get_context_for_agent(self, agent_id, window=20):
"""获取 Agent 可见的对话上下文"""
recent = self.conversation_history[-window:]
return [
{"role": "assistant" if msg["speaker"] == agent_id else "user",
"content": f"[{msg['speaker']}]: {msg['content']}"}
for msg in recent
]
经典协议:FIPA-ACL
FIPA(Foundation for Intelligent Physical Agents)定义了标准的 Agent 通信语言:
核心通信动作(Communicative Acts)
| 动作 | 说明 | 示例 |
|---|---|---|
inform |
告知信息 | "搜索结果显示..." |
request |
请求执行动作 | "请分析这份数据" |
query |
查询信息 | "你知道X的价格吗?" |
propose |
提出方案 | "建议使用方法A" |
accept-proposal |
接受方案 | "同意方案A" |
reject-proposal |
拒绝方案 | "不同意,因为..." |
cfp |
招标 | "谁能完成任务X?" |
在 LLM Agent 中的应用
class StructuredMessage:
"""结合 FIPA 概念的结构化消息"""
def __init__(self, sender, receiver, performative, content,
reply_to=None, conversation_id=None):
self.sender = sender
self.receiver = receiver
self.performative = performative # FIPA 通信动作
self.content = content # 自然语言内容
self.reply_to = reply_to
self.conversation_id = conversation_id or generate_id()
def to_prompt(self):
"""转换为 LLM 可理解的提示"""
return (
f"[{self.performative.upper()}] 来自 {self.sender}:\n"
f"{self.content}"
)
Google A2A 协议
Task 生命周期
stateDiagram-v2
[*] --> submitted: 创建任务
submitted --> working: Agent 开始处理
working --> working: 进度更新
working --> input_required: 需要更多信息
input_required --> working: 提供信息
working --> completed: 任务完成
working --> failed: 任务失败
completed --> [*]
failed --> [*]
Agent Card
{
"name": "Data Analyst Agent",
"description": "专业的数据分析Agent,擅长统计分析和可视化",
"capabilities": ["data_analysis", "visualization", "statistics"],
"input_formats": ["csv", "json", "sql"],
"output_formats": ["text", "image", "json"],
"url": "https://agent.example.com/analyst",
"authentication": {
"type": "oauth2",
"scopes": ["read", "analyze"]
}
}
通信模式的选择
同步 vs 异步
| 模式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 同步 | 简单的请求-响应 | 简单、确定性 | 阻塞、低效 |
| 异步 | 长时间任务、并行处理 | 高效、灵活 | 复杂、需要回调 |
点对点 vs 广播
# 点对点:特定 Agent 之间通信
await bus.send(Message(
sender="manager",
receiver="developer_1",
content="请实现登录功能",
msg_type="request"
))
# 广播:通知所有 Agent
await bus.broadcast(
sender="manager",
content="项目需求已更新,请查看最新版本",
msg_type="inform"
)
# 组播:通知特定组
await bus.multicast(
sender="manager",
group="developers",
content="代码审查会议在 3 点开始",
msg_type="inform"
)
通信拓扑
graph TB
subgraph "星形(Hub-Spoke)"
H[管理者] --> S1[Agent 1]
H --> S2[Agent 2]
H --> S3[Agent 3]
end
graph TB
subgraph "全连接(Full Mesh)"
F1[Agent 1] --- F2[Agent 2]
F1 --- F3[Agent 3]
F2 --- F3
end
graph TB
subgraph "链式(Chain)"
C1[Agent 1] --> C2[Agent 2] --> C3[Agent 3]
end
通信效率优化
减少通信轮次
class EfficientCommunication:
def batch_messages(self, messages):
"""批量发送消息减少通信开销"""
by_receiver = {}
for msg in messages:
if msg.receiver not in by_receiver:
by_receiver[msg.receiver] = []
by_receiver[msg.receiver].append(msg)
for receiver, msgs in by_receiver.items():
combined = "\n---\n".join([m.content for m in msgs])
yield Message(
sender="system",
receiver=receiver,
content=combined,
msg_type="batch"
)
def compress_context(self, conversation, max_tokens=2000):
"""压缩对话上下文"""
if count_tokens(conversation) <= max_tokens:
return conversation
# 只保留关键信息
summary = summarize(conversation)
return [{"role": "system", "content": f"对话摘要: {summary}"}]
通信协议选择指南
| 场景 | 推荐协议 | 原因 |
|---|---|---|
| 简单任务委托 | 消息传递 | 直接高效 |
| 复杂协作任务 | 自然语言 + 黑板 | 灵活且信息共享 |
| 跨系统 Agent 通信 | A2A | 标准化互操作 |
| Agent-工具通信 | MCP | 专为工具设计 |
| 高并发场景 | 消息队列 | 可扩展、解耦 |
延伸阅读
- MCP 与工具协议 - Agent 与工具的通信协议
- FIPA. "Agent Communication Language Specifications"
- Google. "Agent-to-Agent Protocol" Specification
- Guo, T., et al. (2024). "Large Language Model based Multi-Agents: A Survey"