第160集_爬虫项目实战
一、项目需求分析
1. 项目背景
在互联网时代,获取和分析网络数据已经成为许多行业的重要需求。本项目将实战开发一个完整的爬虫系统,用于从电商网站获取商品数据,展示如何构建一个功能完善、稳定可靠的爬虫应用。
2. 项目目标
- 爬取电商网站的商品信息(名称、价格、评分、销量等)
- 实现数据的存储和去重
- 添加反爬虫机制应对网站限制
- 实现定时爬取功能
- 生成数据可视化报告
3. 技术栈选择
- 爬虫框架:Scrapy
- 数据存储:MongoDB + CSV
- 反爬虫策略:User-Agent轮换、IP代理、随机延迟
- 数据可视化:Matplotlib + Pandas
- 定时任务:APScheduler
二、项目架构设计
1. 系统架构图
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 数据采集层 │ │ 数据处理层 │ │ 数据应用层 │
│ (Scrapy爬虫) │───>│ (数据清洗去重) │───>│ (存储与可视化) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
▲ ▲ ▲
│ │ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 反爬虫策略 │ │ 去重与验证 │ │ 定时任务调度 │
│ (UA/IP/延迟) │ │ (MD5/数据库) │ │ (APScheduler) │
└─────────────────┘ └─────────────────┘ └─────────────────┘2. 项目目录结构
ecommerce_crawler/
├── ecommerce_crawler/
│ ├── __init__.py
│ ├── items.py # 定义数据结构
│ ├── middlewares.py # 中间件(反爬虫)
│ ├── pipelines.py # 数据管道(存储)
│ ├── settings.py # 配置文件
│ └── spiders/
│ ├── __init__.py
│ └── product_spider.py # 商品爬虫
├── data/ # 数据存储目录
├── logs/ # 日志目录
├── reports/ # 报告目录
├── requirements.txt # 依赖列表
└── run_crawler.py # 运行脚本三、项目实现步骤
1. 创建Scrapy项目
# 创建项目
scrapy startproject ecommerce_crawler
# 进入项目目录
cd ecommerce_crawler
# 创建爬虫
bash
scrapy genspider product_spider example.com2. 定义数据结构(items.py)
import scrapy
class ProductItem(scrapy.Item):
"""商品数据结构"""
product_id = scrapy.Field() # 商品ID
name = scrapy.Field() # 商品名称
price = scrapy.Field() # 商品价格
original_price = scrapy.Field() # 原价
discount = scrapy.Field() # 折扣
rating = scrapy.Field() # 评分
review_count = scrapy.Field() # 评论数
sales_count = scrapy.Field() # 销量
category = scrapy.Field() # 分类
subcategory = scrapy.Field() # 子分类
brand = scrapy.Field() # 品牌
image_url = scrapy.Field() # 商品图片
product_url = scrapy.Field() # 商品详情页URL
description = scrapy.Field() # 商品描述
specifications = scrapy.Field() # 商品规格
crawl_time = scrapy.Field() # 爬取时间
md5_hash = scrapy.Field() # 数据去重标识3. 配置反爬虫中间件(middlewares.py)
import random
import time
from scrapy import signals
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message
class RandomUserAgentMiddleware(UserAgentMiddleware):
"""随机User-Agent中间件"""
def __init__(self, user_agent_list):
self.user_agent_list = user_agent_list
@classmethod
def from_crawler(cls, crawler):
agent_list = crawler.settings.getlist('USER_AGENT_LIST')
return cls(agent_list)
def process_request(self, request, spider):
request.headers['User-Agent'] = random.choice(self.user_agent_list)
class RandomDelayMiddleware:
"""随机延迟中间件"""
def __init__(self, delay_range):
self.delay_range = delay_range
@classmethod
def from_crawler(cls, crawler):
delay_range = crawler.settings.get('RANDOM_DELAY_RANGE', (1, 3))
return cls(delay_range)
def process_request(self, request, spider):
time.sleep(random.uniform(*self.delay_range))
class ProxyMiddleware:
"""IP代理中间件"""
def __init__(self, proxy_list):
self.proxy_list = proxy_list
@classmethod
def from_crawler(cls, crawler):
proxy_list = crawler.settings.getlist('PROXY_LIST')
return cls(proxy_list)
def process_request(self, request, spider):
if self.proxy_list:
proxy = random.choice(self.proxy_list)
request.meta['proxy'] = proxy
class CustomRetryMiddleware(RetryMiddleware):
"""自定义重试中间件"""
def process_response(self, request, response, spider):
if request.meta.get('dont_retry', False):
return response
if response.status in self.retry_http_codes:
spider.logger.warning(f"Retrying {request.url} (status: {response.status})")
return self._retry(request, response_status_message(response.status), spider)
return response4. 实现数据管道(pipelines.py)
import hashlib
import pymongo
import csv
import os
from datetime import datetime
from scrapy.exceptions import DropItem
class DuplicateItemPipeline:
"""去重管道"""
def __init__(self):
self.item_hashes = set()
def process_item(self, item, spider):
# 计算数据的MD5哈希值
item_str = str(dict(item)).encode('utf-8')
item_hash = hashlib.md5(item_str).hexdigest()
item['md5_hash'] = item_hash
if item_hash in self.item_hashes:
raise DropItem(f"重复数据: {item['name']}")
self.item_hashes.add(item_hash)
return item
class MongoDBPipeline:
"""MongoDB存储管道"""
collection_name = 'products'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
# 使用商品ID作为查询条件,存在则更新,不存在则插入
self.db[self.collection_name].update_one(
{'product_id': item.get('product_id')},
{'$set': dict(item)},
upsert=True
)
return item
class CSVPipeline:
"""CSV存储管道"""
def __init__(self):
self.csv_file = None
self.csv_writer = None
def open_spider(self, spider):
# 创建数据目录
data_dir = '../data'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
# 创建CSV文件
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'products_{timestamp}.csv'
file_path = os.path.join(data_dir, filename)
self.csv_file = open(file_path, 'w', newline='', encoding='utf-8-sig')
# 定义CSV字段
fields = ['product_id', 'name', 'price', 'original_price', 'rating',
'review_count', 'sales_count', 'category', 'brand', 'product_url']
self.csv_writer = csv.DictWriter(self.csv_file, fieldnames=fields)
self.csv_writer.writeheader()
def close_spider(self, spider):
self.csv_file.close()
def process_item(self, item, spider):
# 只写入部分关键字段
csv_item = {
'product_id': item.get('product_id', ''),
'name': item.get('name', ''),
'price': item.get('price', ''),
'original_price': item.get('original_price', ''),
'rating': item.get('rating', ''),
'review_count': item.get('review_count', ''),
'sales_count': item.get('sales_count', ''),
'category': item.get('category', ''),
'brand': item.get('brand', ''),
'product_url': item.get('product_url', '')
}
self.csv_writer.writerow(csv_item)
return item5. 配置爬虫设置(settings.py)
# Scrapy settings for ecommerce_crawler project
BOT_NAME = 'ecommerce_crawler'
SPIDER_MODULES = ['ecommerce_crawler.spiders']
NEWSPIDER_MODULE = 'ecommerce_crawler.spiders'
# 爬虫并发设置
CONCURRENT_REQUESTS = 4
DOWNLOAD_DELAY = 2
CONCURRENT_REQUESTS_PER_DOMAIN = 4
CONCURRENT_REQUESTS_PER_IP = 4
# 爬虫反爬设置
ROBOTSTXT_OBEY = False
DOWNLOAD_TIMEOUT = 10
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]
# User-Agent列表
USER_AGENT_LIST = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'
]
# 代理列表(示例)
PROXY_LIST = [
# 'http://username:password@proxy:port',
# 'https://username:password@proxy:port'
]
# 随机延迟范围(秒)
RANDOM_DELAY_RANGE = (1, 3)
# 中间件配置
DOWNLOADER_MIDDLEWARES = {
'ecommerce_crawler.middlewares.RandomUserAgentMiddleware': 543,
'ecommerce_crawler.middlewares.RandomDelayMiddleware': 544,
'ecommerce_crawler.middlewares.ProxyMiddleware': 545,
'ecommerce_crawler.middlewares.CustomRetryMiddleware': 546,
'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
'scrapy.downloadermiddlewares.retry.RetryMiddleware': None,
}
# 管道配置
ITEM_PIPELINES = {
'ecommerce_crawler.pipelines.DuplicateItemPipeline': 300,
'ecommerce_crawler.pipelines.MongoDBPipeline': 400,
'ecommerce_crawler.pipelines.CSVPipeline': 500,
}
# MongoDB配置
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'ecommerce_db'
# 日志配置
LOG_ENABLED = True
LOG_LEVEL = 'INFO'
LOG_FILE = '../logs/crawler.log'
# 图片下载配置(可选)
# IMAGES_STORE = '../images'
# ITEM_PIPELINES.update({
# 'scrapy.pipelines.images.ImagesPipeline': 1,
# })6. 实现商品爬虫(product_spider.py)
import scrapy
from datetime import datetime
from ecommerce_crawler.items import ProductItem
class ProductSpider(scrapy.Spider):
name = 'product_spider'
allowed_domains = ['example.com'] # 替换为实际域名
start_urls = ['https://example.com/category/products'] # 替换为实际起始URL
def parse(self, response):
"""解析商品列表页"""
self.logger.info(f"爬取商品列表页: {response.url}")
# 提取商品链接(根据实际网页结构调整XPath)
product_links = response.xpath('//div[@class="product-item"]//a[@class="product-link"]/@href').getall()
for link in product_links:
# 构建完整URL
product_url = response.urljoin(link)
yield scrapy.Request(
url=product_url,
callback=self.parse_product_detail,
meta={'product_url': product_url}
)
# 提取下一页链接(根据实际网页结构调整XPath)
next_page = response.xpath('//a[@class="next-page"]/@href').get()
if next_page:
next_page_url = response.urljoin(next_page)
self.logger.info(f"爬取下一页: {next_page_url}")
yield scrapy.Request(
url=next_page_url,
callback=self.parse
)
def parse_product_detail(self, response):
"""解析商品详情页"""
self.logger.info(f"爬取商品详情页: {response.url}")
# 创建商品Item
item = ProductItem()
# 提取商品信息(根据实际网页结构调整XPath)
item['product_id'] = response.xpath('//input[@name="productId"]/@value').get()
item['name'] = response.xpath('//h1[@class="product-name"]/text()').get().strip()
item['price'] = response.xpath('//span[@class="current-price"]/text()').get().strip()
item['original_price'] = response.xpath('//span[@class="original-price"]/text()').get(default='').strip()
item['rating'] = response.xpath('//div[@class="product-rating"]/span[@class="rating-score"]/text()').get()
item['review_count'] = response.xpath('//span[@class="review-count"]/text()').get()
item['sales_count'] = response.xpath('//span[@class="sales-count"]/text()').get()
item['category'] = response.xpath('//div[@class="breadcrumb"]/a[2]/text()').get().strip()
item['subcategory'] = response.xpath('//div[@class="breadcrumb"]/a[3]/text()').get().strip()
item['brand'] = response.xpath('//div[@class="product-brand"]/a/text()').get().strip()
item['image_url'] = response.xpath('//div[@class="product-main-image"]/img/@src').get()
item['product_url'] = response.meta['product_url']
item['description'] = ' '.join(response.xpath('//div[@class="product-description"]//text()').getall()).strip()
# 提取商品规格
specifications = {}
spec_rows = response.xpath('//table[@class="product-specs"]/tr')
for row in spec_rows:
key = row.xpath('./th/text()').get().strip()
value = row.xpath('./td/text()').get().strip()
specifications[key] = value
item['specifications'] = specifications
# 设置爬取时间
item['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
yield item7. 创建运行脚本(run_crawler.py)
import os
import sys
import logging
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from apscheduler.schedulers.blocking import BlockingScheduler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('../logs/scheduler.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def run_crawler():
"""运行爬虫"""
logger.info("开始执行爬虫任务...")
# 获取项目设置
settings = get_project_settings()
# 创建爬虫进程
process = CrawlerProcess(settings)
# 启动爬虫
process.crawl('product_spider')
process.start() # 阻塞直到爬虫完成
logger.info("爬虫任务执行完成!")
def run_visualization():
"""运行数据可视化"""
logger.info("开始生成数据可视化报告...")
# 导入可视化模块
import pandas as pd
import matplotlib.pyplot as plt
from pymongo import MongoClient
# 连接MongoDB
client = MongoClient('mongodb://localhost:27017')
db = client['ecommerce_db']
collection = db['products']
# 查询数据
products = list(collection.find())
if not products:
logger.warning("没有数据可可视化")
return
# 转换为DataFrame
df = pd.DataFrame(products)
# 数据预处理
df['price'] = df['price'].str.extract(r'([\d.]+)').astype(float)
df['rating'] = df['rating'].astype(float)
df['review_count'] = df['review_count'].str.extract(r'([\d,]+)').str.replace(',', '').astype(int)
# 创建报告目录
reports_dir = '../reports'
if not os.path.exists(reports_dir):
os.makedirs(reports_dir)
# 1. 价格分布直方图
plt.figure(figsize=(10, 6))
df['price'].hist(bins=20)
plt.title('商品价格分布')
plt.xlabel('价格')
plt.ylabel('数量')
plt.savefig(os.path.join(reports_dir, 'price_distribution.png'))
plt.close()
# 2. 评分与评论数散点图
plt.figure(figsize=(10, 6))
plt.scatter(df['rating'], df['review_count'], alpha=0.5)
plt.title('商品评分与评论数关系')
plt.xlabel('评分')
plt.ylabel('评论数')
plt.savefig(os.path.join(reports_dir, 'rating_vs_reviews.png'))
plt.close()
# 3. 分类商品数量条形图
plt.figure(figsize=(12, 6))
category_counts = df['category'].value_counts()
category_counts.plot(kind='bar')
plt.title('各分类商品数量')
plt.xlabel('分类')
plt.ylabel('数量')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(reports_dir, 'category_counts.png'))
plt.close()
logger.info("数据可视化报告生成完成!")
if __name__ == '__main__':
# 命令行参数处理
if len(sys.argv) > 1:
if sys.argv[1] == 'run':
# 直接运行爬虫
run_crawler()
elif sys.argv[1] == 'visualize':
# 生成可视化报告
run_visualization()
elif sys.argv[1] == 'all':
# 运行爬虫并生成报告
run_crawler()
run_visualization()
else:
logger.error(f"未知命令: {sys.argv[1]}")
sys.exit(1)
else:
# 默认运行爬虫
run_crawler()8. 创建定时任务脚本(schedule_crawler.py)
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import run_crawler
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/scheduler.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def main():
"""主函数"""
logger.info("启动爬虫定时任务调度器...")
# 创建调度器
scheduler = BlockingScheduler()
# 添加定时任务
# 每天上午9点运行爬虫
scheduler.add_job(
func=run_crawler.run_crawler,
trigger=CronTrigger(hour=9, minute=0),
id='daily_crawl',
name='每日商品数据爬取',
replace_existing=True
)
# 每周日下午3点生成可视化报告
scheduler.add_job(
func=run_crawler.run_visualization,
trigger=CronTrigger(day_of_week=6, hour=15, minute=0),
id='weekly_report',
name='每周数据可视化报告',
replace_existing=True
)
# 启动调度器
try:
scheduler.start()
except KeyboardInterrupt:
logger.info("停止爬虫定时任务调度器...")
scheduler.shutdown()
if __name__ == '__main__':
main()四、项目依赖与部署
1. 安装依赖(requirements.txt)
scrapy>=2.6.0
pymongo>=4.1.0
pandas>=1.3.0
matplotlib>=3.4.0
apscheduler>=3.9.0安装命令:
pip install -r requirements.txt2. 部署步骤
环境准备:
- 安装Python 3.7+
- 安装MongoDB(可选,用于数据存储)
- 安装依赖包
配置修改:
- 根据实际需求修改
settings.py中的配置 - 调整爬虫代码中的XPath选择器以匹配目标网站结构
- 根据实际需求修改
运行项目:
# 直接运行爬虫 python run_crawler.py # 运行爬虫并生成报告 python run_crawler.py all # 生成可视化报告 python run_crawler.py visualize # 启动定时任务 python schedule_crawler.py
3. 项目维护
- 定期更新:根据目标网站结构变化,定期更新XPath选择器
- 日志监控:定期查看日志文件,排查爬虫运行中的问题
- 性能优化:根据实际运行情况,调整并发数、延迟时间等参数
- 数据备份:定期备份MongoDB数据和CSV文件
五、项目扩展功能
- 分布式爬虫:使用Scrapy-Redis实现分布式爬取,提高爬取效率
- 数据清洗:添加更复杂的数据清洗和标准化流程
- 情感分析:对商品评论进行情感分析,了解用户反馈
- 价格监控:实现商品价格变动监控,当价格下降时发送通知
- 网页截图:使用Selenium或Pyppeteer对商品页面进行截图
- 数据导出:支持将数据导出为Excel、JSON等多种格式
- Web界面:开发简单的Web界面,用于管理爬虫和查看数据
六、总结
本项目实现了一个功能完整的电商网站爬虫系统,涵盖了从数据采集、处理到存储和可视化的全过程。通过这个项目,你可以掌握:
- Scrapy框架的完整使用方法
- 反爬虫策略的实现(User-Agent轮换、IP代理、随机延迟等)
- 数据去重和数据清洗的技巧
- MongoDB和CSV数据存储方式
- 数据可视化的基本方法
- 定时任务调度的实现
这个项目可以作为一个基础框架,根据实际需求进行扩展和定制,适用于各种网站的数据爬取任务。