【数据】数据库自然语言查询智能体(NL2SQL)

1. 需求分析与技术架构

1.1 核心需求

数据库自然语言查询智能体需要具备以下核心能力:

  • 自然语言理解:理解用户用自然语言表达的查询需求
  • SQL生成:将自然语言查询转换为正确的SQL语句
  • 数据库连接:连接到各种类型的数据库(MySQL、PostgreSQL等)
  • 查询执行:执行生成的SQL语句并获取结果
  • 结果处理:将查询结果以友好的方式呈现给用户
  • 错误处理:处理SQL语法错误和执行错误
  • 模式识别:识别数据库表结构和字段含义
  • 上下文理解:理解多轮对话中的上下文信息

1.2 技术架构

我们将采用以下技术栈构建数据库自然语言查询智能体:

┌─────────────────────┐
│     用户界面层      │
│  Gradio Web界面     │
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│     智能体核心层     │
│  LangChain + LLM    │
└──────────┬──────────┘
           │
┌──────────▼──────────┐
│     工具与数据层     │
│ 1. 自然语言解析工具  │
│ 2. SQL生成器        │
│ 3. 数据库连接器      │
│ 4. 结果处理器        │
└─────────────────────┘

2. 环境搭建与依赖配置

首先,我们需要创建项目并安装必要的依赖:

# 创建项目目录
mkdir nl2sql-agent
cd nl2sql-agent

# 初始化Python环境
python -m venv venv
venv\Scripts\activate  # Windows
# 或 source venv/bin/activate  # macOS/Linux

# 安装依赖
pip install langchain langchain-openai gradio pandas sqlalchemy pymysql psycopg2-binary

3. 核心功能实现

3.1 配置文件管理

创建 config.py 文件管理配置信息:

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

# API密钥配置
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# 系统配置
SYSTEM_PROMPT = """你是一个专业的数据库查询助手,负责将用户的自然语言问题转换为SQL查询语句。

你的职责包括:
1. 理解用户用自然语言表达的查询需求
2. 根据数据库表结构,生成正确的SQL查询语句
3. 确保SQL语句语法正确,逻辑合理
4. 处理用户的后续问题和上下文
5. 以友好的方式呈现查询结果

请保持专业、准确的态度,确保生成的SQL语句能够正确执行并返回用户需要的结果。"""

# 数据库配置
DATABASE_CONFIG = {
    "mysql": {
        "driver": "mysql+pymysql",
        "host": "localhost",
        "port": 3306,
        "user": "root",
        "password": "password",
        "database": "example"
    },
    "postgresql": {
        "driver": "postgresql+psycopg2",
        "host": "localhost",
        "port": 5432,
        "user": "postgres",
        "password": "password",
        "database": "example"
    },
    "sqlite": {
        "driver": "sqlite",
        "database": "example.db"
    }
}

# SQL模板配置
SQL_TEMPLATES = {
    "select": "SELECT {columns} FROM {table} WHERE {conditions}",
    "group_by": "SELECT {columns} FROM {table} GROUP BY {group_by} HAVING {conditions}",
    "join": "SELECT {columns} FROM {table1} JOIN {table2} ON {join_condition} WHERE {conditions}"
}

# 示例表结构
EXAMPLE_SCHEMA = {
    "employees": {
        "columns": [
            {"name": "id", "type": "INT", "description": "员工ID"},
            {"name": "name", "type": "VARCHAR(100)", "description": "员工姓名"},
            {"name": "department", "type": "VARCHAR(100)", "description": "部门"},
            {"name": "salary", "type": "DECIMAL(10,2)", "description": "工资"},
            {"name": "hire_date", "type": "DATE", "description": "入职日期"}
        ]
    },
    "departments": {
        "columns": [
            {"name": "id", "type": "INT", "description": "部门ID"},
            {"name": "name", "type": "VARCHAR(100)", "description": "部门名称"},
            {"name": "manager_id", "type": "INT", "description": "经理ID"},
            {"name": "location", "type": "VARCHAR(100)", "description": "部门位置"}
        ]
    }
}

3.2 SQL生成工具

创建 sql_generator.py 文件生成SQL语句:

# sql_generator.py
from langchain_openai import ChatOpenAI
from config import OPENAI_API_KEY

