第7章:索引优化与管理

7.1 索引合并与优化策略

7.1.1 索引结构原理

Whoosh 索引采用分段存储(Segment-based)的设计:

  • Segment(段):索引的基本单元,每个段包含独立的倒排索引数据
  • Commit(提交):每次创建或更新索引时会生成新段
  • Merge(合并):将多个小段合并为一个大段,减少文件数量

为什么需要索引合并?

  • 减少搜索时的文件打开数量
  • 提高查询性能(读取的文件更少)
  • 释放已删除文档的存储空间
  • 优化磁盘 I/O 性能

7.1.2 索引优化方法

方法1:使用 optimize() 进行全量优化

from whoosh.index import open_dir

ix = open_dir("my_index")

# 优化索引,将所有段合并为一个
writer = ix.writer()
writer.optimize()
writer.commit()

代码示例

from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT
import os
import shutil
import time

# 创建示例索引
index_dir = "optimize_demo"
if os.path.exists(index_dir):
    shutil.rmtree(index_dir)
os.makedirs(index_dir)

schema = Schema(
    title=TEXT(stored=True),
    content=TEXT(stored=True)
)

ix = create_in(index_dir, schema)

print("=== 索引优化演示 ===\n")

# 模拟多次提交,产生多个段
print("【步骤1】添加文档并多次提交(模拟产生多个段)")
for i in range(1, 6):
    writer = ix.writer()
    for j in range(1, 11):
        doc = {
            "title": f"文档 {(i-1)*10 + j}",
            "content": f"这是第 {(i-1)*10 + j} 篇文档的内容"
        }
        writer.add_document(**doc)
    writer.commit()
    print(f"  提交 {i}:添加 10 篇文档")

# 查看当前段信息
print("\n【步骤2】优化前的索引状态")
ix = open_dir(index_dir)
print(f"  段数量: {len(list(ix.reader().segments()))}")

# 优化索引
print("\n【步骤3】优化索引")
start_time = time.time()
writer = ix.writer()
writer.optimize()
writer.commit()
optimize_time = time.time() - start_time
print(f"  优化完成,耗时: {optimize_time:.2f} 秒")

# 查看优化后的段信息
print("\n【步骤4】优化后的索引状态")
ix = open_dir(index_dir)
print(f"  段数量: {len(list(ix.reader().segments()))}")

# 测试查询性能
print("\n【步骤5】查询性能对比")
from whoosh.qparser import QueryParser
parser = QueryParser("content", ix.schema)
query = parser.parse(u"文档")

start_time = time.time()
with ix.searcher() as searcher:
    results = searcher.search(query)
    hit_count = len(results)
search_time = time.time() - start_time
print(f"  查询耗时: {search_time:.4f} 秒,命中 {hit_count} 篇")

print("\n✅ 索引优化演示完成!")

方法2:部分优化(merge_factor)

# 创建 writer 时指定合并因子
writer = ix.writer(merge_factor=4)
writer.commit()

7.1.3 优化策略选择

场景 推荐策略 说明
小规模索引(< 1万文档) 定期全量优化 性能影响小,维护简单
中等规模(1万-100万) 增量优化 使用合理的 merge_factor
大规模索引(> 100万) 分区索引 + 延迟优化 避免全量优化阻塞
实时更新场景 避免频繁优化 使用 NRT(近实时)策略

7.1.4 查看索引状态

from whoosh.index import open_dir

ix = open_dir("my_index")
reader = ix.reader()

# 查看段信息
print(f"段数量: {len(list(reader.segments()))}")
print(f"文档总数: {reader.doc_count()}")
print(f"已删除文档: {reader.doc_count_all() - reader.doc_count()}")

# 查看每个段的大小
for seg in reader.segments():
    print(f"段名: {seg.segment_id}, 文档数: {seg.doc_count()}")

7.2 增量更新与删除文档

7.2.1 增量更新索引

添加新文档

from whoosh.index import open_dir

ix = open_dir("my_index")

# 创建 writer
writer = ix.writer()

