第110集:网络编程综合练习

学习目标

  1. 综合运用网络编程基础知识
  2. 理解完整网络应用的架构设计
  3. 掌握多模块协同开发方法
  4. 学习网络应用的测试与调试技巧
  5. 培养项目整体规划能力

一、项目概述

本集将通过一个完整的网络应用项目,综合运用前几集所学的网络编程知识,包括:

  • socket 编程基础
  • HTTP 协议原理
  • requests 库的使用
  • 网页解析技术
  • 简单爬虫实现

我们将开发一个简易的新闻聚合器,该应用能够:

  1. 从多个新闻网站爬取新闻内容
  2. 解析并提取关键信息
  3. 提供本地 HTTP 服务展示聚合结果
  4. 支持简单的查询和过滤功能

二、项目架构设计

2.1 整体架构

┌────────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  新闻网站数据源     │────▶│  爬虫模块        │────▶│  数据存储模块    │
└────────────────────┘     └─────────────────┘     └────────┬────────┘
                                                           │
                                                           ▼
┌────────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│  用户浏览器        │◀────│  HTTP服务模块    │◀────│  数据处理模块    │
└────────────────────┘     └─────────────────┘     └─────────────────┘

2.2 模块划分

模块名称 主要功能 技术要点
爬虫模块 爬取新闻网站内容 requests 库、异常处理、多线程/多进程
解析模块 提取新闻关键信息 BeautifulSoup、lxml、CSS选择器/XPath
存储模块 保存和管理新闻数据 文件存储(JSON/CSV)、SQLite数据库
HTTP服务模块 提供Web访问接口 socket 编程、HTTP协议、路由处理
展示模块 生成用户界面 HTML模板、CSS样式、简单JavaScript

三、核心功能实现

3.1 爬虫模块设计

import requests
from bs4 import BeautifulSoup
import time
import threading
from queue import Queue

class NewsSpider:
    def __init__(self, news_sources, max_threads=5):
        self.news_sources = news_sources  # 新闻源配置
        self.news_queue = Queue()  # 存储爬取的新闻
        self.max_threads = max_threads  # 最大线程数
        
    def fetch_page(self, url, source_config):
        """获取网页内容"""
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"获取{url}失败: {e}")
            return None
    
    def parse_page(self, html, source_config):
        """解析网页内容,提取新闻"""
        if not html:
            return []
        
        soup = BeautifulSoup(html, 'lxml')
        news_list = []
        
        # 根据不同新闻源的配置解析
        articles = soup.select(source_config['article_selector'])
        for article in articles:
            try:
                title = article.select_one(source_config['title_selector']).text.strip()
                link = article.select_one(source_config['link_selector'])['href']
                # 确保链接是完整的
                if not link.startswith('http'):
                    link = source_config['base_url'] + link
                
                # 尝试获取摘要
                summary = ''
                if 'summary_selector' in source_config:
                    summary_elem = article.select_one(source_config['summary_selector'])
                    if summary_elem:
                        summary = summary_elem.text.strip()
                
                # 尝试获取发布时间
                publish_time = ''
                if 'time_selector' in source_config:
                    time_elem = article.select_one(source_config['time_selector'])
                    if time_elem:
                        publish_time = time_elem.text.strip()
                
                news_item = {
                    'title': title,
                    'link': link,
                    'summary': summary,
                    'publish_time': publish_time,
                    'source': source_config['name'],
                    'crawl_time': time.strftime('%Y-%m-%d %H:%M:%S')
                }
                news_list.append(news_item)
            except Exception as e:
                print(f"解析新闻失败: {e}")
                continue
        
        return news_list
    
    def crawl_source(self, source_config):
        """爬取单个新闻源"""
        html = self.fetch_page(source_config['url'], source_config)
        if html:
            news_list = self.parse_page(html, source_config)
            for news in news_list:
                self.news_queue.put(news)
    
    def run(self):
        """启动爬虫"""
        threads = []
        
        # 创建并启动线程
        for source in self.news_sources:
            thread = threading.Thread(target=self.crawl_source, args=(source,))
            threads.append(thread)
            thread.start()
            # 控制并发数量
            if len(threads) >= self.max_threads:
                for t in threads:
                    t.join()
                threads = []
        
        # 等待剩余线程完成
        for t in threads:
            t.join()
        
        # 将队列中的新闻转换为列表
        news_list = []
        while not self.news_queue.empty():
            news_list.append(self.news_queue.get())
        
        return news_list

