Linux Python脚本开发完整指南

针对 Ubuntu 环境的全面 Python 自动化脚本开发指南,涵盖从环境搭建到生产级部署的全流程实践。

前言

Python 作为 "胶水语言",在 Linux 环境下拥有无与伦比的生态优势。本指南将带你系统掌握如何在 Ubuntu 系统中编写高效、稳定、可维护的 Python 自动化脚本,覆盖系统监控、进程管理、远程运维等核心场景。

读者定位:具备基础编程知识,熟悉 Linux 常用命令,希望深入掌握 Linux 下 Python 自动化开发的开发者。


第一章:Ubuntu 下 Python 开发环境搭建

1.1 系统 Python 环境管理

Ubuntu 系统通常预装了 Python,但我们需要正确管理环境以避免破坏系统工具。

# 检查系统Python版本
python3 --version

# 安装基础开发工具
sudo apt update
sudo apt install -y python3-full python3-pip python3-dev

⚠️ 警告:永远不要卸载系统自带的python3包,否则会导致apt等系统工具崩溃。

1.2 多版本管理与虚拟环境

使用 pyenv 管理多 Python 版本

对于需要切换不同 Python 版本的场景,pyenv是最佳选择:

# 安装pyenv
sudo apt install pyenv

配置 Shell 环境,你可以用 Vim 编辑 \.bashrc

vim ~/.bashrc

然后在文件末尾添加:

export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"

或者直接运行以下命令一次性写入:

cat >> ~/.bashrc << 'EOF'
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
EOF

# 重启Shell后生效
exec "$SHELL"

# 安装指定版本Python
pyenv install 3.12.0
pyenv local 3.12.0  # 为当前目录设置版本

项目级虚拟环境(venv)

这是最推荐的项目隔离方案,无需额外工具:

# 创建虚拟环境
python3 -m venv .venv

# 激活虚拟环境
source .venv/bin/activate

# 退出虚拟环境
deactivate

虚拟环境目录结构如下,它会创建一个独立的 Python 运行环境:

.venv/
├── bin/          # 激活脚本和Python解释器
├── lib/          # 独立的依赖包
├── include/      # 头文件
└── pyvenv.cfg    # 配置文件

1.3 开发工具链配置

为了提升代码质量,建议安装以下工具:

# 代码格式化与检查
pip install black flake8 pytest

# 或者通过apt安装
sudo apt install black flake8 python3-pytest

第二章:Linux Python 脚本基础规范

2.1 Shebang 与可执行脚本

一个标准的 Linux Python 脚本开头必须包含 Shebang 行:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

为什么用/usr/bin/env python3而不是/usr/bin/python3

  • 前者会自动在 PATH 中查找 Python 解释器,兼容虚拟环境

  • 后者写死了路径,在不同机器或虚拟环境中可能失效

添加执行权限后,你可以直接运行脚本:

chmod +x my_script.py
./my_script.py  # 而不需要 python3 my_script.py

2.2 路径处理最佳实践

Linux 下的路径处理必须小心,避免硬编码:

import os
import sys
from pathlib import Path

# 获取脚本所在目录(推荐使用Path)
SCRIPT_DIR = Path(__file__).parent
# 项目根目录
ROOT_DIR = SCRIPT_DIR.parent

# 构建绝对路径,避免相对路径陷阱
LOG_FILE = ROOT_DIR / "logs" / "app.log"
# 确保目录存在
LOG_FILE.parent.mkdir(exist_ok=True)

2.3 环境变量与配置管理

敏感配置永远不要硬编码在脚本中,使用环境变量:

import os
from dotenv import load_dotenv

# 加载.env文件(开发环境)
load_dotenv()

# 读取环境变量
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PASSWORD = os.getenv("DB_PASSWORD")  # 生产环境通过系统环境变量注入

2.4 防止重复执行:文件锁

