第107集 requests库高级用法
学习目标
- 掌握requests库的高级认证方式
- 学会设置和使用代理
- 了解requests库的性能优化方法
- 掌握流式请求和响应处理
- 学会自定义请求和响应处理
- 了解异步请求的实现
- 掌握复杂场景下的错误处理
- 学习实际项目中的最佳实践
一、高级认证
1. HTTP Basic认证
HTTP Basic认证是一种简单的认证方式,将用户名和密码以Base64编码的形式发送到服务器。
import requests
from requests.auth import HTTPBasicAuth
# 方式1:使用auth参数
response = requests.get(
'https://httpbin.org/basic-auth/user/passwd',
auth=('user', 'passwd')
)
# 方式2:使用HTTPBasicAuth对象
response = requests.get(
'https://httpbin.org/basic-auth/user/passwd',
auth=HTTPBasicAuth('user', 'passwd')
)
print(response.json())2. HTTP Digest认证
HTTP Digest认证比Basic认证更安全,不会在网络上明文传输密码。
import requests
from requests.auth import HTTPDigestAuth
response = requests.get(
'https://httpbin.org/digest-auth/auth/user/passwd',
auth=HTTPDigestAuth('user', 'passwd')
)
print(response.json())3. OAuth认证
OAuth是一种常用的第三方认证方式,requests库支持OAuth 1.0和OAuth 2.0。
import requests
from requests_oauthlib import OAuth1, OAuth2Session
# OAuth 1.0
auth = OAuth1('client_key', 'client_secret', 'resource_owner_key', 'resource_owner_secret')
response = requests.get('https://api.twitter.com/1.1/account/verify_credentials.json', auth=auth)
# OAuth 2.0
access_token = 'your_access_token'
os = OAuth2Session('client_id', token={'access_token': access_token})
response = os.get('https://api.github.com/user')注意:使用OAuth 2.0需要安装requests-oauthlib库:pip install requests-oauthlib
二、代理设置
1. 基本代理设置
可以通过proxies参数设置HTTP和HTTPS代理。
import requests
# 定义代理
proxies = {
'http': 'http://10.10.1.10:3128',
'https': 'https://10.10.1.10:1080',
}
# 发送带代理的请求
response = requests.get('https://httpbin.org/ip', proxies=proxies)
print(response.json())2. 带认证的代理
如果代理需要认证,可以在代理URL中包含用户名和密码。
import requests
# 带认证的代理
proxies = {
'http': 'http://user:password@10.10.1.10:3128',
'https': 'https://user:password@10.10.1.10:1080',
}
response = requests.get('https://httpbin.org/ip', proxies=proxies)
print(response.json())3. SOCKS代理
requests库默认不支持SOCKS代理,需要安装第三方库。
import requests
# 安装依赖:pip install requests[socks]
# SOCKS5代理
proxies = {
'http': 'socks5://user:password@10.10.1.10:1080',
'https': 'socks5://user:password@10.10.1.10:1080',
}
response = requests.get('https://httpbin.org/ip', proxies=proxies)
print(response.json())三、性能优化
1. 连接池
requests库自动使用连接池管理TCP连接,默认的连接池大小是10。可以通过Session对象的adapter来调整连接池的大小。
import requests
from requests.adapters import HTTPAdapter
# 创建会话对象
session = requests.Session()
# 创建HTTP适配器,设置连接池大小为50
adapter = HTTPAdapter(pool_connections=50, pool_maxsize=50)
# 为http和https协议添加适配器
session.mount('http://', adapter)
session.mount('https://', adapter)
# 发送请求
response = session.get('https://httpbin.org/get')
print(response.json())2. 超时设置
合理设置超时时间可以提高程序的响应速度和稳定性。
import requests
# 设置连接超时和读取超时
try:
# 连接超时1秒,读取超时3秒
response = requests.get('https://httpbin.org/delay/2', timeout=(1, 3))
print(response.json())
except requests.exceptions.Timeout as e:
print(f"请求超时: {e}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")3. 响应缓存
可以使用第三方库来实现响应缓存,减少重复请求。
import requests
from requests_cache import CachedSession
# 创建缓存会话对象,缓存有效期1小时
session = CachedSession('demo_cache', expire_after=3600)
# 第一次请求会从网络获取
response1 = session.get('https://httpbin.org/get')
print(f"第一次请求耗时: {response1.elapsed.total_seconds()}秒")
# 第二次请求会从缓存获取
response2 = session.get('https://httpbin.org/get')
print(f"第二次请求耗时: {response2.elapsed.total_seconds()}秒")
print(f"是否使用缓存: {response2.from_cache}")注意:需要安装requests-cache库:pip install requests-cache
四、流式处理
1. 流式请求
当需要发送大文件或大量数据时,可以使用流式请求。
import requests
# 定义生成器函数,逐块读取文件
def file_chunk_generator(file_path, chunk_size=1024):
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# 发送流式请求
response = requests.post(
'https://httpbin.org/post',
data=file_chunk_generator('large_file.txt'),
headers={'Content-Type': 'application/octet-stream'}
)
print(response.json())2. 流式响应
当需要处理大文件或持续的数据流时,可以使用流式响应。
import requests
# 发送GET请求,启用流式响应
response = requests.get(
'https://httpbin.org/stream/20',
stream=True # 启用流式响应
)
# 逐行读取响应内容
print("开始读取流式响应:")
try:
for line in response.iter_lines():
if line:
print(f"接收到数据: {line.decode('utf-8')}")
finally:
# 确保关闭响应
response.close()3. 下载大文件
使用流式响应下载大文件可以节省内存。
import requests
# 下载大文件
def download_large_file(url, file_path):
response = requests.get(url, stream=True)
# 获取文件大小
file_size = int(response.headers.get('Content-Length', 0))
downloaded_size = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 计算并打印下载进度
progress = downloaded_size / file_size * 100 if file_size > 0 else 0
print(f"下载进度: {progress:.2f}% ({downloaded_size}/{file_size} 字节)", end='\r')
print(f"\n文件下载完成: {file_path}")
# 调用下载函数
download_large_file('https://www.example.com/large_file.zip', 'large_file.zip')五、自定义请求和响应处理
1. 自定义请求头
可以通过headers参数自定义请求头。
import requests
# 自定义请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Cache-Control': 'max-age=0',
}
response = requests.get('https://httpbin.org/headers', headers=headers)
print(response.json())2. 自定义Cookie
可以通过cookies参数自定义Cookie。
import requests
# 自定义Cookie
cookies = {
'user_id': '12345',
'session_id': 'abcdef123456'
}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
print(response.json())3. 自定义认证
可以通过继承AuthBase类来实现自定义认证。
import requests
from requests.auth import AuthBase
class CustomAuth(AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
# 在请求头中添加自定义认证信息
r.headers['X-Auth-Token'] = self.token
return r
# 使用自定义认证
response = requests.get('https://httpbin.org/headers', auth=CustomAuth('my_token_123'))
print(response.json())4. 响应钩子
可以使用hooks参数设置响应钩子,在获取响应后自动执行一些操作。
import requests
# 定义响应钩子函数
def print_status(response, *args, **kwargs):
print(f"响应状态码: {response.status_code}")
print(f"响应URL: {response.url}")
# 定义错误处理钩子函数
def handle_error(response, *args, **kwargs):
if response.status_code >= 400:
print(f"请求错误: {response.status_code}")
response.raise_for_status()
# 使用响应钩子
response = requests.get(
'https://httpbin.org/status/200',
hooks={'response': [print_status, handle_error]}
)
print(response.text)六、异步请求
虽然requests库本身不支持异步请求,但可以结合异步库如asyncio和aiohttp来实现。
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://httpbin.org/get?i=1',
'https://httpbin.org/get?i=2',
'https://httpbin.org/get?i=3',
'https://httpbin.org/get?i=4',
'https://httpbin.org/get?i=5'
]
async with aiohttp.ClientSession() as session:
# 并发执行多个请求
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
# 处理结果
for i, result in enumerate(results):
print(f"请求 {i+1} 结果长度: {len(result)} 字符")
# 运行异步主函数
if __name__ == '__main__':
asyncio.run(main())注意:需要安装aiohttp库:pip install aiohttp
七、复杂场景的错误处理
1. 重试机制
可以使用第三方库来实现自动重试功能。
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
# 配置重试策略
try_strategy = Retry(
total=3, # 总重试次数
status_forcelist=[429, 500, 502, 503, 504], # 需要重试的状态码
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"], # 允许重试的方法
backoff_factor=1, # 重试间隔因子
)
# 创建适配器
adapter = HTTPAdapter(max_retries=try_strategy)
# 创建会话并挂载适配器
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
# 发送请求
response = session.get("https://httpbin.org/status/500")
print(response.status_code)2. 异常层次结构
requests库的异常层次结构如下:
RequestException
├── HTTPError
├── ConnectionError
│ ├── ProxyError
│ ├── SSLError
│ └── ConnectTimeout
├── Timeout
│ └── ReadTimeout
├── TooManyRedirects
└── RequestException可以根据异常类型进行不同的处理:
import requests
from requests.exceptions import (
RequestException, HTTPError, ConnectionError,
Timeout, ReadTimeout, ConnectTimeout,
TooManyRedirects, SSLError
)
try:
response = requests.get('https://httpbin.org/status/500', timeout=(1, 3))
response.raise_for_status()
except HTTPError as e:
print(f"HTTP错误: {e.response.status_code} - {e.response.reason}")
except ConnectTimeout as e:
print(f"连接超时: 无法在指定时间内连接到服务器")
except ReadTimeout as e:
print(f"读取超时: 服务器在指定时间内未返回响应")
except SSLError as e:
print(f"SSL错误: SSL证书验证失败")
except TooManyRedirects as e:
print(f"重定向过多: 请求超过了最大重定向次数")
except ConnectionError as e:
print(f"连接错误: 无法建立连接")
except Timeout as e:
print(f"超时错误: 请求超时")
except RequestException as e:
print(f"请求异常: {e}")八、最佳实践
1. 使用会话对象
对于多个相关请求,使用会话对象可以提高性能并保持状态:
import requests
# 创建会话对象
session = requests.Session()
# 设置默认请求头
session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json'
})
# 登录
session.post('https://example.com/login', data={'username': 'user', 'password': 'pass'})
# 访问需要登录的页面
response = session.get('https://example.com/dashboard')
# 退出登录
session.post('https://example.com/logout')
# 关闭会话
session.close()2. 资源清理
使用上下文管理器确保资源正确清理:
import requests
# 使用上下文管理器
with requests.Session() as session:
response = session.get('https://httpbin.org/get')
print(response.json())
# 使用上下文管理器处理流式响应
with requests.get('https://httpbin.org/stream/5', stream=True) as response:
for line in response.iter_lines():
if line:
print(line.decode('utf-8'))3. 日志记录
使用日志记录请求和响应信息,便于调试和监控:
import requests
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def log_request_response(func):
"""记录请求和响应的装饰器"""
def wrapper(*args, **kwargs):
# 记录请求信息
method = args[0].upper() if len(args) > 0 and isinstance(args[0], str) else 'GET'
url = args[1] if len(args) > 1 else kwargs.get('url', '')
logger.info(f"发送请求: {method} {url}")
try:
# 发送请求
response = func(*args, **kwargs)
# 记录响应信息
logger.info(f"收到响应: {response.status_code} {response.url}")
return response
except Exception as e:
logger.error(f"请求失败: {e}")
raise
return wrapper
# 使用装饰器装饰requests方法
requests.get = log_request_response(requests.get)
requests.post = log_request_response(requests.post)
# 发送请求
response = requests.get('https://httpbin.org/get')
print(response.json())九、实际项目案例
1. 批量API调用
import requests
import time
from concurrent.futures import ThreadPoolExecutor
# API端点
API_ENDPOINT = 'https://api.example.com/users/{id}'
# 用户ID列表
user_ids = range(1, 100)
# 定义请求函数
def fetch_user(user_id):
url = API_ENDPOINT.format(id=user_id)
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"获取用户 {user_id} 失败: {e}")
return None
# 主函数
def main():
start_time = time.time()
# 使用线程池并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_user, user_ids))
# 处理结果
successful_results = [r for r in results if r is not None]
end_time = time.time()
print(f"总共请求: {len(user_ids)}")
print(f"成功: {len(successful_results)}")
print(f"失败: {len(user_ids) - len(successful_results)}")
print(f"总耗时: {end_time - start_time:.2f}秒")
if __name__ == '__main__':
main()2. 网站监控工具
import requests
import time
import smtplib
from email.mime.text import MIMEText
# 配置
WEBSITES = [
'https://www.example.com',
'https://www.google.com',
'https://www.python.org'
]
CHECK_INTERVAL = 60 # 检查间隔(秒)
# 邮件配置(示例)
EMAIL_CONFIG = {
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'username': 'your_email@example.com',
'password': 'your_password',
'from_email': 'your_email@example.com',
'to_email': 'alert@example.com'
}
# 发送告警邮件
def send_alert_email(website, status_code, error_message):
subject = f"网站监控告警: {website} 不可访问"
body = f"""
网站监控告警
网站: {website}
状态码: {status_code}
错误信息: {error_message}
时间: {time.strftime('%Y-%m-%d %H:%M:%S')}
"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = EMAIL_CONFIG['from_email']
msg['To'] = EMAIL_CONFIG['to_email']
try:
with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server:
server.starttls()
server.login(EMAIL_CONFIG['username'], EMAIL_CONFIG['password'])
server.send_message(msg)
print(f"告警邮件已发送: {website}")
except Exception as e:
print(f"发送邮件失败: {e}")
# 检查网站可用性
def check_website(website):
try:
response = requests.get(website, timeout=10)
response.raise_for_status()
print(f"{website} - 正常 ({response.status_code})")
return True
except Exception as e:
status_code = getattr(e.response, 'status_code', 'N/A') if hasattr(e, 'response') else 'N/A'
print(f"{website} - 异常 ({status_code}): {e}")
# 发送告警邮件
send_alert_email(website, status_code, str(e))
return False
# 主函数
def main():
print("网站监控工具启动")
print(f"监控网站: {WEBSITES}")
print(f"检查间隔: {CHECK_INTERVAL}秒")
print("按Ctrl+C停止监控")
try:
while True:
print(f"\n--- 检查开始 ({time.strftime('%Y-%m-%d %H:%M:%S')}) ---")
# 并发检查所有网站
with ThreadPoolExecutor(max_workers=len(WEBSITES)) as executor:
executor.map(check_website, WEBSITES)
print(f"--- 检查结束 ---\n")
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("\n网站监控工具已停止")
if __name__ == '__main__':
main()十、总结
本集我们学习了requests库的高级用法,包括:
- 高级认证方式(Basic、Digest、OAuth)
- 代理设置(HTTP、HTTPS、SOCKS)
- 性能优化(连接池、超时设置、响应缓存)
- 流式处理(流式请求和响应、大文件下载)
- 自定义请求和响应处理(请求头、Cookie、认证、钩子)
- 异步请求(结合asyncio和aiohttp)
- 复杂场景的错误处理(重试机制、异常层次结构)
- 最佳实践(会话对象、资源清理、日志记录)
- 实际项目案例(批量API调用、网站监控工具)
requests库是Python中最强大的HTTP客户端库之一,掌握其高级用法可以帮助我们更高效地开发网络应用和API客户端。在实际项目中,我们应该根据具体需求选择合适的技术和策略,以确保代码的可靠性、性能和可维护性。
练习题
- 实现一个支持Basic认证和Digest认证的HTTP客户端。
- 编写一个程序,使用代理服务器发送请求并获取代理的IP地址。
- 实现一个大文件下载器,支持断点续传和进度显示。
- 使用requests库结合多线程或异步IO并发调用API。
- 编写一个网站监控工具,定期检查网站可用性并发送告警。
- 实现一个自定义的认证类,在请求头中添加API密钥。
- 使用响应缓存减少重复请求,提高程序性能。
- 编写一个错误处理机制,对不同类型的异常进行不同的处理。