# 添加新文档
writer.add_document(
    title="新文档标题",
    content="新文档内容"
)

# 提交更改
writer.commit()

代码示例

from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser
import os
import shutil

# 创建索引
index_dir = "update_demo"
if os.path.exists(index_dir):
    shutil.rmtree(index_dir)
os.makedirs(index_dir)

schema = Schema(
    id=ID(stored=True, unique=True),
    title=TEXT(stored=True),
    content=TEXT(stored=True)
)

ix = create_in(index_dir, schema)

print("=== 增量更新演示 ===\n")

# 初始化数据
print("【步骤1】初始化数据")
writer = ix.writer()
for i in range(1, 11):
    writer.add_document(
        id=f"doc_{i}",
        title=f"文档 {i}",
        content=f"这是文档 {i} 的内容"
    )
writer.commit()
print("  添加 10 篇文档")

# 查询当前文档数
with ix.searcher() as searcher:
    print(f"  当前文档数: {searcher.doc_count()}")

# 增量添加新文档
print("\n【步骤2】增量添加新文档")
new_docs = [
    {"id": "doc_11", "title": "文档 11", "content": "新增文档 11"},
    {"id": "doc_12", "title": "文档 12", "content": "新增文档 12"},
    {"id": "doc_13", "title": "文档 13", "content": "新增文档 13"},
]

writer = ix.writer()
for doc in new_docs:
    writer.add_document(**doc)
    print(f"  添加: {doc['title']}")
writer.commit()

# 查询更新后的文档数
with ix.searcher() as searcher:
    print(f"\n更新后文档数: {searcher.doc_count()}")

# 验证新文档
with ix.searcher() as searcher:
    parser = QueryParser("title", ix.schema)
    query = parser.parse(u"文档 11 OR 文档 12 OR 文档 13")
    results = searcher.search(query)
    print(f"新文档查询结果: {len(results)} 篇")

print("\n✅ 增量更新演示完成!")

7.2.2 更新已有文档

方法1:使用 update_document

# 根据 ID 字段更新文档
writer = ix.writer()
writer.update_document(
    id="doc_1",
    title="更新后的标题",
    content="更新后的内容"
)
writer.commit()

方法2:先删除再添加

from whoosh.query import Term

# 删除旧文档
writer = ix.writer()
writer.delete_by_term("id", "doc_1")

# 添加新文档
writer.add_document(
    id="doc_1",
    title="新标题",
    content="新内容"
)
writer.commit()

代码示例

from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser
import os
import shutil

# 创建索引
index_dir = "update_document_demo"
if os.path.exists(index_dir):
    shutil.rmtree(index_dir)
os.makedirs(index_dir)

schema = Schema(
    id=ID(stored=True, unique=True),
    title=TEXT(stored=True),
    content=TEXT(stored=True)
)

ix = create_in(index_dir, schema)

# 初始化数据
print("=== 文档更新演示 ===\n")

print("【步骤1】初始化数据")
writer = ix.writer()
writer.add_document(
    id="doc_1",
    title="原始标题",
    content="原始内容"
)
writer.commit()

# 查看原始文档
with ix.searcher() as searcher:
    parser = QueryParser("id", ix.schema)
    query = parser.parse(u"doc_1")
    results = searcher.search(query)
    for hit in results:
        print(f"原始文档: {hit['title']} - {hit['content']}")

# 方法1:使用 update_document 更新
print("\n【步骤2】使用 update_document 更新")
writer = ix.writer()
writer.update_document(
    id="doc_1",
    title="更新标题",
    content="更新内容"
)
writer.commit()

# 查看更新后的文档
with ix.searcher() as searcher:
    query = parser.parse(u"doc_1")
    results = searcher.search(query)
    for hit in results:
        print(f"更新后文档: {hit['title']} - {hit['content']}")

# 再次更新
print("\n【步骤3】再次更新文档")
writer = ix.writer()
writer.update_document(
    id="doc_1",
    title="最终标题",
    content="最终内容"
)
writer.commit()

