跳转至

协作模式

引言

多智能体系统的核心是 Agent 之间如何协作。不同的协作模式适用于不同类型的任务。本节详细介绍六种常见的协作模式,并给出每种模式的实现方案和适用场景。

模式 1:顺序链(Sequential / Chain)

Agent 按顺序依次处理任务,每个 Agent 接收上一个的输出作为输入。

graph LR
    INPUT[输入] --> A1[Agent A<br/>研究]
    A1 -->|研究结果| A2[Agent B<br/>分析]
    A2 -->|分析报告| A3[Agent C<br/>写作]
    A3 -->|初稿| A4[Agent D<br/>审核]
    A4 --> OUTPUT[最终输出]

实现

class SequentialPipeline:
    def __init__(self, agents):
        self.agents = agents  # 有序的 Agent 列表

    async def run(self, initial_input):
        current = initial_input
        for agent in self.agents:
            current = await agent.process(current)
        return current

# 使用
pipeline = SequentialPipeline([
    ResearchAgent(),
    AnalysisAgent(),
    WritingAgent(),
    ReviewAgent(),
])
result = await pipeline.run("分析 2024 年 AI Agent 市场趋势")

适用场景:文档生成流水线、数据处理管道、多步骤审批流程

优点:简单清晰、易于调试

缺点:延迟累积、错误传播、无法并行

模式 2:并行(Fan-out / Fan-in)

多个 Agent 同时处理不同的子任务,然后汇总结果。

graph TB
    INPUT[输入任务] --> SPLIT[任务分解]
    SPLIT --> A1[Agent A<br/>搜索学术论文]
    SPLIT --> A2[Agent B<br/>搜索行业报告]
    SPLIT --> A3[Agent C<br/>搜索新闻]
    A1 --> MERGE[结果汇总]
    A2 --> MERGE
    A3 --> MERGE
    MERGE --> OUTPUT[综合报告]

实现

class ParallelFanOut:
    def __init__(self, agents, aggregator):
        self.agents = agents
        self.aggregator = aggregator

    async def run(self, task):
        # Fan-out: 并行执行
        tasks = [agent.process(task) for agent in self.agents]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 过滤失败的结果
        valid_results = [r for r in results if not isinstance(r, Exception)]

        # Fan-in: 汇总
        return await self.aggregator.aggregate(valid_results)

# 使用
research = ParallelFanOut(
    agents=[AcademicSearchAgent(), IndustryReportAgent(), NewsSearchAgent()],
    aggregator=SynthesisAgent(),
)
report = await research.run("AI Agent 技术现状")

适用场景:多源信息收集、分布式搜索、并行测试

优点:低延迟(取决于最慢的 Agent)、高吞吐

缺点:需要汇总逻辑、资源消耗大

模式 3:层级式(Hierarchical / Manager-Worker)

管理者 Agent 负责任务分解和分配,工作者 Agent 负责执行。

graph TB
    PM[项目经理 Agent] --> ARCH[架构师 Agent]
    PM --> DEV1[开发者 Agent 1]
    PM --> DEV2[开发者 Agent 2]
    PM --> QA[测试 Agent]

    ARCH -->|设计方案| PM
    DEV1 -->|代码| PM
    DEV2 -->|代码| PM
    QA -->|测试报告| PM

    PM --> RESULT[最终交付]

实现

class HierarchicalSystem:
    def __init__(self, manager, workers):
        self.manager = manager
        self.workers = {w.name: w for w in workers}

    async def run(self, task):
        # 管理者分解任务
        subtasks = await self.manager.decompose(task)

        results = {}
        for subtask in subtasks:
            # 管理者分配任务
            worker_name = subtask["assigned_to"]
            worker = self.workers[worker_name]

            # 工作者执行
            result = await worker.process(subtask["description"])
            results[worker_name] = result

            # 管理者审核
            feedback = await self.manager.review(subtask, result)
            if feedback["needs_revision"]:
                result = await worker.revise(result, feedback["comments"])
                results[worker_name] = result

        # 管理者整合
        return await self.manager.synthesize(results)

适用场景:软件开发、项目管理、复杂任务协调

优点:清晰的职责分工、可扩展

缺点:管理者是瓶颈、通信开销

模式 4:辩论式(Debate / Adversarial)

Agent 从不同角度论证,通过辩论提升结论质量。

graph TB
    TOPIC[辩论主题] --> PRO[正方 Agent]
    TOPIC --> CON[反方 Agent]
    PRO -->|论点| JUDGE[裁判 Agent]
    CON -->|论点| JUDGE
    JUDGE -->|需要更多论证| PRO
    JUDGE -->|需要更多论证| CON
    JUDGE --> VERDICT[最终结论]