3.2 数据存储模块

import json
import csv
import sqlite3
import os

class NewsStorage:
    def __init__(self, storage_type='json', db_path='news.db'):
        self.storage_type = storage_type
        self.db_path = db_path
        
        if storage_type == 'sqlite':
            self._init_sqlite_db()
    
    def _init_sqlite_db(self):
        """初始化SQLite数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 创建新闻表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS news (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            title TEXT NOT NULL,
            link TEXT UNIQUE NOT NULL,
            summary TEXT,
            publish_time TEXT,
            source TEXT,
            crawl_time TEXT
        )
        ''')
        
        conn.commit()
        conn.close()
    
    def save(self, news_list, filename='news'):
        """保存新闻数据"""
        if self.storage_type == 'json':
            self._save_to_json(news_list, filename + '.json')
        elif self.storage_type == 'csv':
            self._save_to_csv(news_list, filename + '.csv')
        elif self.storage_type == 'sqlite':
            self._save_to_sqlite(news_list)
        else:
            raise ValueError(f"不支持的存储类型: {self.storage_type}")
    
    def _save_to_json(self, news_list, filename):
        """保存为JSON文件"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(news_list, f, ensure_ascii=False, indent=2)
        print(f"新闻已保存到 {filename}")
    
    def _save_to_csv(self, news_list, filename):
        """保存为CSV文件"""
        if not news_list:
            return
        
        # 获取所有字段名
        fieldnames = list(news_list[0].keys())
        
        with open(filename, 'w', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(news_list)
        print(f"新闻已保存到 {filename}")
    
    def _save_to_sqlite(self, news_list):
        """保存到SQLite数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        for news in news_list:
            try:
                cursor.execute('''
                INSERT OR IGNORE INTO news (title, link, summary, publish_time, source, crawl_time)
                VALUES (?, ?, ?, ?, ?, ?)
                ''', (news['title'], news['link'], news['summary'], 
                      news['publish_time'], news['source'], news['crawl_time']))
            except Exception as e:
                print(f"保存新闻到数据库失败: {e}")
        
        conn.commit()
        conn.close()
        print(f"新闻已保存到SQLite数据库: {self.db_path}")
    
    def load(self, condition=None):
        """加载新闻数据"""
        if self.storage_type == 'json':
            return self._load_from_json()
        elif self.storage_type == 'sqlite':
            return self._load_from_sqlite(condition)
        else:
            raise ValueError(f"不支持的存储类型: {self.storage_type}")
    
    def _load_from_json(self, filename='news.json'):
        """从JSON文件加载"""
        if not os.path.exists(filename):
            return []
        
        with open(filename, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def _load_from_sqlite(self, condition=None):
        """从SQLite数据库加载"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        if condition:
            cursor.execute('SELECT * FROM news WHERE ' + condition)
        else:
            cursor.execute('SELECT * FROM news')
        
        rows = cursor.fetchall()
        conn.close()
        
        # 转换为字典列表
        news_list = []
        for row in rows:
            news_list.append({
                'id': row[0],
                'title': row[1],
                'link': row[2],
                'summary': row[3],
                'publish_time': row[4],
                'source': row[5],
                'crawl_time': row[6]
            })
        
        return news_list

3.3 HTTP 服务模块

import socket
import threading
import os
import json

class NewsHTTPServer:
    def __init__(self, host='127.0.0.1', port=8080, storage=None):
        self.host = host
        self.port = port
        self.storage = storage
        self.server_socket = None
    
    def start(self):
        """启动HTTP服务器"""
        try:
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server_socket.bind((self.host, self.port))
            self.server_socket.listen(128)
            
            print(f"新闻聚合器HTTP服务已启动,访问地址: http://{self.host}:{self.port}")
            
            while True:
                client_socket, client_addr = self.server_socket.accept()
                thread = threading.Thread(target=self.handle_client, args=(client_socket,))
                thread.start()
                
        except Exception as e:
            print(f"启动服务器失败: {e}")
        finally:
            if self.server_socket:
                self.server_socket.close()
    
    def handle_client(self, client_socket):
        """处理客户端请求"""
        try:
            # 接收请求数据
            request_data = client_socket.recv(1024).decode('utf-8')
            if not request_data:
                return
            
            # 解析请求
            request_lines = request_data.split('\r\n')
            if not request_lines:
                return
            
            # 解析第一行获取请求方法、路径和HTTP版本
            request_line = request_lines[0]
            method, path, version = request_line.split()
            
            # 处理不同路径的请求
            if path == '/':
                self._handle_index(client_socket)
            elif path == '/news':
                self._handle_news_api(client_socket)
            elif path.startswith('/search'):
                self._handle_search(client_socket, path)
            else:
                self._handle_not_found(client_socket)
                
        except Exception as e:
            print(f"处理客户端请求失败: {e}")
        finally:
            client_socket.close()
    
    def _handle_index(self, client_socket):
        """处理首页请求"""
        # 读取HTML模板
        with open('index.html', 'r', encoding='utf-8') as f:
            html_content = f.read()
        
        # 发送HTTP响应
        response = "HTTP/1.1 200 OK\r\n"
        response += "Content-Type: text/html; charset=utf-8\r\n"
        response += f"Content-Length: {len(html_content)}\r\n"
        response += "\r\n"
        response += html_content
        
        client_socket.send(response.encode('utf-8'))
    
    def _handle_news_api(self, client_socket):
        """处理新闻API请求"""
        # 从存储中加载新闻
        news_list = self.storage.load()
        
        # 转换为JSON格式
        news_json = json.dumps(news_list, ensure_ascii=False)
        
        # 发送HTTP响应
        response = "HTTP/1.1 200 OK\r\n"
        response += "Content-Type: application/json; charset=utf-8\r\n"
        response += f"Content-Length: {len(news_json)}\r\n"
        response += "\r\n"
        response += news_json
        
        client_socket.send(response.encode('utf-8'))
    
    def _handle_search(self, client_socket, path):
        """处理搜索请求"""
        # 解析查询参数
        import urllib.parse
        query_params = path.split('?')[1] if '?' in path else ''
        params = urllib.parse.parse_qs(query_params)
        keyword = params.get('q', [''])[0]
        
        if not keyword:
            # 如果没有关键词,返回所有新闻
            news_list = self.storage.load()
        else:
            # 根据关键词搜索
            condition = f"title LIKE '%{keyword}%' OR summary LIKE '%{keyword}%'"
            news_list = self.storage.load(condition)
        
        # 转换为JSON格式
        news_json = json.dumps(news_list, ensure_ascii=False)
        
        # 发送HTTP响应
        response = "HTTP/1.1 200 OK\r\n"
        response += "Content-Type: application/json; charset=utf-8\r\n"
        response += f"Content-Length: {len(news_json)}\r\n"
        response += "\r\n"
        response += news_json
        
        client_socket.send(response.encode('utf-8'))
    
    def _handle_not_found(self, client_socket):
        """处理404请求"""
        html_content = """
        <!DOCTYPE html>
        <html lang="zh-CN">
        <head>
            <meta charset="UTF-8">
            <title>404 Not Found</title>
        </head>
        <body>
            <h1>404 Not Found</h1>
            <p>您访问的页面不存在</p>
        </body>
        </html>
        """
        
        response = "HTTP/1.1 404 Not Found\r\n"
        response += "Content-Type: text/html; charset=utf-8\r\n"
        response += f"Content-Length: {len(html_content)}\r\n"
        response += "\r\n"
        response += html_content
        
        client_socket.send(response.encode('utf-8'))

3.4 HTML 展示页面

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>简易新闻聚合器</title>
    <style>
        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }
        
        body {
            font-family: Arial, sans-serif;
            line-height: 1.6;
            color: #333;
            background-color: #f4f4f4;
        }
        
        .container {
            max-width: 1200px;
            margin: 0 auto;
            padding: 20px;
        }
        
        header {
            background-color: #333;
            color: #fff;
            padding: 1rem;
            margin-bottom: 20px;
            text-align: center;
        }
        
        h1 {
            margin-bottom: 10px;
        }
        
        .search-box {
            margin-bottom: 20px;
            text-align: center;
        }
        
        .search-box input {
            padding: 10px;
            width: 300px;
            font-size: 16px;
            border: 1px solid #ddd;
            border-radius: 4px;
        }
        
        .search-box button {
            padding: 10px 20px;
            font-size: 16px;
            background-color: #4CAF50;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        
        .search-box button:hover {
            background-color: #45a049;
        }
        
        .news-list {
            display: grid;
            grid-template-columns: repeat(auto-fill, minmax(350px, 1fr));
            gap: 20px;
        }
        
        .news-item {
            background-color: white;
            padding: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
            transition: transform 0.3s ease;
        }
        
        .news-item:hover {
            transform: translateY(-5px);
            box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2);
        }
        
        .news-title {
            font-size: 18px;
            font-weight: bold;
            margin-bottom: 10px;
            color: #333;
        }
        
        .news-title a {
            text-decoration: none;
            color: #333;
        }
        
        .news-title a:hover {
            color: #4CAF50;
        }
        
        .news-summary {
            color: #666;
            margin-bottom: 10px;
            line-height: 1.5;
        }
        
        .news-meta {
            font-size: 12px;
            color: #999;
            display: flex;
            justify-content: space-between;
        }
        
        .news-source {
            background-color: #f0f0f0;
            padding: 2px 6px;
            border-radius: 4px;
        }
        
        #loading {
            text-align: center;
            padding: 20px;
            font-size: 18px;
            color: #666;
        }
    </style>
