第107集 requests库高级用法

学习目标

  • 掌握requests库的高级认证方式
  • 学会设置和使用代理
  • 了解requests库的性能优化方法
  • 掌握流式请求和响应处理
  • 学会自定义请求和响应处理
  • 了解异步请求的实现
  • 掌握复杂场景下的错误处理
  • 学习实际项目中的最佳实践

一、高级认证

1. HTTP Basic认证

HTTP Basic认证是一种简单的认证方式,将用户名和密码以Base64编码的形式发送到服务器。

import requests
from requests.auth import HTTPBasicAuth

# 方式1:使用auth参数
response = requests.get(
    'https://httpbin.org/basic-auth/user/passwd',
    auth=('user', 'passwd')
)

# 方式2:使用HTTPBasicAuth对象
response = requests.get(
    'https://httpbin.org/basic-auth/user/passwd',
    auth=HTTPBasicAuth('user', 'passwd')
)

print(response.json())

2. HTTP Digest认证

HTTP Digest认证比Basic认证更安全,不会在网络上明文传输密码。

import requests
from requests.auth import HTTPDigestAuth

response = requests.get(
    'https://httpbin.org/digest-auth/auth/user/passwd',
    auth=HTTPDigestAuth('user', 'passwd')
)

print(response.json())

3. OAuth认证

OAuth是一种常用的第三方认证方式,requests库支持OAuth 1.0和OAuth 2.0。

import requests
from requests_oauthlib import OAuth1, OAuth2Session

# OAuth 1.0
auth = OAuth1('client_key', 'client_secret', 'resource_owner_key', 'resource_owner_secret')
response = requests.get('https://api.twitter.com/1.1/account/verify_credentials.json', auth=auth)

# OAuth 2.0
access_token = 'your_access_token'
os = OAuth2Session('client_id', token={'access_token': access_token})
response = os.get('https://api.github.com/user')

注意:使用OAuth 2.0需要安装requests-oauthlib库:pip install requests-oauthlib

二、代理设置

1. 基本代理设置

可以通过proxies参数设置HTTP和HTTPS代理。

import requests

# 定义代理
proxies = {
    'http': 'http://10.10.1.10:3128',
    'https': 'https://10.10.1.10:1080',
}

# 发送带代理的请求
response = requests.get('https://httpbin.org/ip', proxies=proxies)

print(response.json())

2. 带认证的代理

如果代理需要认证,可以在代理URL中包含用户名和密码。

import requests

# 带认证的代理
proxies = {
    'http': 'http://user:password@10.10.1.10:3128',
    'https': 'https://user:password@10.10.1.10:1080',
}

response = requests.get('https://httpbin.org/ip', proxies=proxies)

print(response.json())

3. SOCKS代理

requests库默认不支持SOCKS代理,需要安装第三方库。

import requests

# 安装依赖:pip install requests[socks]

# SOCKS5代理
proxies = {
    'http': 'socks5://user:password@10.10.1.10:1080',
    'https': 'socks5://user:password@10.10.1.10:1080',
}

response = requests.get('https://httpbin.org/ip', proxies=proxies)

print(response.json())

三、性能优化

1. 连接池

requests库自动使用连接池管理TCP连接,默认的连接池大小是10。可以通过Session对象的adapter来调整连接池的大小。

import requests
from requests.adapters import HTTPAdapter

# 创建会话对象
session = requests.Session()

# 创建HTTP适配器,设置连接池大小为50
adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50)

# 为http和https协议添加适配器
session.mount('http://', adapter)
session.mount('https://', adapter)

# 发送请求
response = session.get('https://httpbin.org/get')

print(response.json())

2. 超时设置

合理设置超时时间可以提高程序的响应速度和稳定性。

import requests

# 设置连接超时和读取超时
try:
    # 连接超时1秒,读取超时3秒
    response = requests.get('https://httpbin.org/delay/2', timeout=(1, 3))
    print(response.json())
except requests.exceptions.Timeout as e:
    print(f"请求超时: {e}")
except requests.exceptions.RequestException as e:
    print(f"请求异常: {e}")

3. 响应缓存

可以使用第三方库来实现响应缓存,减少重复请求。

import requests
from requests_cache import CachedSession

# 创建缓存会话对象,缓存有效期1小时
session = CachedSession('demo_cache', expire_after=3600)

# 第一次请求会从网络获取
response1 = session.get('https://httpbin.org/get')
print(f"第一次请求耗时: {response1.elapsed.total_seconds()}秒")