Crontab 定时任务最常见的坑:上一次任务还没跑完,下一次就启动了,导致重复执行。使用 Linux 文件锁可以完美解决这个问题:

import fcntl
import os

class FileLock:
    """Linux文件锁,保证同一时间只有一个脚本实例运行"""
    def __init__(self, lock_file):
        self.lock_file = lock_file
        self.fp = None
    
    def __enter__(self):
        self.fp = open(self.lock_file, 'w')
        try:
            # 非阻塞加锁,如果已经被锁了直接抛异常
            fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except BlockingIOError:
            raise RuntimeError("脚本已经在运行中了,跳过本次执行")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.fp:
            fcntl.flock(self.fp, fcntl.LOCK_UN)
            self.fp.close()
            try:
                os.unlink(self.lock_file)
            except:
                pass

# 使用示例:在脚本开头加上
LOCK_FILE = "/tmp/my_script.lock"
try:
    with FileLock(LOCK_FILE):
        # 这里放你的业务逻辑
        main()
except RuntimeError as e:
    print(e)
    sys.exit(0)

第三章:系统监控与进程管理

3.1 psutil:跨平台系统监控利器

psutil是 Linux 系统监控的瑞士军刀,它封装了底层的系统调用,让你用 Python 轻松获取系统信息。

pip install psutil

系统资源监控示例

import psutil
import time

def get_system_status():
    """获取系统状态摘要"""
    # CPU信息
    cpu_percent = psutil.cpu_percent(interval=1)
    cpu_count = psutil.cpu_count()
    load_avg = psutil.getloadavg()  # 1/5/15分钟负载
    
    # 内存信息
    mem = psutil.virtual_memory()
    mem_used = mem.used / (1024**3)  # GB
    mem_total = mem.total / (1024**3)
    mem_percent = mem.percent
    
    # Swap信息
    swap = psutil.swap_memory()
    
    # 磁盘信息
    disk = psutil.disk_usage('/')
    disk_percent = disk.percent
    
    # 磁盘IO
    disk_io = psutil.disk_io_counters()
    
    # 网络信息
    net = psutil.net_io_counters()
    net_sent = net.bytes_sent / (1024**2)  # MB
    net_recv = net.bytes_recv / (1024**2)
    
    # TCP连接数
    connections = len(psutil.net_connections())
    
    return {
        "cpu": f"{cpu_percent}% ({cpu_count}核)",
        "load": f"{load_avg[0]:.2f}, {load_avg[1]:.2f}, {load_avg[2]:.2f}",
        "memory": f"{mem_used:.1f}G/{mem_total:.1f}G ({mem_percent}%)",
        "swap": f"{swap.percent}%",
        "disk": f"{disk_percent}%",
        "network": f"↑{net_sent:.1f}M ↓{net_recv:.1f}M",
        "connections": connections
    }

# 实时监控
while True:
    status = get_system_status()
    print(f"\rCPU: {status['cpu']} | 负载: {status['load']} | MEM: {status['memory']} | DISK: {status['disk']}", end="")
    time.sleep(2)

进程管理实战

def find_process_by_name(name):
    """根据名称查找进程"""
    processes = []
    for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
        try:
            if name.lower() in proc.info['name'].lower():
                processes.append(proc.info)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            pass
    return processes

def kill_process_gracefully(pid, timeout=5):
    """优雅终止进程"""
    try:
        proc = psutil.Process(pid)
        # 先发送SIGTERM,给进程清理时间
        proc.terminate()
        # 等待进程退出
        proc.wait(timeout=timeout)
        print(f"进程 {pid} 已正常终止")
    except psutil.NoSuchProcess:
        print(f"进程 {pid} 不存在")
    except psutil.TimeoutExpired:
        # 如果超时,强制杀死
        proc.kill()
        print(f"进程 {pid} 无响应,已强制杀死")

3.2 信号处理与优雅退出

Linux 通过信号与进程通信,你的脚本需要正确处理这些信号:

