第44集:自定义工具:调用自定义API(如查询天气接口)

章节标题

自定义API工具开发详解

核心知识点讲解

自定义工具的重要性

自定义工具是智能体开发中的重要组成部分,它允许智能体:

  1. 与特定领域交互:访问领域特定的API和服务
  2. 扩展能力边界:突破内置工具的限制
  3. 集成现有系统:与企业内部系统和服务集成
  4. 提供个性化服务:根据特定需求定制功能
  5. 保持信息更新:访问实时数据和服务

自定义工具的开发流程

  1. 需求分析:确定工具的功能和用途
  2. API调研:了解目标API的接口、参数和返回格式
  3. 工具设计:设计工具的名称、描述和参数
  4. 实现代码:编写工具的实现代码
  5. 测试验证:测试工具的功能和可靠性
  6. 集成使用:将工具集成到智能体中使用

自定义工具的类型

  1. API调用工具:调用外部RESTful API
  2. 数据库工具:与数据库交互
  3. 内部系统工具:与企业内部系统交互
  4. 第三方服务工具:调用第三方服务
  5. 硬件控制工具:控制硬件设备

实用案例分析

案例:天气查询工具

场景:创建一个工具,通过调用天气API获取指定城市的天气信息。

挑战

  • 需要找到合适的天气API
  • 需要处理API密钥的管理
  • 需要处理API调用的错误情况
  • 需要格式化API返回的结果

解决方案

  1. 选择一个免费的天气API,如OpenWeatherMap
  2. 实现工具的参数验证和错误处理
  3. 格式化API返回的结果,使其更友好
  4. 集成到智能体中使用

预期效果

  • 智能体能够通过工具获取实时天气信息
  • 工具能够处理各种错误情况
  • 工具返回的结果清晰、友好

代码示例

示例1:天气查询工具

from langchain.tools import BaseTool
from langchain.pydantic_v1 import Field, BaseModel
from langchain.agents import AgentType, initialize_agent
from langchain.llms import OpenAI
import requests
import os

# 初始化模型
llm = OpenAI(temperature=0.7)

# 定义天气查询工具的输入参数
class WeatherInput(BaseModel):
    city: str = Field(..., description="城市名称,例如:北京、上海、广州")
    days: int = Field(1, description="查询未来几天的天气,默认1天")

