上一篇文章我们初步搭建了开发环境,现在深入探讨三个小工具的具体实现。 本章,我们将聚焦于在上一章的基础上,继续深化对这三个小工具的编写过程,重点关注需求分析的细化以及架构设计的进阶。考虑到国内服务器环境的复杂性,我们需要确保工具的稳定性和兼容性。例如,服务器监控工具需要兼容不同的 Linux 发行版,同时考虑到国内常见的宝塔面板环境,进行适配。
1. 服务器资源监控工具:更全面的指标采集
1.1 需求细化
除了 CPU、内存、磁盘、网络 IO 这些基础指标外,我们需要增加以下监控项:
- 进程状态监控: 能够显示 CPU 占用率最高的几个进程,方便快速定位问题。
- 磁盘 IOPS 监控: 了解磁盘读写性能。
- 网络连接数监控: 特别是 TCP 连接状态,例如 ESTABLISHED, TIME_WAIT 等,可以辅助排查网络瓶颈,对于高并发应用,例如使用 Nginx 作为反向代理的服务器,尤其重要。考虑并发连接数达到瓶颈的情况。
- 端口监控: 能够检测指定端口的监听状态,如果端口未监听,则发出告警。 方便监控关键服务的可用性。
1.2 代码实现(Python)
import psutil
import time
def get_process_cpu_usage(top_n=5):
"""获取 CPU 占用率最高的几个进程"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
processes.append(proc.info)
sorted_processes = sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)
return sorted_processes[:top_n]
def get_disk_iops():
"""获取磁盘 IOPS"""
disk_io = psutil.disk_io_counters()
read_ops = disk_io.read_count
write_ops = disk_io.write_count
time.sleep(1) # 间隔1秒,计算速率
disk_io_new = psutil.disk_io_counters()
read_ops_new = disk_io_new.read_count
write_ops_new = disk_io_new.write_count
return {
"read_iops": read_ops_new - read_ops,
"write_iops": write_ops_new - write_ops
}
def get_network_connections():
"""获取网络连接数,按状态分类"""
connections = psutil.net_connections(kind='tcp')
connection_stats = {}
for conn in connections:
status = conn.status
if status in connection_stats:
connection_stats[status] += 1
else:
connection_stats[status] = 1
return connection_stats
def check_port_status(port):
"""检查端口是否监听"""
for conn in psutil.net_connections(kind='inet'):
if conn.laddr.port == port and conn.status == 'LISTEN':
return True
return False
# 示例用法
top_processes = get_process_cpu_usage()
disk_iops = get_disk_iops()
network_stats = get_network_connections()
port_80_status = check_port_status(80)
print("Top CPU Processes:", top_processes)
print("Disk IOPS:", disk_iops)
print("Network Connections:", network_stats)
print("Port 80 Status:", port_80_status)
1.3 避坑经验
- 权限问题: 部分指标的获取可能需要 root 权限,需要注意使用
sudo运行脚本,或者修改文件权限。 - 性能开销: 频繁调用
psutil接口会带来一定的性能开销,需要控制监控频率。 - 异常处理: 增加异常处理机制,避免因为个别进程或资源无法访问导致整个监控程序崩溃。
2. 简单 Web 服务器:支持静态文件服务
2.1 需求细化
- 支持多线程/多进程: 提高并发处理能力,尤其是在高并发场景下,例如使用 Nginx 进行反向代理之后,后端服务器仍需具备一定的处理能力。
- MIME 类型支持: 根据文件后缀名,返回正确的 Content-Type。
- 错误处理: 返回 404 等错误页面。
2.2 代码实现(Python)
import socket
import threading
import os
import mimetypes
SERVER_ROOT = './static' # 静态文件目录
def handle_client(client_socket):
"""处理客户端请求"""
request = client_socket.recv(1024).decode()
if not request:
client_socket.close()
return
try:
filename = request.split(' ')[1].lstrip('/')
except IndexError:
filename = 'index.html'
filepath = os.path.join(SERVER_ROOT, filename)
if os.path.exists(filepath) and os.path.isfile(filepath):
try:
with open(filepath, 'rb') as f:
content = f.read()
mime_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
response = f"HTTP/1.1 200 OK\r\nContent-Type: {mime_type}\r\n\r\n".encode() + content
client_socket.sendall(response)
except Exception as e:
print(f"Error reading file: {e}")
response = b"HTTP/1.1 500 Internal Server Error\r\n\r\n<h1>500 Internal Server Error</h1>"
client_socket.sendall(response)
else:
response = b"HTTP/1.1 404 Not Found\r\n\r\n<h1>404 Not Found</h1>"
client_socket.sendall(response)
client_socket.close()
def start_server(host='127.0.0.1', port=8080):
"""启动服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((host, port))
server_socket.listen(5)
print(f"Server listening on {host}:{port}")
while True:
client_socket, addr = server_socket.accept()
print(f"Accepted connection from {addr}")
threading.Thread(target=handle_client, args=(client_socket,)).start()
if __name__ == '__main__':
# 创建静态文件目录
if not os.path.exists(SERVER_ROOT):
os.makedirs(SERVER_ROOT)
# 创建一个简单的index.html
with open(os.path.join(SERVER_ROOT, 'index.html'), 'w') as f:
f.write('<h1>Hello, World!</h1>')
start_server()
2.3 避坑经验
- 线程安全: 在高并发环境下,需要注意线程安全问题,例如避免多个线程同时写入同一个文件。
- 资源释放: 确保及时关闭 socket 连接,避免资源泄露。
- 目录遍历漏洞: 需要对用户输入的
filename进行严格的校验,防止目录遍历漏洞,避免用户访问到不应该访问的文件。
3. 简易日志分析工具:支持按关键词过滤
3.1 需求细化
- 支持多文件分析: 能够同时分析多个日志文件。
- 支持正则表达式: 提供更灵活的关键词匹配方式。
- 输出格式控制: 例如按照时间排序,高亮显示关键词。
3.2 代码实现 (框架)
import re
import glob
import os
def analyze_logs(file_pattern, keywords):
"""分析日志文件"""
file_list = glob.glob(file_pattern) # 使用 glob 匹配多个文件
for filepath in file_list:
try:
with open(filepath, 'r', encoding='utf-8') as f:
for line_number, line in enumerate(f, 1):
for keyword in keywords:
if re.search(keyword, line, re.IGNORECASE):
print(f"{filepath}:{line_number}: {line.strip()}") # 输出文件名、行号和内容
break # 避免同一行重复匹配多个关键词
except FileNotFoundError:
print(f"File not found: {filepath}")
except Exception as e:
print(f"Error reading file {filepath}: {e}")
# 示例用法
log_pattern = "./logs/*.log" # 匹配logs目录下的所有.log文件
search_keywords = ["error", "exception", "warning"]
analyze_logs(log_pattern, search_keywords)
3.3 避坑经验
- 编码问题: 日志文件可能使用不同的编码方式,需要指定正确的编码方式打开文件。
- 性能问题: 对于大型日志文件,需要考虑性能优化,例如使用多线程/多进程加速分析。
- 正则表达式安全: 需要对用户输入的正则表达式进行安全检查,防止 ReDoS (Regular expression Denial of Service) 攻击,尤其是在 Web 应用中,需要特别注意。
通过上述的步骤,我们对三个小工具的需求进行了更细致的分析,并给出了相应的代码示例和避坑经验。在实际开发中,还需要根据具体的需求进行调整和优化。理解这些基础知识点,可以帮助我们更好地构建稳定可靠的后端系统。
冠军资讯
代码一只喵