第110集:网络编程综合练习
学习目标
- 综合运用网络编程基础知识
- 理解完整网络应用的架构设计
- 掌握多模块协同开发方法
- 学习网络应用的测试与调试技巧
- 培养项目整体规划能力
一、项目概述
本集将通过一个完整的网络应用项目,综合运用前几集所学的网络编程知识,包括:
- socket 编程基础
- HTTP 协议原理
- requests 库的使用
- 网页解析技术
- 简单爬虫实现
我们将开发一个简易的新闻聚合器,该应用能够:
- 从多个新闻网站爬取新闻内容
- 解析并提取关键信息
- 提供本地 HTTP 服务展示聚合结果
- 支持简单的查询和过滤功能
二、项目架构设计
2.1 整体架构
┌────────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 新闻网站数据源 │────▶│ 爬虫模块 │────▶│ 数据存储模块 │
└────────────────────┘ └─────────────────┘ └────────┬────────┘
│
▼
┌────────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 用户浏览器 │◀────│ HTTP服务模块 │◀────│ 数据处理模块 │
└────────────────────┘ └─────────────────┘ └─────────────────┘2.2 模块划分
| 模块名称 | 主要功能 | 技术要点 |
|---|---|---|
| 爬虫模块 | 爬取新闻网站内容 | requests 库、异常处理、多线程/多进程 |
| 解析模块 | 提取新闻关键信息 | BeautifulSoup、lxml、CSS选择器/XPath |
| 存储模块 | 保存和管理新闻数据 | 文件存储(JSON/CSV)、SQLite数据库 |
| HTTP服务模块 | 提供Web访问接口 | socket 编程、HTTP协议、路由处理 |
| 展示模块 | 生成用户界面 | HTML模板、CSS样式、简单JavaScript |
三、核心功能实现
3.1 爬虫模块设计
import requests
from bs4 import BeautifulSoup
import time
import threading
from queue import Queue
class NewsSpider:
def __init__(self, news_sources, max_threads=5):
self.news_sources = news_sources # 新闻源配置
self.news_queue = Queue() # 存储爬取的新闻
self.max_threads = max_threads # 最大线程数
def fetch_page(self, url, source_config):
"""获取网页内容"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.text
except Exception as e:
print(f"获取{url}失败: {e}")
return None
def parse_page(self, html, source_config):
"""解析网页内容,提取新闻"""
if not html:
return []
soup = BeautifulSoup(html, 'lxml')
news_list = []
# 根据不同新闻源的配置解析
articles = soup.select(source_config['article_selector'])
for article in articles:
try:
title = article.select_one(source_config['title_selector']).text.strip()
link = article.select_one(source_config['link_selector'])['href']
# 确保链接是完整的
if not link.startswith('http'):
link = source_config['base_url'] + link
# 尝试获取摘要
summary = ''
if 'summary_selector' in source_config:
summary_elem = article.select_one(source_config['summary_selector'])
if summary_elem:
summary = summary_elem.text.strip()
# 尝试获取发布时间
publish_time = ''
if 'time_selector' in source_config:
time_elem = article.select_one(source_config['time_selector'])
if time_elem:
publish_time = time_elem.text.strip()
news_item = {
'title': title,
'link': link,
'summary': summary,
'publish_time': publish_time,
'source': source_config['name'],
'crawl_time': time.strftime('%Y-%m-%d %H:%M:%S')
}
news_list.append(news_item)
except Exception as e:
print(f"解析新闻失败: {e}")
continue
return news_list
def crawl_source(self, source_config):
"""爬取单个新闻源"""
html = self.fetch_page(source_config['url'], source_config)
if html:
news_list = self.parse_page(html, source_config)
for news in news_list:
self.news_queue.put(news)
def run(self):
"""启动爬虫"""
threads = []
# 创建并启动线程
for source in self.news_sources:
thread = threading.Thread(target=self.crawl_source, args=(source,))
threads.append(thread)
thread.start()
# 控制并发数量
if len(threads) >= self.max_threads:
for t in threads:
t.join()
threads = []
# 等待剩余线程完成
for t in threads:
t.join()
# 将队列中的新闻转换为列表
news_list = []
while not self.news_queue.empty():
news_list.append(self.news_queue.get())
return news_list3.2 数据存储模块
import json
import csv
import sqlite3
import os
class NewsStorage:
def __init__(self, storage_type='json', db_path='news.db'):
self.storage_type = storage_type
self.db_path = db_path
if storage_type == 'sqlite':
self._init_sqlite_db()
def _init_sqlite_db(self):
"""初始化SQLite数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建新闻表
cursor.execute('''
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
link TEXT UNIQUE NOT NULL,
summary TEXT,
publish_time TEXT,
source TEXT,
crawl_time TEXT
)
''')
conn.commit()
conn.close()
def save(self, news_list, filename='news'):
"""保存新闻数据"""
if self.storage_type == 'json':
self._save_to_json(news_list, filename + '.json')
elif self.storage_type == 'csv':
self._save_to_csv(news_list, filename + '.csv')
elif self.storage_type == 'sqlite':
self._save_to_sqlite(news_list)
else:
raise ValueError(f"不支持的存储类型: {self.storage_type}")
def _save_to_json(self, news_list, filename):
"""保存为JSON文件"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(news_list, f, ensure_ascii=False, indent=2)
print(f"新闻已保存到 {filename}")
def _save_to_csv(self, news_list, filename):
"""保存为CSV文件"""
if not news_list:
return
# 获取所有字段名
fieldnames = list(news_list[0].keys())
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(news_list)
print(f"新闻已保存到 {filename}")
def _save_to_sqlite(self, news_list):
"""保存到SQLite数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for news in news_list:
try:
cursor.execute('''
INSERT OR IGNORE INTO news (title, link, summary, publish_time, source, crawl_time)
VALUES (?, ?, ?, ?, ?, ?)
''', (news['title'], news['link'], news['summary'],
news['publish_time'], news['source'], news['crawl_time']))
except Exception as e:
print(f"保存新闻到数据库失败: {e}")
conn.commit()
conn.close()
print(f"新闻已保存到SQLite数据库: {self.db_path}")
def load(self, condition=None):
"""加载新闻数据"""
if self.storage_type == 'json':
return self._load_from_json()
elif self.storage_type == 'sqlite':
return self._load_from_sqlite(condition)
else:
raise ValueError(f"不支持的存储类型: {self.storage_type}")
def _load_from_json(self, filename='news.json'):
"""从JSON文件加载"""
if not os.path.exists(filename):
return []
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
def _load_from_sqlite(self, condition=None):
"""从SQLite数据库加载"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if condition:
cursor.execute('SELECT * FROM news WHERE ' + condition)
else:
cursor.execute('SELECT * FROM news')
rows = cursor.fetchall()
conn.close()
# 转换为字典列表
news_list = []
for row in rows:
news_list.append({
'id': row[0],
'title': row[1],
'link': row[2],
'summary': row[3],
'publish_time': row[4],
'source': row[5],
'crawl_time': row[6]
})
return news_list3.3 HTTP 服务模块
import socket
import threading
import os
import json
class NewsHTTPServer:
def __init__(self, host='127.0.0.1', port=8080, storage=None):
self.host = host
self.port = port
self.storage = storage
self.server_socket = None
def start(self):
"""启动HTTP服务器"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(128)
print(f"新闻聚合器HTTP服务已启动,访问地址: http://{self.host}:{self.port}")
while True:
client_socket, client_addr = self.server_socket.accept()
thread = threading.Thread(target=self.handle_client, args=(client_socket,))
thread.start()
except Exception as e:
print(f"启动服务器失败: {e}")
finally:
if self.server_socket:
self.server_socket.close()
def handle_client(self, client_socket):
"""处理客户端请求"""
try:
# 接收请求数据
request_data = client_socket.recv(1024).decode('utf-8')
if not request_data:
return
# 解析请求
request_lines = request_data.split('\r\n')
if not request_lines:
return
# 解析第一行获取请求方法、路径和HTTP版本
request_line = request_lines[0]
method, path, version = request_line.split()
# 处理不同路径的请求
if path == '/':
self._handle_index(client_socket)
elif path == '/news':
self._handle_news_api(client_socket)
elif path.startswith('/search'):
self._handle_search(client_socket, path)
else:
self._handle_not_found(client_socket)
except Exception as e:
print(f"处理客户端请求失败: {e}")
finally:
client_socket.close()
def _handle_index(self, client_socket):
"""处理首页请求"""
# 读取HTML模板
with open('index.html', 'r', encoding='utf-8') as f:
html_content = f.read()
# 发送HTTP响应
response = "HTTP/1.1 200 OK\r\n"
response += "Content-Type: text/html; charset=utf-8\r\n"
response += f"Content-Length: {len(html_content)}\r\n"
response += "\r\n"
response += html_content
client_socket.send(response.encode('utf-8'))
def _handle_news_api(self, client_socket):
"""处理新闻API请求"""
# 从存储中加载新闻
news_list = self.storage.load()
# 转换为JSON格式
news_json = json.dumps(news_list, ensure_ascii=False)
# 发送HTTP响应
response = "HTTP/1.1 200 OK\r\n"
response += "Content-Type: application/json; charset=utf-8\r\n"
response += f"Content-Length: {len(news_json)}\r\n"
response += "\r\n"
response += news_json
client_socket.send(response.encode('utf-8'))
def _handle_search(self, client_socket, path):
"""处理搜索请求"""
# 解析查询参数
import urllib.parse
query_params = path.split('?')[1] if '?' in path else ''
params = urllib.parse.parse_qs(query_params)
keyword = params.get('q', [''])[0]
if not keyword:
# 如果没有关键词,返回所有新闻
news_list = self.storage.load()
else:
# 根据关键词搜索
condition = f"title LIKE '%{keyword}%' OR summary LIKE '%{keyword}%'"
news_list = self.storage.load(condition)
# 转换为JSON格式
news_json = json.dumps(news_list, ensure_ascii=False)
# 发送HTTP响应
response = "HTTP/1.1 200 OK\r\n"
response += "Content-Type: application/json; charset=utf-8\r\n"
response += f"Content-Length: {len(news_json)}\r\n"
response += "\r\n"
response += news_json
client_socket.send(response.encode('utf-8'))
def _handle_not_found(self, client_socket):
"""处理404请求"""
html_content = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>404 Not Found</title>
</head>
<body>
<h1>404 Not Found</h1>
<p>您访问的页面不存在</p>
</body>
</html>
"""
response = "HTTP/1.1 404 Not Found\r\n"
response += "Content-Type: text/html; charset=utf-8\r\n"
response += f"Content-Length: {len(html_content)}\r\n"
response += "\r\n"
response += html_content
client_socket.send(response.encode('utf-8'))3.4 HTML 展示页面
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>简易新闻聚合器</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
background-color: #f4f4f4;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
header {
background-color: #333;
color: #fff;
padding: 1rem;
margin-bottom: 20px;
text-align: center;
}
h1 {
margin-bottom: 10px;
}
.search-box {
margin-bottom: 20px;
text-align: center;
}
.search-box input {
padding: 10px;
width: 300px;
font-size: 16px;
border: 1px solid #ddd;
border-radius: 4px;
}
.search-box button {
padding: 10px 20px;
font-size: 16px;
background-color: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.search-box button:hover {
background-color: #45a049;
}
.news-list {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(350px, 1fr));
gap: 20px;
}
.news-item {
background-color: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
transition: transform 0.3s ease;
}
.news-item:hover {
transform: translateY(-5px);
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2);
}
.news-title {
font-size: 18px;
font-weight: bold;
margin-bottom: 10px;
color: #333;
}
.news-title a {
text-decoration: none;
color: #333;
}
.news-title a:hover {
color: #4CAF50;
}
.news-summary {
color: #666;
margin-bottom: 10px;
line-height: 1.5;
}
.news-meta {
font-size: 12px;
color: #999;
display: flex;
justify-content: space-between;
}
.news-source {
background-color: #f0f0f0;
padding: 2px 6px;
border-radius: 4px;
}
#loading {
text-align: center;
padding: 20px;
font-size: 18px;
color: #666;
}
</style>
</head>
<body>
<div class="container">
<header>
<h1>简易新闻聚合器</h1>
<p>聚合多个来源的新闻内容</p>
</header>
<div class="search-box">
<input type="text" id="searchInput" placeholder="搜索新闻...">
<button onclick="searchNews()">搜索</button>
</div>
<div id="loading">加载新闻中...</div>
<div class="news-list" id="newsList"></div>
</div>
<script>
// 页面加载时获取所有新闻
window.onload = function() {
loadNews();
};
// 加载新闻
function loadNews(keyword = '') {
const newsList = document.getElementById('newsList');
const loading = document.getElementById('loading');
newsList.innerHTML = '';
loading.style.display = 'block';
let url = '/news';
if (keyword) {
url = `/search?q=${encodeURIComponent(keyword)}`;
}
fetch(url)
.then(response => response.json())
.then(news => {
loading.style.display = 'none';
if (news.length === 0) {
newsList.innerHTML = '<p style="text-align: center; grid-column: 1 / -1;">没有找到新闻</p>';
return;
}
news.forEach(item => {
const newsItem = document.createElement('div');
newsItem.className = 'news-item';
newsItem.innerHTML = `
<div class="news-title">
<a href="${item.link}" target="_blank">${item.title}</a>
</div>
<div class="news-summary">${item.summary || '暂无摘要'}</div>
<div class="news-meta">
<span class="news-source">${item.source}</span>
<span>${item.publish_time || item.crawl_time}</span>
</div>
`;
newsList.appendChild(newsItem);
});
})
.catch(error => {
loading.style.display = 'none';
newsList.innerHTML = `<p style="text-align: center; grid-column: 1 / -1; color: red;">加载失败: ${error.message}</p>`;
});
}
// 搜索新闻
function searchNews() {
const keyword = document.getElementById('searchInput').value;
loadNews(keyword);
}
// 回车键搜索
document.getElementById('searchInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
searchNews();
}
});
</script>
</body>
</html>三、主程序整合
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简易新闻聚合器主程序
综合运用网络编程知识的示例项目
"""
import time
from news_spider import NewsSpider
from news_storage import NewsStorage
from news_server import NewsHTTPServer
# 新闻源配置
NEWS_SOURCES = [
{
'name': '示例新闻网1',
'url': 'http://example.com/news',
'base_url': 'http://example.com',
'article_selector': '.news-item',
'title_selector': 'h3 a',
'link_selector': 'h3 a',
'summary_selector': '.summary',
'time_selector': '.time'
},
{
'name': '示例新闻网2',
'url': 'http://example.org/latest',
'base_url': 'http://example.org',
'article_selector': '.article',
'title_selector': '.title a',
'link_selector': '.title a',
'summary_selector': '.excerpt',
'time_selector': '.publish-time'
}
]
def main():
print("=== 简易新闻聚合器 ===")
print("1. 开始爬取新闻...")
# 1. 初始化爬虫
spider = NewsSpider(NEWS_SOURCES, max_threads=3)
# 2. 执行爬取
news_list = spider.run()
print(f"爬取完成,共获取 {len(news_list)} 条新闻")
# 3. 保存数据
storage = NewsStorage(storage_type='sqlite')
storage.save(news_list)
# 4. 启动HTTP服务
print("\n2. 启动HTTP服务...")
server = NewsHTTPServer(storage=storage)
server.start()
if __name__ == "__main__":
main()四、项目运行与测试
4.1 准备工作
确保安装了必要的依赖:
pip install requests beautifulsoup4 lxml创建项目目录结构:
news_aggregator/ ├── main.py # 主程序 ├── news_spider.py # 爬虫模块 ├── news_storage.py # 存储模块 ├── news_server.py # HTTP服务模块 └── index.html # 展示页面
4.2 运行项目
执行主程序:
python main.py访问新闻聚合器:
在浏览器中输入http://127.0.0.1:8080
4.3 测试项目
功能测试:
- 检查是否能成功爬取新闻
- 验证新闻是否正确展示
- 测试搜索功能是否正常工作
性能测试:
- 测试多线程爬取的效率
- 检查HTTP服务的响应速度
异常测试:
- 测试网络异常情况下的处理
- 验证无效输入的处理
五、扩展练习
增加更多新闻源:
- 配置更多的新闻网站
- 实现新闻源的动态管理
增强爬取功能:
- 实现深度爬取(爬取详情页)
- 添加定时爬取功能
- 实现增量更新机制
优化用户界面:
- 添加分类筛选功能
- 实现新闻排序功能
- 增加响应式设计支持移动端
提升系统性能:
- 实现数据缓存机制
- 优化数据库查询
- 添加异步处理支持
完善系统功能:
- 添加用户收藏功能
- 实现新闻推荐算法
- 增加评论功能
六、学习总结
通过本综合练习,我们学习了:
- 项目整体规划:如何从需求分析到架构设计
- 模块化开发:将复杂系统拆分为多个独立模块
- 多技术整合:综合运用不同的网络编程技术
- 调试与测试:如何测试和调试复杂网络应用
- 持续优化:如何提升系统性能和用户体验
这个项目展示了网络编程在实际应用中的完整流程,从数据获取到服务提供,涵盖了网络编程的各个方面。通过实际开发这个项目,你将对网络编程有更深入的理解和掌握。
七、课后作业
- 完成新闻聚合器的基本功能实现
- 为项目添加至少一个新的新闻源
- 实现新闻的分类展示功能
- 撰写项目总结报告,包括:
- 项目架构设计
- 技术选型理由
- 遇到的问题及解决方案
- 未来改进方向
下集预告:第111集将开始学习多线程编程,探索如何利用并发技术提升程序性能。