# 实现天气查询工具
class WeatherTool(BaseTool):
    name = "Weather"
    description = "查询指定城市的天气信息"
    args_schema = WeatherInput
    
    def __init__(self, api_key=None):
        super().__init__()
        # 从环境变量或参数获取API密钥
        self.api_key = api_key or os.getenv("OPENWEATHER_API_KEY")
        if not self.api_key:
            raise ValueError("需要提供OpenWeatherMap API密钥")
        self.base_url = "https://api.openweathermap.org/data/2.5"
    
    def _run(self, city: str, days: int = 1) -> str:
        """执行天气查询
        
        参数:
            city: 城市名称
            days: 查询天数
        
        返回:
            天气信息字符串
        """
        try:
            # 第一步:通过城市名称获取城市ID
            geo_url = f"{self.base_url}/geo/1.0/direct"
            geo_params = {
                "q": city,
                "limit": 1,
                "appid": self.api_key
            }
            
            geo_response = requests.get(geo_url, params=geo_params)
            geo_response.raise_for_status()
            geo_data = geo_response.json()
            
            if not geo_data:
                return f"错误:未找到城市 {city}"
            
            lat = geo_data[0]["lat"]
            lon = geo_data[0]["lon"]
            
            # 第二步:通过城市ID获取天气信息
            if days == 1:
                # 查询当前天气
                weather_url = f"{self.base_url}/weather"
                weather_params = {
                    "lat": lat,
                    "lon": lon,
                    "appid": self.api_key,
                    "units": "metric",  # 使用摄氏度
                    "lang": "zh_cn"  # 使用中文
                }
                
                weather_response = requests.get(weather_url, params=weather_params)
                weather_response.raise_for_status()
                weather_data = weather_response.json()
                
                # 格式化天气信息
                weather_main = weather_data["weather"][0]["main"]
                weather_desc = weather_data["weather"][0]["description"]
                temp = weather_data["main"]["temp"]
                temp_min = weather_data["main"]["temp_min"]
                temp_max = weather_data["main"]["temp_max"]
                humidity = weather_data["main"]["humidity"]
                wind_speed = weather_data["wind"]["speed"]
                
                result = f"{city}的天气:\n"
                result += f"天气状况:{weather_main}({weather_desc})\n"
                result += f"当前温度:{temp}°C\n"
                result += f"最低温度:{temp_min}°C\n"
                result += f"最高温度:{temp_max}°C\n"
                result += f"湿度:{humidity}%\n"
                result += f"风速:{wind_speed} m/s"
            else:
                # 查询未来天气
                forecast_url = f"{self.base_url}/forecast"
                forecast_params = {
                    "lat": lat,
                    "lon": lon,
                    "appid": self.api_key,
                    "units": "metric",
                    "lang": "zh_cn",
                    "cnt": days * 8  # 每3小时一次,一天8次
                }
                
                forecast_response = requests.get(forecast_url, params=forecast_params)
                forecast_response.raise_for_status()
                forecast_data = forecast_response.json()
                
                # 格式化天气信息
                result = f"{city}未来{days}天的天气:\n"
                
                # 按天分组
                daily_forecasts = {}
                for item in forecast_data["list"]:
                    date = item["dt_txt"].split()[0]
                    if date not in daily_forecasts:
                        daily_forecasts[date] = []
                    daily_forecasts[date].append(item)
                
                # 处理每天的天气
                for date, items in daily_forecasts.items():
                    # 获取当天的天气状况
                    weather_main = items[0]["weather"][0]["main"]
                    weather_desc = items[0]["weather"][0]["description"]
                    
                    # 计算当天的平均温度
                    temps = [item["main"]["temp"] for item in items]
                    temp_avg = sum(temps) / len(temps)
                    temp_min = min(item["main"]["temp_min"] for item in items)
                    temp_max = max(item["main"]["temp_max"] for item in items)
                    
                    # 计算当天的平均湿度
                    humidity = sum(item["main"]["humidity"] for item in items) / len(items)
                    
                    # 计算当天的平均风速
                    wind_speed = sum(item["wind"]["speed"] for item in items) / len(items)
                    
                    result += f"{date}:\n"
                    result += f"  天气状况:{weather_main}({weather_desc})\n"
                    result += f"  平均温度:{temp_avg:.1f}°C\n"
                    result += f"  最低温度:{temp_min}°C\n"
                    result += f"  最高温度:{temp_max}°C\n"
                    result += f"  平均湿度:{humidity:.1f}%\n"
                    result += f"  平均风速:{wind_speed:.1f} m/s\n"
            
            return result
            
        except requests.exceptions.RequestException as e:
            return f"错误:API调用失败 - {str(e)}"
        except Exception as e:
            return f"错误:处理天气数据时发生错误 - {str(e)}"
    
    async def _arun(self, city: str, days: int = 1) -> str:
        """异步执行天气查询
        
        参数:
            city: 城市名称
            days: 查询天数
        
        返回:
            天气信息字符串
        """
        # 简单实现,实际应用中应该使用异步HTTP客户端
        return self._run(city, days)

# 创建工具实例
# 注意:需要设置OPENWEATHER_API_KEY环境变量
weather_tool = WeatherTool()

# 创建工具列表
tools = [weather_tool]

# 初始化智能体
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 测试智能体
print(agent.run("北京今天的天气怎么样?"))
print(agent.run("上海未来3天的天气怎么样?"))

示例2:股票查询工具

from langchain.tools import BaseTool
from langchain.pydantic_v1 import Field, BaseModel
from langchain.agents import AgentType, initialize_agent
from langchain.llms import OpenAI
import requests
import os

# 初始化模型
llm = OpenAI(temperature=0.7)

# 定义股票查询工具的输入参数
class StockInput(BaseModel):
    symbol: str = Field(..., description="股票代码,例如:AAPL、MSFT、GOOG")

