CrewAI使用"船员"概念:每个Agent是船员,有明确的角色(Role)、目标(Goal)和背景(Backstory)。 多个船员组成船队(Crew),协作完成任务(Task)。
| 特性 | CrewAI | AutoGen |
|---|---|---|
| 核心理念 | 角色和任务驱动 | 对话驱动 |
| 学习难度 | 简单(概念清晰) | 中等 |
| 任务编排 | Sequential/Hierarchical | GroupChat |
| 代码执行 | 支持工具调用 | 原生支持,更强大 |
| 适用场景 | 任务明确的协作 | 探索性问题解决 |
💡 选择建议:任务清晰、角色明确选CrewAI;需要代码执行和探索性对话选AutoGen
# 基础安装 pip install crewai # 包含工具支持 pip install 'crewai[tools]' # 配置OpenAI API Key export OPENAI_API_KEY='your-api-key'
from crewai import Agent, Task, Crew, Process
# 1. 定义Agent(船员)
researcher = Agent(
role='研究员',
goal='收集关于AI的最新信息',
backstory='你是一位经验丰富的AI研究员,擅长找到最新最准确的信息',
verbose=True,
allow_delegation=False
)
writer = Agent(
role='技术作家',
goal='将研究结果撰写成易懂的文章',
backstory='你是一位优秀的技术作家,善于用简单语言解释复杂概念',
verbose=True,
allow_delegation=False
)
# 2. 定义Task(任务)
research_task = Task(
description='研究2024年AI发展的最新趋势,重点关注大语言模型',
expected_output='一份详细的研究报告,包含关键趋势和数据',
agent=researcher
)
write_task = Task(
description='基于研究结果,撰写一篇科普文章',
expected_output='一篇1000字的科普文章,通俗易懂',
agent=writer
)
# 3. 组建Crew(船队)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
process=Process.sequential, # 顺序执行
verbose=2
)
# 4. 启动!
result = crew.kickoff()
print(result)
from crewai_tools import (
SerperDevTool, # 网络搜索
ScrapeWebsiteTool, # 网页抓取
FileReadTool, # 读取文件
DirectoryReadTool # 读取目录
)
# 配置工具
search_tool = SerperDevTool()
scrape_tool = ScrapeWebsiteTool()
# Agent配备工具
researcher = Agent(
role='研究员',
goal='收集信息',
tools=[search_tool, scrape_tool], # 可以搜索和抓取
verbose=True
)
# 自定义工具
from crewai_tools import tool
@tool("计算器")
def calculator(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果:{result}"
except:
return "计算错误"
analyst = Agent(
role='数据分析师',
goal='分析数据',
tools=[calculator],
verbose=True
)
任务可以依赖前序任务的输出
task1 = Task(
description='分析用户需求',
expected_output='需求分析文档',
agent=analyst
)
task2 = Task(
description='基于需求分析,设计系统架构',
expected_output='架构设计文档',
agent=architect,
context=[task1] # 依赖task1的输出
)
task3 = Task(
description='根据架构设计,编写代码',
expected_output='完整代码',
agent=developer,
context=[task2] # 依赖task2的输出
)
crew = Crew(
agents=[analyst, architect, developer],
tasks=[task1, task2, task3],
process=Process.sequential
)
Manager Agent负责分配任务和协调
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[task1, task2, task3],
process=Process.hierarchical, # 层级模式
manager_llm="gpt-4" # Manager使用的模型
)
# Manager自动:
# 1. 分析任务优先级
# 2. 分配给合适的Agent
# 3. 协调Agent之间的沟通
# 4. 确保任务按时完成
从选题到发布的完整内容创作流程
from crewai import Agent, Task, Crew
from crewai_tools import SerperDevTool
search_tool = SerperDevTool()
# 定义团队成员
topic_researcher = Agent(
role='选题策划',
goal='找到热门且有价值的话题',
backstory='你有敏锐的市场洞察力,知道读者喜欢什么',
tools=[search_tool],
verbose=True
)
content_writer = Agent(
role='内容撰写',
goal='撰写高质量的原创文章',
backstory='你是一位经验丰富的内容创作者,文笔优美',
verbose=True
)
seo_expert = Agent(
role='SEO专家',
goal='优化文章的搜索引擎排名',
backstory='你精通SEO,知道如何让文章获得更多流量',
verbose=True
)
# 定义工作流
task1 = Task(
description='研究当前AI领域的热点话题,选择3个最有潜力的',
expected_output='3个话题及其潜力分析',
agent=topic_researcher
)
task2 = Task(
description='选择第一个话题,撰写一篇1500字的深度文章',
expected_output='完整文章,包含引言、正文、结论',
agent=content_writer
)
task3 = Task(
description='优化文章的SEO,添加关键词、meta描述、标题优化建议',
expected_output='SEO优化建议和优化后的文章',
agent=seo_expert
)
# 组建团队
content_crew = Crew(
agents=[topic_researcher, content_writer, seo_expert],
tasks=[task1, task2, task3],
process=Process.sequential,
verbose=2
)
# 开始工作
result = content_crew.kickoff()
print(result)
需求分析 → 设计 → 开发 → 测试 完整流程
# 产品经理
pm = Agent(
role='产品经理',
goal='分析用户需求,编写产品需求文档',
backstory='你有丰富的产品经验,善于把握用户痛点'
)
# 架构师
architect = Agent(
role='系统架构师',
goal='设计系统架构和技术方案',
backstory='你是资深架构师,精通各种设计模式'
)
# 开发工程师
developer = Agent(
role='全栈工程师',
goal='实现系统功能',
backstory='你精通Python和JavaScript,代码质量高'
)
# 测试工程师
tester = Agent(
role='测试工程师',
goal='编写测试用例,确保质量',
backstory='你有强迫症,不放过任何bug'
)
# 定义开发流程
tasks = [
Task(
description='分析用户需求:构建一个TODO应用',
expected_output='PRD文档',
agent=pm
),
Task(
description='设计系统架构和数据库结构',
expected_output='架构设计文档和ER图',
agent=architect
),
Task(
description='实现后端API和前端界面',
expected_output='完整代码',
agent=developer
),
Task(
description='编写单元测试和集成测试',
expected_output='测试代码',
agent=tester
)
]
dev_crew = Crew(
agents=[pm, architect, developer, tester],
tasks=tasks,
process=Process.sequential
)
result = dev_crew.kickoff()
Agent可以将子任务委托给其他Agent
manager = Agent(
role='项目经理',
goal='协调团队完成项目',
backstory='你善于分配任务和协调资源',
allow_delegation=True # 允许委托
)
# Manager可以说:
# "我需要研究员帮我调研一下市场"
# "程序员,请实现这个功能"
# 并行执行多个任务
import asyncio
async def run_crew():
result = await crew.kickoff_async()
return result
# 运行
result = asyncio.run(run_crew())
agent = Agent(
role='助手',
goal='帮助用户',
memory=True, # 启用记忆
verbose=True
)
# Agent会记住之前的对话
from pydantic import BaseModel
class ArticleOutput(BaseModel):
title: str
content: str
tags: list[str]
task = Task(
description='写文章',
expected_output='结构化文章',
output_pydantic=ArticleOutput,
agent=writer
)
特别适合: