Linux Python脚本开发完整指南
针对 Ubuntu 环境的全面 Python 自动化脚本开发指南,涵盖从环境搭建到生产级部署的全流程实践。
前言
Python 作为 "胶水语言",在 Linux 环境下拥有无与伦比的生态优势。本指南将带你系统掌握如何在 Ubuntu 系统中编写高效、稳定、可维护的 Python 自动化脚本,覆盖系统监控、进程管理、远程运维等核心场景。
读者定位:具备基础编程知识,熟悉 Linux 常用命令,希望深入掌握 Linux 下 Python 自动化开发的开发者。


第一章:Ubuntu 下 Python 开发环境搭建
1.1 系统 Python 环境管理
Ubuntu 系统通常预装了 Python,但我们需要正确管理环境以避免破坏系统工具。

检查系统Python版本

python3 --version

安装基础开发工具

sudo apt update
sudo apt install -y python3-full python3-pip python3-dev
⚠️ 警告:永远不要卸载系统自带的python3包,否则会导致apt等系统工具崩溃。
1.2 多版本管理与虚拟环境
使用 pyenv 管理多 Python 版本
对于需要切换不同 Python 版本的场景,pyenv是最佳选择:

安装pyenv

sudo apt install pyenv
配置 Shell 环境,你可以用 Vim 编辑 .bashrc:
vim ~/.bashrc
然后在文件末尾添加:
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
或者直接运行以下命令一次性写入:
cat >> ~/.bashrc << 'EOF'
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
EOF

重启Shell后生效

exec "$SHELL"

安装指定版本Python

pyenv install 3.12.0
pyenv local 3.12.0 # 为当前目录设置版本
项目级虚拟环境(venv)
这是最推荐的项目隔离方案,无需额外工具:

创建虚拟环境

python3 -m venv .venv

激活虚拟环境

source .venv/bin/activate

退出虚拟环境

deactivate
虚拟环境目录结构如下,它会创建一个独立的 Python 运行环境:
.venv/
├── bin/ # 激活脚本和Python解释器
├── lib/ # 独立的依赖包
├── include/ # 头文件
└── pyvenv.cfg # 配置文件
1.3 开发工具链配置
为了提升代码质量,建议安装以下工具:

代码格式化与检查

pip install black flake8 pytest

或者通过apt安装

sudo apt install black flake8 python3-pytest


第二章:Linux Python 脚本基础规范
2.1 Shebang 与可执行脚本
一个标准的 Linux Python 脚本开头必须包含 Shebang 行:
#!/usr/bin/env python3

-- coding: utf-8 --

为什么用/usr/bin/env python3而不是/usr/bin/python3?

  • 前者会自动在 PATH 中查找 Python 解释器,兼容虚拟环境
  • 后者写死了路径,在不同机器或虚拟环境中可能失效
    添加执行权限后,你可以直接运行脚本:
    chmod +x my_script.py
    ./my_script.py # 而不需要 python3 my_script.py
    2.2 路径处理最佳实践
    Linux 下的路径处理必须小心,避免硬编码:
    import os
    import sys
    from pathlib import Path

获取脚本所在目录(推荐使用Path)

SCRIPT_DIR = Path(file).parent

项目根目录

ROOT_DIR = SCRIPT_DIR.parent

构建绝对路径,避免相对路径陷阱

LOG_FILE = ROOT_DIR / "logs" / "app.log"

确保目录存在

LOG_FILE.parent.mkdir(exist_ok=True)
2.3 环境变量与配置管理
敏感配置永远不要硬编码在脚本中,使用环境变量:
import os
from dotenv import load_dotenv

加载.env文件(开发环境)

load_dotenv()

读取环境变量

DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PASSWORD = os.getenv("DB_PASSWORD") # 生产环境通过系统环境变量注入
2.4 防止重复执行:文件锁
Crontab 定时任务最常见的坑:上一次任务还没跑完,下一次就启动了,导致重复执行。使用 Linux 文件锁可以完美解决这个问题:
import fcntl
import os

class FileLock:
"""Linux文件锁,保证同一时间只有一个脚本实例运行"""
def init(self, lock_file):
self.lock_file = lock_file
self.fp = None

def __enter__(self):
    self.fp = open(self.lock_file, 'w')
    try:
        # 非阻塞加锁,如果已经被锁了直接抛异常
        fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except BlockingIOError:
        raise RuntimeError("脚本已经在运行中了,跳过本次执行")
    return self

def __exit__(self, exc_type, exc_val, exc_tb):
    if self.fp:
        fcntl.flock(self.fp, fcntl.LOCK_UN)
        self.fp.close()
        try:
            os.unlink(self.lock_file)
        except:
            pass

使用示例:在脚本开头加上

LOCK_FILE = "/tmp/my_script.lock"
try:
with FileLock(LOCK_FILE):
# 这里放你的业务逻辑
main()
except RuntimeError as e:
print(e)
sys.exit(0)


第三章:系统监控与进程管理
3.1 psutil:跨平台系统监控利器
psutil是 Linux 系统监控的瑞士军刀,它封装了底层的系统调用,让你用 Python 轻松获取系统信息。
pip install psutil
系统资源监控示例
import psutil
import time

def get_system_status():
"""获取系统状态摘要"""
# CPU信息
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
load_avg = psutil.getloadavg() # 1/5/15分钟负载

# 内存信息
mem = psutil.virtual_memory()
mem_used = mem.used / (1024**3)  # GB
mem_total = mem.total / (1024**3)
mem_percent = mem.percent

# Swap信息
swap = psutil.swap_memory()

# 磁盘信息
disk = psutil.disk_usage('/')
disk_percent = disk.percent

# 磁盘IO
disk_io = psutil.disk_io_counters()

# 网络信息
net = psutil.net_io_counters()
net_sent = net.bytes_sent / (1024**2)  # MB
net_recv = net.bytes_recv / (1024**2)

# TCP连接数
connections = len(psutil.net_connections())

return {
    "cpu": f"{cpu_percent}% ({cpu_count}核)",
    "load": f"{load_avg[0]:.2f}, {load_avg[1]:.2f}, {load_avg[2]:.2f}",
    "memory": f"{mem_used:.1f}G/{mem_total:.1f}G ({mem_percent}%)",
    "swap": f"{swap.percent}%",
    "disk": f"{disk_percent}%",
    "network": f"↑{net_sent:.1f}M ↓{net_recv:.1f}M",
    "connections": connections
}

实时监控