</head>
<body>
    <div class="container">
        <header>
            <h1>简易新闻聚合器</h1>
            <p>聚合多个来源的新闻内容</p>
        </header>
        
        <div class="search-box">
            <input type="text" id="searchInput" placeholder="搜索新闻...">
            <button onclick="searchNews()">搜索</button>
        </div>
        
        <div id="loading">加载新闻中...</div>
        <div class="news-list" id="newsList"></div>
    </div>
    
    <script>
        // 页面加载时获取所有新闻
        window.onload = function() {
            loadNews();
        };
        
        // 加载新闻
        function loadNews(keyword = '') {
            const newsList = document.getElementById('newsList');
            const loading = document.getElementById('loading');
            
            newsList.innerHTML = '';
            loading.style.display = 'block';
            
            let url = '/news';
            if (keyword) {
                url = `/search?q=${encodeURIComponent(keyword)}`;
            }
            
            fetch(url)
                .then(response => response.json())
                .then(news => {
                    loading.style.display = 'none';
                    
                    if (news.length === 0) {
                        newsList.innerHTML = '<p style="text-align: center; grid-column: 1 / -1;">没有找到新闻</p>';
                        return;
                    }
                    
                    news.forEach(item => {
                        const newsItem = document.createElement('div');
                        newsItem.className = 'news-item';
                        
                        newsItem.innerHTML = `
                            <div class="news-title">
                                <a href="${item.link}" target="_blank">${item.title}</a>
                            </div>
                            <div class="news-summary">${item.summary || '暂无摘要'}</div>
                            <div class="news-meta">
                                <span class="news-source">${item.source}</span>
                                <span>${item.publish_time || item.crawl_time}</span>
                            </div>
                        `;
                        
                        newsList.appendChild(newsItem);
                    });
                })
                .catch(error => {
                    loading.style.display = 'none';
                    newsList.innerHTML = `<p style="text-align: center; grid-column: 1 / -1; color: red;">加载失败: ${error.message}</p>`;
                });
        }
        
        // 搜索新闻
        function searchNews() {
            const keyword = document.getElementById('searchInput').value;
            loadNews(keyword);
        }
        
        // 回车键搜索
        document.getElementById('searchInput').addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                searchNews();
            }
        });
    </script>