import signal
import sys

class GracefulExit:
    def __init__(self):
        self.shutdown_flag = False
        # 注册信号处理函数
        signal.signal(signal.SIGINT, self._handler)   # Ctrl+C
        signal.signal(signal.SIGTERM, self._handler)  # systemd stop
    
    def _handler(self, signum, frame):
        print(f"收到信号 {signum},开始优雅退出...")
        self.shutdown_flag = True
    
    def is_shutting_down(self):
        return self.shutdown_flag

# 使用示例
graceful = GracefulExit()

while not graceful.is_shutting_down():
    # 主循环
    do_work()
    time.sleep(1)

# 退出前的清理工作
cleanup_resources()
print("退出完成")
sys.exit(0)

热重载配置:自定义信号

生产环境中,我们经常需要不重启服务就重新加载配置。可以使用 Linux 自定义信号 SIGUSR1/SIGUSR2 来实现:

config = None

def load_config():
    """加载配置文件"""
    global config
    with open('/etc/myapp/config.json') as f:
        config = json.load(f)
    logger.info("配置已重新加载")

def handle_reload(signum, frame):
    """处理重载信号"""
    logger.info("收到SIGUSR1信号,开始重新加载配置...")
    load_config()

# 注册信号处理
signal.signal(signal.SIGUSR1, handle_reload)

# 初始加载
load_config()

使用时,只需向进程发送信号即可:

# 不用重启服务,直接重载配置
kill -USR1 <pid>

3.3 权限管理

Linux 的权限模型是安全的基础,脚本中需要注意:

import os
import pwd

def drop_privileges(uid_name='nobody', gid_name='nogroup'):
    """降权:从root切换到普通用户"""
    if os.getuid() != 0:
        return  # 已经不是root,无需处理
    
    # 获取用户ID
    uid = pwd.getpwnam(uid_name).pw_uid
    gid = pwd.getgrnam(gid_name).gr_gid
    
    # 先降组,再降用户
    os.setgid(gid)
    os.setuid(uid)
    
    print(f"已降权到用户 {uid_name} (UID={uid})")

第四章:调用 Shell 命令与管道

4.1 subprocess 最佳实践

subprocess是 Python 调用外部命令的标准库,永远不要再用os\.system\(\)

基础用法

import subprocess
import shlex

# 推荐:参数列表形式,安全无注入
result = subprocess.run(
    ['ls', '-l', '/home'],
    capture_output=True,  # 捕获输出
    text=True,           # 自动解码为字符串
    check=True,          # 失败时抛出异常
    timeout=30           # 超时时间,防止卡住
)

print(result.stdout)
print(result.returncode)

⚠️ 超时参数很重要:很多外部命令可能因为网络或其他原因卡住,没有超时的话你的脚本会永远卡死。

安全警告:shell=True 的风险

除非绝对必要,否则不要使用shell=True,它会带来命令注入风险:

# ❌ 危险!如果user_input包含 "; rm -rf /" 会被执行
user_input = "; rm -rf /"
subprocess.run(f"echo {user_input}", shell=True)

# ✅ 安全!参数会被正确转义
subprocess.run(['echo', user_input], shell=False)

4.2 管道与流式处理

实现类似 Shell 的管道功能:

# 实现 ps aux | grep python
p1 = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE, text=True)
p2 = subprocess.Popen(
    ['grep', 'python'],
    stdin=p1.stdout,
    stdout=subprocess.PIPE,
    text=True
)
p1.stdout.close()  # 关闭p1的stdout,让p1接收SIGPIPE

output, _ = p2.communicate()
print(output)

实时输出处理

对于长时间运行的命令,实时输出很重要:

def run_command_with_live_output(cmd):
    process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
        universal_newlines=True
    )
    
    for line in iter(process.stdout.readline, ''):
        print(f"[OUT] {line.strip()}")
        # 可以在这里实时处理日志
    
    process.wait()
    return process.returncode