class SQLGenerator:
    """SQL生成类"""
    
    def __init__(self):
        self.llm = ChatOpenAI(
            api_key=OPENAI_API_KEY,
            model="gpt-3.5-turbo",
            temperature=0.3
        )
    
    def generate_sql(self, natural_language, schema, db_type="mysql"):
        """生成SQL语句"""
        # 构建表结构描述
        schema_description = self._format_schema(schema)
        
        prompt = f"""
请将以下自然语言查询转换为{db_type}的SQL语句:

自然语言查询:{natural_language}

数据库表结构:
{schema_description}

请遵循以下要求:
1. 生成正确的{db_type} SQL语法
2. 确保SQL语句能够正确执行
3. 只返回SQL语句,不要包含其他无关内容
4. 如果需要使用JOIN,请确保表之间有合理的关联条件

请直接生成SQL语句,不要包含任何解释或说明。
        """
        
        response = self.llm.invoke(prompt)
        return response.content
    
    def _format_schema(self, schema):
        """格式化表结构描述"""
        formatted = []
        for table_name, table_info in schema.items():
            formatted.append(f"表名:{table_name}")
            formatted.append("字段:")
            for column in table_info.get("columns", []):
                formatted.append(f"  - {column['name']} ({column['type']}): {column['description']}")
            formatted.append("")
        return "\n".join(formatted)
    
    def optimize_sql(self, sql, db_type="mysql"):
        """优化SQL语句"""
        prompt = f"""
请优化以下{db_type} SQL语句:

{sql}

优化要求:
1. 提高查询性能
2. 保持查询结果不变
3. 遵循{db_type}最佳实践
4. 只返回优化后的SQL语句

请直接返回优化后的SQL语句,不要包含其他无关内容。
        """
        
        response = self.llm.invoke(prompt)
        return response.content
    
    def explain_sql(self, sql):
        """解释SQL语句"""
        prompt = f"""
请解释以下SQL语句的含义:

{sql}

解释要求:
1. 说明查询的目的
2. 解释SQL语句的各个部分
3. 说明查询结果会包含什么信息
4. 使用通俗易懂的语言
        """
        
        response = self.llm.invoke(prompt)
        return response.content

3.3 数据库连接工具

创建 database_connector.py 文件连接数据库:

# database_connector.py
from sqlalchemy import create_engine, inspect
import pandas as pd
from config import DATABASE_CONFIG

class DatabaseConnector:
    """数据库连接类"""
    
    def __init__(self):
        self.engines = {}
    
    def connect(self, db_type="sqlite", config=None):
        """连接到数据库"""
        if config is None:
            config = DATABASE_CONFIG[db_type]
        
        # 构建连接字符串
        if db_type == "sqlite":
            connection_string = f"{config['driver']}:///{config['database']}"
        else:
            connection_string = f"{config['driver']}://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
        
        # 创建引擎
        engine = create_engine(connection_string)
        self.engines[db_type] = engine
        
        return engine
    
    def get_schema(self, db_type="sqlite"):
        """获取数据库表结构"""
        engine = self.engines.get(db_type)
        if not engine:
            engine = self.connect(db_type)
        
        inspector = inspect(engine)
        schema = {}
        
        # 获取所有表
        tables = inspector.get_table_names()
        
        for table in tables:
            columns = []
            # 获取表的所有列
            for column in inspector.get_columns(table):
                columns.append({
                    "name": column["name"],
                    "type": str(column["type"]),
                    "description": ""
                })
            schema[table] = {"columns": columns}
        
        return schema
    
    def execute_query(self, sql, db_type="sqlite"):
        """执行SQL查询"""
        engine = self.engines.get(db_type)
        if not engine:
            engine = self.connect(db_type)
        
        try:
            # 执行查询并返回结果
            with engine.connect() as connection:
                result = connection.execute(sql)
                # 获取列名
                columns = result.keys()
                # 获取数据
                data = result.fetchall()
                # 转换为DataFrame
                df = pd.DataFrame(data, columns=columns)
                return df
        except Exception as e:
            raise Exception(f"执行查询时出错:{str(e)}")
    
    def create_example_tables(self, db_type="sqlite"):
        """创建示例表结构"""
        engine = self.engines.get(db_type)
        if not engine:
            engine = self.connect(db_type)
        
        # 创建员工表
        create_employees = """
        CREATE TABLE IF NOT EXISTS employees (
            id INT PRIMARY KEY,
            name VARCHAR(100),
            department VARCHAR(100),
            salary DECIMAL(10,2),
            hire_date DATE
        )
        """
        
        # 创建部门表
        create_departments = """
        CREATE TABLE IF NOT EXISTS departments (
            id INT PRIMARY KEY,
            name VARCHAR(100),
            manager_id INT,
            location VARCHAR(100)
        )
        """
        
        # 插入示例数据
        insert_employees = """
        INSERT INTO employees (id, name, department, salary, hire_date)
        VALUES 
            (1, '张三', '技术部', 10000, '2020-01-01'),
            (2, '李四', '销售部', 8000, '2020-02-01'),
            (3, '王五', '技术部', 12000, '2019-12-01'),
            (4, '赵六', '人力资源部', 7000, '2020-03-01'),
            (5, '钱七', '销售部', 9000, '2020-01-15')
        """
        
        insert_departments = """
        INSERT INTO departments (id, name, manager_id, location)
        VALUES 
            (1, '技术部', 1, '北京'),
            (2, '销售部', 2, '上海'),
            (3, '人力资源部', 4, '广州')
        """
        
        try:
            with engine.connect() as connection:
                # 创建表
                connection.execute(create_employees)
                connection.execute(create_departments)
                # 清空表数据
                connection.execute("DELETE FROM employees")
                connection.execute("DELETE FROM departments")
                # 插入数据
                connection.execute(insert_employees)
                connection.execute(insert_departments)
                # 提交事务
                connection.commit()
        except Exception as e:
            raise Exception(f"创建示例表时出错:{str(e)}")

