第160集_爬虫项目实战

一、项目需求分析

1. 项目背景

在互联网时代,获取和分析网络数据已经成为许多行业的重要需求。本项目将实战开发一个完整的爬虫系统,用于从电商网站获取商品数据,展示如何构建一个功能完善、稳定可靠的爬虫应用。

2. 项目目标

  • 爬取电商网站的商品信息(名称、价格、评分、销量等)
  • 实现数据的存储和去重
  • 添加反爬虫机制应对网站限制
  • 实现定时爬取功能
  • 生成数据可视化报告

3. 技术栈选择

  • 爬虫框架:Scrapy
  • 数据存储:MongoDB + CSV
  • 反爬虫策略:User-Agent轮换、IP代理、随机延迟
  • 数据可视化:Matplotlib + Pandas
  • 定时任务:APScheduler

二、项目架构设计

1. 系统架构图

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   数据采集层    │    │   数据处理层    │    │   数据应用层    │
│  (Scrapy爬虫)   │───>│  (数据清洗去重)  │───>│ (存储与可视化)   │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         ▲                      ▲                      ▲
         │                      │                      │
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   反爬虫策略    │    │   去重与验证    │    │   定时任务调度   │
│  (UA/IP/延迟)   │    │  (MD5/数据库)   │    │  (APScheduler)   │
└─────────────────┘    └─────────────────┘    └─────────────────┘

2. 项目目录结构

ecommerce_crawler/
├── ecommerce_crawler/
│   ├── __init__.py
│   ├── items.py          # 定义数据结构
│   ├── middlewares.py    # 中间件(反爬虫)
│   ├── pipelines.py      # 数据管道(存储)
│   ├── settings.py       # 配置文件
│   └── spiders/
│       ├── __init__.py
│       └── product_spider.py  # 商品爬虫
├── data/                 # 数据存储目录
├── logs/                 # 日志目录
├── reports/              # 报告目录
├── requirements.txt      # 依赖列表
└── run_crawler.py        # 运行脚本

三、项目实现步骤

1. 创建Scrapy项目

# 创建项目
scrapy startproject ecommerce_crawler

# 进入项目目录
cd ecommerce_crawler

# 创建爬虫
bash
scrapy genspider product_spider example.com

2. 定义数据结构(items.py)

import scrapy

class ProductItem(scrapy.Item):
    """商品数据结构"""
    product_id = scrapy.Field()      # 商品ID
    name = scrapy.Field()            # 商品名称
    price = scrapy.Field()           # 商品价格
    original_price = scrapy.Field()  # 原价
    discount = scrapy.Field()        # 折扣
    rating = scrapy.Field()          # 评分
    review_count = scrapy.Field()    # 评论数
    sales_count = scrapy.Field()     # 销量
    category = scrapy.Field()        # 分类
    subcategory = scrapy.Field()     # 子分类
    brand = scrapy.Field()           # 品牌
    image_url = scrapy.Field()       # 商品图片
    product_url = scrapy.Field()     # 商品详情页URL
    description = scrapy.Field()     # 商品描述
    specifications = scrapy.Field()  # 商品规格
    crawl_time = scrapy.Field()      # 爬取时间
    md5_hash = scrapy.Field()        # 数据去重标识

3. 配置反爬虫中间件(middlewares.py)

import random
import time
from scrapy import signals
from scrapy.downloadermiddlewares.useragent import UserAgentMiddleware
from scrapy.downloadermiddlewares.retry import RetryMiddleware
from scrapy.utils.response import response_status_message


class RandomUserAgentMiddleware(UserAgentMiddleware):
    """随机User-Agent中间件"""
    def __init__(self, user_agent_list):
        self.user_agent_list = user_agent_list

    @classmethod
    def from_crawler(cls, crawler):
        agent_list = crawler.settings.getlist('USER_AGENT_LIST')
        return cls(agent_list)

    def process_request(self, request, spider):
        request.headers['User-Agent'] = random.choice(self.user_agent_list)