# 实现股票查询工具
class StockTool(BaseTool):
    name = "Stock"
    description = "查询指定股票的实时价格"
    args_schema = StockInput
    
    def __init__(self, api_key=None):
        super().__init__()
        # 从环境变量或参数获取API密钥
        self.api_key = api_key or os.getenv("ALPHA_VANTAGE_API_KEY")
        if not self.api_key:
            raise ValueError("需要提供Alpha Vantage API密钥")
        self.base_url = "https://www.alphavantage.co/query"
    
    def _run(self, symbol: str) -> str:
        """执行股票查询
        
        参数:
            symbol: 股票代码
        
        返回:
            股票价格信息字符串
        """
        try:
            # 构建API参数
            params = {
                "function": "GLOBAL_QUOTE",
                "symbol": symbol,
                "apikey": self.api_key
            }
            
            # 调用API
            response = requests.get(self.base_url, params=params)
            response.raise_for_status()
            data = response.json()
            
            # 处理API返回的数据
            if "Global Quote" in data:
                quote = data["Global Quote"]
                symbol = quote["01. symbol"]
                price = quote["05. price"]
                change = quote["09. change"]
                change_percent = quote["10. change percent"]
                high = quote["03. high"]
                low = quote["04. low"]
                volume = quote["06. volume"]
                
                # 格式化结果
                result = f"{symbol}的股票信息:\n"
                result += f"当前价格:${price}\n"
                result += f"涨跌:${change}({change_percent})\n"
                result += f"今日最高:${high}\n"
                result += f"今日最低:${low}\n"
                result += f"成交量:{volume}"
                
                return result
            else:
                return f"错误:未找到股票代码 {symbol} 的信息"
                
        except requests.exceptions.RequestException as e:
            return f"错误:API调用失败 - {str(e)}"
        except Exception as e:
            return f"错误:处理股票数据时发生错误 - {str(e)}"
    
    async def _arun(self, symbol: str) -> str:
        """异步执行股票查询
        
        参数:
            symbol: 股票代码
        
        返回:
            股票价格信息字符串
        """
        return self._run(symbol)

# 创建工具实例
# 注意:需要设置ALPHA_VANTAGE_API_KEY环境变量
stock_tool = StockTool()

# 创建工具列表
tools = [stock_tool]

# 初始化智能体
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 测试智能体
print(agent.run("苹果股票(AAPL)的价格是多少?"))
print(agent.run("微软股票(MSFT)的价格是多少?"))

示例3:自定义工具的通用框架

from langchain.tools import BaseTool
from langchain.pydantic_v1 import Field, BaseModel
from langchain.agents import AgentType, initialize_agent
from langchain.llms import OpenAI
import requests

# 初始化模型
llm = OpenAI(temperature=0.7)

# 通用API工具基类
class GenericAPITool(BaseTool):
    def __init__(self, name, description, base_url, default_params=None):
        super().__init__()
        self.name = name
        self.description = description
        self.base_url = base_url
        self.default_params = default_params or {}
    
    def _run(self, **kwargs):
        """执行API调用
        
        参数:
            **kwargs: API参数
        
        返回:
            API响应结果
        """
        try:
            # 合并默认参数和传入参数
            params = {**self.default_params, **kwargs}
            
            # 调用API
            response = requests.get(self.base_url, params=params)
            response.raise_for_status()
            
            # 处理响应
            return self._process_response(response.json())
            
        except requests.exceptions.RequestException as e:
            return f"错误:API调用失败 - {str(e)}"
        except Exception as e:
            return f"错误:处理API响应时发生错误 - {str(e)}"
    
    def _process_response(self, data):
        """处理API响应
        
        参数:
            data: API响应数据
        
        返回:
            处理后的响应结果
        """
        # 子类应该重写此方法
        return str(data)
    
    async def _arun(self, **kwargs):
        """异步执行API调用
        
        参数:
            **kwargs: API参数
        
        返回:
            API响应结果
        """
        return self._run(**kwargs)

# 示例:使用通用框架创建天气工具
class WeatherInput(BaseModel):
    city: str = Field(..., description="城市名称")

class CustomWeatherTool(GenericAPITool):
    args_schema = WeatherInput
    
    def __init__(self, api_key):
        super().__init__(
            name="CustomWeather",
            description="查询指定城市的天气信息",
            base_url="https://api.openweathermap.org/data/2.5/weather",
            default_params={"appid": api_key, "units": "metric", "lang": "zh_cn"}
        )
    
    def _run(self, city: str) -> str:
        # 第一步:获取城市坐标
        geo_url = "https://api.openweathermap.org/data/2.5/geo/1.0/direct"
        geo_params = {"q": city, "limit": 1, "appid": self.default_params["appid"]}
        
        geo_response = requests.get(geo_url, params=geo_params)
        geo_response.raise_for_status()
        geo_data = geo_response.json()
        
        if not geo_data:
            return f"错误:未找到城市 {city}"
        
        lat = geo_data[0]["lat"]
        lon = geo_data[0]["lon"]
        
        # 第二步:查询天气
        return super()._run(lat=lat, lon=lon)
    
    def _process_response(self, data):
        # 处理天气API的响应
        weather_main = data["weather"][0]["main"]
        weather_desc = data["weather"][0]["description"]
        temp = data["main"]["temp"]
        temp_min = data["main"]["temp_min"]
        temp_max = data["main"]["temp_max"]
        humidity = data["main"]["humidity"]
        
        result = f"天气状况:{weather_main}({weather_desc})\n"
        result += f"当前温度:{temp}°C\n"
        result += f"最低温度:{temp_min}°C\n"
        result += f"最高温度:{temp_max}°C\n"
        result += f"湿度:{humidity}%"
        
        return result