# 第二次请求会从缓存获取
response2 = session.get('https://httpbin.org/get')
print(f"第二次请求耗时: {response2.elapsed.total_seconds()}秒")
print(f"是否使用缓存: {response2.from_cache}")

注意:需要安装requests-cache库:pip install requests-cache

四、流式处理

1. 流式请求

当需要发送大文件或大量数据时,可以使用流式请求。

import requests

# 定义生成器函数,逐块读取文件
def file_chunk_generator(file_path, chunk_size=1024):
    with open(file_path, 'rb') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

# 发送流式请求
response = requests.post(
    'https://httpbin.org/post',
    data=file_chunk_generator('large_file.txt'),
    headers={'Content-Type': 'application/octet-stream'}
)

print(response.json())

2. 流式响应

当需要处理大文件或持续的数据流时,可以使用流式响应。

import requests

# 发送GET请求,启用流式响应
response = requests.get(
    'https://httpbin.org/stream/20',
    stream=True  # 启用流式响应
)

# 逐行读取响应内容
print("开始读取流式响应:")
try:
    for line in response.iter_lines():
        if line:
            print(f"接收到数据: {line.decode('utf-8')}")
finally:
    # 确保关闭响应
    response.close()

3. 下载大文件

使用流式响应下载大文件可以节省内存。

import requests

# 下载大文件
def download_large_file(url, file_path):
    response = requests.get(url, stream=True)
    
    # 获取文件大小
    file_size = int(response.headers.get('Content-Length', 0))
    downloaded_size = 0
    
    with open(file_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)
                downloaded_size += len(chunk)
                
                # 计算并打印下载进度
                progress = downloaded_size / file_size * 100 if file_size > 0 else 0
                print(f"下载进度: {progress:.2f}% ({downloaded_size}/{file_size} 字节)", end='\r')
    
    print(f"\n文件下载完成: {file_path}")

# 调用下载函数
download_large_file('https://www.example.com/large_file.zip', 'large_file.zip')

五、自定义请求和响应处理

1. 自定义请求头

可以通过headers参数自定义请求头。

import requests

# 自定义请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
    'Accept-Encoding': 'gzip, deflate, br',
    'Connection': 'keep-alive',
    'Upgrade-Insecure-Requests': '1',
    'Cache-Control': 'max-age=0',
}

response = requests.get('https://httpbin.org/headers', headers=headers)

print(response.json())

2. 自定义Cookie

可以通过cookies参数自定义Cookie。

import requests

# 自定义Cookie
cookies = {
    'user_id': '12345',
    'session_id': 'abcdef123456'
}

response = requests.get('https://httpbin.org/cookies', cookies=cookies)

print(response.json())

3. 自定义认证

可以通过继承AuthBase类来实现自定义认证。

import requests
from requests.auth import AuthBase

class CustomAuth(AuthBase):
    def __init__(self, token):
        self.token = token
    
    def __call__(self, r):
        # 在请求头中添加自定义认证信息
        r.headers['X-Auth-Token'] = self.token
        return r

# 使用自定义认证
response = requests.get('https://httpbin.org/headers', auth=CustomAuth('my_token_123'))

print(response.json())

4. 响应钩子

可以使用hooks参数设置响应钩子,在获取响应后自动执行一些操作。

import requests

# 定义响应钩子函数
def print_status(response, *args, **kwargs):
    print(f"响应状态码: {response.status_code}")
    print(f"响应URL: {response.url}")
    
# 定义错误处理钩子函数
def handle_error(response, *args, **kwargs):
    if response.status_code >= 400:
        print(f"请求错误: {response.status_code}")
        response.raise_for_status()

# 使用响应钩子
response = requests.get(
    'https://httpbin.org/status/200',
    hooks={'response': [print_status, handle_error]}
)

print(response.text)

六、异步请求

虽然requests库本身不支持异步请求,但可以结合异步库如asyncio和aiohttp来实现。

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'https://httpbin.org/get?i=1',
        'https://httpbin.org/get?i=2',
        'https://httpbin.org/get?i=3',
        'https://httpbin.org/get?i=4',
        'https://httpbin.org/get?i=5'
    ]
    
    async with aiohttp.ClientSession() as session:
        # 并发执行多个请求
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        # 处理结果
        for i, result in enumerate(results):
            print(f"请求 {i+1} 结果长度: {len(result)} 字符")

# 运行异步主函数
if __name__ == '__main__':
    asyncio.run(main())