3.4 智能体构建

创建 agent.py 文件构建智能体:

# agent.py
from langchain_openai import ChatOpenAI
from langchain.agents import AgentType, initialize_agent
from langchain.memory import ConversationBufferMemory
from sql_generator import SQLGenerator
from database_connector import DatabaseConnector
from config import OPENAI_API_KEY, SYSTEM_PROMPT, EXAMPLE_SCHEMA

# 初始化LLM
llm = ChatOpenAI(
    api_key=OPENAI_API_KEY,
    model="gpt-3.5-turbo",
    temperature=0.7
)

# 初始化记忆
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 初始化工具类
sql_generator = SQLGenerator()
db_connector = DatabaseConnector()

# 定义智能体工具
from langchain.tools import tool

@tool

def generate_sql(natural_language: str, schema: str = None, db_type: str = "sqlite") -> str:
    """将自然语言转换为SQL语句"""
    try:
        # 如果没有提供schema,使用示例schema
        if schema is None:
            schema_to_use = EXAMPLE_SCHEMA
        else:
            import json
            schema_to_use = json.loads(schema)
        
        # 生成SQL
        sql = sql_generator.generate_sql(natural_language, schema_to_use, db_type)
        
        # 解释SQL
        explanation = sql_generator.explain_sql(sql)
        
        return f"生成的SQL语句:\n{sql}\n\nSQL解释:\n{explanation}"
    except Exception as e:
        return f"生成SQL时出错:{str(e)}"

@tool

def execute_sql(sql: str, db_type: str = "sqlite") -> str:
    """执行SQL语句并返回结果"""
    try:
        # 执行查询
        result = db_connector.execute_query(sql, db_type)
        
        # 转换结果为字符串
        result_str = result.to_string(index=False)
        
        return f"查询结果:\n{result_str}"
    except Exception as e:
        return f"执行SQL时出错:{str(e)}"

@tool

def get_database_schema(db_type: str = "sqlite") -> str:
    """获取数据库表结构"""
    try:
        # 获取表结构
        schema = db_connector.get_schema(db_type)
        
        # 格式化表结构
        formatted = []
        for table_name, table_info in schema.items():
            formatted.append(f"表名:{table_name}")
            formatted.append("字段:")
            for column in table_info.get("columns", []):
                formatted.append(f"  - {column['name']} ({column['type']})")
            formatted.append("")
        
        return "\n".join(formatted)
    except Exception as e:
        return f"获取表结构时出错:{str(e)}"

@tool

def create_example_db(db_type: str = "sqlite") -> str:
    """创建示例数据库和表结构"""
    try:
        # 创建示例表
        db_connector.create_example_tables(db_type)
        
        # 获取表结构
        schema = db_connector.get_schema(db_type)
        
        return f"示例数据库创建成功!\n\n表结构:\n{get_database_schema(db_type)}"
    except Exception as e:
        return f"创建示例数据库时出错:{str(e)}"

# 定义工具列表
tools = [
    generate_sql,
    execute_sql,
    get_database_schema,
    create_example_db
]

# 初始化智能体
nl2sql_agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
    memory=memory,
    system_message=SYSTEM_PROMPT,
    verbose=True
)

3.5 Web界面构建