class RandomDelayMiddleware:
    """随机延迟中间件"""
    def __init__(self, delay_range):
        self.delay_range = delay_range

    @classmethod
    def from_crawler(cls, crawler):
        delay_range = crawler.settings.get('RANDOM_DELAY_RANGE', (1, 3))
        return cls(delay_range)

    def process_request(self, request, spider):
        time.sleep(random.uniform(*self.delay_range))


class ProxyMiddleware:
    """IP代理中间件"""
    def __init__(self, proxy_list):
        self.proxy_list = proxy_list

    @classmethod
    def from_crawler(cls, crawler):
        proxy_list = crawler.settings.getlist('PROXY_LIST')
        return cls(proxy_list)

    def process_request(self, request, spider):
        if self.proxy_list:
            proxy = random.choice(self.proxy_list)
            request.meta['proxy'] = proxy


class CustomRetryMiddleware(RetryMiddleware):
    """自定义重试中间件"""
    def process_response(self, request, response, spider):
        if request.meta.get('dont_retry', False):
            return response
        if response.status in self.retry_http_codes:
            spider.logger.warning(f"Retrying {request.url} (status: {response.status})")
            return self._retry(request, response_status_message(response.status), spider)
        return response

4. 实现数据管道(pipelines.py)

import hashlib
import pymongo
import csv
import os
from datetime import datetime
from scrapy.exceptions import DropItem


class DuplicateItemPipeline:
    """去重管道"""
    def __init__(self):
        self.item_hashes = set()

    def process_item(self, item, spider):
        # 计算数据的MD5哈希值
        item_str = str(dict(item)).encode('utf-8')
        item_hash = hashlib.md5(item_str).hexdigest()
        item['md5_hash'] = item_hash

        if item_hash in self.item_hashes:
            raise DropItem(f"重复数据: {item['name']}")
        
        self.item_hashes.add(item_hash)
        return item


class MongoDBPipeline:
    """MongoDB存储管道"""
    collection_name = 'products'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        # 使用商品ID作为查询条件,存在则更新,不存在则插入
        self.db[self.collection_name].update_one(
            {'product_id': item.get('product_id')},
            {'$set': dict(item)},
            upsert=True
        )
        return item


class CSVPipeline:
    """CSV存储管道"""
    def __init__(self):
        self.csv_file = None
        self.csv_writer = None

    def open_spider(self, spider):
        # 创建数据目录
        data_dir = '../data'
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)

        # 创建CSV文件
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f'products_{timestamp}.csv'
        file_path = os.path.join(data_dir, filename)

        self.csv_file = open(file_path, 'w', newline='', encoding='utf-8-sig')
        
        # 定义CSV字段
        fields = ['product_id', 'name', 'price', 'original_price', 'rating', 
                 'review_count', 'sales_count', 'category', 'brand', 'product_url']
        self.csv_writer = csv.DictWriter(self.csv_file, fieldnames=fields)
        self.csv_writer.writeheader()

    def close_spider(self, spider):
        self.csv_file.close()

    def process_item(self, item, spider):
        # 只写入部分关键字段
        csv_item = {
            'product_id': item.get('product_id', ''),
            'name': item.get('name', ''),
            'price': item.get('price', ''),
            'original_price': item.get('original_price', ''),
            'rating': item.get('rating', ''),
            'review_count': item.get('review_count', ''),
            'sales_count': item.get('sales_count', ''),
            'category': item.get('category', ''),
            'brand': item.get('brand', ''),
            'product_url': item.get('product_url', '')
        }
        self.csv_writer.writerow(csv_item)
        return item

5. 配置爬虫设置(settings.py)

# Scrapy settings for ecommerce_crawler project
BOT_NAME = 'ecommerce_crawler'
SPIDER_MODULES = ['ecommerce_crawler.spiders']
NEWSPIDER_MODULE = 'ecommerce_crawler.spiders'

