第44集:自定义工具:调用自定义API(如查询天气接口)
章节标题
自定义API工具开发详解
核心知识点讲解
自定义工具的重要性
自定义工具是智能体开发中的重要组成部分,它允许智能体:
- 与特定领域交互:访问领域特定的API和服务
- 扩展能力边界:突破内置工具的限制
- 集成现有系统:与企业内部系统和服务集成
- 提供个性化服务:根据特定需求定制功能
- 保持信息更新:访问实时数据和服务
自定义工具的开发流程
- 需求分析:确定工具的功能和用途
- API调研:了解目标API的接口、参数和返回格式
- 工具设计:设计工具的名称、描述和参数
- 实现代码:编写工具的实现代码
- 测试验证:测试工具的功能和可靠性
- 集成使用:将工具集成到智能体中使用
自定义工具的类型
- API调用工具:调用外部RESTful API
- 数据库工具:与数据库交互
- 内部系统工具:与企业内部系统交互
- 第三方服务工具:调用第三方服务
- 硬件控制工具:控制硬件设备
实用案例分析
案例:天气查询工具
场景:创建一个工具,通过调用天气API获取指定城市的天气信息。
挑战:
- 需要找到合适的天气API
- 需要处理API密钥的管理
- 需要处理API调用的错误情况
- 需要格式化API返回的结果
解决方案:
- 选择一个免费的天气API,如OpenWeatherMap
- 实现工具的参数验证和错误处理
- 格式化API返回的结果,使其更友好
- 集成到智能体中使用
预期效果:
- 智能体能够通过工具获取实时天气信息
- 工具能够处理各种错误情况
- 工具返回的结果清晰、友好
代码示例
示例1:天气查询工具
from langchain.tools import BaseTool
from langchain.pydantic_v1 import Field, BaseModel
from langchain.agents import AgentType, initialize_agent
from langchain.llms import OpenAI
import requests
import os
# 初始化模型
llm = OpenAI(temperature=0.7)
# 定义天气查询工具的输入参数
class WeatherInput(BaseModel):
city: str = Field(..., description="城市名称,例如:北京、上海、广州")
days: int = Field(1, description="查询未来几天的天气,默认1天")
# 实现天气查询工具
class WeatherTool(BaseTool):
name = "Weather"
description = "查询指定城市的天气信息"
args_schema = WeatherInput
def __init__(self, api_key=None):
super().__init__()
# 从环境变量或参数获取API密钥
self.api_key = api_key or os.getenv("OPENWEATHER_API_KEY")
if not self.api_key:
raise ValueError("需要提供OpenWeatherMap API密钥")
self.base_url = "https://api.openweathermap.org/data/2.5"
def _run(self, city: str, days: int = 1) -> str:
"""执行天气查询
参数:
city: 城市名称
days: 查询天数
返回:
天气信息字符串
"""
try:
# 第一步:通过城市名称获取城市ID
geo_url = f"{self.base_url}/geo/1.0/direct"
geo_params = {
"q": city,
"limit": 1,
"appid": self.api_key
}
geo_response = requests.get(geo_url, params=geo_params)
geo_response.raise_for_status()
geo_data = geo_response.json()
if not geo_data:
return f"错误:未找到城市 {city}"
lat = geo_data[0]["lat"]
lon = geo_data[0]["lon"]
# 第二步:通过城市ID获取天气信息
if days == 1:
# 查询当前天气
weather_url = f"{self.base_url}/weather"
weather_params = {
"lat": lat,
"lon": lon,
"appid": self.api_key,
"units": "metric", # 使用摄氏度
"lang": "zh_cn" # 使用中文
}
weather_response = requests.get(weather_url, params=weather_params)
weather_response.raise_for_status()
weather_data = weather_response.json()
# 格式化天气信息
weather_main = weather_data["weather"][0]["main"]
weather_desc = weather_data["weather"][0]["description"]
temp = weather_data["main"]["temp"]
temp_min = weather_data["main"]["temp_min"]
temp_max = weather_data["main"]["temp_max"]
humidity = weather_data["main"]["humidity"]
wind_speed = weather_data["wind"]["speed"]
result = f"{city}的天气:\n"
result += f"天气状况:{weather_main}({weather_desc})\n"
result += f"当前温度:{temp}°C\n"
result += f"最低温度:{temp_min}°C\n"
result += f"最高温度:{temp_max}°C\n"
result += f"湿度:{humidity}%\n"
result += f"风速:{wind_speed} m/s"
else:
# 查询未来天气
forecast_url = f"{self.base_url}/forecast"
forecast_params = {
"lat": lat,
"lon": lon,
"appid": self.api_key,
"units": "metric",
"lang": "zh_cn",
"cnt": days * 8 # 每3小时一次,一天8次
}
forecast_response = requests.get(forecast_url, params=forecast_params)
forecast_response.raise_for_status()
forecast_data = forecast_response.json()
# 格式化天气信息
result = f"{city}未来{days}天的天气:\n"
# 按天分组
daily_forecasts = {}
for item in forecast_data["list"]:
date = item["dt_txt"].split()[0]
if date not in daily_forecasts:
daily_forecasts[date] = []
daily_forecasts[date].append(item)
# 处理每天的天气
for date, items in daily_forecasts.items():
# 获取当天的天气状况
weather_main = items[0]["weather"][0]["main"]
weather_desc = items[0]["weather"][0]["description"]
# 计算当天的平均温度
temps = [item["main"]["temp"] for item in items]
temp_avg = sum(temps) / len(temps)
temp_min = min(item["main"]["temp_min"] for item in items)
temp_max = max(item["main"]["temp_max"] for item in items)
# 计算当天的平均湿度
humidity = sum(item["main"]["humidity"] for item in items) / len(items)
# 计算当天的平均风速
wind_speed = sum(item["wind"]["speed"] for item in items) / len(items)
result += f"{date}:\n"
result += f" 天气状况:{weather_main}({weather_desc})\n"
result += f" 平均温度:{temp_avg:.1f}°C\n"
result += f" 最低温度:{temp_min}°C\n"
result += f" 最高温度:{temp_max}°C\n"
result += f" 平均湿度:{humidity:.1f}%\n"
result += f" 平均风速:{wind_speed:.1f} m/s\n"
return result
except requests.exceptions.RequestException as e:
return f"错误:API调用失败 - {str(e)}"
except Exception as e:
return f"错误:处理天气数据时发生错误 - {str(e)}"
async def _arun(self, city: str, days: int = 1) -> str:
"""异步执行天气查询
参数:
city: 城市名称
days: 查询天数
返回:
天气信息字符串
"""
# 简单实现,实际应用中应该使用异步HTTP客户端
return self._run(city, days)
# 创建工具实例
# 注意:需要设置OPENWEATHER_API_KEY环境变量
weather_tool = WeatherTool()
# 创建工具列表
tools = [weather_tool]
# 初始化智能体
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 测试智能体
print(agent.run("北京今天的天气怎么样?"))
print(agent.run("上海未来3天的天气怎么样?"))示例2:股票查询工具
from langchain.tools import BaseTool
from langchain.pydantic_v1 import Field, BaseModel
from langchain.agents import AgentType, initialize_agent
from langchain.llms import OpenAI
import requests
import os
# 初始化模型
llm = OpenAI(temperature=0.7)
# 定义股票查询工具的输入参数
class StockInput(BaseModel):
symbol: str = Field(..., description="股票代码,例如:AAPL、MSFT、GOOG")
# 实现股票查询工具
class StockTool(BaseTool):
name = "Stock"
description = "查询指定股票的实时价格"
args_schema = StockInput
def __init__(self, api_key=None):
super().__init__()
# 从环境变量或参数获取API密钥
self.api_key = api_key or os.getenv("ALPHA_VANTAGE_API_KEY")
if not self.api_key:
raise ValueError("需要提供Alpha Vantage API密钥")
self.base_url = "https://www.alphavantage.co/query"
def _run(self, symbol: str) -> str:
"""执行股票查询
参数:
symbol: 股票代码
返回:
股票价格信息字符串
"""
try:
# 构建API参数
params = {
"function": "GLOBAL_QUOTE",
"symbol": symbol,
"apikey": self.api_key
}
# 调用API
response = requests.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
# 处理API返回的数据
if "Global Quote" in data:
quote = data["Global Quote"]
symbol = quote["01. symbol"]
price = quote["05. price"]
change = quote["09. change"]
change_percent = quote["10. change percent"]
high = quote["03. high"]
low = quote["04. low"]
volume = quote["06. volume"]
# 格式化结果
result = f"{symbol}的股票信息:\n"
result += f"当前价格:${price}\n"
result += f"涨跌:${change}({change_percent})\n"
result += f"今日最高:${high}\n"
result += f"今日最低:${low}\n"
result += f"成交量:{volume}"
return result
else:
return f"错误:未找到股票代码 {symbol} 的信息"
except requests.exceptions.RequestException as e:
return f"错误:API调用失败 - {str(e)}"
except Exception as e:
return f"错误:处理股票数据时发生错误 - {str(e)}"
async def _arun(self, symbol: str) -> str:
"""异步执行股票查询
参数:
symbol: 股票代码
返回:
股票价格信息字符串
"""
return self._run(symbol)
# 创建工具实例
# 注意:需要设置ALPHA_VANTAGE_API_KEY环境变量
stock_tool = StockTool()
# 创建工具列表
tools = [stock_tool]
# 初始化智能体
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 测试智能体
print(agent.run("苹果股票(AAPL)的价格是多少?"))
print(agent.run("微软股票(MSFT)的价格是多少?"))示例3:自定义工具的通用框架
from langchain.tools import BaseTool
from langchain.pydantic_v1 import Field, BaseModel
from langchain.agents import AgentType, initialize_agent
from langchain.llms import OpenAI
import requests
# 初始化模型
llm = OpenAI(temperature=0.7)
# 通用API工具基类
class GenericAPITool(BaseTool):
def __init__(self, name, description, base_url, default_params=None):
super().__init__()
self.name = name
self.description = description
self.base_url = base_url
self.default_params = default_params or {}
def _run(self, **kwargs):
"""执行API调用
参数:
**kwargs: API参数
返回:
API响应结果
"""
try:
# 合并默认参数和传入参数
params = {**self.default_params, **kwargs}
# 调用API
response = requests.get(self.base_url, params=params)
response.raise_for_status()
# 处理响应
return self._process_response(response.json())
except requests.exceptions.RequestException as e:
return f"错误:API调用失败 - {str(e)}"
except Exception as e:
return f"错误:处理API响应时发生错误 - {str(e)}"
def _process_response(self, data):
"""处理API响应
参数:
data: API响应数据
返回:
处理后的响应结果
"""
# 子类应该重写此方法
return str(data)
async def _arun(self, **kwargs):
"""异步执行API调用
参数:
**kwargs: API参数
返回:
API响应结果
"""
return self._run(**kwargs)
# 示例:使用通用框架创建天气工具
class WeatherInput(BaseModel):
city: str = Field(..., description="城市名称")
class CustomWeatherTool(GenericAPITool):
args_schema = WeatherInput
def __init__(self, api_key):
super().__init__(
name="CustomWeather",
description="查询指定城市的天气信息",
base_url="https://api.openweathermap.org/data/2.5/weather",
default_params={"appid": api_key, "units": "metric", "lang": "zh_cn"}
)
def _run(self, city: str) -> str:
# 第一步:获取城市坐标
geo_url = "https://api.openweathermap.org/data/2.5/geo/1.0/direct"
geo_params = {"q": city, "limit": 1, "appid": self.default_params["appid"]}
geo_response = requests.get(geo_url, params=geo_params)
geo_response.raise_for_status()
geo_data = geo_response.json()
if not geo_data:
return f"错误:未找到城市 {city}"
lat = geo_data[0]["lat"]
lon = geo_data[0]["lon"]
# 第二步:查询天气
return super()._run(lat=lat, lon=lon)
def _process_response(self, data):
# 处理天气API的响应
weather_main = data["weather"][0]["main"]
weather_desc = data["weather"][0]["description"]
temp = data["main"]["temp"]
temp_min = data["main"]["temp_min"]
temp_max = data["main"]["temp_max"]
humidity = data["main"]["humidity"]
result = f"天气状况:{weather_main}({weather_desc})\n"
result += f"当前温度:{temp}°C\n"
result += f"最低温度:{temp_min}°C\n"
result += f"最高温度:{temp_max}°C\n"
result += f"湿度:{humidity}%"
return result
# 创建工具实例
# 注意:需要替换为实际的API密钥
weather_tool = CustomWeatherTool(api_key="YOUR_API_KEY")
# 创建工具列表
tools = [weather_tool]
# 初始化智能体
agent = initialize_agent(
tools,
llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 测试智能体
print(agent.run("北京的天气怎么样?"))高级技巧
1. API密钥管理
环境变量
- 优势:安全,不易泄露
- 使用方法:通过
os.environ.get()获取 - 适用场景:生产环境
配置文件
- 优势:集中管理,易于修改
- 使用方法:通过配置文件读取
- 适用场景:开发环境
密钥轮换
- 优势:提高安全性
- 使用方法:定期更新API密钥
- 适用场景:所有环境
2. 错误处理和重试机制
错误处理
- 分类处理:根据错误类型进行不同处理
- 友好提示:向用户提供清晰的错误信息
- 日志记录:记录错误信息,便于调试
重试机制
- 指数退避:失败后逐渐增加重试间隔
- 最大重试次数:限制重试次数,避免无限重试
- 重试条件:只对特定错误进行重试
示例:带重试机制的API调用
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
"""创建带重试机制的会话"""
session = requests.Session()
# 配置重试策略
retry = Retry(
total=3, # 总重试次数
backoff_factor=1, # 重试间隔因子
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
)
# 配置适配器
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 使用示例
session = create_session()
try:
response = session.get("https://api.example.com/data")
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"错误:{e}")3. 缓存机制
内存缓存
- 优势:速度快,实现简单
- 使用方法:使用字典存储缓存
- 适用场景:短期缓存,数据量小
Redis缓存
- 优势:持久化,支持分布式
- 使用方法:通过Redis客户端存储缓存
- 适用场景:长期缓存,数据量大
缓存策略
- 过期时间:为缓存设置合理的过期时间
- 缓存键:使用唯一的缓存键
- 缓存失效:当数据更新时,主动失效缓存
示例:带缓存的工具
from langchain.tools import BaseTool
import time
class CachedTool(BaseTool):
def __init__(self, name, description):
super().__init__()
self.name = name
self.description = description
self.cache = {}
self.cache_expiry = 3600 # 缓存过期时间(秒)
def _run(self, query):
# 检查缓存
current_time = time.time()
if query in self.cache:
cached_result, timestamp = self.cache[query]
if current_time - timestamp < self.cache_expiry:
return f"缓存结果:{cached_result}"
# 执行实际操作
result = self._actual_run(query)
# 更新缓存
self.cache[query] = (result, current_time)
return result
def _actual_run(self, query):
# 子类应该重写此方法
pass4. 异步API调用
异步优势
- 提高并发:支持同时执行多个API调用
- 减少阻塞:避免阻塞主线程
- 提高性能:特别是在I/O密集型操作中
实现方法
- 使用aiohttp:异步HTTP客户端
- 使用async/await:Python的异步语法
- 事件循环:管理异步任务
示例:异步API调用
import aiohttp
import asyncio
async def fetch_data(url, params):
"""异步获取数据"""
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"API调用失败,状态码:{response.status}")
async def main():
"""主函数"""
url = "https://api.example.com/data"
params = {"key": "value"}
try:
data = await fetch_data(url, params)
print(data)
except Exception as e:
print(f"错误:{e}")
# 运行异步函数
asyncio.run(main())最佳实践
1. 工具设计的最佳实践
工具命名
- 清晰明了:使用简洁、明确的名称
- 使用动词:工具名称应该描述其执行的操作
- 一致性:保持命名风格的一致性
工具描述
- 详细准确:准确描述工具的功能和用途
- 包含参数:说明工具需要什么参数
- 包含返回值:说明工具返回什么
- 使用中文:对于中文智能体,使用中文描述
参数设计
- 简洁明确:参数名称应该简洁明了
- 类型提示:使用类型提示,便于智能体理解
- 默认值:为可选参数提供合理的默认值
- 验证逻辑:添加参数验证逻辑
2. 工具实现的最佳实践
代码组织
- 模块化:将工具代码组织成模块
- 可读性:使用清晰的代码结构和命名
- 注释:添加必要的注释,说明代码的功能
性能优化
- 缓存:对于频繁调用的API,实现缓存机制
- 异步:对于I/O密集型操作,使用异步执行
- 批处理:支持批量操作,减少API调用次数
安全性
- API密钥保护:安全存储API密钥
- 输入验证:验证输入参数,防止注入攻击
- 输出限制:限制输出的长度和内容
3. 工具测试的最佳实践
单元测试
- 测试正常情况:测试工具在正常输入下的行为
- 测试异常情况:测试工具在异常输入下的行为
- 测试边界情况:测试工具在边界输入下的行为
集成测试
- 测试与智能体的集成:确保智能体能够正确调用工具
- 测试真实场景:在真实场景中测试工具的性能和可靠性
- 测试错误处理:测试工具的错误处理能力
负载测试
- 测试并发性能:测试工具在并发调用下的性能
- 测试响应时间:测试工具的响应时间
- 测试稳定性:测试工具在长时间运行下的稳定性
4. 工具文档的最佳实践
文档内容
- 工具说明:说明工具的功能和用途
- 参数说明:详细说明每个参数的含义和用法
- 返回值说明:说明工具的返回值格式和含义
- 使用示例:提供工具使用的示例
- 常见问题:说明使用工具时可能遇到的问题和解决方案
文档格式
- 清晰结构:使用清晰的文档结构
- 代码示例:包含代码示例,便于理解
- 版本信息:包含工具的版本信息
- 更新日志:记录工具的更新历史
故障排除
1. API调用失败
症状:工具调用API时失败
原因:
- API密钥无效或过期
- 网络连接问题
- API服务不可用
- 参数格式错误
解决方案:
- 检查API密钥是否有效
- 检查网络连接是否正常
- 检查API服务是否可用
- 检查参数格式是否正确
2. 工具不被智能体调用
症状:智能体知道有工具,但不调用它
原因:
- 工具描述不清晰,智能体不理解工具的功能
- 工具参数描述不准确,智能体不知道如何提供参数
- 智能体的提示词模板不鼓励使用工具
解决方案:
- 优化工具描述,使其更清晰、更具体
- 改进参数描述,明确说明参数的格式和示例
- 调整智能体的提示词模板,鼓励使用工具
3. 工具执行结果不符合预期
症状:工具执行成功,但返回结果不符合预期
原因:
- API返回的数据格式发生变化
- 参数传递错误
- 结果处理逻辑错误
- API返回的数据不完整
解决方案:
- 检查API返回的数据格式
- 检查参数传递是否正确
- 检查结果处理逻辑是否正确
- 检查API返回的数据是否完整
4. 工具执行速度慢
症状:工具执行速度慢,影响智能体的响应速度
原因:
- API响应速度慢
- 网络延迟高
- 结果处理逻辑复杂
- 缺少缓存机制
解决方案:
- 优化API调用,减少不必要的API请求
- 实现缓存机制,避免重复计算
- 优化结果处理逻辑
- 考虑使用异步执行
总结与展望
自定义工具是智能体开发中的重要组成部分,它允许智能体与外部世界进行更丰富的交互。通过本集的学习,你已经掌握了:
- 自定义工具的开发流程:从需求分析到集成使用的完整流程
- API调用工具的实现:如何调用外部API,如天气查询API
- 高级技巧:如错误处理、重试机制、缓存和异步调用
- 最佳实践:工具设计、实现、测试和文档的最佳实践
- 故障排除:常见问题的解决方法
未来,自定义工具的发展趋势包括:
- 更智能的工具:工具本身具有一定的智能,能够自动调整参数和处理复杂情况
- 更丰富的集成:与更多的第三方服务和系统集成
- 更标准化的接口:工具接口更加标准化,便于跨平台使用
- 更安全的实现:内置更多的安全措施,如API密钥管理和输入验证
通过掌握自定义工具的开发,你可以为智能体赋予更强大的能力,使其能够处理更复杂的任务,与更多的外部系统和服务集成。