</body>
</html>

三、主程序整合

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
简易新闻聚合器主程序
综合运用网络编程知识的示例项目
"""

import time
from news_spider import NewsSpider
from news_storage import NewsStorage
from news_server import NewsHTTPServer

# 新闻源配置
NEWS_SOURCES = [
    {
        'name': '示例新闻网1',
        'url': 'http://example.com/news',
        'base_url': 'http://example.com',
        'article_selector': '.news-item',
        'title_selector': 'h3 a',
        'link_selector': 'h3 a',
        'summary_selector': '.summary',
        'time_selector': '.time'
    },
    {
        'name': '示例新闻网2',
        'url': 'http://example.org/latest',
        'base_url': 'http://example.org',
        'article_selector': '.article',
        'title_selector': '.title a',
        'link_selector': '.title a',
        'summary_selector': '.excerpt',
        'time_selector': '.publish-time'
    }
]

def main():
    print("=== 简易新闻聚合器 ===")
    print("1. 开始爬取新闻...")
    
    # 1. 初始化爬虫
    spider = NewsSpider(NEWS_SOURCES, max_threads=3)
    
    # 2. 执行爬取
    news_list = spider.run()
    print(f"爬取完成,共获取 {len(news_list)} 条新闻")
    
    # 3. 保存数据
    storage = NewsStorage(storage_type='sqlite')
    storage.save(news_list)
    
    # 4. 启动HTTP服务
    print("\n2. 启动HTTP服务...")
    server = NewsHTTPServer(storage=storage)
    server.start()

if __name__ == "__main__":
    main()

四、项目运行与测试

4.1 准备工作

  1. 确保安装了必要的依赖:

    pip install requests beautifulsoup4 lxml
  2. 创建项目目录结构:

    news_aggregator/
    ├── main.py                 # 主程序
    ├── news_spider.py          # 爬虫模块
    ├── news_storage.py         # 存储模块
    ├── news_server.py          # HTTP服务模块
    └── index.html              # 展示页面

4.2 运行项目

  1. 执行主程序:

    python main.py
  2. 访问新闻聚合器:
    在浏览器中输入 http://127.0.0.1:8080

4.3 测试项目

  1. 功能测试

    • 检查是否能成功爬取新闻
    • 验证新闻是否正确展示
    • 测试搜索功能是否正常工作
  2. 性能测试

    • 测试多线程爬取的效率
    • 检查HTTP服务的响应速度
  3. 异常测试

    • 测试网络异常情况下的处理
    • 验证无效输入的处理

五、扩展练习

  1. 增加更多新闻源

    • 配置更多的新闻网站
    • 实现新闻源的动态管理
  2. 增强爬取功能

    • 实现深度爬取(爬取详情页)
    • 添加定时爬取功能
    • 实现增量更新机制
  3. 优化用户界面

    • 添加分类筛选功能
    • 实现新闻排序功能
    • 增加响应式设计支持移动端
  4. 提升系统性能

    • 实现数据缓存机制
    • 优化数据库查询
    • 添加异步处理支持
  5. 完善系统功能

    • 添加用户收藏功能
    • 实现新闻推荐算法
    • 增加评论功能

六、学习总结

通过本综合练习,我们学习了:

  1. 项目整体规划:如何从需求分析到架构设计
  2. 模块化开发:将复杂系统拆分为多个独立模块
  3. 多技术整合:综合运用不同的网络编程技术
  4. 调试与测试:如何测试和调试复杂网络应用
  5. 持续优化:如何提升系统性能和用户体验

这个项目展示了网络编程在实际应用中的完整流程,从数据获取到服务提供,涵盖了网络编程的各个方面。通过实际开发这个项目,你将对网络编程有更深入的理解和掌握。

七、课后作业

  1. 完成新闻聚合器的基本功能实现
  2. 为项目添加至少一个新的新闻源
  3. 实现新闻的分类展示功能
  4. 撰写项目总结报告,包括:
    • 项目架构设计
    • 技术选型理由
    • 遇到的问题及解决方案
    • 未来改进方向

下集预告:第111集将开始学习多线程编程,探索如何利用并发技术提升程序性能。

« 上一篇 简单的网络爬虫 下一篇 » 进程与线程概念