注意:需要安装aiohttp库:pip install aiohttp

七、复杂场景的错误处理

1. 重试机制

可以使用第三方库来实现自动重试功能。

import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

# 配置重试策略
try_strategy = Retry(
    total=3,  # 总重试次数
    status_forcelist=[429, 500, 502, 503, 504],  # 需要重试的状态码
    allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"],  # 允许重试的方法
    backoff_factor=1,  # 重试间隔因子
)

# 创建适配器
adapter = HTTPAdapter(max_retries=try_strategy)

# 创建会话并挂载适配器
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)

# 发送请求
response = session.get("https://httpbin.org/status/500")

print(response.status_code)

2. 异常层次结构

requests库的异常层次结构如下:

RequestException
├── HTTPError
├── ConnectionError
│   ├── ProxyError
│   ├── SSLError
│   └── ConnectTimeout
├── Timeout
│   └── ReadTimeout
├── TooManyRedirects
└── RequestException

可以根据异常类型进行不同的处理:

import requests
from requests.exceptions import (
    RequestException, HTTPError, ConnectionError, 
    Timeout, ReadTimeout, ConnectTimeout,
    TooManyRedirects, SSLError
)

try:
    response = requests.get('https://httpbin.org/status/500', timeout=(1, 3))
    response.raise_for_status()
except HTTPError as e:
    print(f"HTTP错误: {e.response.status_code} - {e.response.reason}")
except ConnectTimeout as e:
    print(f"连接超时: 无法在指定时间内连接到服务器")
except ReadTimeout as e:
    print(f"读取超时: 服务器在指定时间内未返回响应")
except SSLError as e:
    print(f"SSL错误: SSL证书验证失败")
except TooManyRedirects as e:
    print(f"重定向过多: 请求超过了最大重定向次数")
except ConnectionError as e:
    print(f"连接错误: 无法建立连接")
except Timeout as e:
    print(f"超时错误: 请求超时")
except RequestException as e:
    print(f"请求异常: {e}")

八、最佳实践

1. 使用会话对象

对于多个相关请求,使用会话对象可以提高性能并保持状态:

import requests

# 创建会话对象
session = requests.Session()

# 设置默认请求头
session.headers.update({
    'User-Agent': 'MyApp/1.0',
    'Accept': 'application/json'
})

# 登录
session.post('https://example.com/login', data={'username': 'user', 'password': 'pass'})

# 访问需要登录的页面
response = session.get('https://example.com/dashboard')

# 退出登录
session.post('https://example.com/logout')

# 关闭会话
session.close()

2. 资源清理

使用上下文管理器确保资源正确清理:

import requests

# 使用上下文管理器
with requests.Session() as session:
    response = session.get('https://httpbin.org/get')
    print(response.json())

# 使用上下文管理器处理流式响应
with requests.get('https://httpbin.org/stream/5', stream=True) as response:
    for line in response.iter_lines():
        if line:
            print(line.decode('utf-8'))

3. 日志记录

使用日志记录请求和响应信息,便于调试和监控:

import requests
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def log_request_response(func):
    """记录请求和响应的装饰器"""
    def wrapper(*args, **kwargs):
        # 记录请求信息
        method = args[0].upper() if len(args) > 0 and isinstance(args[0], str) else 'GET'
        url = args[1] if len(args) > 1 else kwargs.get('url', '')
        logger.info(f"发送请求: {method} {url}")
        
        try:
            # 发送请求
            response = func(*args, **kwargs)
            
            # 记录响应信息
            logger.info(f"收到响应: {response.status_code} {response.url}")
            
            return response
        except Exception as e:
            logger.error(f"请求失败: {e}")
            raise
    
    return wrapper

# 使用装饰器装饰requests方法
requests.get = log_request_response(requests.get)
requests.post = log_request_response(requests.post)

# 发送请求
response = requests.get('https://httpbin.org/get')
print(response.json())

九、实际项目案例

1. 批量API调用

import requests
import time
from concurrent.futures import ThreadPoolExecutor

# API端点
API_ENDPOINT = 'https://api.example.com/users/{id}'

# 用户ID列表
user_ids = range(1, 100)

# 定义请求函数
def fetch_user(user_id):
    url = API_ENDPOINT.format(id=user_id)
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print(f"获取用户 {user_id} 失败: {e}")
        return None