创建 app.py 文件构建Gradio Web界面:

# app.py
import gradio as gr
import json
from agent import nl2sql_agent
from config import EXAMPLE_SCHEMA

# 处理自然语言查询
def handle_natural_language_query(query, db_type):
    # 调用智能体生成SQL
    sql_result = nl2sql_agent.invoke(f"将以下自然语言转换为{db_type} SQL语句:{query}")
    
    # 提取SQL语句
    import re
    sql_match = re.search(r'生成的SQL语句:\n(.*?)\n\nSQL解释', sql_result['output'], re.DOTALL)
    if sql_match:
        sql = sql_match.group(1).strip()
        # 执行SQL
        execute_result = nl2sql_agent.invoke(f"执行以下SQL语句:{sql}")
        return sql_result['output'], execute_result['output']
    else:
        return sql_result['output'], "无法生成SQL语句"

# 处理SQL直接执行
def handle_sql_execution(sql, db_type):
    # 调用智能体执行SQL
    result = nl2sql_agent.invoke(f"执行以下{db_type} SQL语句:{sql}")
    return result['output']

# 处理获取表结构
def handle_get_schema(db_type):
    # 调用智能体获取表结构
    result = nl2sql_agent.invoke(f"获取{db_type}数据库的表结构")
    return result['output']

# 处理创建示例数据库
def handle_create_example_db(db_type):
    # 调用智能体创建示例数据库
    result = nl2sql_agent.invoke(f"创建{db_type}示例数据库")
    return result['output']

# 示例查询
EXAMPLE_QUERIES = [
    "查询所有员工的姓名和工资",
    "查询技术部的员工数量",
    "查询工资高于10000的员工",
    "查询每个部门的平均工资",
    "查询员工数量最多的部门"
]

# 创建Gradio界面
with gr.Blocks(title="数据库自然语言查询智能助手") as demo:
    gr.Markdown("""
    # 数据库自然语言查询智能助手
    用自然语言提问,自动生成SQL并执行查询!
    """)
    
    # 标签页
    with gr.Tabs():
        # 自然语言查询标签页
        with gr.TabItem("自然语言查询"):
            db_type_input = gr.Dropdown(label="数据库类型", choices=["sqlite", "mysql", "postgresql"], value="sqlite")
            query_input = gr.Textbox(label="输入自然语言查询", lines=3, placeholder="例如:查询所有员工的姓名和工资")
            example_queries = gr.Dataset(
                components=[gr.Textbox()],
                samples=[[q] for q in EXAMPLE_QUERIES],
                type="index"
            )
            query_btn = gr.Button("生成并执行查询")
            sql_output = gr.Textbox(label="生成的SQL语句", lines=5)
            result_output = gr.Textbox(label="查询结果", lines=10)
            
            # 示例查询点击事件
            def load_example_query(index):
                return EXAMPLE_QUERIES[index]
            
            example_queries.click(
                fn=load_example_query,
                inputs=[example_queries],
                outputs=[query_input]
            )
            
            query_btn.click(
                fn=handle_natural_language_query,
                inputs=[query_input, db_type_input],
                outputs=[sql_output, result_output]
            )
        
        # SQL执行标签页
        with gr.TabItem("SQL执行"):
            db_type_input2 = gr.Dropdown(label="数据库类型", choices=["sqlite", "mysql", "postgresql"], value="sqlite")
            sql_input = gr.Textbox(label="输入SQL语句", lines=5, placeholder="例如:SELECT * FROM employees")
            execute_btn = gr.Button("执行SQL")
            execute_output = gr.Textbox(label="执行结果", lines=10)
            
            execute_btn.click(
                fn=handle_sql_execution,
                inputs=[sql_input, db_type_input2],
                outputs=[execute_output]
            )
        
        # 数据库管理标签页
        with gr.TabItem("数据库管理"):
            db_type_input3 = gr.Dropdown(label="数据库类型", choices=["sqlite", "mysql", "postgresql"], value="sqlite")
            get_schema_btn = gr.Button("获取表结构")
            create_example_btn = gr.Button("创建示例数据库")
            schema_output = gr.Textbox(label="操作结果", lines=15)
            
            get_schema_btn.click(
                fn=handle_get_schema,
                inputs=[db_type_input3],
                outputs=[schema_output]
            )
            
            create_example_btn.click(
                fn=handle_create_example_db,
                inputs=[db_type_input3],
                outputs=[schema_output]
            )

# 启动应用
if __name__ == "__main__":
    demo.launch(share=True)