# 查看最终文档
with ix.searcher() as searcher:
    query = parser.parse(u"doc_1")
    results = searcher.search(query)
    for hit in results:
        print(f"最终文档: {hit['title']} - {hit['content']}")

print("\n✅ 文档更新演示完成!")

7.2.3 删除文档

方法1:根据文档 ID 删除

writer = ix.writer()
writer.delete_by_term("id", "doc_1")
writer.commit()

方法2:根据查询删除

from whoosh.query import Term

writer = ix.writer()
query = Term("category", "过期")
writer.delete_by_query(query)
writer.commit()

方法3:根据文档编号删除

writer = ix.writer()
writer.delete_document(5)  # 删除文档编号为5的文档
writer.commit()

代码示例

from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT, ID, KEYWORD
from whoosh.query import Term
from whoosh.qparser import QueryParser
import os
import shutil

# 创建索引
index_dir = "delete_demo"
if os.path.exists(index_dir):
    shutil.rmtree(index_dir)
os.makedirs(index_dir)

schema = Schema(
    id=ID(stored=True, unique=True),
    title=TEXT(stored=True),
    content=TEXT(stored=True),
    category=KEYWORD(stored=True)
)

ix = create_in(index_dir, schema)

# 初始化数据
print("=== 文档删除演示 ===\n")

print("【步骤1】初始化数据")
writer = ix.writer()
docs = [
    {"id": "doc_1", "title": "文档1", "content": "内容1", "category": "有效"},
    {"id": "doc_2", "title": "文档2", "content": "内容2", "category": "有效"},
    {"id": "doc_3", "title": "文档3", "content": "内容3", "category": "过期"},
    {"id": "doc_4", "title": "文档4", "content": "内容4", "category": "有效"},
    {"id": "doc_5", "title": "文档5", "content": "内容5", "category": "过期"},
]
for doc in docs:
    writer.add_document(**doc)
writer.commit()

# 查看原始文档数
with ix.searcher() as searcher:
    print(f"初始文档数: {searcher.doc_count()}")

# 方法1:根据 ID 删除
print("\n【步骤2】根据 ID 删除文档")
writer = ix.writer()
writer.delete_by_term("id", "doc_1")
writer.commit()

with ix.searcher() as searcher:
    print(f"删除后文档数: {searcher.doc_count()}")
    parser = QueryParser("id", ix.schema)
    query = parser.parse(u"doc_1")
    results = searcher.search(query)
    print(f"doc_1 查询结果: {len(results)} 篇")

# 方法2:根据查询删除
print("\n【步骤3】根据分类删除过期文档")
writer = ix.writer()
writer.delete_by_query(Term("category", "过期"))
writer.commit()

with ix.searcher() as searcher:
    print(f"删除后文档数: {searcher.doc_count()}")
    parser = QueryParser("category", ix.schema)
    query = parser.parse(u"过期")
    results = searcher.search(query)
    print(f"过期文档查询结果: {len(results)} 篇")

# 查看剩余文档
print("\n【步骤4】查看剩余文档")
with ix.searcher() as searcher:
    parser = QueryParser("title", ix.schema)
    query = parser.parse(u"*")
    results = searcher.search(query)
    print("剩余文档:")
    for hit in results:
        print(f"  - {hit['title']} ({hit['category']})")

print("\n✅ 文档删除演示完成!")

7.2.4 批量操作与事务

批量添加

# 批量添加文档
writer = ix.writer()
for i in range(1000):
    writer.add_document(
        title=f"文档 {i}",
        content=f"内容 {i}"
    )
writer.commit()  # 一次性提交

批量更新

# 批量更新
writer = ix.writer()
for doc_id in ["doc_1", "doc_2", "doc_3"]:
    writer.update_document(
        id=doc_id,
        title=f"更新 {doc_id}",
        content="新内容"
    )
writer.commit()

取消操作

# 取消未提交的更改
writer = ix.writer()
writer.add_document(title="临时文档")
writer.cancel()  # 取消所有更改