while True:
status = get_system_status()
print(f"\rCPU: {status['cpu']} | 负载: {status['load']} | MEM: {status['memory']} | DISK: {status['disk']}", end="")
time.sleep(2)
进程管理实战
def find_process_by_name(name):
"""根据名称查找进程"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
if name.lower() in proc.info['name'].lower():
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return processes

def kill_process_gracefully(pid, timeout=5):
"""优雅终止进程"""
try:
proc = psutil.Process(pid)
# 先发送SIGTERM,给进程清理时间
proc.terminate()
# 等待进程退出
proc.wait(timeout=timeout)
print(f"进程 {pid} 已正常终止")
except psutil.NoSuchProcess:
print(f"进程 {pid} 不存在")
except psutil.TimeoutExpired:
# 如果超时,强制杀死
proc.kill()
print(f"进程 {pid} 无响应,已强制杀死")
3.2 信号处理与优雅退出
Linux 通过信号与进程通信,你的脚本需要正确处理这些信号:
import signal
import sys

class GracefulExit:
def init(self):
self.shutdown_flag = False
# 注册信号处理函数
signal.signal(signal.SIGINT, self._handler) # Ctrl+C
signal.signal(signal.SIGTERM, self._handler) # systemd stop

def _handler(self, signum, frame):
    print(f"收到信号 {signum},开始优雅退出...")
    self.shutdown_flag = True

def is_shutting_down(self):
    return self.shutdown_flag

使用示例

graceful = GracefulExit()

while not graceful.is_shutting_down():
# 主循环
do_work()
time.sleep(1)

退出前的清理工作

cleanup_resources()
print("退出完成")
sys.exit(0)
热重载配置:自定义信号
生产环境中,我们经常需要不重启服务就重新加载配置。可以使用 Linux 自定义信号 SIGUSR1/SIGUSR2 来实现:
config = None

def load_config():
"""加载配置文件"""
global config
with open('/etc/myapp/config.json') as f:
config = json.load(f)
logger.info("配置已重新加载")

def handle_reload(signum, frame):
"""处理重载信号"""
logger.info("收到SIGUSR1信号,开始重新加载配置...")
load_config()

注册信号处理

signal.signal(signal.SIGUSR1, handle_reload)

初始加载

load_config()
使用时,只需向进程发送信号即可:

不用重启服务,直接重载配置

kill -USR1
3.3 权限管理
Linux 的权限模型是安全的基础,脚本中需要注意:
import os
import pwd

def drop_privileges(uid_name='nobody', gid_name='nogroup'):
"""降权:从root切换到普通用户"""
if os.getuid() != 0:
return # 已经不是root,无需处理

# 获取用户ID
uid = pwd.getpwnam(uid_name).pw_uid
gid = pwd.getgrnam(gid_name).gr_gid

# 先降组,再降用户
os.setgid(gid)
os.setuid(uid)

print(f"已降权到用户 {uid_name} (UID={uid})")

第四章:调用 Shell 命令与管道
4.1 subprocess 最佳实践
subprocess是 Python 调用外部命令的标准库,永远不要再用os.system()了!
基础用法
import subprocess
import shlex

推荐:参数列表形式,安全无注入

result = subprocess.run(
['ls', '-l', '/home'],
capture_output=True, # 捕获输出
text=True, # 自动解码为字符串
check=True, # 失败时抛出异常
timeout=30 # 超时时间,防止卡住
)

print(result.stdout)
print(result.returncode)
⚠️ 超时参数很重要:很多外部命令可能因为网络或其他原因卡住,没有超时的话你的脚本会永远卡死。
安全警告:shell=True 的风险
除非绝对必要,否则不要使用shell=True,它会带来命令注入风险:

❌ 危险!如果user_input包含 "; rm -rf /" 会被执行

user_input = "; rm -rf /"
subprocess.run(f"echo {user_input}", shell=True)

✅ 安全!参数会被正确转义

subprocess.run(['echo', user_input], shell=False)
4.2 管道与流式处理
实现类似 Shell 的管道功能:

实现 ps aux | grep python

p1 = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE, text=True)
p2 = subprocess.Popen(
['grep', 'python'],
stdin=p1.stdout,
stdout=subprocess.PIPE,
text=True
)
p1.stdout.close() # 关闭p1的stdout,让p1接收SIGPIPE

output, _ = p2.communicate()
print(output)
实时输出处理
对于长时间运行的命令,实时输出很重要:
def run_command_with_live_output(cmd):
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)

for line in iter(process.stdout.readline, ''):
    print(f"[OUT] {line.strip()}")
    # 可以在这里实时处理日志

process.wait()
return process.returncode

第五章:后台运行与自动化调度
5.1 临时后台运行:nohup & &
对于临时的后台任务:

后台运行,忽略挂起信号,输出重定向

nohup python3 my_script.py > app.log 2>&1 &

查看后台任务

jobs

带回前台

fg %1
⚠️ 注意:nohup只是临时方案,它无法保证脚本崩溃后自动重启,也无法保证开机自启。
5.2 定时任务:Crontab
Crontab 适合定时执行的脚本,比如每天备份、每小时监控。

编辑定时任务

crontab -e
Crontab 配置示例

分 时 日 月 周 命令

每天凌晨2点执行备份

0 2 * * * /home/user/scripts/backup.py >> /var/log/backup.log 2>&1

每10分钟执行一次监控

*/10 * * * * /home/user/.venv/bin/python /home/user/scripts/monitor.py

开机启动执行

@reboot /home/user/scripts/startup.sh

执行结果自动发邮件

MAILTO="your@email.com"
0 2 * * * /home/user/scripts/backup.py
Crontab 避坑指南

  1. 绝对路径:Crontab 的 PATH 非常短,所有命令必须用绝对路径

❌ 错误:可能找不到python

          • python /path/to/script.py

✅ 正确:使用虚拟环境的绝对路径

          • /home/user/.venv/bin/python /path/to/script.py
  1. 工作目录:Crontab 的工作目录是用户家目录,脚本中尽量用绝对路径
  2. 环境变量:Crontab 不会加载.bashrc,需要的环境变量要在 crontab 里定义
    SHELL=/bin/bash
    PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
          • /path/to/script.py
            5.3 常驻服务:Systemd
            对于需要长期运行的后台服务,Systemd 是 Ubuntu 的标准方案。它能提供:
  • 开机自启
  • 崩溃自动重启
  • 状态监控
  • 统一的日志管理
  1. 创建服务文件
    sudo nano /etc/systemd/system/my-monitor.service
    服务文件内容:
    [Unit]

服务描述

Description=My System Monitor Service

网络就绪后再启动

After=network.target

[Service]

执行命令(绝对路径!)

ExecStart=/home/user/.venv/bin/python /home/user/scripts/monitor.py

工作目录

WorkingDirectory=/home/user/scripts

崩溃自动重启

Restart=always
RestartSec=5

运行用户

User=user
Group=user

输出重定向到journal

StandardOutput=journal
StandardError=journal

解决"too many open files"问题

LimitNOFILE=65536

从文件加载环境变量(敏感配置放这里)

EnvironmentFile=/etc/myapp/env.conf

[Install]

多用户模式下启用

WantedBy=multi-user.target
💡 EnvironmentFile 是生产环境最佳实践:把敏感的环境变量(如密码、API Key)放在单独的文件里,权限设为 600,只有运行用户能读,而不是写在公开的服务文件里。
2. 启用服务

重新加载配置

sudo systemctl daemon-reload

启动服务

sudo systemctl start my-monitor

设置开机自启

sudo systemctl enable my-monitor
3. 管理与查看

查看服务状态

sudo systemctl status my-monitor
[图片]

查看实时日志

sudo journalctl -u my-monitor -f

重启服务

sudo systemctl restart my-monitor

停止服务

sudo systemctl stop my-monitor


第六章:日志管理与问题排查
6.1 Python logging 配置
标准的日志配置:
import logging
import logging.handlers

def setup_logging():
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

# 格式
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 文件处理器
file_handler = logging.handlers.RotatingFileHandler(
    '/var/log/myapp/app.log',
    maxBytes=100*1024*1024,  # 100MB
    backupCount=5,
    encoding='utf-8'
)
file_handler.setFormatter(formatter)

# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)

logger.addHandler(file_handler)
logger.addHandler(console_handler)

return logger

logger = setup_logging()
结构化日志(JSON)
生产环境中,JSON 格式的日志更容易被 ELK、Grafana Loki 等日志系统解析:
pip install python-json-logger
from pythonjsonlogger import jsonlogger

def setup_json_logging():
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)

# JSON格式
formatter = jsonlogger.JsonFormatter(
    '%(asctime)s %(levelname)s %(name)s %(message)s'
)

handler = logging.FileHandler('/var/log/myapp/app.json.log')
handler.setFormatter(formatter)
logger.addHandler(handler)

return logger

Systemd Journald 日志
如果你用 Systemd 部署服务,直接把日志输出到 Journald 是最佳实践,不需要自己管理日志文件:
pip install systemd-python
from systemd import journal

直接发送日志到journald

journal.send("Hello from Python", PRIORITY=6)

或者用logging集成

from systemd.journal import JournalHandler

logger = logging.getLogger(name)
logger.addHandler(JournalHandler())
logger.info("This will go to journald")
6.2 Logrotate 日志轮转
即使你用了 RotatingFileHandler,Linux 的 logrotate 依然是生产环境的首选。
创建配置文件 /etc/logrotate.d/myapp:
/var/log/myapp/*.log {
daily # 每天轮转
rotate 30 # 保留30天
compress # 自动压缩
delaycompress # 延迟压缩
missingok # 文件不存在不报错
notifempty # 空文件不轮转
dateext # 用日期做后缀
dateformat -%Y%m%d

# 对于Python脚本,使用copytruncate模式
copytruncate        # 复制并清空,无需重启应用
su user user        # 切换用户处理权限

}
调试 Logrotate

测试配置(不实际执行)

logrotate -d /etc/logrotate.d/myapp

强制执行一次

logrotate -f /etc/logrotate.d/myapp
6.3 常见问题排查
问题 1:删除了日志文件,磁盘空间没释放?
这是因为进程还持有已删除文件的句柄。解决:

重启进程,或者发送USR1信号让它重开日志

kill -USR1
问题 2:脚本在手动运行正常,Crontab/Systemd 里运行失败?
99% 是路径或环境变量问题:

  1. 检查所有路径是否都是绝对路径
  2. 检查 Python 解释器路径
  3. 检查日志输出看具体错误

第七章:远程服务器批量管理
7.1 Paramiko:SSH 远程执行
Paramiko 是 Python 的 SSH 协议实现,让你可以用代码远程管理服务器。
pip install paramiko
基础 SSH 连接
import paramiko

def ssh_exec(host, username, password, command):
# 创建客户端
client = paramiko.SSHClient()
# 自动接受主机密钥(测试用,生产建议关闭)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

try:
    # 连接
    client.connect(
        hostname=host,
        username=username,
        password=password,
        timeout=10
    )
    
    # 执行命令
    stdin, stdout, stderr = client.exec_command(command, timeout=30)
    
    # 获取结果
    out = stdout.read().decode('utf-8')
    err = stderr.read().decode('utf-8')
    code = stdout.channel.recv_exit_status()
    
    return out, err, code
finally:
    client.close()

使用

out, err, code = ssh_exec('192.168.1.100', 'ubuntu', 'password', 'ls -la')
print(out)
密钥认证(推荐)
生产环境永远用密钥而不是密码:
client.connect(
hostname=host,
username=username,
key_filename='/home/user/.ssh/id_rsa', # 私钥路径
timeout=10
)
7.2 SFTP 文件传输
def sftp_upload(host, username, key_file, local_path, remote_path):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, username=username, key_filename=key_file)

sftp = client.open_sftp()
try:
    # 上传文件
    sftp.put(local_path, remote_path)
    print(f"上传完成: {local_path} -> {remote_path}")
finally:
    sftp.close()
    client.close()

7.3 封装 SSHExecutor
封装一个可复用的执行器:
class SSHExecutor:
def init(self, host, username, password=None, key_file=None):
self.host = host
self.username = username
self.password = password
self.key_file = key_file
self._client = None

def connect(self):
    if self._client and self._client.get_transport().is_active():
        return
    self._client = paramiko.SSHClient()
    self._client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    self._client.connect(
        self.host, username=self.username,
        password=self.password, key_filename=self.key_file
    )

def run(self, command):
    self.connect()
    stdin, stdout, stderr = self._client.exec_command(command)
    out = stdout.read().decode()
    err = stderr.read().decode()
    code = stdout.channel.recv_exit_status()
    return out, err, code

def close(self):
    if self._client:
        self._client.close()

def __enter__(self):
    self.connect()
    return self

def __exit__(self, *args):
    self.close()

使用

with SSHExecutor('192.168.1.100', 'ubuntu', key_file='~/.ssh/id_rsa') as ssh:
out, err, code = ssh.run('apt update')
print(out)
7.4 并发批量执行
管理几十台服务器时,串行执行太慢了。用多线程并发执行:
from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_run(hosts, command, username, key_file):
"""
批量在多台服务器上执行命令
"""
results = {}

def run_single(host):
    try:
        with SSHExecutor(host, username, key_file=key_file) as ssh:
            out, err, code = ssh.run(command)
            return host, out, err, code, None
    except Exception as e:
        return host, None, None, None, str(e)

# 最多10个并发
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(run_single, host) for host in hosts]
    
    for future in as_completed(futures):
        host, out, err, code, error = future.result()
        results[host] = {
            "success": error is None,
            "output": out,
            "error": error or err,
            "code": code
        }

return results

使用

hosts = ['192.168.1.100', '192.168.1.101', '192.168.1.102']
results = batch_run(hosts, 'uptime', 'ubuntu', '~/.ssh/id_rsa')

for host, res in results.items():
if res['success']:
print(f"{host}: {res['output'].strip()}")
else:
print(f"{host}: 失败 - {res['error']}")


第八章:高级自动化场景
8.1 文件系统监控:Inotify
Linux 的 inotify 可以监控文件系统的变化,比如新文件上传、文件修改等,非常适合自动化处理场景。
pip install watchdog
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
def on_created(self, event):
"""新文件创建时触发"""
if not event.is_directory:
logger.info(f"检测到新文件: {event.src_path}")
# 自动处理新文件,比如解压、解析、上传
process_new_file(event.src_path)

def on_modified(self, event):
    """文件修改时触发"""
    if not event.is_directory:
        logger.info(f"文件被修改: {event.src_path}")

def on_deleted(self, event):
    """文件删除时触发"""
    logger.info(f"文件被删除: {event.src_path}")

启动监控

def start_watcher(path):
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
logger.info(f"开始监控目录: {path}")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

常见应用场景:

  • 自动处理上传的日志文件
  • 监控配置文件变化自动重载
  • 自动同步文件到备份服务器
    8.2 失败重试机制
    网络请求、API 调用经常会因为临时的网络波动失败,自动重试可以大大提高稳定性:
    pip install tenacity
    from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
    import requests

@retry(
stop=stop_after_attempt(3), # 最多重试3次
wait=wait_exponential(multiplier=1, min=2, max=10), # 指数退避:2s, 4s, 8s
retry=retry_if_exception_type((requests.exceptions.ConnectionError,
requests.exceptions.Timeout)),
reraise=True
)
def send_alert(message):
"""发送告警,失败自动重试"""
response = requests.post(
ALERT_WEBHOOK,
json={"text": message},
timeout=10
)
response.raise_for_status()
return response


第九章:脚本调试与性能分析
9.1 性能分析:cProfile
想知道你的脚本哪里慢?用 Python 内置的 cProfile:

运行脚本并生成性能报告

python -m cProfile -s cumulative my_script.py
输出示例:
ncalls tottime percall cumtime percall filename:lineno(function)
100 0.123 0.001 2.456 0.024 my_script.py:123(process_data)
200 0.045 0.000 1.234 0.006 my_script.py:456(parse_file)

  • tottime:函数本身的耗时(不包含子函数)
  • cumtime:累计耗时(包含所有子函数)
    9.2 行级性能分析:line_profiler
    想知道具体哪一行代码慢?用 line_profiler:
    pip install line_profiler
    给函数加上装饰器:
    @profile
    def slow_function():

    你的代码

    pass
    然后运行:
    kernprof -l -v my_script.py
    它会输出每一行代码的耗时,帮你精准定位瓶颈。
    9.3 内存分析:memory_profiler
    pip install memory_profiler
    @profile
    def my_function():
    a = [1] * 1000000
    b = [2] * 1000000
    del b
    return a
    运行:
    python -m memory_profiler my_script.py

第十章:安全最佳实践
10.1 敏感信息处理

  • 永远不要把密码、API Key 硬编码在代码里
  • 配置文件权限设为 600,只有所有者能读写
  • 使用环境变量注入敏感配置,而不是写在代码里
  • 生产环境使用 keyring 管理密码:
    pip install keyring
    import keyring

存储密码

keyring.set_password("myapp", "db_user", "my_password")

读取密码

password = keyring.get_password("myapp", "db_user")
10.2 文件权限

  • 脚本文件权限不要给其他用户写权限,避免被篡改:chmod 755 script.py
  • 敏感配置文件:chmod 600 config.env
  • 日志文件:chmod 640 app.log,避免其他用户读取日志里的敏感信息
    10.3 输入验证
    永远不要信任外部输入,无论是用户输入、环境变量还是文件内容:
  • 对所有外部输入做验证
  • 调用外部命令时永远用参数列表,不要用 shell=True
  • 路径处理时检查是否有路径穿越攻击(比如 ../)

第十一章:综合实战:系统监控告警脚本
现在我们把所有知识点整合起来,写一个完整的系统监控脚本:
#!/usr/bin/env python3

-- coding: utf-8 --

import os
import sys
import time
import signal
import json
import logging
import psutil
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import fcntl

-------------------------- 配置 --------------------------

CHECK_INTERVAL = 60 # 检查间隔,秒
ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK") # 告警Webhook
CPU_THRESHOLD = 80
MEM_THRESHOLD = 85
DISK_THRESHOLD = 90
LOAD_THRESHOLD = psutil.cpu_count() * 0.8 # 负载阈值:CPU核数*0.8

-------------------------- 日志 --------------------------

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/monitor/monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(name)

-------------------------- 信号处理 --------------------------

class GracefulExit:
def init(self):
self.shutdown_flag = False
signal.signal(signal.SIGINT, self.handler)
signal.signal(signal.SIGTERM, self.handler)
signal.signal(signal.SIGUSR1, self.reload_handler)

def handler(self, signum, frame):
    logger.info(f"收到退出信号 {signum},准备退出")
    self.shutdown_flag = True

def reload_handler(self, signum, frame):
    logger.info("收到重载信号,重新加载配置")
    # 这里可以重新加载配置

def is_shutting_down(self):
    return self.shutdown_flag

-------------------------- 重试告警 --------------------------

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.exceptions.ConnectionError,
requests.exceptions.Timeout)),
reraise=False
)
def send_alert(alerts):
if not ALERT_WEBHOOK:
logger.warning("未配置告警Webhook,跳过告警")
return

message = "\n".join(alerts)
logger.info(f"发送告警: {message}")
requests.post(ALERT_WEBHOOK, json={
    "text": f"服务器告警: {message}"
}, timeout=10)

-------------------------- 监控逻辑 --------------------------

def check_system():
alerts = []

# CPU检查
cpu = psutil.cpu_percent(interval=1)
if cpu > CPU_THRESHOLD:
    alerts.append(f"CPU使用率过高: {cpu}%")

# 负载检查
load1, load5, load15 = psutil.getloadavg()
if load1 > LOAD_THRESHOLD:
    alerts.append(f"系统负载过高: {load1:.2f}")

# 内存检查
mem = psutil.virtual_memory()
if mem.percent > MEM_THRESHOLD:
    alerts.append(f"内存使用率过高: {mem.percent}%")

# Swap检查
swap = psutil.swap_memory()
if swap.percent > 50:
    alerts.append(f"Swap使用率过高: {swap.percent}%")

# 磁盘检查
disk = psutil.disk_usage('/')
if disk.percent > DISK_THRESHOLD:
    alerts.append(f"磁盘使用率过高: {disk.percent}%")

return alerts

-------------------------- 主循环 --------------------------

def main():
logger.info("监控服务启动")
graceful = GracefulExit()

while not graceful.is_shutting_down():
    try:
        alerts = check_system()
        if alerts:
            logger.warning(f"发现告警: {alerts}")
            send_alert(alerts)
        else:
            logger.info("系统状态正常")
    except Exception as e:
        logger.error(f"检查出错: {e}", exc_info=True)
    
    # 等待,可中断
    for _ in range(CHECK_INTERVAL):
        if graceful.is_shutting_down():
            break
        time.sleep(1)

logger.info("监控服务退出")

-------------------------- 文件锁 --------------------------

class FileLock:
def init(self, lock_file):
self.lock_file = lock_file
self.fp = None

def __enter__(self):
    self.fp = open(self.lock_file, 'w')
    try:
        fcntl.flock(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except BlockingIOError:
        raise RuntimeError("监控已经在运行中")
    return self

def __exit__(self, *args):
    if self.fp:
        fcntl.flock(self.fp, fcntl.LOCK_UN)
        self.fp.close()

if name == "main":
# 确保目录存在
os.makedirs('/var/log/monitor', exist_ok=True)

LOCK_FILE = "/tmp/monitor.lock"
try:
    with FileLock(LOCK_FILE):
        main()
except RuntimeError as e:
    logger.error(e)
    sys.exit(1)

部署这个脚本

  1. 安装依赖
    pip install psutil requests tenacity
  2. 创建 Systemd 服务
    [Unit]
    Description=System Monitor Service
    After=network.target

[Service]
ExecStart=/home/user/.venv/bin/python /home/user/scripts/monitor.py
WorkingDirectory=/home/user/scripts
Environment="ALERT_WEBHOOK=https://your-webhook-url"
Restart=always
RestartSec=5
User=user
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
3. 配置 Logrotate
/var/log/monitor/*.log {
daily
rotate 30
compress
missingok
notifempty
copytruncate
su user user
}


第十二章:生产环境避坑指南
12.1 常见陷阱

  1. 相对路径陷阱:永远用绝对路径,尤其是在 Crontab/Systemd 中
  2. 环境变量陷阱:不要假设环境变量和你手动登录的一样
  3. 权限陷阱:脚本运行用户必须有足够的权限访问文件
  4. 僵尸进程:子进程退出后要 wait,否则会变成僵尸进程
  5. 文件描述符泄漏:长期运行的服务要记得关闭文件句柄
  6. Too many open files:记得在 Systemd 里配置 LimitNOFILE
  7. 重复执行:定时任务记得加文件锁,防止重复跑
    12.2 性能优化
  • IO 密集型任务:使用多线程或 asyncio
  • CPU 密集型任务:使用多进程(绕过 GIL)
  • 批量操作:尽量减少循环中的 IO 操作