# 创建工具实例
# 注意:需要替换为实际的API密钥
weather_tool = CustomWeatherTool(api_key="YOUR_API_KEY")

# 创建工具列表
tools = [weather_tool]

# 初始化智能体
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 测试智能体
print(agent.run("北京的天气怎么样?"))

高级技巧

1. API密钥管理

环境变量

  • 优势:安全,不易泄露
  • 使用方法:通过os.environ.get()获取
  • 适用场景:生产环境

配置文件

  • 优势:集中管理,易于修改
  • 使用方法:通过配置文件读取
  • 适用场景:开发环境

密钥轮换

  • 优势:提高安全性
  • 使用方法:定期更新API密钥
  • 适用场景:所有环境

2. 错误处理和重试机制

错误处理

  • 分类处理:根据错误类型进行不同处理
  • 友好提示:向用户提供清晰的错误信息
  • 日志记录:记录错误信息,便于调试

重试机制

  • 指数退避:失败后逐渐增加重试间隔
  • 最大重试次数:限制重试次数,避免无限重试
  • 重试条件:只对特定错误进行重试

示例:带重试机制的API调用

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session():
    """创建带重试机制的会话"""
    session = requests.Session()
    
    # 配置重试策略
    retry = Retry(
        total=3,  # 总重试次数
        backoff_factor=1,  # 重试间隔因子
        status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码
        allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
    )
    
    # 配置适配器
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# 使用示例
session = create_session()
try:
    response = session.get("https://api.example.com/data")
    response.raise_for_status()
except requests.exceptions.RequestException as e:
    print(f"错误:{e}")

3. 缓存机制

内存缓存

  • 优势:速度快,实现简单
  • 使用方法:使用字典存储缓存
  • 适用场景:短期缓存,数据量小

Redis缓存

  • 优势:持久化,支持分布式
  • 使用方法:通过Redis客户端存储缓存
  • 适用场景:长期缓存,数据量大

缓存策略

  • 过期时间:为缓存设置合理的过期时间
  • 缓存键:使用唯一的缓存键
  • 缓存失效:当数据更新时,主动失效缓存

示例:带缓存的工具

from langchain.tools import BaseTool
import time

class CachedTool(BaseTool):
    def __init__(self, name, description):
        super().__init__()
        self.name = name
        self.description = description
        self.cache = {}
        self.cache_expiry = 3600  # 缓存过期时间(秒)
    
    def _run(self, query):
        # 检查缓存
        current_time = time.time()
        if query in self.cache:
            cached_result, timestamp = self.cache[query]
            if current_time - timestamp < self.cache_expiry:
                return f"缓存结果:{cached_result}"
        
        # 执行实际操作
        result = self._actual_run(query)
        
        # 更新缓存
        self.cache[query] = (result, current_time)
        
        return result
    
    def _actual_run(self, query):
        # 子类应该重写此方法
        pass

4. 异步API调用

异步优势

  • 提高并发:支持同时执行多个API调用
  • 减少阻塞:避免阻塞主线程
  • 提高性能:特别是在I/O密集型操作中

实现方法

  • 使用aiohttp:异步HTTP客户端
  • 使用async/await:Python的异步语法
  • 事件循环:管理异步任务

示例:异步API调用

import aiohttp
import asyncio

async def fetch_data(url, params):
    """异步获取数据"""
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise Exception(f"API调用失败,状态码:{response.status}")

async def main():
    """主函数"""
    url = "https://api.example.com/data"
    params = {"key": "value"}
    
    try:
        data = await fetch_data(url, params)
        print(data)
    except Exception as e:
        print(f"错误:{e}")

# 运行异步函数
asyncio.run(main())

最佳实践

1. 工具设计的最佳实践

工具命名

  • 清晰明了:使用简洁、明确的名称
  • 使用动词:工具名称应该描述其执行的操作
  • 一致性:保持命名风格的一致性

工具描述

  • 详细准确:准确描述工具的功能和用途
  • 包含参数:说明工具需要什么参数
  • 包含返回值:说明工具返回什么
  • 使用中文:对于中文智能体,使用中文描述

参数设计

  • 简洁明确:参数名称应该简洁明了
  • 类型提示:使用类型提示,便于智能体理解
  • 默认值:为可选参数提供合理的默认值
  • 验证逻辑:添加参数验证逻辑

