第156集 数据存储策略
1 数据存储概述
在网络爬虫开发中,数据存储是非常重要的环节。根据不同的需求和场景,我们可以选择不同的数据存储方式。本集将介绍几种常用的数据存储策略及其在Scrapy中的实现方法。
2 文件存储
2.1 文本文件存储
2.1.1 存储为JSON
JSON是一种轻量级的数据交换格式,易于阅读和编写,也易于机器解析和生成。
使用Scrapy内置的JSON导出功能:
scrapy crawl myspider -o items.json
scrapy crawl myspider -o items.jl # JSON Lines格式自定义JSON存储:
# pipelines.py
import json
import os
class JsonWriterPipeline:
def __init__(self):
self.file = None
def open_spider(self, spider):
# 在爬虫启动时打开文件
self.file = open('items.json', 'w', encoding='utf-8')
self.file.write('[\n')
self.first_item = True
def process_item(self, item, spider):
# 将Item转换为字典
item_dict = dict(item)
# 写入文件
if not self.first_item:
self.file.write(',\n')
else:
self.first_item = False
# 写入JSON数据
line = json.dumps(item_dict, ensure_ascii=False, indent=2)
self.file.write(line)
return item
def close_spider(self, spider):
# 在爬虫关闭时关闭文件
self.file.write('\n]')
self.file.close()2.1.2 存储为CSV
CSV(逗号分隔值)是一种简单的文本格式,广泛用于电子表格和数据库导入导出。
使用Scrapy内置的CSV导出功能:
scrapy crawl myspider -o items.csv自定义CSV存储:
# pipelines.py
import csv
import os
class CsvWriterPipeline:
def __init__(self):
self.file = None
self.writer = None
def open_spider(self, spider):
# 在爬虫启动时创建CSV文件
self.file = open('items.csv', 'w', newline='', encoding='utf-8')
# 定义CSV字段名
fieldnames = ['title', 'content', 'author', 'publish_date', 'url']
# 创建CSV写入器
self.writer = csv.DictWriter(self.file, fieldnames=fieldnames)
self.writer.writeheader()
def process_item(self, item, spider):
# 将Item转换为字典并写入CSV
self.writer.writerow(dict(item))
return item
def close_spider(self, spider):
# 在爬虫关闭时关闭文件
self.file.close()2.1.3 存储为XML
XML是一种标记语言,用于存储和传输数据。
使用Scrapy内置的XML导出功能:
scrapy crawl myspider -o items.xml自定义XML存储:
# pipelines.py
import xml.etree.ElementTree as ET
import os
class XmlWriterPipeline:
def __init__(self):
self.root = None
self.tree = None
def open_spider(self, spider):
# 在爬虫启动时创建XML根元素
self.root = ET.Element('items')
self.tree = ET.ElementTree(self.root)
def process_item(self, item, spider):
# 创建item元素
item_elem = ET.SubElement(self.root, 'item')
# 添加子元素
for key, value in dict(item).items():
# 处理列表类型的值
if isinstance(value, list):
value = ' '.join(map(str, value))
# 处理None值
if value is None:
value = ''
elem = ET.SubElement(item_elem, key)
elem.text = str(value)
return item
def close_spider(self, spider):
# 在爬虫关闭时写入XML文件
self.tree.write('items.xml', encoding='utf-8', xml_declaration=True)3 关系型数据库存储
3.1 使用MySQL存储
3.1.1 安装依赖
pip install pymysql3.1.2 配置数据库
在Scrapy项目的settings.py中添加数据库配置:
# 数据库配置
MYSQL_HOST = 'localhost'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'password'
MYSQL_DB = 'mydatabase'
MYSQL_CHARSET = 'utf8mb4'3.1.3 编写MySQL管道
# pipelines.py
import pymysql
from pymysql.cursors import DictCursor
class MySQLPipeline:
def __init__(self, host, port, user, password, db, charset):
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.charset = charset
self.conn = None
self.cursor = None
@classmethod
def from_crawler(cls, crawler):
# 从settings中获取数据库配置
return cls(
host=crawler.settings.get('MYSQL_HOST'),
port=crawler.settings.get('MYSQL_PORT'),
user=crawler.settings.get('MYSQL_USER'),
password=crawler.settings.get('MYSQL_PASSWORD'),
db=crawler.settings.get('MYSQL_DB'),
charset=crawler.settings.get('MYSQL_CHARSET')
)
def open_spider(self, spider):
# 建立数据库连接
self.conn = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
db=self.db,
charset=self.charset,
cursorclass=DictCursor
)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
# 定义SQL语句
sql = """
INSERT INTO articles (title, content, author, publish_date, url)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
content = VALUES(content),
author = VALUES(author),
publish_date = VALUES(publish_date)
"""
# 准备参数
params = (
item.get('title'),
item.get('content'),
item.get('author'),
item.get('publish_date'),
item.get('url')
)
try:
# 执行SQL
self.cursor.execute(sql, params)
self.conn.commit()
except Exception as e:
# 发生错误时回滚
self.conn.rollback()
spider.logger.error(f"MySQL error: {e}")
return item
def close_spider(self, spider):
# 关闭数据库连接
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()3.2 使用PostgreSQL存储
3.2.1 安装依赖
pip install psycopg2-binary3.2.2 配置数据库
# settings.py
# PostgreSQL配置
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = 5432
POSTGRES_USER = 'postgres'
POSTGRES_PASSWORD = 'password'
POSTGRES_DB = 'mydatabase'3.2.3 编写PostgreSQL管道
# pipelines.py
import psycopg2
class PostgreSQLPipeline:
def __init__(self, host, port, user, password, db):
self.host = host
self.port = port
self.user = user
self.password = password
self.db = db
self.conn = None
self.cursor = None
@classmethod
def from_crawler(cls, crawler):
return cls(
host=crawler.settings.get('POSTGRES_HOST'),
port=crawler.settings.get('POSTGRES_PORT'),
user=crawler.settings.get('POSTGRES_USER'),
password=crawler.settings.get('POSTGRES_PASSWORD'),
db=crawler.settings.get('POSTGRES_DB')
)
def open_spider(self, spider):
self.conn = psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.db
)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
sql = """
INSERT INTO articles (title, content, author, publish_date, url)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (url) DO UPDATE SET
content = EXCLUDED.content,
author = EXCLUDED.author,
publish_date = EXCLUDED.publish_date
"""
params = (
item.get('title'),
item.get('content'),
item.get('author'),
item.get('publish_date'),
item.get('url')
)
try:
self.cursor.execute(sql, params)
self.conn.commit()
except Exception as e:
self.conn.rollback()
spider.logger.error(f"PostgreSQL error: {e}")
return item
def close_spider(self, spider):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()4 非关系型数据库存储
4.1 使用MongoDB存储
4.1.1 安装依赖
pip install pymongo4.1.2 配置MongoDB
# settings.py
# MongoDB配置
MONGO_URI = 'mongodb://localhost:27017'
MONGO_DATABASE = 'mydatabase'4.1.3 编写MongoDB管道
# pipelines.py
import pymongo
class MongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client = None
self.db = None
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE')
)
def open_spider(self, spider):
# 建立MongoDB连接
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
# 将Item转换为字典并插入MongoDB
self.db.articles.insert_one(dict(item))
return item
def close_spider(self, spider):
# 关闭MongoDB连接
self.client.close()4.1.4 高级MongoDB用法
# pipelines.py
class AdvancedMongoDBPipeline:
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client = None
self.db = None
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
# 创建索引
self.db.articles.create_index([('url', pymongo.ASCENDING)], unique=True)
def process_item(self, item, spider):
# 使用update_one方法实现upsert(存在则更新,不存在则插入)
self.db.articles.update_one(
{'url': item.get('url')}, # 查询条件
{'$set': dict(item)}, # 更新字段
upsert=True # 如果不存在则插入
)
return item
def close_spider(self, spider):
self.client.close()4.2 使用Redis存储
4.2.1 安装依赖
pip install redis4.2.2 配置Redis
# settings.py
# Redis配置
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = ''
REDIS_DB = 04.2.3 编写Redis管道
# pipelines.py
import redis
import json
class RedisPipeline:
def __init__(self, host, port, password, db):
self.host = host
self.port = port
self.password = password
self.db = db
self.r = None
@classmethod
def from_crawler(cls, crawler):
return cls(
host=crawler.settings.get('REDIS_HOST'),
port=crawler.settings.get('REDIS_PORT'),
password=crawler.settings.get('REDIS_PASSWORD'),
db=crawler.settings.get('REDIS_DB')
)
def open_spider(self, spider):
# 建立Redis连接
self.r = redis.Redis(
host=self.host,
port=self.port,
password=self.password,
db=self.db
)
def process_item(self, item, spider):
# 将Item转换为JSON字符串
item_json = json.dumps(dict(item), ensure_ascii=False)
# 使用LPUSH命令将数据添加到列表中
self.r.lpush('articles', item_json)
# 使用HSET命令将数据存储到哈希表中
self.r.hset('article:' + item.get('url'), mapping=dict(item))
return item
def close_spider(self, spider):
# Redis连接会自动关闭,不需要显式关闭
pass5 数据存储策略选择
在选择数据存储策略时,需要考虑以下因素:
5.1 数据结构和类型
- 结构化数据:适合使用关系型数据库(MySQL、PostgreSQL)
- 半结构化数据:适合使用非关系型数据库(MongoDB)
- 键值对数据:适合使用Redis
- 简单文本数据:适合使用JSON、CSV等文件存储
5.2 查询需求
- 需要复杂查询(JOIN、聚合等):关系型数据库
- 简单查询:非关系型数据库或文件存储
- 高性能查询:Redis等内存数据库
5.3 数据量
- 少量数据:文件存储或轻量级数据库
- 大量数据:关系型数据库或MongoDB
- 超大量数据:分布式数据库或数据仓库
5.4 性能需求
- 高并发读写:Redis、MongoDB
- 事务支持:关系型数据库
- 数据一致性:关系型数据库
5.5 可扩展性
- 水平扩展:非关系型数据库(MongoDB、Redis)
- 垂直扩展:关系型数据库
6 数据存储最佳实践
6.1 数据验证
在存储数据之前,应该对数据进行验证,确保数据的完整性和一致性。
# pipelines.py
class DataValidationPipeline:
def process_item(self, item, spider):
# 验证必填字段
required_fields = ['title', 'url']
for field in required_fields:
if not item.get(field):
spider.logger.error(f"Missing required field: {field}")
return None
# 验证数据类型
if 'publish_date' in item and item['publish_date']:
try:
from datetime import datetime
datetime.strptime(item['publish_date'], '%Y-%m-%d')
except ValueError:
spider.logger.error(f"Invalid publish date format: {item['publish_date']}")
item['publish_date'] = None
return item6.2 数据去重
避免存储重复数据,提高数据质量和存储效率。
# pipelines.py
class DuplicateFilterPipeline:
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
url = item.get('url')
if url in self.urls_seen:
spider.logger.info(f"Duplicate item found: {url}")
return None
else:
self.urls_seen.add(url)
return item6.3 错误处理
在数据存储过程中,应该处理可能出现的错误,确保程序的稳定性。
# pipelines.py
class ErrorHandlingPipeline:
def process_item(self, item, spider):
try:
# 数据存储逻辑
# ...
return item
except Exception as e:
spider.logger.error(f"Error processing item: {e}")
# 可以选择返回None丢弃该Item,或者继续传递
return item6.4 批量处理
对于大量数据,可以使用批量处理来提高性能。
# pipelines.py
class BatchProcessingPipeline:
def __init__(self):
self.items = []
self.batch_size = 100
def process_item(self, item, spider):
self.items.append(dict(item))
# 当达到批量大小时,进行批量处理
if len(self.items) >= self.batch_size:
self._process_batch(spider)
self.items = []
return item
def _process_batch(self, spider):
try:
# 批量插入数据
# ...
spider.logger.info(f"Processed batch of {len(self.items)} items")
except Exception as e:
spider.logger.error(f"Error processing batch: {e}")
def close_spider(self, spider):
# 处理剩余的item
if self.items:
self._process_batch(spider)7 总结
通过本集的学习,我们掌握了多种数据存储策略及其在Scrapy中的实现方法,包括:
- 文件存储(JSON、CSV、XML)
- 关系型数据库存储(MySQL、PostgreSQL)
- 非关系型数据库存储(MongoDB、Redis)
- 数据存储策略选择的考虑因素
- 数据存储的最佳实践
选择合适的数据存储策略对于爬虫项目的成功至关重要,需要根据项目需求和数据特点进行综合考虑。