Linux Python脚本开发完整指南
Linux Python脚本开发完整指南
针对 Ubuntu 环境的全面 Python 自动化脚本开发指南,涵盖从环境搭建到生产级部署的全流程实践。
前言
Python 作为 "胶水语言",在 Linux 环境下拥有无与伦比的生态优势。本指南将带你系统掌握如何在 Ubuntu 系统中编写高效、稳定、可维护的 Python 自动化脚本,覆盖系统监控、进程管理、远程运维等核心场景。
读者定位:具备基础编程知识,熟悉 Linux 常用命令,希望深入掌握 Linux 下 Python 自动化开发的开发者。
第一章:Ubuntu 下 Python 开发环境搭建
1.1 系统 Python 环境管理
Ubuntu 系统通常预装了 Python,但我们需要正确管理环境以避免破坏系统工具。
# 检查系统Python版本
python3 --version
# 安装基础开发工具
sudo apt update
sudo apt install -y python3-full python3-pip python3-dev
⚠️ 警告:永远不要卸载系统自带的
python3包,否则会导致apt等系统工具崩溃。
1.2 多版本管理与虚拟环境
使用 pyenv 管理多 Python 版本
对于需要切换不同 Python 版本的场景,pyenv是最佳选择:
# 安装pyenv
sudo apt install pyenv
配置 Shell 环境,你可以用 Vim 编辑 \.bashrc:
vim ~/.bashrc
然后在文件末尾添加:
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
或者直接运行以下命令一次性写入:
cat >> ~/.bashrc << 'EOF'
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
EOF
# 重启Shell后生效
exec "$SHELL"
# 安装指定版本Python
pyenv install 3.12.0
pyenv local 3.12.0 # 为当前目录设置版本
项目级虚拟环境(venv)
这是最推荐的项目隔离方案,无需额外工具:
# 创建虚拟环境
python3 -m venv .venv
# 激活虚拟环境
source .venv/bin/activate
# 退出虚拟环境
deactivate
虚拟环境目录结构如下,它会创建一个独立的 Python 运行环境:
.venv/
├── bin/ # 激活脚本和Python解释器
├── lib/ # 独立的依赖包
├── include/ # 头文件
└── pyvenv.cfg # 配置文件
1.3 开发工具链配置
为了提升代码质量,建议安装以下工具:
# 代码格式化与检查
pip install black flake8 pytest
# 或者通过apt安装
sudo apt install black flake8 python3-pytest
第二章:Linux Python 脚本基础规范
2.1 Shebang 与可执行脚本
一个标准的 Linux Python 脚本开头必须包含 Shebang 行:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
为什么用
/usr/bin/env python3而不是/usr/bin/python3?
前者会自动在 PATH 中查找 Python 解释器,兼容虚拟环境
后者写死了路径,在不同机器或虚拟环境中可能失效
添加执行权限后,你可以直接运行脚本:
chmod +x my_script.py
./my_script.py # 而不需要 python3 my_script.py
2.2 路径处理最佳实践
Linux 下的路径处理必须小心,避免硬编码:
import os
import sys
from pathlib import Path
# 获取脚本所在目录(推荐使用Path)
SCRIPT_DIR = Path(__file__).parent
# 项目根目录
ROOT_DIR = SCRIPT_DIR.parent
# 构建绝对路径,避免相对路径陷阱
LOG_FILE = ROOT_DIR / "logs" / "app.log"
# 确保目录存在
LOG_FILE.parent.mkdir(exist_ok=True)
2.3 环境变量与配置管理
敏感配置永远不要硬编码在脚本中,使用环境变量:
import os
from dotenv import load_dotenv
# 加载.env文件(开发环境)
load_dotenv()
# 读取环境变量
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PASSWORD = os.getenv("DB_PASSWORD") # 生产环境通过系统环境变量注入
2.4 防止重复执行:文件锁
Crontab 定时任务最常见的坑:上一次任务还没跑完,下一次就启动了,导致重复执行。使用 Linux 文件锁可以完美解决这个问题:
import fcntl
import os
class FileLock:
"""Linux文件锁,保证同一时间只有一个脚本实例运行"""
def __init__(self, lock_file):
self.lock_file = lock_file
self.fp = None
def __enter__(self):
self.fp = open(self.lock_file, 'w')
try:
# 非阻塞加锁,如果已经被锁了直接抛异常
fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
raise RuntimeError("脚本已经在运行中了,跳过本次执行")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.fp:
fcntl.flock(self.fp, fcntl.LOCK_UN)
self.fp.close()
try:
os.unlink(self.lock_file)
except:
pass
# 使用示例:在脚本开头加上
LOCK_FILE = "/tmp/my_script.lock"
try:
with FileLock(LOCK_FILE):
# 这里放你的业务逻辑
main()
except RuntimeError as e:
print(e)
sys.exit(0)
第三章:系统监控与进程管理
3.1 psutil:跨平台系统监控利器
psutil是 Linux 系统监控的瑞士军刀,它封装了底层的系统调用,让你用 Python 轻松获取系统信息。
pip install psutil
系统资源监控示例
import psutil
import time
def get_system_status():
"""获取系统状态摘要"""
# CPU信息
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
load_avg = psutil.getloadavg() # 1/5/15分钟负载
# 内存信息
mem = psutil.virtual_memory()
mem_used = mem.used / (1024**3) # GB
mem_total = mem.total / (1024**3)
mem_percent = mem.percent
# Swap信息
swap = psutil.swap_memory()
# 磁盘信息
disk = psutil.disk_usage('/')
disk_percent = disk.percent
# 磁盘IO
disk_io = psutil.disk_io_counters()
# 网络信息
net = psutil.net_io_counters()
net_sent = net.bytes_sent / (1024**2) # MB
net_recv = net.bytes_recv / (1024**2)
# TCP连接数
connections = len(psutil.net_connections())
return {
"cpu": f"{cpu_percent}% ({cpu_count}核)",
"load": f"{load_avg[0]:.2f}, {load_avg[1]:.2f}, {load_avg[2]:.2f}",
"memory": f"{mem_used:.1f}G/{mem_total:.1f}G ({mem_percent}%)",
"swap": f"{swap.percent}%",
"disk": f"{disk_percent}%",
"network": f"↑{net_sent:.1f}M ↓{net_recv:.1f}M",
"connections": connections
}
# 实时监控
while True:
status = get_system_status()
print(f"\rCPU: {status['cpu']} | 负载: {status['load']} | MEM: {status['memory']} | DISK: {status['disk']}", end="")
time.sleep(2)
进程管理实战
def find_process_by_name(name):
"""根据名称查找进程"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if name.lower() in proc.info['name'].lower():
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return processes
def kill_process_gracefully(pid, timeout=5):
"""优雅终止进程"""
try:
proc = psutil.Process(pid)
# 先发送SIGTERM,给进程清理时间
proc.terminate()
# 等待进程退出
proc.wait(timeout=timeout)
print(f"进程 {pid} 已正常终止")
except psutil.NoSuchProcess:
print(f"进程 {pid} 不存在")
except psutil.TimeoutExpired:
# 如果超时,强制杀死
proc.kill()
print(f"进程 {pid} 无响应,已强制杀死")
3.2 信号处理与优雅退出
Linux 通过信号与进程通信,你的脚本需要正确处理这些信号:
import signal
import sys
class GracefulExit:
def __init__(self):
self.shutdown_flag = False
# 注册信号处理函数
signal.signal(signal.SIGINT, self._handler) # Ctrl+C
signal.signal(signal.SIGTERM, self._handler) # systemd stop
def _handler(self, signum, frame):
print(f"收到信号 {signum},开始优雅退出...")
self.shutdown_flag = True
def is_shutting_down(self):
return self.shutdown_flag
# 使用示例
graceful = GracefulExit()
while not graceful.is_shutting_down():
# 主循环
do_work()
time.sleep(1)
# 退出前的清理工作
cleanup_resources()
print("退出完成")
sys.exit(0)
热重载配置:自定义信号
生产环境中,我们经常需要不重启服务就重新加载配置。可以使用 Linux 自定义信号 SIGUSR1/SIGUSR2 来实现:
config = None
def load_config():
"""加载配置文件"""
global config
with open('/etc/myapp/config.json') as f:
config = json.load(f)
logger.info("配置已重新加载")
def handle_reload(signum, frame):
"""处理重载信号"""
logger.info("收到SIGUSR1信号,开始重新加载配置...")
load_config()
# 注册信号处理
signal.signal(signal.SIGUSR1, handle_reload)
# 初始加载
load_config()
使用时,只需向进程发送信号即可:
# 不用重启服务,直接重载配置
kill -USR1 <pid>
3.3 权限管理
Linux 的权限模型是安全的基础,脚本中需要注意:
import os
import pwd
def drop_privileges(uid_name='nobody', gid_name='nogroup'):
"""降权:从root切换到普通用户"""
if os.getuid() != 0:
return # 已经不是root,无需处理
# 获取用户ID
uid = pwd.getpwnam(uid_name).pw_uid
gid = pwd.getgrnam(gid_name).gr_gid
# 先降组,再降用户
os.setgid(gid)
os.setuid(uid)
print(f"已降权到用户 {uid_name} (UID={uid})")
第四章:调用 Shell 命令与管道
4.1 subprocess 最佳实践
subprocess是 Python 调用外部命令的标准库,永远不要再用os\.system\(\)了!
基础用法
import subprocess
import shlex
# 推荐:参数列表形式,安全无注入
result = subprocess.run(
['ls', '-l', '/home'],
capture_output=True, # 捕获输出
text=True, # 自动解码为字符串
check=True, # 失败时抛出异常
timeout=30 # 超时时间,防止卡住
)
print(result.stdout)
print(result.returncode)
⚠️ 超时参数很重要:很多外部命令可能因为网络或其他原因卡住,没有超时的话你的脚本会永远卡死。
安全警告:shell=True 的风险
除非绝对必要,否则不要使用shell=True,它会带来命令注入风险:
# ❌ 危险!如果user_input包含 "; rm -rf /" 会被执行
user_input = "; rm -rf /"
subprocess.run(f"echo {user_input}", shell=True)
# ✅ 安全!参数会被正确转义
subprocess.run(['echo', user_input], shell=False)
4.2 管道与流式处理
实现类似 Shell 的管道功能:
# 实现 ps aux | grep python
p1 = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE, text=True)
p2 = subprocess.Popen(
['grep', 'python'],
stdin=p1.stdout,
stdout=subprocess.PIPE,
text=True
)
p1.stdout.close() # 关闭p1的stdout,让p1接收SIGPIPE
output, _ = p2.communicate()
print(output)
实时输出处理
对于长时间运行的命令,实时输出很重要:
def run_command_with_live_output(cmd):
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
for line in iter(process.stdout.readline, ''):
print(f"[OUT] {line.strip()}")
# 可以在这里实时处理日志
process.wait()
return process.returncode
第五章:后台运行与自动化调度
5.1 临时后台运行:nohup & &
对于临时的后台任务:
# 后台运行,忽略挂起信号,输出重定向
nohup python3 my_script.py > app.log 2>&1 &
# 查看后台任务
jobs
# 带回前台
fg %1
⚠️ 注意:
nohup只是临时方案,它无法保证脚本崩溃后自动重启,也无法保证开机自启。
5.2 定时任务:Crontab
Crontab 适合定时执行的脚本,比如每天备份、每小时监控。
# 编辑定时任务
crontab -e
Crontab 配置示例
# 分 时 日 月 周 命令
# 每天凌晨2点执行备份
0 2 * * * /home/user/scripts/backup.py >> /var/log/backup.log 2>&1
# 每10分钟执行一次监控
*/10 * * * * /home/user/.venv/bin/python /home/user/scripts/monitor.py
# 开机启动执行
@reboot /home/user/scripts/startup.sh
# 执行结果自动发邮件
MAILTO="your@email.com"
0 2 * * * /home/user/scripts/backup.py
Crontab 避坑指南
-
绝对路径:Crontab 的 PATH 非常短,所有命令必须用绝对路径
# ❌ 错误:可能找不到python * * * * * python /path/to/script.py # ✅ 正确:使用虚拟环境的绝对路径 * * * * * /home/user/.venv/bin/python /path/to/script.py -
工作目录:Crontab 的工作目录是用户家目录,脚本中尽量用绝对路径
-
环境变量:Crontab 不会加载.bashrc,需要的环境变量要在 crontab 里定义
SHELL=/bin/bash PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin * * * * * /path/to/script.py
5.3 常驻服务:Systemd
对于需要长期运行的后台服务,Systemd 是 Ubuntu 的标准方案。它能提供:
-
开机自启
-
崩溃自动重启
-
状态监控
-
统一的日志管理
1. 创建服务文件
sudo nano /etc/systemd/system/my-monitor.service
服务文件内容:
[Unit]
# 服务描述
Description=My System Monitor Service
# 网络就绪后再启动
After=network.target
[Service]
# 执行命令(绝对路径!)
ExecStart=/home/user/.venv/bin/python /home/user/scripts/monitor.py
# 工作目录
WorkingDirectory=/home/user/scripts
# 崩溃自动重启
Restart=always
RestartSec=5
# 运行用户
User=user
Group=user
# 输出重定向到journal
StandardOutput=journal
StandardError=journal
# 解决"too many open files"问题
LimitNOFILE=65536
# 从文件加载环境变量(敏感配置放这里)
EnvironmentFile=/etc/myapp/env.conf
[Install]
# 多用户模式下启用
WantedBy=multi-user.target
💡
EnvironmentFile是生产环境最佳实践:把敏感的环境变量(如密码、API Key)放在单独的文件里,权限设为600,只有运行用户能读,而不是写在公开的服务文件里。
2. 启用服务
# 重新加载配置
sudo systemctl daemon-reload
# 启动服务
sudo systemctl start my-monitor
# 设置开机自启
sudo systemctl enable my-monitor
3. 管理与查看
# 查看服务状态
sudo systemctl status my-monitor
# 查看实时日志
sudo journalctl -u my-monitor -f
# 重启服务
sudo systemctl restart my-monitor
# 停止服务
sudo systemctl stop my-monitor
第六章:日志管理与问题排查
6.1 Python logging 配置
标准的日志配置:
import logging
import logging.handlers
def setup_logging():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 格式
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 文件处理器
file_handler = logging.handlers.RotatingFileHandler(
'/var/log/myapp/app.log',
maxBytes=100*1024*1024, # 100MB
backupCount=5,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
结构化日志(JSON)
生产环境中,JSON 格式的日志更容易被 ELK、Grafana Loki 等日志系统解析:
pip install python-json-logger
from pythonjsonlogger import jsonlogger
def setup_json_logging():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# JSON格式
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(levelname)s %(name)s %(message)s'
)
handler = logging.FileHandler('/var/log/myapp/app.json.log')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
Systemd Journald 日志
如果你用 Systemd 部署服务,直接把日志输出到 Journald 是最佳实践,不需要自己管理日志文件:
pip install systemd-python
from systemd import journal
# 直接发送日志到journald
journal.send("Hello from Python", PRIORITY=6)
# 或者用logging集成
from systemd.journal import JournalHandler
logger = logging.getLogger(__name__)
logger.addHandler(JournalHandler())
logger.info("This will go to journald")
6.2 Logrotate 日志轮转
即使你用了 RotatingFileHandler,Linux 的 logrotate 依然是生产环境的首选。
创建配置文件 /etc/logrotate\.d/myapp:
/var/log/myapp/*.log {
daily # 每天轮转
rotate 30 # 保留30天
compress # 自动压缩
delaycompress # 延迟压缩
missingok # 文件不存在不报错
notifempty # 空文件不轮转
dateext # 用日期做后缀
dateformat -%Y%m%d
# 对于Python脚本,使用copytruncate模式
copytruncate # 复制并清空,无需重启应用
su user user # 切换用户处理权限
}
调试 Logrotate
# 测试配置(不实际执行)
logrotate -d /etc/logrotate.d/myapp
# 强制执行一次
logrotate -f /etc/logrotate.d/myapp
6.3 常见问题排查
问题 1:删除了日志文件,磁盘空间没释放?
这是因为进程还持有已删除文件的句柄。解决:
# 重启进程,或者发送USR1信号让它重开日志
kill -USR1 <pid>
问题 2:脚本在手动运行正常,Crontab/Systemd 里运行失败?
99% 是路径或环境变量问题:
-
检查所有路径是否都是绝对路径
-
检查 Python 解释器路径
-
检查日志输出看具体错误
第七章:远程服务器批量管理
7.1 Paramiko:SSH 远程执行
Paramiko 是 Python 的 SSH 协议实现,让你可以用代码远程管理服务器。
pip install paramiko
基础 SSH 连接
import paramiko
def ssh_exec(host, username, password, command):
# 创建客户端
client = paramiko.SSHClient()
# 自动接受主机密钥(测试用,生产建议关闭)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 连接
client.connect(
hostname=host,
username=username,
password=password,
timeout=10
)
# 执行命令
stdin, stdout, stderr = client.exec_command(command, timeout=30)
# 获取结果
out = stdout.read().decode('utf-8')
err = stderr.read().decode('utf-8')
code = stdout.channel.recv_exit_status()
return out, err, code
finally:
client.close()
# 使用
out, err, code = ssh_exec('192.168.1.100', 'ubuntu', 'password', 'ls -la')
print(out)
密钥认证(推荐)
生产环境永远用密钥而不是密码:
client.connect(
hostname=host,
username=username,
key_filename='/home/user/.ssh/id_rsa', # 私钥路径
timeout=10
)
7.2 SFTP 文件传输
def sftp_upload(host, username, key_file, local_path, remote_path):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, username=username, key_filename=key_file)
sftp = client.open_sftp()
try:
# 上传文件
sftp.put(local_path, remote_path)
print(f"上传完成: {local_path} -> {remote_path}")
finally:
sftp.close()
client.close()
7.3 封装 SSHExecutor
封装一个可复用的执行器:
class SSHExecutor:
def __init__(self, host, username, password=None, key_file=None):
self.host = host
self.username = username
self.password = password
self.key_file = key_file
self._client = None
def connect(self):
if self._client and self._client.get_transport().is_active():
return
self._client = paramiko.SSHClient()
self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._client.connect(
self.host, username=self.username,
password=self.password, key_filename=self.key_file
)
def run(self, command):
self.connect()
stdin, stdout, stderr = self._client.exec_command(command)
out = stdout.read().decode()
err = stderr.read().decode()
code = stdout.channel.recv_exit_status()
return out, err, code
def close(self):
if self._client:
self._client.close()
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()
# 使用
with SSHExecutor('192.168.1.100', 'ubuntu', key_file='~/.ssh/id_rsa') as ssh:
out, err, code = ssh.run('apt update')
print(out)
7.4 并发批量执行
管理几十台服务器时,串行执行太慢了。用多线程并发执行:
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_run(hosts, command, username, key_file):
"""
批量在多台服务器上执行命令
"""
results = {}
def run_single(host):
try:
with SSHExecutor(host, username, key_file=key_file) as ssh:
out, err, code = ssh.run(command)
return host, out, err, code, None
except Exception as e:
return host, None, None, None, str(e)
# 最多10个并发
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(run_single, host) for host in hosts]
for future in as_completed(futures):
host, out, err, code, error = future.result()
results[host] = {
"success": error is None,
"output": out,
"error": error or err,
"code": code
}
return results
# 使用
hosts = ['192.168.1.100', '192.168.1.101', '192.168.1.102']
results = batch_run(hosts, 'uptime', 'ubuntu', '~/.ssh/id_rsa')
for host, res in results.items():
if res['success']:
print(f"{host}: {res['output'].strip()}")
else:
print(f"{host}: 失败 - {res['error']}")
第八章:高级自动化场景
8.1 文件系统监控:Inotify
Linux 的 inotify 可以监控文件系统的变化,比如新文件上传、文件修改等,非常适合自动化处理场景。
pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileChangeHandler(FileSystemEventHandler):
def on_created(self, event):
"""新文件创建时触发"""
if not event.is_directory:
logger.info(f"检测到新文件: {event.src_path}")
# 自动处理新文件,比如解压、解析、上传
process_new_file(event.src_path)
def on_modified(self, event):
"""文件修改时触发"""
if not event.is_directory:
logger.info(f"文件被修改: {event.src_path}")
def on_deleted(self, event):
"""文件删除时触发"""
logger.info(f"文件被删除: {event.src_path}")
# 启动监控
def start_watcher(path):
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
logger.info(f"开始监控目录: {path}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
常见应用场景:
-
自动处理上传的日志文件
-
监控配置文件变化自动重载
-
自动同步文件到备份服务器
8.2 失败重试机制
网络请求、API 调用经常会因为临时的网络波动失败,自动重试可以大大提高稳定性:
pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
@retry(
stop=stop_after_attempt(3), # 最多重试3次
wait=wait_exponential(multiplier=1, min=2, max=10), # 指数退避:2s, 4s, 8s
retry=retry_if_exception_type((requests.exceptions.ConnectionError,
requests.exceptions.Timeout)),
reraise=True
)
def send_alert(message):
"""发送告警,失败自动重试"""
response = requests.post(
ALERT_WEBHOOK,
json={"text": message},
timeout=10
)
response.raise_for_status()
return response
第九章:脚本调试与性能分析
9.1 性能分析:cProfile
想知道你的脚本哪里慢?用 Python 内置的 cProfile:
# 运行脚本并生成性能报告
python -m cProfile -s cumulative my_script.py
输出示例:
ncalls tottime percall cumtime percall filename:lineno(function)
100 0.123 0.001 2.456 0.024 my_script.py:123(process_data)
200 0.045 0.000 1.234 0.006 my_script.py:456(parse_file)
-
tottime:函数本身的耗时(不包含子函数) -
cumtime:累计耗时(包含所有子函数)
9.2 行级性能分析:line_profiler
想知道具体哪一行代码慢?用 line_profiler:
pip install line_profiler
给函数加上装饰器:
@profile
def slow_function():
# 你的代码
pass
然后运行:
kernprof -l -v my_script.py
它会输出每一行代码的耗时,帮你精准定位瓶颈。
9.3 内存分析:memory_profiler
pip install memory_profiler
@profile
def my_function():
a = [1] * 1000000
b = [2] * 1000000
del b
return a
运行:
python -m memory_profiler my_script.py
第十章:安全最佳实践
10.1 敏感信息处理
-
永远不要把密码、API Key 硬编码在代码里
-
配置文件权限设为
600,只有所有者能读写 -
使用环境变量注入敏感配置,而不是写在代码里
-
生产环境使用
keyring管理密码:
pip install keyring
import keyring
# 存储密码
keyring.set_password("myapp", "db_user", "my_password")
# 读取密码
password = keyring.get_password("myapp", "db_user")
10.2 文件权限
-
脚本文件权限不要给其他用户写权限,避免被篡改:
chmod 755 script\.py -
敏感配置文件:
chmod 600 config\.env -
日志文件:
chmod 640 app\.log,避免其他用户读取日志里的敏感信息
10.3 输入验证
永远不要信任外部输入,无论是用户输入、环境变量还是文件内容:
-
对所有外部输入做验证
-
调用外部命令时永远用参数列表,不要用 shell=True
-
路径处理时检查是否有路径穿越攻击(比如
\.\./)
第十一章:综合实战:系统监控告警脚本
现在我们把所有知识点整合起来,写一个完整的系统监控脚本:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import signal
import json
import logging
import psutil
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import fcntl
# -------------------------- 配置 --------------------------
CHECK_INTERVAL = 60 # 检查间隔,秒
ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK") # 告警Webhook
CPU_THRESHOLD = 80
MEM_THRESHOLD = 85
DISK_THRESHOLD = 90
LOAD_THRESHOLD = psutil.cpu_count() * 0.8 # 负载阈值:CPU核数*0.8
# -------------------------- 日志 --------------------------
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/monitor/monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# -------------------------- 信号处理 --------------------------
class GracefulExit:
def __init__(self):
self.shutdown_flag = False
signal.signal(signal.SIGINT, self.handler)
signal.signal(signal.SIGTERM, self.handler)
signal.signal(signal.SIGUSR1, self.reload_handler)
def handler(self, signum, frame):
logger.info(f"收到退出信号 {signum},准备退出")
self.shutdown_flag = True
def reload_handler(self, signum, frame):
logger.info("收到重载信号,重新加载配置")
# 这里可以重新加载配置
def is_shutting_down(self):
return self.shutdown_flag
# -------------------------- 重试告警 --------------------------
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.exceptions.ConnectionError,
requests.exceptions.Timeout)),
reraise=False
)
def send_alert(alerts):
if not ALERT_WEBHOOK:
logger.warning("未配置告警Webhook,跳过告警")
return
message = "\n".join(alerts)
logger.info(f"发送告警: {message}")
requests.post(ALERT_WEBHOOK, json={
"text": f"服务器告警: {message}"
}, timeout=10)
# -------------------------- 监控逻辑 --------------------------
def check_system():
alerts = []
# CPU检查
cpu = psutil.cpu_percent(interval=1)
if cpu > CPU_THRESHOLD:
alerts.append(f"CPU使用率过高: {cpu}%")
# 负载检查
load1, load5, load15 = psutil.getloadavg()
if load1 > LOAD_THRESHOLD:
alerts.append(f"系统负载过高: {load1:.2f}")
# 内存检查
mem = psutil.virtual_memory()
if mem.percent > MEM_THRESHOLD:
alerts.append(f"内存使用率过高: {mem.percent}%")
# Swap检查
swap = psutil.swap_memory()
if swap.percent > 50:
alerts.append(f"Swap使用率过高: {swap.percent}%")
# 磁盘检查
disk = psutil.disk_usage('/')
if disk.percent > DISK_THRESHOLD:
alerts.append(f"磁盘使用率过高: {disk.percent}%")
return alerts
# -------------------------- 主循环 --------------------------
def main():
logger.info("监控服务启动")
graceful = GracefulExit()
while not graceful.is_shutting_down():
try:
alerts = check_system()
if alerts:
logger.warning(f"发现告警: {alerts}")
send_alert(alerts)
else:
logger.info("系统状态正常")
except Exception as e:
logger.error(f"检查出错: {e}", exc_info=True)
# 等待,可中断
for _ in range(CHECK_INTERVAL):
if graceful.is_shutting_down():
break
time.sleep(1)
logger.info("监控服务退出")
# -------------------------- 文件锁 --------------------------
class FileLock:
def __init__(self, lock_file):
self.lock_file = lock_file
self.fp = None
def __enter__(self):
self.fp = open(self.lock_file, 'w')
try:
fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
raise RuntimeError("监控已经在运行中")
return self
def __exit__(self, *args):
if self.fp:
fcntl.flock(self.fp, fcntl.LOCK_UN)
self.fp.close()
if __name__ == "__main__":
# 确保目录存在
os.makedirs('/var/log/monitor', exist_ok=True)
LOCK_FILE = "/tmp/monitor.lock"
try:
with FileLock(LOCK_FILE):
main()
except RuntimeError as e:
logger.error(e)
sys.exit(1)
部署这个脚本
- 安装依赖
pip install psutil requests tenacity
- 创建 Systemd 服务
[Unit]
Description=System Monitor Service
After=network.target
[Service]
ExecStart=/home/user/.venv/bin/python /home/user/scripts/monitor.py
WorkingDirectory=/home/user/scripts
Environment="ALERT_WEBHOOK=https://your-webhook-url"
Restart=always
RestartSec=5
User=user
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target
- 配置 Logrotate
/var/log/monitor/*.log {
daily
rotate 30
compress
missingok
notifempty
copytruncate
su user user
}
第十二章:生产环境避坑指南
12.1 常见陷阱
-
相对路径陷阱:永远用绝对路径,尤其是在 Crontab/Systemd 中
-
环境变量陷阱:不要假设环境变量和你手动登录的一样
-
权限陷阱:脚本运行用户必须有足够的权限访问文件
-
僵尸进程:子进程退出后要 wait,否则会变成僵尸进程
-
文件描述符泄漏:长期运行的服务要记得关闭文件句柄
-
Too many open files:记得在 Systemd 里配置
LimitNOFILE -
重复执行:定时任务记得加文件锁,防止重复跑
12.2 性能优化
-
IO 密集型任务:使用多线程或 asyncio
-
CPU 密集型任务:使用多进程(绕过 GIL)
-
批量操作:尽量减少循环中的 IO 操作
(注:文档部分内容可能由 AI 生成)
- 感谢你赐予我前进的力量