7.3 内存索引与磁盘索引

7.3.1 内存索引

特点

  • 数据存储在内存中
  • 查询速度极快
  • 重启后数据丢失
  • 适合临时索引和缓存

创建内存索引

from whoosh.index import RamStorage, FileIndex, create_in
from whoosh.fields import Schema, TEXT

# 方法1:使用 RamStorage
storage = RamStorage()
ix = FileIndex.create(storage, Schema(title=TEXT()))

# 方法2:使用 create_in(内存版本)
storage = RamStorage()
ix = create_in(storage, Schema(title=TEXT()))

代码示例

from whoosh.index import RamStorage, FileIndex, create_in
from whoosh.fields import Schema, TEXT
from whoosh.qparser import QueryParser

print("=== 内存索引演示 ===\n")

# 创建内存索引
print("【步骤1】创建内存索引")
storage = RamStorage()
schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True))
ix = create_in(storage, schema)
print("  内存索引创建成功")

# 添加文档
print("\n【步骤2】添加文档")
writer = ix.writer()
for i in range(1, 6):
    writer.add_document(
        title=f"内存文档 {i}",
        content=f"这是内存中的文档 {i}"
    )
writer.commit()
print("  添加 5 篇文档")

# 查询
print("\n【步骤3】查询内存索引")
parser = QueryParser("content", ix.schema)
query = parser.parse(u"文档")

with ix.searcher() as searcher:
    results = searcher.search(query)
    print(f"  命中 {len(results)} 篇")
    for hit in results:
        print(f"    - {hit['title']}")

print("\n✅ 内存索引演示完成!")

7.3.2 磁盘索引

特点

  • 数据存储在磁盘上
  • 查询速度较慢但持久
  • 重启后数据保留
  • 适合长期存储

创建磁盘索引

from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT

# 创建磁盘索引
ix = create_in("my_index", Schema(title=TEXT()))

# 打开已存在的磁盘索引
ix = open_dir("my_index")

7.3.3 内存索引与磁盘索引对比

特性 内存索引 磁盘索引
存储位置 内存 磁盘
查询速度 极快 较快
写入速度 极快 较慢
数据持久性
容量限制 受内存限制 受磁盘限制
适用场景 临时查询、缓存 长期存储

7.3.4 混合使用场景

场景1:主从索引

from whoosh.index import open_dir
from whoosh.index import RamStorage, create_in
from whoosh.fields import Schema, TEXT

# 主索引(磁盘)
disk_index = open_dir("main_index")

# 从索引(内存,用于热数据)
storage = RamStorage()
memory_index = create_in(storage, Schema(title=TEXT()))

场景2:内存加速缓存

# 将常用数据加载到内存索引
disk_index = open_dir("main_index")

# 提取热点数据到内存
storage = RamStorage()
memory_index = create_in(storage, disk_index.schema)

writer = memory_index.writer()
with disk_index.searcher() as searcher:
    for hit in searcher.search_page(query, 1, pagelen=100):
        writer.add_document(**hit.fields())
writer.commit()

7.4 索引备份与恢复

7.4.1 索引备份

方法1:文件系统备份

import shutil
from whoosh.index import open_dir

# 复制整个索引目录
source_dir = "my_index"
backup_dir = "my_index_backup"

shutil.copytree(source_dir, backup_dir)
print(f"索引已备份到: {backup_dir}")

方法2:使用 shutil 进行压缩备份

import shutil
import os
from datetime import datetime

# 创建压缩备份
source_dir = "my_index"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"index_backup_{timestamp}.zip"

shutil.make_archive(backup_file.replace('.zip', ''), 'zip', source_dir)
print(f"索引已压缩备份到: {backup_file}")

代码示例

import shutil
import os
from datetime import datetime
from whoosh.index import create_in, open_dir
from whoosh.fields import Schema, TEXT

print("=== 索引备份演示 ===\n")

# 创建测试索引
index_dir = "backup_demo_index"
if os.path.exists(index_dir):
    shutil.rmtree(index_dir)