# 主函数
def main():
    start_time = time.time()
    
    # 使用线程池并发请求
    with ThreadPoolExecutor(max_workers=10) as executor:
        results = list(executor.map(fetch_user, user_ids))
    
    # 处理结果
    successful_results = [r for r in results if r is not None]
    
    end_time = time.time()
    
    print(f"总共请求: {len(user_ids)}")
    print(f"成功: {len(successful_results)}")
    print(f"失败: {len(user_ids) - len(successful_results)}")
    print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
    main()

2. 网站监控工具

import requests
import time
import smtplib
from email.mime.text import MIMEText

# 配置
WEBSITES = [
    'https://www.example.com',
    'https://www.google.com',
    'https://www.python.org'
]

CHECK_INTERVAL = 60  # 检查间隔(秒)

# 邮件配置(示例)
EMAIL_CONFIG = {
    'smtp_server': 'smtp.example.com',
    'smtp_port': 587,
    'username': 'your_email@example.com',
    'password': 'your_password',
    'from_email': 'your_email@example.com',
    'to_email': 'alert@example.com'
}

# 发送告警邮件
def send_alert_email(website, status_code, error_message):
    subject = f"网站监控告警: {website} 不可访问"
    body = f"""
    网站监控告警
    
    网站: {website}
    状态码: {status_code}
    错误信息: {error_message}
    时间: {time.strftime('%Y-%m-%d %H:%M:%S')}
    """
    
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = EMAIL_CONFIG['from_email']
    msg['To'] = EMAIL_CONFIG['to_email']
    
    try:
        with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server:
            server.starttls()
            server.login(EMAIL_CONFIG['username'], EMAIL_CONFIG['password'])
            server.send_message(msg)
        print(f"告警邮件已发送: {website}")
    except Exception as e:
        print(f"发送邮件失败: {e}")

# 检查网站可用性
def check_website(website):
    try:
        response = requests.get(website, timeout=10)
        response.raise_for_status()
        print(f"{website} - 正常 ({response.status_code})")
        return True
    except Exception as e:
        status_code = getattr(e.response, 'status_code', 'N/A') if hasattr(e, 'response') else 'N/A'
        print(f"{website} - 异常 ({status_code}): {e}")
        # 发送告警邮件
        send_alert_email(website, status_code, str(e))
        return False

# 主函数
def main():
    print("网站监控工具启动")
    print(f"监控网站: {WEBSITES}")
    print(f"检查间隔: {CHECK_INTERVAL}秒")
    print("按Ctrl+C停止监控")
    
    try:
        while True:
            print(f"\n--- 检查开始 ({time.strftime('%Y-%m-%d %H:%M:%S')}) ---")
            
            # 并发检查所有网站
            with ThreadPoolExecutor(max_workers=len(WEBSITES)) as executor:
                executor.map(check_website, WEBSITES)
            
            print(f"--- 检查结束 ---\n")
            time.sleep(CHECK_INTERVAL)
    
    except KeyboardInterrupt:
        print("\n网站监控工具已停止")

if __name__ == '__main__':
    main()

十、总结

本集我们学习了requests库的高级用法,包括:

  • 高级认证方式(Basic、Digest、OAuth)
  • 代理设置(HTTP、HTTPS、SOCKS)
  • 性能优化(连接池、超时设置、响应缓存)
  • 流式处理(流式请求和响应、大文件下载)
  • 自定义请求和响应处理(请求头、Cookie、认证、钩子)
  • 异步请求(结合asyncio和aiohttp)
  • 复杂场景的错误处理(重试机制、异常层次结构)
  • 最佳实践(会话对象、资源清理、日志记录)
  • 实际项目案例(批量API调用、网站监控工具)

requests库是Python中最强大的HTTP客户端库之一,掌握其高级用法可以帮助我们更高效地开发网络应用和API客户端。在实际项目中,我们应该根据具体需求选择合适的技术和策略,以确保代码的可靠性、性能和可维护性。

练习题

  1. 实现一个支持Basic认证和Digest认证的HTTP客户端。
  2. 编写一个程序,使用代理服务器发送请求并获取代理的IP地址。
  3. 实现一个大文件下载器,支持断点续传和进度显示。
  4. 使用requests库结合多线程或异步IO并发调用API。
  5. 编写一个网站监控工具,定期检查网站可用性并发送告警。
  6. 实现一个自定义的认证类,在请求头中添加API密钥。
  7. 使用响应缓存减少重复请求,提高程序性能。
  8. 编写一个错误处理机制,对不同类型的异常进行不同的处理。
« 上一篇 requests库基础 下一篇 » 网页解析基础