实现

class DebateSystem:
    def __init__(self, proponent, opponent, judge, max_rounds=3):
        self.proponent = proponent
        self.opponent = opponent
        self.judge = judge
        self.max_rounds = max_rounds

    async def debate(self, topic):
        history = []

        for round_num in range(self.max_rounds):
            # 正方论证
            pro_arg = await self.proponent.argue(topic, history)
            history.append({"side": "pro", "argument": pro_arg})

            # 反方反驳
            con_arg = await self.opponent.argue(topic, history)
            history.append({"side": "con", "argument": con_arg})

            # 裁判评估
            evaluation = await self.judge.evaluate(topic, history)
            if evaluation["sufficient"]:
                break

        # 裁判做出最终判断
        verdict = await self.judge.verdict(topic, history)
        return verdict

# 使用
debate = DebateSystem(
    proponent=Agent(system="你要论证为什么应该采用微服务架构"),
    opponent=Agent(system="你要论证为什么应该采用单体架构"),
    judge=Agent(system="你是技术架构评审专家,基于双方论点做出客观判断"),
)
result = await debate.debate("公司新项目应该采用微服务还是单体架构?")

适用场景:决策分析、观点验证、红蓝对抗

优点:多角度思考、减少偏见

缺点:通信轮次多、可能陷入僵局

模式 5:市场机制(Market-based / Auction)

Agent 通过竞标获取任务,类似市场经济:

class TaskAuction:
    """基于拍卖的任务分配"""

    async def auction(self, task, agents):
        # 1. 发布任务(Call for Proposals)
        bids = []
        for agent in agents:
            bid = await agent.bid(task)
            if bid:
                bids.append({
                    "agent": agent,
                    "price": bid["estimated_cost"],
                    "time": bid["estimated_time"],
                    "confidence": bid["confidence"],
                })

        # 2. 评估出价
        best_bid = self.evaluate_bids(bids)

        # 3. 分配任务
        result = await best_bid["agent"].execute(task)
        return result

    def evaluate_bids(self, bids):
        """综合评估出价"""
        for bid in bids:
            bid["score"] = (
                bid["confidence"] * 0.5 +
                (1 / bid["time"]) * 0.3 +
                (1 / bid["price"]) * 0.2
            )
        return max(bids, key=lambda b: b["score"])

适用场景:资源分配、任务调度、负载均衡

模式 6:投票机制(Voting)

多个 Agent 独立决策,通过投票汇总:

class VotingSystem:
    async def majority_vote(self, question, agents):
        """多数投票"""
        votes = []
        for agent in agents:
            answer = await agent.answer(question)
            votes.append(answer)

        # 统计票数
        from collections import Counter
        vote_counts = Counter(votes)
        winner = vote_counts.most_common(1)[0]

        return {
            "answer": winner[0],
            "votes": winner[1],
            "total": len(votes),
            "confidence": winner[1] / len(votes),
        }

    async def weighted_vote(self, question, agents):
        """加权投票(基于 Agent 的专业度)"""
        weighted_answers = {}
        for agent in agents:
            answer = await agent.answer(question)
            weight = agent.expertise_score  # 专业度权重

            if answer not in weighted_answers:
                weighted_answers[answer] = 0
            weighted_answers[answer] += weight

        winner = max(weighted_answers.items(), key=lambda x: x[1])
        return {"answer": winner[0], "weighted_score": winner[1]}

适用场景:质量保证(多模型投票)、决策制定

模式对比

模式 延迟 通信量 复杂度 质量提升 适用任务
顺序链 流水线任务
并行 可并行的任务
层级式 复杂项目
辩论式 决策分析
市场机制 资源分配
投票 质量保证

混合模式

实际系统通常组合多种模式:

class HybridSystem:
    """层级式 + 并行 + 投票的混合系统"""

    async def run(self, task):
        # 1. 管理者分解任务
        subtasks = await self.manager.decompose(task)

        # 2. 并行执行子任务(每个子任务由多个 Agent 投票)
        results = {}
        for subtask in subtasks:
            # 3 个 Agent 并行执行同一子任务
            answers = await asyncio.gather(*[
                agent.process(subtask) for agent in self.get_workers(subtask, n=3)
            ])
            # 投票选出最佳结果
            results[subtask["id"]] = majority_vote(answers)

        # 3. 管理者整合
        return await self.manager.synthesize(results)

延伸阅读

  • 多智能体框架 - 实现这些协作模式的框架
  • Wu, Q., et al. (2023). "AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation"
  • Hong, S., et al. (2023). "MetaGPT: Meta Programming for Multi-Agent Collaborative Framework"

评论 #