第7章:索引优化与管理
7.1 索引合并与优化策略
7.1.1 索引结构原理
Whoosh 索引采用分段存储(Segment-based)的设计:
- Segment(段):索引的基本单元,每个段包含独立的倒排索引数据
- Commit(提交):每次创建或更新索引时会生成新段
- Merge(合并):将多个小段合并为一个大段,减少文件数量
为什么需要索引合并?
- 减少搜索时的文件打开数量
- 提高查询性能(读取的文件更少)
- 释放已删除文档的存储空间
- 优化磁盘 I/O 性能
7.1.2 索引优化方法
方法1:使用 optimize() 进行全量优化
from whoosh.index import open_dir
ix = open_dir("my_index")
# 优化索引,将所有段合并为一个
writer = ix.writer()
writer.optimize()
writer.commit()代码示例:
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT
import os
import shutil
import time
# 创建示例索引
index_dir = "optimize_demo"
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
os.makedirs(index_dir)
schema = Schema(
title=TEXT(stored=True),
content=TEXT(stored=True)
)
ix = create_in(index_dir, schema)
print("=== 索引优化演示 ===\n")
# 模拟多次提交,产生多个段
print("【步骤1】添加文档并多次提交(模拟产生多个段)")
for i in range(1, 6):
writer = ix.writer()
for j in range(1, 11):
doc = {
"title": f"文档 {(i-1)*10 + j}",
"content": f"这是第 {(i-1)*10 + j} 篇文档的内容"
}
writer.add_document(**doc)
writer.commit()
print(f" 提交 {i}:添加 10 篇文档")
# 查看当前段信息
print("\n【步骤2】优化前的索引状态")
ix = open_dir(index_dir)
print(f" 段数量: {len(list(ix.reader().segments()))}")
# 优化索引
print("\n【步骤3】优化索引")
start_time = time.time()
writer = ix.writer()
writer.optimize()
writer.commit()
optimize_time = time.time() - start_time
print(f" 优化完成,耗时: {optimize_time:.2f} 秒")
# 查看优化后的段信息
print("\n【步骤4】优化后的索引状态")
ix = open_dir(index_dir)
print(f" 段数量: {len(list(ix.reader().segments()))}")
# 测试查询性能
print("\n【步骤5】查询性能对比")
from whoosh.qparser import QueryParser
parser = QueryParser("content", ix.schema)
query = parser.parse(u"文档")
start_time = time.time()
with ix.searcher() as searcher:
results = searcher.search(query)
hit_count = len(results)
search_time = time.time() - start_time
print(f" 查询耗时: {search_time:.4f} 秒,命中 {hit_count} 篇")
print("\n✅ 索引优化演示完成!")方法2:部分优化(merge_factor)
# 创建 writer 时指定合并因子
writer = ix.writer(merge_factor=4)
writer.commit()7.1.3 优化策略选择
| 场景 | 推荐策略 | 说明 |
|---|---|---|
| 小规模索引(< 1万文档) | 定期全量优化 | 性能影响小,维护简单 |
| 中等规模(1万-100万) | 增量优化 | 使用合理的 merge_factor |
| 大规模索引(> 100万) | 分区索引 + 延迟优化 | 避免全量优化阻塞 |
| 实时更新场景 | 避免频繁优化 | 使用 NRT(近实时)策略 |
7.1.4 查看索引状态
from whoosh.index import open_dir
ix = open_dir("my_index")
reader = ix.reader()
# 查看段信息
print(f"段数量: {len(list(reader.segments()))}")
print(f"文档总数: {reader.doc_count()}")
print(f"已删除文档: {reader.doc_count_all() - reader.doc_count()}")
# 查看每个段的大小
for seg in reader.segments():
print(f"段名: {seg.segment_id}, 文档数: {seg.doc_count()}")7.2 增量更新与删除文档
7.2.1 增量更新索引
添加新文档:
from whoosh.index import open_dir
ix = open_dir("my_index")
# 创建 writer
writer = ix.writer()
# 添加新文档
writer.add_document(
title="新文档标题",
content="新文档内容"
)
# 提交更改
writer.commit()代码示例:
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser
import os
import shutil
# 创建索引
index_dir = "update_demo"
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
os.makedirs(index_dir)
schema = Schema(
id=ID(stored=True, unique=True),
title=TEXT(stored=True),
content=TEXT(stored=True)
)
ix = create_in(index_dir, schema)
print("=== 增量更新演示 ===\n")
# 初始化数据
print("【步骤1】初始化数据")
writer = ix.writer()
for i in range(1, 11):
writer.add_document(
id=f"doc_{i}",
title=f"文档 {i}",
content=f"这是文档 {i} 的内容"
)
writer.commit()
print(" 添加 10 篇文档")
# 查询当前文档数
with ix.searcher() as searcher:
print(f" 当前文档数: {searcher.doc_count()}")
# 增量添加新文档
print("\n【步骤2】增量添加新文档")
new_docs = [
{"id": "doc_11", "title": "文档 11", "content": "新增文档 11"},
{"id": "doc_12", "title": "文档 12", "content": "新增文档 12"},
{"id": "doc_13", "title": "文档 13", "content": "新增文档 13"},
]
writer = ix.writer()
for doc in new_docs:
writer.add_document(**doc)
print(f" 添加: {doc['title']}")
writer.commit()
# 查询更新后的文档数
with ix.searcher() as searcher:
print(f"\n更新后文档数: {searcher.doc_count()}")
# 验证新文档
with ix.searcher() as searcher:
parser = QueryParser("title", ix.schema)
query = parser.parse(u"文档 11 OR 文档 12 OR 文档 13")
results = searcher.search(query)
print(f"新文档查询结果: {len(results)} 篇")
print("\n✅ 增量更新演示完成!")7.2.2 更新已有文档
方法1:使用 update_document
# 根据 ID 字段更新文档
writer = ix.writer()
writer.update_document(
id="doc_1",
title="更新后的标题",
content="更新后的内容"
)
writer.commit()方法2:先删除再添加
from whoosh.query import Term
# 删除旧文档
writer = ix.writer()
writer.delete_by_term("id", "doc_1")
# 添加新文档
writer.add_document(
id="doc_1",
title="新标题",
content="新内容"
)
writer.commit()代码示例:
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser
import os
import shutil
# 创建索引
index_dir = "update_document_demo"
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
os.makedirs(index_dir)
schema = Schema(
id=ID(stored=True, unique=True),
title=TEXT(stored=True),
content=TEXT(stored=True)
)
ix = create_in(index_dir, schema)
# 初始化数据
print("=== 文档更新演示 ===\n")
print("【步骤1】初始化数据")
writer = ix.writer()
writer.add_document(
id="doc_1",
title="原始标题",
content="原始内容"
)
writer.commit()
# 查看原始文档
with ix.searcher() as searcher:
parser = QueryParser("id", ix.schema)
query = parser.parse(u"doc_1")
results = searcher.search(query)
for hit in results:
print(f"原始文档: {hit['title']} - {hit['content']}")
# 方法1:使用 update_document 更新
print("\n【步骤2】使用 update_document 更新")
writer = ix.writer()
writer.update_document(
id="doc_1",
title="更新标题",
content="更新内容"
)
writer.commit()
# 查看更新后的文档
with ix.searcher() as searcher:
query = parser.parse(u"doc_1")
results = searcher.search(query)
for hit in results:
print(f"更新后文档: {hit['title']} - {hit['content']}")
# 再次更新
print("\n【步骤3】再次更新文档")
writer = ix.writer()
writer.update_document(
id="doc_1",
title="最终标题",
content="最终内容"
)
writer.commit()
# 查看最终文档
with ix.searcher() as searcher:
query = parser.parse(u"doc_1")
results = searcher.search(query)
for hit in results:
print(f"最终文档: {hit['title']} - {hit['content']}")
print("\n✅ 文档更新演示完成!")7.2.3 删除文档
方法1:根据文档 ID 删除
writer = ix.writer()
writer.delete_by_term("id", "doc_1")
writer.commit()方法2:根据查询删除
from whoosh.query import Term
writer = ix.writer()
query = Term("category", "过期")
writer.delete_by_query(query)
writer.commit()方法3:根据文档编号删除
writer = ix.writer()
writer.delete_document(5) # 删除文档编号为5的文档
writer.commit()代码示例:
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, KEYWORD
from whoosh.query import Term
from whoosh.qparser import QueryParser
import os
import shutil
# 创建索引
index_dir = "delete_demo"
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
os.makedirs(index_dir)
schema = Schema(
id=ID(stored=True, unique=True),
title=TEXT(stored=True),
content=TEXT(stored=True),
category=KEYWORD(stored=True)
)
ix = create_in(index_dir, schema)
# 初始化数据
print("=== 文档删除演示 ===\n")
print("【步骤1】初始化数据")
writer = ix.writer()
docs = [
{"id": "doc_1", "title": "文档1", "content": "内容1", "category": "有效"},
{"id": "doc_2", "title": "文档2", "content": "内容2", "category": "有效"},
{"id": "doc_3", "title": "文档3", "content": "内容3", "category": "过期"},
{"id": "doc_4", "title": "文档4", "content": "内容4", "category": "有效"},
{"id": "doc_5", "title": "文档5", "content": "内容5", "category": "过期"},
]
for doc in docs:
writer.add_document(**doc)
writer.commit()
# 查看原始文档数
with ix.searcher() as searcher:
print(f"初始文档数: {searcher.doc_count()}")
# 方法1:根据 ID 删除
print("\n【步骤2】根据 ID 删除文档")
writer = ix.writer()
writer.delete_by_term("id", "doc_1")
writer.commit()
with ix.searcher() as searcher:
print(f"删除后文档数: {searcher.doc_count()}")
parser = QueryParser("id", ix.schema)
query = parser.parse(u"doc_1")
results = searcher.search(query)
print(f"doc_1 查询结果: {len(results)} 篇")
# 方法2:根据查询删除
print("\n【步骤3】根据分类删除过期文档")
writer = ix.writer()
writer.delete_by_query(Term("category", "过期"))
writer.commit()
with ix.searcher() as searcher:
print(f"删除后文档数: {searcher.doc_count()}")
parser = QueryParser("category", ix.schema)
query = parser.parse(u"过期")
results = searcher.search(query)
print(f"过期文档查询结果: {len(results)} 篇")
# 查看剩余文档
print("\n【步骤4】查看剩余文档")
with ix.searcher() as searcher:
parser = QueryParser("title", ix.schema)
query = parser.parse(u"*")
results = searcher.search(query)
print("剩余文档:")
for hit in results:
print(f" - {hit['title']} ({hit['category']})")
print("\n✅ 文档删除演示完成!")7.2.4 批量操作与事务
批量添加:
# 批量添加文档
writer = ix.writer()
for i in range(1000):
writer.add_document(
title=f"文档 {i}",
content=f"内容 {i}"
)
writer.commit() # 一次性提交批量更新:
# 批量更新
writer = ix.writer()
for doc_id in ["doc_1", "doc_2", "doc_3"]:
writer.update_document(
id=doc_id,
title=f"更新 {doc_id}",
content="新内容"
)
writer.commit()取消操作:
# 取消未提交的更改
writer = ix.writer()
writer.add_document(title="临时文档")
writer.cancel() # 取消所有更改7.3 内存索引与磁盘索引
7.3.1 内存索引
特点:
- 数据存储在内存中
- 查询速度极快
- 重启后数据丢失
- 适合临时索引和缓存
创建内存索引:
from whoosh.index import RamStorage, FileIndex, create_in
from whoosh.fields import Schema, TEXT
# 方法1:使用 RamStorage
storage = RamStorage()
ix = FileIndex.create(storage, Schema(title=TEXT()))
# 方法2:使用 create_in(内存版本)
storage = RamStorage()
ix = create_in(storage, Schema(title=TEXT()))代码示例:
from whoosh.index import RamStorage, FileIndex, create_in
from whoosh.fields import Schema, TEXT
from whoosh.qparser import QueryParser
print("=== 内存索引演示 ===\n")
# 创建内存索引
print("【步骤1】创建内存索引")
storage = RamStorage()
schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True))
ix = create_in(storage, schema)
print(" 内存索引创建成功")
# 添加文档
print("\n【步骤2】添加文档")
writer = ix.writer()
for i in range(1, 6):
writer.add_document(
title=f"内存文档 {i}",
content=f"这是内存中的文档 {i}"
)
writer.commit()
print(" 添加 5 篇文档")
# 查询
print("\n【步骤3】查询内存索引")
parser = QueryParser("content", ix.schema)
query = parser.parse(u"文档")
with ix.searcher() as searcher:
results = searcher.search(query)
print(f" 命中 {len(results)} 篇")
for hit in results:
print(f" - {hit['title']}")
print("\n✅ 内存索引演示完成!")7.3.2 磁盘索引
特点:
- 数据存储在磁盘上
- 查询速度较慢但持久
- 重启后数据保留
- 适合长期存储
创建磁盘索引:
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT
# 创建磁盘索引
ix = create_in("my_index", Schema(title=TEXT()))
# 打开已存在的磁盘索引
ix = open_dir("my_index")7.3.3 内存索引与磁盘索引对比
| 特性 | 内存索引 | 磁盘索引 |
|---|---|---|
| 存储位置 | 内存 | 磁盘 |
| 查询速度 | 极快 | 较快 |
| 写入速度 | 极快 | 较慢 |
| 数据持久性 | 否 | 是 |
| 容量限制 | 受内存限制 | 受磁盘限制 |
| 适用场景 | 临时查询、缓存 | 长期存储 |
7.3.4 混合使用场景
场景1:主从索引
from whoosh.index import open_dir
from whoosh.index import RamStorage, create_in
from whoosh.fields import Schema, TEXT
# 主索引(磁盘)
disk_index = open_dir("main_index")
# 从索引(内存,用于热数据)
storage = RamStorage()
memory_index = create_in(storage, Schema(title=TEXT()))场景2:内存加速缓存
# 将常用数据加载到内存索引
disk_index = open_dir("main_index")
# 提取热点数据到内存
storage = RamStorage()
memory_index = create_in(storage, disk_index.schema)
writer = memory_index.writer()
with disk_index.searcher() as searcher:
for hit in searcher.search_page(query, 1, pagelen=100):
writer.add_document(**hit.fields())
writer.commit()7.4 索引备份与恢复
7.4.1 索引备份
方法1:文件系统备份
import shutil
from whoosh.index import open_dir
# 复制整个索引目录
source_dir = "my_index"
backup_dir = "my_index_backup"
shutil.copytree(source_dir, backup_dir)
print(f"索引已备份到: {backup_dir}")方法2:使用 shutil 进行压缩备份
import shutil
import os
from datetime import datetime
# 创建压缩备份
source_dir = "my_index"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"index_backup_{timestamp}.zip"
shutil.make_archive(backup_file.replace('.zip', ''), 'zip', source_dir)
print(f"索引已压缩备份到: {backup_file}")代码示例:
import shutil
import os
from datetime import datetime
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT
print("=== 索引备份演示 ===\n")
# 创建测试索引
index_dir = "backup_demo_index"
if os.path.exists(index_dir):
shutil.rmtree(index_dir)
os.makedirs(index_dir)
schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True))
ix = create_in(index_dir, schema)
# 添加文档
writer = ix.writer()
for i in range(1, 6):
writer.add_document(
title=f"文档 {i}",
content=f"内容 {i}"
)
writer.commit()
print("【步骤1】创建测试索引")
print(f" 索引目录: {index_dir}")
print(f" 包含文件: {len(os.listdir(index_dir))} 个")
# 备份索引
print("\n【步骤2】备份索引")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = f"backup_demo_backup_{timestamp}"
shutil.copytree(index_dir, backup_dir)
print(f" 备份目录: {backup_dir}")
# 验证备份
print("\n【步骤3】验证备份")
print(f" 原索引文件: {os.listdir(index_dir)}")
print(f" 备份文件: {os.listdir(backup_dir)}")
# 压缩备份
print("\n【步骤4】创建压缩备份")
zip_name = f"backup_demo_{timestamp}"
shutil.make_archive(zip_name, 'zip', backup_dir)
print(f" 压缩文件: {zip_name}.zip")
print("\n✅ 索引备份演示完成!")7.4.2 索引恢复
从备份恢复:
import shutil
# 从备份恢复
backup_dir = "my_index_backup"
target_dir = "my_index"
# 删除旧索引(如果存在)
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
# 复制备份
shutil.copytree(backup_dir, target_dir)
print(f"索引已从备份恢复到: {target_dir}")从压缩文件恢复:
import shutil
import zipfile
# 解压备份
zip_file = "index_backup_20240101_120000.zip"
target_dir = "my_index_restored"
shutil.unpack_archive(zip_file, target_dir)
print(f"索引已从压缩包恢复到: {target_dir}")7.4.3 增量备份策略
基于时间戳的增量备份:
from datetime import datetime
import shutil
def incremental_backup(source_dir, backup_base_dir):
"""增量备份"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = os.path.join(backup_base_dir, f"backup_{timestamp}")
shutil.copytree(source_dir, backup_dir)
# 保留最近 N 个备份
backups = sorted([
d for d in os.listdir(backup_base_dir)
if d.startswith("backup_")
])
# 只保留最近的 5 个备份
while len(backups) > 5:
old_backup = os.path.join(backup_base_dir, backups.pop(0))
shutil.rmtree(old_backup)
print(f"删除旧备份: {old_backup}")
return backup_dir7.4.4 索引迁移
迁移到新目录:
import shutil
from whoosh.index import open_dir
# 迁移前验证
old_index = open_dir("old_index")
print(f"迁移前文档数: {old_index.searcher().doc_count()}")
# 执行迁移
shutil.copytree("old_index", "new_index")
# 迁移后验证
new_index = open_dir("new_index")
print(f"迁移后文档数: {new_index.searcher().doc_count()}")7.5 综合示例
7.5.1 完整的索引管理系统
from whoosh.index import create_in, open_dir, FileIndex, RamStorage
from whoosh.fields import Schema, TEXT, ID, KEYWORD
from whoosh.qparser import QueryParser
from whoosh.query import Term
import shutil
import os
from datetime import datetime
class IndexManager:
"""索引管理类"""
def __init__(self, index_dir, schema=None):
self.index_dir = index_dir
self.schema = schema or self._default_schema()
# 创建或打开索引
if os.path.exists(index_dir):
self.ix = open_dir(index_dir)
else:
os.makedirs(index_dir)
self.ix = create_in(index_dir, self.schema)
def _default_schema(self):
return Schema(
id=ID(stored=True, unique=True),
title=TEXT(stored=True),
content=TEXT(stored=True),
category=KEYWORD(stored=True)
)
def add_document(self, doc):
"""添加文档"""
writer = self.ix.writer()
writer.add_document(**doc)
writer.commit()
def update_document(self, doc_id, doc):
"""更新文档"""
writer = self.ix.writer()
doc['id'] = doc_id
writer.update_document(**doc)
writer.commit()
def delete_document(self, doc_id):
"""删除文档"""
writer = self.ix.writer()
writer.delete_by_term("id", doc_id)
writer.commit()
def search(self, query_str, limit=10):
"""搜索"""
parser = QueryParser("content", self.ix.schema)
query = parser.parse(query_str)
with self.ix.searcher() as searcher:
results = searcher.search(query, limit=limit)
return [hit for hit in results]
def optimize(self):
"""优化索引"""
writer = self.ix.writer()
writer.optimize()
writer.commit()
def backup(self, backup_dir=None):
"""备份索引"""
if backup_dir is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = f"{self.index_dir}_backup_{timestamp}"
shutil.copytree(self.index_dir, backup_dir)
return backup_dir
def get_stats(self):
"""获取统计信息"""
with self.ix.searcher() as searcher:
reader = searcher.reader()
return {
"doc_count": reader.doc_count(),
"segment_count": len(list(reader.segments())),
"index_size": self._get_dir_size(self.index_dir)
}
def _get_dir_size(self, path):
"""计算目录大小"""
total = 0
for root, dirs, files in os.walk(path):
for file in files:
total += os.path.getsize(os.path.join(root, file))
return total / (1024 * 1024) # MB
# 使用示例
print("=== 索引管理系统演示 ===\n")
# 创建索引管理器
manager = IndexManager("managed_index")
# 添加文档
print("【步骤1】添加文档")
for i in range(1, 11):
manager.add_document({
"id": f"doc_{i}",
"title": f"文档 {i}",
"content": f"这是文档 {i} 的内容",
"category": "测试"
})
print(f" 添加 10 篇文档")
# 查看统计
stats = manager.get_stats()
print(f"\n【步骤2】索引统计")
print(f" 文档数: {stats['doc_count']}")
print(f" 段数: {stats['segment_count']}")
print(f" 大小: {stats['index_size']:.2f} MB")
# 搜索
print(f"\n【步骤3】搜索")
results = manager.search("文档")
print(f" 搜索 '文档': 命中 {len(results)} 篇")
# 更新文档
print(f"\n【步骤4】更新文档")
manager.update_document("doc_1", {
"title": "更新后的文档 1",
"content": "更新后的内容",
"category": "测试"
})
# 删除文档
print(f"\n【步骤5】删除文档")
manager.delete_document("doc_2")
# 优化索引
print(f"\n【步骤6】优化索引")
manager.optimize()
# 备份
print(f"\n【步骤7】备份索引")
backup_dir = manager.backup()
print(f" 备份到: {backup_dir}")
# 最终统计
stats = manager.get_stats()
print(f"\n【步骤8】最终统计")
print(f" 文档数: {stats['doc_count']}")
print(f" 段数: {stats['segment_count']}")
print("\n✅ 索引管理系统演示完成!")本章小结
本章我们学习了 Whoosh 索引的优化与管理:
- 索引合并与优化:理解段的概念,使用 optimize() 方法优化索引
- 增量更新与删除:添加、更新、删除文档的方法
- 内存索引与磁盘索引:不同类型索引的特点和适用场景
- 索引备份与恢复:备份和恢复索引的方法
通过本章的学习,你应该能够:
- 理解 Whoosh 索引的内部结构
- 进行索引的优化和合并
- 实现文档的增量更新和删除
- 合理使用内存索引和磁盘索引
- 实现索引的备份和恢复
在下一章中,我们将学习结果处理与展示,包括高亮显示、摘要生成等。