# 爬虫并发设置
CONCURRENT_REQUESTS = 4
DOWNLOAD_DELAY = 2
CONCURRENT_REQUESTS_PER_DOMAIN = 4
CONCURRENT_REQUESTS_PER_IP = 4

# 爬虫反爬设置
ROBOTSTXT_OBEY = False
DOWNLOAD_TIMEOUT = 10
RETRY_TIMES = 3
RETRY_HTTP_CODES = [500, 502, 503, 504, 408, 429]

# User-Agent列表
USER_AGENT_LIST = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'
]

# 代理列表(示例)
PROXY_LIST = [
    # 'http://username:password@proxy:port',
    # 'https://username:password@proxy:port'
]

# 随机延迟范围(秒)
RANDOM_DELAY_RANGE = (1, 3)

# 中间件配置
DOWNLOADER_MIDDLEWARES = {
    'ecommerce_crawler.middlewares.RandomUserAgentMiddleware': 543,
    'ecommerce_crawler.middlewares.RandomDelayMiddleware': 544,
    'ecommerce_crawler.middlewares.ProxyMiddleware': 545,
    'ecommerce_crawler.middlewares.CustomRetryMiddleware': 546,
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
    'scrapy.downloadermiddlewares.retry.RetryMiddleware': None,
}

# 管道配置
ITEM_PIPELINES = {
    'ecommerce_crawler.pipelines.DuplicateItemPipeline': 300,
    'ecommerce_crawler.pipelines.MongoDBPipeline': 400,
    'ecommerce_crawler.pipelines.CSVPipeline': 500,
}

# MongoDB配置
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'ecommerce_db'

# 日志配置
LOG_ENABLED = True
LOG_LEVEL = 'INFO'
LOG_FILE = '../logs/crawler.log'

# 图片下载配置(可选)
# IMAGES_STORE = '../images'
# ITEM_PIPELINES.update({
#     'scrapy.pipelines.images.ImagesPipeline': 1,
# })

6. 实现商品爬虫(product_spider.py)

import scrapy
from datetime import datetime
from ecommerce_crawler.items import ProductItem