2. 工具实现的最佳实践

代码组织

  • 模块化:将工具代码组织成模块
  • 可读性:使用清晰的代码结构和命名
  • 注释:添加必要的注释,说明代码的功能

性能优化

  • 缓存:对于频繁调用的API,实现缓存机制
  • 异步:对于I/O密集型操作,使用异步执行
  • 批处理:支持批量操作,减少API调用次数

安全性

  • API密钥保护:安全存储API密钥
  • 输入验证:验证输入参数,防止注入攻击
  • 输出限制:限制输出的长度和内容

3. 工具测试的最佳实践

单元测试

  • 测试正常情况:测试工具在正常输入下的行为
  • 测试异常情况:测试工具在异常输入下的行为
  • 测试边界情况:测试工具在边界输入下的行为

集成测试

  • 测试与智能体的集成:确保智能体能够正确调用工具
  • 测试真实场景:在真实场景中测试工具的性能和可靠性
  • 测试错误处理:测试工具的错误处理能力

负载测试

  • 测试并发性能:测试工具在并发调用下的性能
  • 测试响应时间:测试工具的响应时间
  • 测试稳定性:测试工具在长时间运行下的稳定性

4. 工具文档的最佳实践

文档内容

  • 工具说明:说明工具的功能和用途
  • 参数说明:详细说明每个参数的含义和用法
  • 返回值说明:说明工具的返回值格式和含义
  • 使用示例:提供工具使用的示例
  • 常见问题:说明使用工具时可能遇到的问题和解决方案

文档格式

  • 清晰结构:使用清晰的文档结构
  • 代码示例:包含代码示例,便于理解
  • 版本信息:包含工具的版本信息
  • 更新日志:记录工具的更新历史

故障排除

1. API调用失败

症状:工具调用API时失败

原因

  • API密钥无效或过期
  • 网络连接问题
  • API服务不可用
  • 参数格式错误

解决方案

  • 检查API密钥是否有效
  • 检查网络连接是否正常
  • 检查API服务是否可用
  • 检查参数格式是否正确

2. 工具不被智能体调用

症状:智能体知道有工具,但不调用它

原因

  • 工具描述不清晰,智能体不理解工具的功能
  • 工具参数描述不准确,智能体不知道如何提供参数
  • 智能体的提示词模板不鼓励使用工具

解决方案

  • 优化工具描述,使其更清晰、更具体
  • 改进参数描述,明确说明参数的格式和示例
  • 调整智能体的提示词模板,鼓励使用工具

3. 工具执行结果不符合预期

症状:工具执行成功,但返回结果不符合预期

原因

  • API返回的数据格式发生变化
  • 参数传递错误
  • 结果处理逻辑错误
  • API返回的数据不完整

解决方案

  • 检查API返回的数据格式
  • 检查参数传递是否正确
  • 检查结果处理逻辑是否正确
  • 检查API返回的数据是否完整

4. 工具执行速度慢

症状:工具执行速度慢,影响智能体的响应速度

原因

  • API响应速度慢
  • 网络延迟高
  • 结果处理逻辑复杂
  • 缺少缓存机制

解决方案

  • 优化API调用,减少不必要的API请求
  • 实现缓存机制,避免重复计算
  • 优化结果处理逻辑
  • 考虑使用异步执行

总结与展望

自定义工具是智能体开发中的重要组成部分,它允许智能体与外部世界进行更丰富的交互。通过本集的学习,你已经掌握了:

  1. 自定义工具的开发流程:从需求分析到集成使用的完整流程
  2. API调用工具的实现:如何调用外部API,如天气查询API
  3. 高级技巧:如错误处理、重试机制、缓存和异步调用
  4. 最佳实践:工具设计、实现、测试和文档的最佳实践
  5. 故障排除:常见问题的解决方法

未来,自定义工具的发展趋势包括:

  • 更智能的工具:工具本身具有一定的智能,能够自动调整参数和处理复杂情况
  • 更丰富的集成:与更多的第三方服务和系统集成
  • 更标准化的接口:工具接口更加标准化,便于跨平台使用
  • 更安全的实现:内置更多的安全措施,如API密钥管理和输入验证

通过掌握自定义工具的开发,你可以为智能体赋予更强大的能力,使其能够处理更复杂的任务,与更多的外部系统和服务集成。

« 上一篇 内置工具集:计算器、维基百科、Shell等 下一篇 » 代码解释器工具:让智能体写代码并执行