os.makedirs(index_dir)

schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True))
ix = create_in(index_dir, schema)

# 添加文档
writer = ix.writer()
for i in range(1, 6):
    writer.add_document(
        title=f"文档 {i}",
        content=f"内容 {i}"
    )
writer.commit()

print("【步骤1】创建测试索引")
print(f"  索引目录: {index_dir}")
print(f"  包含文件: {len(os.listdir(index_dir))} 个")

# 备份索引
print("\n【步骤2】备份索引")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = f"backup_demo_backup_{timestamp}"
shutil.copytree(index_dir, backup_dir)
print(f"  备份目录: {backup_dir}")

# 验证备份
print("\n【步骤3】验证备份")
print(f"  原索引文件: {os.listdir(index_dir)}")
print(f"  备份文件: {os.listdir(backup_dir)}")

# 压缩备份
print("\n【步骤4】创建压缩备份")
zip_name = f"backup_demo_{timestamp}"
shutil.make_archive(zip_name, 'zip', backup_dir)
print(f"  压缩文件: {zip_name}.zip")

print("\n✅ 索引备份演示完成!")

7.4.2 索引恢复

从备份恢复

import shutil

# 从备份恢复
backup_dir = "my_index_backup"
target_dir = "my_index"

# 删除旧索引(如果存在)
if os.path.exists(target_dir):
    shutil.rmtree(target_dir)

# 复制备份
shutil.copytree(backup_dir, target_dir)
print(f"索引已从备份恢复到: {target_dir}")

从压缩文件恢复

import shutil
import zipfile

# 解压备份
zip_file = "index_backup_20240101_120000.zip"
target_dir = "my_index_restored"

shutil.unpack_archive(zip_file, target_dir)
print(f"索引已从压缩包恢复到: {target_dir}")

7.4.3 增量备份策略

基于时间戳的增量备份

from datetime import datetime
import shutil