class ProductSpider(scrapy.Spider):
    name = 'product_spider'
    allowed_domains = ['example.com']  # 替换为实际域名
    start_urls = ['https://example.com/category/products']  # 替换为实际起始URL

    def parse(self, response):
        """解析商品列表页"""
        self.logger.info(f"爬取商品列表页: {response.url}")
        
        # 提取商品链接(根据实际网页结构调整XPath)
        product_links = response.xpath('//div[@class="product-item"]//a[@class="product-link"]/@href').getall()
        
        for link in product_links:
            # 构建完整URL
            product_url = response.urljoin(link)
            yield scrapy.Request(
                url=product_url,
                callback=self.parse_product_detail,
                meta={'product_url': product_url}
            )
        
        # 提取下一页链接(根据实际网页结构调整XPath)
        next_page = response.xpath('//a[@class="next-page"]/@href').get()
        if next_page:
            next_page_url = response.urljoin(next_page)
            self.logger.info(f"爬取下一页: {next_page_url}")
            yield scrapy.Request(
                url=next_page_url,
                callback=self.parse
            )

    def parse_product_detail(self, response):
        """解析商品详情页"""
        self.logger.info(f"爬取商品详情页: {response.url}")
        
        # 创建商品Item
        item = ProductItem()
        
        # 提取商品信息(根据实际网页结构调整XPath)
        item['product_id'] = response.xpath('//input[@name="productId"]/@value').get()
        item['name'] = response.xpath('//h1[@class="product-name"]/text()').get().strip()
        item['price'] = response.xpath('//span[@class="current-price"]/text()').get().strip()
        item['original_price'] = response.xpath('//span[@class="original-price"]/text()').get(default='').strip()
        item['rating'] = response.xpath('//div[@class="product-rating"]/span[@class="rating-score"]/text()').get()
        item['review_count'] = response.xpath('//span[@class="review-count"]/text()').get()
        item['sales_count'] = response.xpath('//span[@class="sales-count"]/text()').get()
        item['category'] = response.xpath('//div[@class="breadcrumb"]/a[2]/text()').get().strip()
        item['subcategory'] = response.xpath('//div[@class="breadcrumb"]/a[3]/text()').get().strip()
        item['brand'] = response.xpath('//div[@class="product-brand"]/a/text()').get().strip()
        item['image_url'] = response.xpath('//div[@class="product-main-image"]/img/@src').get()
        item['product_url'] = response.meta['product_url']
        item['description'] = ' '.join(response.xpath('//div[@class="product-description"]//text()').getall()).strip()
        
        # 提取商品规格
        specifications = {}
        spec_rows = response.xpath('//table[@class="product-specs"]/tr')
        for row in spec_rows:
            key = row.xpath('./th/text()').get().strip()
            value = row.xpath('./td/text()').get().strip()
            specifications[key] = value
        item['specifications'] = specifications
        
        # 设置爬取时间
        item['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        yield item

7. 创建运行脚本(run_crawler.py)

import os
import sys
import logging
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from apscheduler.schedulers.blocking import BlockingScheduler

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('../logs/scheduler.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


def run_crawler():
    """运行爬虫"""
    logger.info("开始执行爬虫任务...")
    
    # 获取项目设置
    settings = get_project_settings()
    
    # 创建爬虫进程
    process = CrawlerProcess(settings)
    
    # 启动爬虫
    process.crawl('product_spider')
    process.start()  # 阻塞直到爬虫完成
    
    logger.info("爬虫任务执行完成!")


def run_visualization():
    """运行数据可视化"""
    logger.info("开始生成数据可视化报告...")
    
    # 导入可视化模块
    import pandas as pd
    import matplotlib.pyplot as plt
    from pymongo import MongoClient
    
    # 连接MongoDB
    client = MongoClient('mongodb://localhost:27017')
    db = client['ecommerce_db']
    collection = db['products']
    
    # 查询数据
    products = list(collection.find())
    
    if not products:
        logger.warning("没有数据可可视化")
        return
    
    # 转换为DataFrame
    df = pd.DataFrame(products)
    
    # 数据预处理
    df['price'] = df['price'].str.extract(r'([\d.]+)').astype(float)
    df['rating'] = df['rating'].astype(float)
    df['review_count'] = df['review_count'].str.extract(r'([\d,]+)').str.replace(',', '').astype(int)
    
    # 创建报告目录
    reports_dir = '../reports'
    if not os.path.exists(reports_dir):
        os.makedirs(reports_dir)
    
    # 1. 价格分布直方图
    plt.figure(figsize=(10, 6))
    df['price'].hist(bins=20)
    plt.title('商品价格分布')
    plt.xlabel('价格')
    plt.ylabel('数量')
    plt.savefig(os.path.join(reports_dir, 'price_distribution.png'))
    plt.close()
    
    # 2. 评分与评论数散点图
    plt.figure(figsize=(10, 6))
    plt.scatter(df['rating'], df['review_count'], alpha=0.5)
    plt.title('商品评分与评论数关系')
    plt.xlabel('评分')
    plt.ylabel('评论数')
    plt.savefig(os.path.join(reports_dir, 'rating_vs_reviews.png'))
    plt.close()
    
    # 3. 分类商品数量条形图
    plt.figure(figsize=(12, 6))
    category_counts = df['category'].value_counts()
    category_counts.plot(kind='bar')
    plt.title('各分类商品数量')
    plt.xlabel('分类')
    plt.ylabel('数量')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(os.path.join(reports_dir, 'category_counts.png'))
    plt.close()
    
    logger.info("数据可视化报告生成完成!")


if __name__ == '__main__':
    # 命令行参数处理
    if len(sys.argv) > 1:
        if sys.argv[1] == 'run':
            # 直接运行爬虫
            run_crawler()
        elif sys.argv[1] == 'visualize':
            # 生成可视化报告
            run_visualization()
        elif sys.argv[1] == 'all':
            # 运行爬虫并生成报告
            run_crawler()
            run_visualization()
        else:
            logger.error(f"未知命令: {sys.argv[1]}")
            sys.exit(1)
    else:
        # 默认运行爬虫
        run_crawler()

8. 创建定时任务脚本(schedule_crawler.py)

import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import run_crawler

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('logs/scheduler.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


def main():
    """主函数"""
    logger.info("启动爬虫定时任务调度器...")
    
    # 创建调度器
    scheduler = BlockingScheduler()
    
    # 添加定时任务
    # 每天上午9点运行爬虫
    scheduler.add_job(
        func=run_crawler.run_crawler,
        trigger=CronTrigger(hour=9, minute=0),
        id='daily_crawl',
        name='每日商品数据爬取',
        replace_existing=True
    )
    
    # 每周日下午3点生成可视化报告
    scheduler.add_job(
        func=run_crawler.run_visualization,
        trigger=CronTrigger(day_of_week=6, hour=15, minute=0),
        id='weekly_report',
        name='每周数据可视化报告',
        replace_existing=True
    )
    
    # 启动调度器
    try:
        scheduler.start()
    except KeyboardInterrupt:
        logger.info("停止爬虫定时任务调度器...")
        scheduler.shutdown()


if __name__ == '__main__':
    main()

四、项目依赖与部署

1. 安装依赖(requirements.txt)

scrapy>=2.6.0
pymongo>=4.1.0
pandas>=1.3.0
matplotlib>=3.4.0
apscheduler>=3.9.0

安装命令:

pip install -r requirements.txt

2. 部署步骤

  1. 环境准备

    • 安装Python 3.7+
    • 安装MongoDB(可选,用于数据存储)
    • 安装依赖包
  2. 配置修改

    • 根据实际需求修改settings.py中的配置
    • 调整爬虫代码中的XPath选择器以匹配目标网站结构
  3. 运行项目

    # 直接运行爬虫
    python run_crawler.py
    
    # 运行爬虫并生成报告
    python run_crawler.py all
    
    # 生成可视化报告
    python run_crawler.py visualize
    
    # 启动定时任务
    python schedule_crawler.py

3. 项目维护

  • 定期更新:根据目标网站结构变化,定期更新XPath选择器
  • 日志监控:定期查看日志文件,排查爬虫运行中的问题
  • 性能优化:根据实际运行情况,调整并发数、延迟时间等参数
  • 数据备份:定期备份MongoDB数据和CSV文件

五、项目扩展功能

  1. 分布式爬虫:使用Scrapy-Redis实现分布式爬取,提高爬取效率
  2. 数据清洗:添加更复杂的数据清洗和标准化流程
  3. 情感分析:对商品评论进行情感分析,了解用户反馈
  4. 价格监控:实现商品价格变动监控,当价格下降时发送通知
  5. 网页截图:使用Selenium或Pyppeteer对商品页面进行截图
  6. 数据导出:支持将数据导出为Excel、JSON等多种格式
  7. Web界面:开发简单的Web界面,用于管理爬虫和查看数据

六、总结

本项目实现了一个功能完整的电商网站爬虫系统,涵盖了从数据采集、处理到存储和可视化的全过程。通过这个项目,你可以掌握:

  1. Scrapy框架的完整使用方法
  2. 反爬虫策略的实现(User-Agent轮换、IP代理、随机延迟等)
  3. 数据去重数据清洗的技巧
  4. MongoDBCSV数据存储方式
  5. 数据可视化的基本方法
  6. 定时任务调度的实现

这个项目可以作为一个基础框架,根据实际需求进行扩展和定制,适用于各种网站的数据爬取任务。

« 上一篇 API接口调用 下一篇 » 自动化概念