第五章:后台运行与自动化调度

5.1 临时后台运行:nohup &amp; &amp;

对于临时的后台任务:

# 后台运行,忽略挂起信号,输出重定向
nohup python3 my_script.py > app.log 2>&1 &

# 查看后台任务
jobs
# 带回前台
fg %1

⚠️ 注意:nohup只是临时方案,它无法保证脚本崩溃后自动重启,也无法保证开机自启。

5.2 定时任务:Crontab

Crontab 适合定时执行的脚本,比如每天备份、每小时监控。

# 编辑定时任务
crontab -e

Crontab 配置示例

# 分 时 日 月 周 命令
# 每天凌晨2点执行备份
0 2 * * * /home/user/scripts/backup.py >> /var/log/backup.log 2>&1

# 每10分钟执行一次监控
*/10 * * * * /home/user/.venv/bin/python /home/user/scripts/monitor.py

# 开机启动执行
@reboot /home/user/scripts/startup.sh

# 执行结果自动发邮件
MAILTO="your@email.com"
0 2 * * * /home/user/scripts/backup.py

Crontab 避坑指南

  1. 绝对路径:Crontab 的 PATH 非常短,所有命令必须用绝对路径

    # ❌ 错误:可能找不到python
    * * * * * python /path/to/script.py
    
    # ✅ 正确:使用虚拟环境的绝对路径
    * * * * * /home/user/.venv/bin/python /path/to/script.py
    
  2. 工作目录:Crontab 的工作目录是用户家目录,脚本中尽量用绝对路径

  3. 环境变量:Crontab 不会加载.bashrc,需要的环境变量要在 crontab 里定义

    SHELL=/bin/bash
    PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
    
    * * * * * /path/to/script.py
    

5.3 常驻服务:Systemd

对于需要长期运行的后台服务,Systemd 是 Ubuntu 的标准方案。它能提供:

  • 开机自启

  • 崩溃自动重启

  • 状态监控

  • 统一的日志管理

1. 创建服务文件

sudo nano /etc/systemd/system/my-monitor.service

服务文件内容:

[Unit]
# 服务描述
Description=My System Monitor Service
# 网络就绪后再启动
After=network.target

[Service]
# 执行命令(绝对路径!)
ExecStart=/home/user/.venv/bin/python /home/user/scripts/monitor.py
# 工作目录
WorkingDirectory=/home/user/scripts
# 崩溃自动重启
Restart=always
RestartSec=5
# 运行用户
User=user
Group=user
# 输出重定向到journal
StandardOutput=journal
StandardError=journal

# 解决"too many open files"问题
LimitNOFILE=65536

# 从文件加载环境变量(敏感配置放这里)
EnvironmentFile=/etc/myapp/env.conf

[Install]
# 多用户模式下启用
WantedBy=multi-user.target

💡 EnvironmentFile 是生产环境最佳实践:把敏感的环境变量(如密码、API Key)放在单独的文件里,权限设为 600,只有运行用户能读,而不是写在公开的服务文件里。

2. 启用服务

# 重新加载配置
sudo systemctl daemon-reload

# 启动服务
sudo systemctl start my-monitor

# 设置开机自启
sudo systemctl enable my-monitor

3. 管理与查看

# 查看服务状态
sudo systemctl status my-monitor

Image

# 查看实时日志
sudo journalctl -u my-monitor -f

# 重启服务
sudo systemctl restart my-monitor

# 停止服务
sudo systemctl stop my-monitor

第六章:日志管理与问题排查

6.1 Python logging 配置

标准的日志配置:

import logging
import logging.handlers

def setup_logging():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    
    # 格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 文件处理器
    file_handler = logging.handlers.RotatingFileHandler(
        '/var/log/myapp/app.log',
        maxBytes=100*1024*1024,  # 100MB
        backupCount=5,
        encoding='utf-8'
    )
    file_handler.setFormatter(formatter)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