def incremental_backup(source_dir, backup_base_dir):
    """增量备份"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_dir = os.path.join(backup_base_dir, f"backup_{timestamp}")
    shutil.copytree(source_dir, backup_dir)
    
    # 保留最近 N 个备份
    backups = sorted([
        d for d in os.listdir(backup_base_dir)
        if d.startswith("backup_")
    ])
    
    # 只保留最近的 5 个备份
    while len(backups) > 5:
        old_backup = os.path.join(backup_base_dir, backups.pop(0))
        shutil.rmtree(old_backup)
        print(f"删除旧备份: {old_backup}")
    
    return backup_dir

7.4.4 索引迁移

迁移到新目录

import shutil
from whoosh.index import open_dir

# 迁移前验证
old_index = open_dir("old_index")
print(f"迁移前文档数: {old_index.searcher().doc_count()}")

# 执行迁移
shutil.copytree("old_index", "new_index")

# 迁移后验证
new_index = open_dir("new_index")
print(f"迁移后文档数: {new_index.searcher().doc_count()}")

7.5 综合示例

7.5.1 完整的索引管理系统

from whoosh.index import create_in, open_dir, FileIndex, RamStorage
from whoosh.fields import Schema, TEXT, ID, KEYWORD
from whoosh.qparser import QueryParser
from whoosh.query import Term
import shutil
import os
from datetime import datetime

class IndexManager:
    """索引管理类"""
    
    def __init__(self, index_dir, schema=None):
        self.index_dir = index_dir
        self.schema = schema or self._default_schema()
        
        # 创建或打开索引
        if os.path.exists(index_dir):
            self.ix = open_dir(index_dir)
        else:
            os.makedirs(index_dir)
            self.ix = create_in(index_dir, self.schema)
    
    def _default_schema(self):
        return Schema(
            id=ID(stored=True, unique=True),
            title=TEXT(stored=True),
            content=TEXT(stored=True),
            category=KEYWORD(stored=True)
        )
    
    def add_document(self, doc):
        """添加文档"""
        writer = self.ix.writer()
        writer.add_document(**doc)
        writer.commit()
    
    def update_document(self, doc_id, doc):
        """更新文档"""
        writer = self.ix.writer()
        doc['id'] = doc_id
        writer.update_document(**doc)
        writer.commit()
    
    def delete_document(self, doc_id):
        """删除文档"""
        writer = self.ix.writer()
        writer.delete_by_term("id", doc_id)
        writer.commit()
    
    def search(self, query_str, limit=10):
        """搜索"""
        parser = QueryParser("content", self.ix.schema)
        query = parser.parse(query_str)
        with self.ix.searcher() as searcher:
            results = searcher.search(query, limit=limit)
            return [hit for hit in results]
    
    def optimize(self):
        """优化索引"""
        writer = self.ix.writer()
        writer.optimize()
        writer.commit()
    
    def backup(self, backup_dir=None):
        """备份索引"""
        if backup_dir is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_dir = f"{self.index_dir}_backup_{timestamp}"
        shutil.copytree(self.index_dir, backup_dir)
        return backup_dir
    
    def get_stats(self):
        """获取统计信息"""
        with self.ix.searcher() as searcher:
            reader = searcher.reader()
            return {
                "doc_count": reader.doc_count(),
                "segment_count": len(list(reader.segments())),
                "index_size": self._get_dir_size(self.index_dir)
            }
    
    def _get_dir_size(self, path):
        """计算目录大小"""
        total = 0
        for root, dirs, files in os.walk(path):
            for file in files:
                total += os.path.getsize(os.path.join(root, file))
        return total / (1024 * 1024)  # MB

# 使用示例
print("=== 索引管理系统演示 ===\n")

# 创建索引管理器
manager = IndexManager("managed_index")

# 添加文档
print("【步骤1】添加文档")
for i in range(1, 11):
    manager.add_document({
        "id": f"doc_{i}",
        "title": f"文档 {i}",
        "content": f"这是文档 {i} 的内容",
        "category": "测试"
    })
print(f"  添加 10 篇文档")

# 查看统计
stats = manager.get_stats()
print(f"\n【步骤2】索引统计")
print(f"  文档数: {stats['doc_count']}")
print(f"  段数: {stats['segment_count']}")
print(f"  大小: {stats['index_size']:.2f} MB")

# 搜索
print(f"\n【步骤3】搜索")
results = manager.search("文档")
print(f"  搜索 '文档': 命中 {len(results)} 篇")

# 更新文档
print(f"\n【步骤4】更新文档")
manager.update_document("doc_1", {
    "title": "更新后的文档 1",
    "content": "更新后的内容",
    "category": "测试"
})

# 删除文档
print(f"\n【步骤5】删除文档")
manager.delete_document("doc_2")

# 优化索引
print(f"\n【步骤6】优化索引")
manager.optimize()

# 备份
print(f"\n【步骤7】备份索引")
backup_dir = manager.backup()
print(f"  备份到: {backup_dir}")

# 最终统计
stats = manager.get_stats()
print(f"\n【步骤8】最终统计")
print(f"  文档数: {stats['doc_count']}")
print(f"  段数: {stats['segment_count']}")

print("\n✅ 索引管理系统演示完成!")

本章小结

本章我们学习了 Whoosh 索引的优化与管理:

  1. 索引合并与优化:理解段的概念,使用 optimize() 方法优化索引
  2. 增量更新与删除:添加、更新、删除文档的方法
  3. 内存索引与磁盘索引:不同类型索引的特点和适用场景
  4. 索引备份与恢复:备份和恢复索引的方法

通过本章的学习,你应该能够:

  • 理解 Whoosh 索引的内部结构
  • 进行索引的优化和合并
  • 实现文档的增量更新和删除
  • 合理使用内存索引和磁盘索引
  • 实现索引的备份和恢复

在下一章中,我们将学习结果处理与展示,包括高亮显示、摘要生成等。

« 上一篇 高级查询技巧 下一篇 » 结果处理与展示