4. 部署与使用

4.1 本地部署

# 运行应用
python app.py

4.2 Docker容器化

创建 Dockerfile 文件:

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 安装数据库客户端
RUN apt-get update && apt-get install -y default-mysql-client postgresql-client

COPY . .

EXPOSE 7860

CMD ["python", "app.py"]

创建 requirements.txt 文件:

langchain
langchain-openai
gradio
pandas
sqlalchemy
pymysql
psycopg2-binary
python-dotenv

4.3 使用流程

  1. 自然语言查询

    • 在"自然语言查询"标签页选择数据库类型
    • 输入自然语言查询(如"查询所有员工的姓名和工资")
    • 点击"生成并执行查询"按钮
    • 查看生成的SQL语句和查询结果
  2. SQL直接执行

    • 在"SQL执行"标签页选择数据库类型
    • 输入SQL语句
    • 点击"执行SQL"按钮
    • 查看执行结果
  3. 数据库管理

    • 在"数据库管理"标签页选择数据库类型
    • 点击"创建示例数据库"按钮创建示例表结构
    • 点击"获取表结构"按钮查看数据库表结构

5. 功能测试与优化

5.1 测试场景

  1. 基本查询测试

    • 测试简单的SELECT查询
    • 测试带WHERE条件的查询
    • 测试带GROUP BY的查询
  2. 复杂查询测试

    • 测试JOIN查询
    • 测试子查询
    • 测试带复杂条件的查询
  3. 多轮对话测试

    • 测试上下文理解能力
    • 测试指代消解(如"上一个查询的结果中工资最高的是谁")
  4. 错误处理测试

    • 测试处理语法错误的SQL
    • 测试处理执行错误的SQL
    • 测试处理语义错误的自然语言查询

5.2 优化建议

  1. SQL生成优化

    • 集成专门的NL2SQL模型,提高SQL生成的准确性
    • 增加SQL语法检查和验证
    • 实现更智能的表名和字段名匹配
  2. 数据库支持

    • 增加对更多数据库类型的支持(如Oracle、SQL Server等)
    • 实现更灵活的数据库连接配置
    • 添加数据库连接池管理
  3. 功能增强

    • 添加数据可视化功能,将查询结果转换为图表
    • 实现查询结果的导出功能(如CSV、Excel等)
    • 支持复杂的数据分析和报表生成
  4. 用户体验优化

    • 增加查询历史记录
    • 实现查询模板管理
    • 优化错误提示,提供更友好的错误信息
  5. 性能优化

    • 实现SQL查询缓存
    • 优化数据库连接管理
    • 提高查询执行速度

6. 总结与展望

6.1 项目总结

本实战案例成功构建了一个数据库自然语言查询智能体,具备以下核心功能:

  • ✅ 自然语言到SQL的自动转换
  • ✅ 支持多种数据库类型(SQLite、MySQL、PostgreSQL)
  • ✅ 自动执行SQL查询并获取结果
  • ✅ 详细的SQL语句解释
  • ✅ 数据库表结构自动识别
  • ✅ 示例数据库快速创建
  • ✅ Web界面交互
  • ✅ Docker容器化部署

6.2 未来展望

  1. 智能化升级

    • 引入更先进的NL2SQL模型,提高复杂查询的处理能力
    • 实现基于数据库内容的语义理解
    • 开发自适应学习能力,根据用户反馈不断改进
  2. 功能扩展

    • 支持更复杂的数据库操作(如插入、更新、删除)
    • 实现数据库设计和优化建议
    • 集成数据清洗和预处理功能
  3. 平台集成

    • 与BI工具集成,提供更强大的数据分析能力
    • 与企业数据仓库集成
    • 支持云数据库服务(如AWS RDS、阿里云数据库等)
  4. 技术创新

    • 探索使用本地大模型进行NL2SQL转换,保护数据隐私
    • 实现跨语言NL2SQL(支持中文、英文等多语言)
    • 开发基于对话的复杂数据分析系统

通过本项目的实践,我们不仅掌握了AI智能体在数据库操作领域的应用方法,也了解了从自然语言理解到SQL生成执行的完整技术流程。随着技术的不断进步,数据库自然语言查询智能体将在数据驱动决策中发挥越来越重要的作用,为非技术人员提供更便捷的数据库访问方式。

« 上一篇 【多媒体】视频脚本转图文短视频的智能体 下一篇 » 【游戏】NPC智能体:赋予游戏角色动态对话与任务能力