logger = setup_logging()

结构化日志(JSON)

生产环境中,JSON 格式的日志更容易被 ELK、Grafana Loki 等日志系统解析:

pip install python-json-logger
from pythonjsonlogger import jsonlogger

def setup_json_logging():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    
    # JSON格式
    formatter = jsonlogger.JsonFormatter(
        '%(asctime)s %(levelname)s %(name)s %(message)s'
    )
    
    handler = logging.FileHandler('/var/log/myapp/app.json.log')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger

Systemd Journald 日志

如果你用 Systemd 部署服务,直接把日志输出到 Journald 是最佳实践,不需要自己管理日志文件:

pip install systemd-python
from systemd import journal

# 直接发送日志到journald
journal.send("Hello from Python", PRIORITY=6)

# 或者用logging集成
from systemd.journal import JournalHandler

logger = logging.getLogger(__name__)
logger.addHandler(JournalHandler())
logger.info("This will go to journald")

6.2 Logrotate 日志轮转

即使你用了 RotatingFileHandler,Linux 的 logrotate 依然是生产环境的首选。

创建配置文件 /etc/logrotate\.d/myapp

/var/log/myapp/*.log {
    daily               # 每天轮转
    rotate 30           # 保留30天
    compress            # 自动压缩
    delaycompress       # 延迟压缩
    missingok           # 文件不存在不报错
    notifempty          # 空文件不轮转
    dateext             # 用日期做后缀
    dateformat -%Y%m%d
    
    # 对于Python脚本,使用copytruncate模式
    copytruncate        # 复制并清空,无需重启应用
    su user user        # 切换用户处理权限
}

调试 Logrotate

# 测试配置(不实际执行)
logrotate -d /etc/logrotate.d/myapp

# 强制执行一次
logrotate -f /etc/logrotate.d/myapp

6.3 常见问题排查

问题 1:删除了日志文件,磁盘空间没释放?

这是因为进程还持有已删除文件的句柄。解决:

# 重启进程,或者发送USR1信号让它重开日志
kill -USR1 <pid>

问题 2:脚本在手动运行正常,Crontab/Systemd 里运行失败?

99% 是路径或环境变量问题:

  1. 检查所有路径是否都是绝对路径

  2. 检查 Python 解释器路径

  3. 检查日志输出看具体错误


第七章:远程服务器批量管理

7.1 Paramiko:SSH 远程执行

Paramiko 是 Python 的 SSH 协议实现,让你可以用代码远程管理服务器。

pip install paramiko

基础 SSH 连接

import paramiko

def ssh_exec(host, username, password, command):
    # 创建客户端
    client = paramiko.SSHClient()
    # 自动接受主机密钥(测试用,生产建议关闭)
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # 连接
        client.connect(
            hostname=host,
            username=username,
            password=password,
            timeout=10
        )
        
        # 执行命令
        stdin, stdout, stderr = client.exec_command(command, timeout=30)
        
        # 获取结果
        out = stdout.read().decode('utf-8')
        err = stderr.read().decode('utf-8')
        code = stdout.channel.recv_exit_status()
        
        return out, err, code
    finally:
        client.close()

# 使用
out, err, code = ssh_exec('192.168.1.100', 'ubuntu', 'password', 'ls -la')
print(out)

密钥认证(推荐)

生产环境永远用密钥而不是密码:

client.connect(
    hostname=host,
    username=username,
    key_filename='/home/user/.ssh/id_rsa',  # 私钥路径
    timeout=10
)

7.2 SFTP 文件传输

def sftp_upload(host, username, key_file, local_path, remote_path):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host, username=username, key_filename=key_file)
    
    sftp = client.open_sftp()
    try:
        # 上传文件
        sftp.put(local_path, remote_path)
        print(f"上传完成: {local_path} -> {remote_path}")
    finally:
        sftp.close()
        client.close()

7.3 封装 SSHExecutor

封装一个可复用的执行器:

class SSHExecutor:
    def __init__(self, host, username, password=None, key_file=None):
        self.host = host
        self.username = username
        self.password = password
        self.key_file = key_file
        self._client = None
    
    def connect(self):
        if self._client and self._client.get_transport().is_active():
            return
        self._client = paramiko.SSHClient()
        self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self._client.connect(
            self.host, username=self.username,
            password=self.password, key_filename=self.key_file
        )
    
    def run(self, command):
        self.connect()
        stdin, stdout, stderr = self._client.exec_command(command)
        out = stdout.read().decode()
        err = stderr.read().decode()
        code = stdout.channel.recv_exit_status()
        return out, err, code
    
    def close(self):
        if self._client:
            self._client.close()
    
    def __enter__(self):
        self.connect()
        return self
    
    def __exit__(self, *args):
        self.close()

# 使用
with SSHExecutor('192.168.1.100', 'ubuntu', key_file='~/.ssh/id_rsa') as ssh:
    out, err, code = ssh.run('apt update')
    print(out)

7.4 并发批量执行

管理几十台服务器时,串行执行太慢了。用多线程并发执行:

from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_run(hosts, command, username, key_file):
    """
    批量在多台服务器上执行命令
    """
    results = {}
    
    def run_single(host):
        try:
            with SSHExecutor(host, username, key_file=key_file) as ssh:
                out, err, code = ssh.run(command)
                return host, out, err, code, None
        except Exception as e:
            return host, None, None, None, str(e)
    
    # 最多10个并发
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(run_single, host) for host in hosts]
        
        for future in as_completed(futures):
            host, out, err, code, error = future.result()
            results[host] = {
                "success": error is None,
                "output": out,
                "error": error or err,
                "code": code
            }
    
    return results

# 使用
hosts = ['192.168.1.100', '192.168.1.101', '192.168.1.102']
results = batch_run(hosts, 'uptime', 'ubuntu', '~/.ssh/id_rsa')

for host, res in results.items():
    if res['success']:
        print(f"{host}: {res['output'].strip()}")
    else:
        print(f"{host}: 失败 - {res['error']}")

第八章:高级自动化场景

8.1 文件系统监控:Inotify

Linux 的 inotify 可以监控文件系统的变化,比如新文件上传、文件修改等,非常适合自动化处理场景。

pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    def on_created(self, event):
        """新文件创建时触发"""
        if not event.is_directory:
            logger.info(f"检测到新文件: {event.src_path}")
            # 自动处理新文件,比如解压、解析、上传
            process_new_file(event.src_path)
    
    def on_modified(self, event):
        """文件修改时触发"""
        if not event.is_directory:
            logger.info(f"文件被修改: {event.src_path}")
    
    def on_deleted(self, event):
        """文件删除时触发"""
        logger.info(f"文件被删除: {event.src_path}")

# 启动监控
def start_watcher(path):
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    logger.info(f"开始监控目录: {path}")
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

常见应用场景:

  • 自动处理上传的日志文件

  • 监控配置文件变化自动重载

  • 自动同步文件到备份服务器

8.2 失败重试机制

网络请求、API 调用经常会因为临时的网络波动失败,自动重试可以大大提高稳定性:

pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests

@retry(
    stop=stop_after_attempt(3),           # 最多重试3次
    wait=wait_exponential(multiplier=1, min=2, max=10),  # 指数退避:2s, 4s, 8s
    retry=retry_if_exception_type((requests.exceptions.ConnectionError, 
                                   requests.exceptions.Timeout)),
    reraise=True
)
def send_alert(message):
    """发送告警,失败自动重试"""
    response = requests.post(
        ALERT_WEBHOOK, 
        json={"text": message}, 
        timeout=10
    )
    response.raise_for_status()
    return response

第九章:脚本调试与性能分析

9.1 性能分析:cProfile

想知道你的脚本哪里慢?用 Python 内置的 cProfile:

# 运行脚本并生成性能报告
python -m cProfile -s cumulative my_script.py

输出示例:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      100    0.123    0.001    2.456    0.024 my_script.py:123(process_data)
      200    0.045    0.000    1.234    0.006 my_script.py:456(parse_file)
  • tottime:函数本身的耗时(不包含子函数)

  • cumtime:累计耗时(包含所有子函数)

9.2 行级性能分析:line_profiler

想知道具体哪一行代码慢?用 line_profiler:

pip install line_profiler

给函数加上装饰器:

@profile
def slow_function():
    # 你的代码
    pass

然后运行:

kernprof -l -v my_script.py

它会输出每一行代码的耗时,帮你精准定位瓶颈。

9.3 内存分析:memory_profiler

pip install memory_profiler
@profile
def my_function():
    a = [1] * 1000000
    b = [2] * 1000000
    del b
    return a

运行:

python -m memory_profiler my_script.py

第十章:安全最佳实践

10.1 敏感信息处理

  • 永远不要把密码、API Key 硬编码在代码里

  • 配置文件权限设为 600,只有所有者能读写

  • 使用环境变量注入敏感配置,而不是写在代码里

  • 生产环境使用 keyring 管理密码:

pip install keyring
import keyring

# 存储密码
keyring.set_password("myapp", "db_user", "my_password")

# 读取密码
password = keyring.get_password("myapp", "db_user")

10.2 文件权限

  • 脚本文件权限不要给其他用户写权限,避免被篡改:chmod 755 script\.py

  • 敏感配置文件:chmod 600 config\.env

  • 日志文件:chmod 640 app\.log,避免其他用户读取日志里的敏感信息

10.3 输入验证

永远不要信任外部输入,无论是用户输入、环境变量还是文件内容:

  • 对所有外部输入做验证

  • 调用外部命令时永远用参数列表,不要用 shell=True

  • 路径处理时检查是否有路径穿越攻击(比如 \.\./


第十一章:综合实战:系统监控告警脚本

现在我们把所有知识点整合起来,写一个完整的系统监控脚本:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import sys
import time
import signal
import json
import logging
import psutil
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import fcntl

# -------------------------- 配置 --------------------------
CHECK_INTERVAL = 60  # 检查间隔,秒
ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK")  # 告警Webhook
CPU_THRESHOLD = 80
MEM_THRESHOLD = 85
DISK_THRESHOLD = 90
LOAD_THRESHOLD = psutil.cpu_count() * 0.8  # 负载阈值:CPU核数*0.8

# -------------------------- 日志 --------------------------
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/monitor/monitor.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# -------------------------- 信号处理 --------------------------
class GracefulExit:
    def __init__(self):
        self.shutdown_flag = False
        signal.signal(signal.SIGINT, self.handler)
        signal.signal(signal.SIGTERM, self.handler)
        signal.signal(signal.SIGUSR1, self.reload_handler)
    
    def handler(self, signum, frame):
        logger.info(f"收到退出信号 {signum},准备退出")
        self.shutdown_flag = True
    
    def reload_handler(self, signum, frame):
        logger.info("收到重载信号,重新加载配置")
        # 这里可以重新加载配置
    
    def is_shutting_down(self):
        return self.shutdown_flag

# -------------------------- 重试告警 --------------------------
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((requests.exceptions.ConnectionError, 
                                   requests.exceptions.Timeout)),
    reraise=False
)
def send_alert(alerts):
    if not ALERT_WEBHOOK:
        logger.warning("未配置告警Webhook,跳过告警")
        return
    
    message = "\n".join(alerts)
    logger.info(f"发送告警: {message}")
    requests.post(ALERT_WEBHOOK, json={
        "text": f"服务器告警: {message}"
    }, timeout=10)

# -------------------------- 监控逻辑 --------------------------
def check_system():
    alerts = []
    
    # CPU检查
    cpu = psutil.cpu_percent(interval=1)
    if cpu > CPU_THRESHOLD:
        alerts.append(f"CPU使用率过高: {cpu}%")
    
    # 负载检查
    load1, load5, load15 = psutil.getloadavg()
    if load1 > LOAD_THRESHOLD:
        alerts.append(f"系统负载过高: {load1:.2f}")
    
    # 内存检查
    mem = psutil.virtual_memory()
    if mem.percent > MEM_THRESHOLD:
        alerts.append(f"内存使用率过高: {mem.percent}%")
    
    # Swap检查
    swap = psutil.swap_memory()
    if swap.percent > 50:
        alerts.append(f"Swap使用率过高: {swap.percent}%")
    
    # 磁盘检查
    disk = psutil.disk_usage('/')
    if disk.percent > DISK_THRESHOLD:
        alerts.append(f"磁盘使用率过高: {disk.percent}%")
    
    return alerts

# -------------------------- 主循环 --------------------------
def main():
    logger.info("监控服务启动")
    graceful = GracefulExit()
    
    while not graceful.is_shutting_down():
        try:
            alerts = check_system()
            if alerts:
                logger.warning(f"发现告警: {alerts}")
                send_alert(alerts)
            else:
                logger.info("系统状态正常")
        except Exception as e:
            logger.error(f"检查出错: {e}", exc_info=True)
        
        # 等待,可中断
        for _ in range(CHECK_INTERVAL):
            if graceful.is_shutting_down():
                break
            time.sleep(1)
    
    logger.info("监控服务退出")

# -------------------------- 文件锁 --------------------------
class FileLock:
    def __init__(self, lock_file):
        self.lock_file = lock_file
        self.fp = None
    
    def __enter__(self):
        self.fp = open(self.lock_file, 'w')
        try:
            fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except BlockingIOError:
            raise RuntimeError("监控已经在运行中")
        return self
    
    def __exit__(self, *args):
        if self.fp:
            fcntl.flock(self.fp, fcntl.LOCK_UN)
            self.fp.close()

if __name__ == "__main__":
    # 确保目录存在
    os.makedirs('/var/log/monitor', exist_ok=True)
    
    LOCK_FILE = "/tmp/monitor.lock"
    try:
        with FileLock(LOCK_FILE):
            main()
    except RuntimeError as e:
        logger.error(e)
        sys.exit(1)

部署这个脚本

  1. 安装依赖
pip install psutil requests tenacity
  1. 创建 Systemd 服务
[Unit]
Description=System Monitor Service
After=network.target

[Service]
ExecStart=/home/user/.venv/bin/python /home/user/scripts/monitor.py
WorkingDirectory=/home/user/scripts
Environment="ALERT_WEBHOOK=https://your-webhook-url"
Restart=always
RestartSec=5
User=user
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
  1. 配置 Logrotate
/var/log/monitor/*.log {
    daily
    rotate 30
    compress
    missingok
    notifempty
    copytruncate
    su user user
}

第十二章:生产环境避坑指南

12.1 常见陷阱

  1. 相对路径陷阱:永远用绝对路径,尤其是在 Crontab/Systemd 中

  2. 环境变量陷阱:不要假设环境变量和你手动登录的一样

  3. 权限陷阱:脚本运行用户必须有足够的权限访问文件

  4. 僵尸进程:子进程退出后要 wait,否则会变成僵尸进程

  5. 文件描述符泄漏:长期运行的服务要记得关闭文件句柄

  6. Too many open files:记得在 Systemd 里配置 LimitNOFILE

  7. 重复执行:定时任务记得加文件锁,防止重复跑

12.2 性能优化

  • IO 密集型任务:使用多线程或 asyncio

  • CPU 密集型任务:使用多进程(绕过 GIL)

  • 批量操作:尽量减少循环中的 IO 操作

(注:文档部分内容可能由 AI 生成)