#!/usr/bin/env python3 """ Spug 自动发布脚本 - Python 版本 用途:医院绩效考核系统自动化部署(Python 实现) 使用场景:在 Spug 的"执行任务"中通过 Python 调用 """ import os import sys import subprocess import shutil from datetime import datetime from pathlib import Path from typing import Optional, List class SpugDeploy: """Spug 部署类""" def __init__(self): # 配置参数(从环境变量获取,可在 Spug 中设置) # 处理 SPUG_DEPLOY_DIR 为空的情况 deploy_dir = os.getenv('SPUG_DEPLOY_DIR', '').strip() if not deploy_dir: deploy_dir = '/var/www/hospital-performance' self.project_name = os.getenv('SPUG_APP_NAME', 'hospital-performance') self.project_dir = Path(deploy_dir) self.backup_dir = self.project_dir / 'backups' self.frontend_dir = self.project_dir / 'frontend' self.backend_dir = self.project_dir / 'backend' # Git 配置(优先使用 Spug 的环境变量) git_repo = os.getenv('SPUG_GIT_URL', '').strip() if not git_repo: git_repo = os.getenv('SPUG_GIT_REPO', '').strip() if not git_repo: git_repo = 'https://gitea.gentronhealth.com/chenqi/hospital_performance.git' self.git_repo = git_repo self.git_branch = os.getenv('SPUG_GIT_BRANCH', 'main') # Python 配置 self.python_version = os.getenv('PYTHON_VERSION', 'python3.10') self.venv_dir = self.project_dir / 'venv' # Node.js 配置 self.node_version = os.getenv('NODE_VERSION', '18') # 服务配置 self.backend_service = os.getenv('BACKEND_SERVICE', 'hospital-backend') self.backend_port = int(os.getenv('BACKEND_PORT', '8000')) self.frontend_service = os.getenv('FRONTEND_SERVICE', 'nginx') # 日志配置 log_file = os.getenv('LOG_FILE', '/var/log/spug/deploy.log') self.log_file = Path(log_file) self.deploy_time = datetime.now().strftime('%Y%m%d_%H%M%S') # 确保日志目录存在 self.log_file.parent.mkdir(parents=True, exist_ok=True) def log(self, level: str, message: str): """记录日志""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_msg = f"[{timestamp}] [{level}] {message}" print(log_msg) try: with open(self.log_file, 'a', encoding='utf-8') as f: f.write(log_msg + '\n') except Exception as e: print(f"写入日志失败:{e}") def info(self, message: str): self.log("INFO", message) def error(self, message: str): self.log("ERROR", message) def success(self, message: str): self.log("SUCCESS", message) def run_command(self, cmd: List[str], cwd: Optional[Path] = None, check: bool = True, shell: bool = False) -> subprocess.CompletedProcess: """运行命令""" try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, shell=shell, check=check ) if result.stdout: self.info(result.stdout.strip()) if result.stderr: self.info(result.stderr.strip()) return result except subprocess.CalledProcessError as e: self.error(f"命令执行失败:{' '.join(cmd)}") self.error(f"错误信息:{e.stderr}") raise def check_command(self, cmd: str) -> bool: """检查命令是否存在""" return shutil.which(cmd) is not None def pre_check(self): """前置检查""" self.info("========== 开始前置检查 ==========") # 检查必要命令 required_commands = ['git', 'python3', 'node', 'npm'] for cmd in required_commands: if not self.check_command(cmd): self.error(f"命令 {cmd} 未安装,请先安装") sys.exit(1) # 创建部署目录(如果不存在) if not self.project_dir.exists(): self.info(f"创建部署目录:{self.project_dir}") self.project_dir.mkdir(parents=True, exist_ok=True) # 检查磁盘空间 try: stat = shutil.disk_usage(self.project_dir) available_gb = stat.free / (1024 ** 3) if available_gb < 1: self.error(f"磁盘空间不足 1GB,当前可用:{available_gb:.2f}GB") sys.exit(1) except Exception as e: self.error(f"检查磁盘空间失败:{e}") self.success("✓ 前置检查通过") def update_code(self): """更新代码""" self.info("========== 更新代码 ==========") os.chdir(self.project_dir) git_dir = self.project_dir / '.git' if not git_dir.exists(): self.info("首次部署,克隆仓库...") # 检查目录是否为空 if any(self.project_dir.iterdir()): self.info("目录非空,先备份现有文件...") backup_non_git = self.project_dir / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}_non_git" backup_non_git.mkdir(exist_ok=True) # 移动非 .git 相关文件 for item in self.project_dir.iterdir(): if item.name not in ['.git', 'backups', 'venv'] and not item.name.startswith('backup_'): try: shutil.move(str(item), str(backup_non_git / item.name)) except Exception as e: self.info(f"跳过文件 {item.name}: {e}") self.run_command(['git', 'clone', self.git_repo, '.']) self.run_command(['git', 'checkout', self.git_branch]) else: self.info("更新现有代码...") self.run_command(['git', 'fetch', 'origin', self.git_branch]) self.run_command(['git', 'reset', '--hard', f'origin/{self.git_branch}']) self.run_command(['git', 'clean', '-fd']) # 获取当前版本信息 result = self.run_command(['git', 'rev-parse', '--short', 'HEAD'], capture_output=True, text=True) commit_hash = result.stdout.strip() result = self.run_command(['git', 'log', '-1', '--pretty=format:%s'], capture_output=True, text=True) commit_msg = result.stdout.strip() self.success(f"✓ 代码更新完成,当前版本:{commit_hash} - {commit_msg}") def backup_old_version(self): """备份旧版本""" self.info("========== 备份当前版本 ==========") self.backup_dir.mkdir(parents=True, exist_ok=True) backup_name = f"backup_{self.deploy_time}" backup_path = self.backup_dir / backup_name # 备份后端 if self.backend_dir.exists(): backup_backend = backup_path.with_name(f"{backup_name}_backend") shutil.copytree(self.backend_dir, backup_backend, dirs_exist_ok=True) self.info(f"✓ 后端代码已备份到:{backup_backend}") # 备份前端构建 frontend_dist = self.frontend_dir / 'dist' if frontend_dist.exists(): backup_frontend = backup_path.with_name(f"{backup_name}_frontend_dist") shutil.copytree(frontend_dist, backup_frontend, dirs_exist_ok=True) self.info(f"✓ 前端构建已备份到:{backup_frontend}") # 清理 30 天前的备份 try: for item in self.backup_dir.iterdir(): if item.is_dir() and item.name.startswith('backup_'): mtime = datetime.fromtimestamp(item.stat().st_mtime) age = (datetime.now() - mtime).days if age > 30: shutil.rmtree(item) self.info(f"清理过期备份:{item.name}") except Exception as e: self.error(f"清理备份失败:{e}") self.success("✓ 备份完成(保留最近 30 天)") def deploy_backend(self): """部署后端""" self.info("========== 部署后端服务 ==========") os.chdir(self.backend_dir) # 创建虚拟环境 if not self.venv_dir.exists(): self.info("创建 Python 虚拟环境...") self.run_command([self.python_version, '-m', 'venv', str(self.venv_dir)]) # 激活虚拟环境(通过设置 PATH) venv_bin = self.venv_dir / 'bin' env = os.environ.copy() env['PATH'] = str(venv_bin) + os.pathsep + env['PATH'] env['VIRTUAL_ENV'] = str(self.venv_dir) # 升级 pip self.run_command(['pip', 'install', '--upgrade', 'pip'], env=env) # 安装依赖 self.info("安装 Python 依赖...") self.run_command(['pip', 'install', '-r', 'requirements.txt'], env=env) # 数据库迁移 alembic_ini = self.backend_dir / 'alembic.ini' if alembic_ini.exists(): self.info("执行数据库迁移...") self.run_command(['alembic', 'upgrade', 'head'], env=env) # 初始化数据 init_db_py = self.backend_dir / 'init_db.py' if init_db_py.exists(): self.info("初始化数据库...") try: self.run_command([str(self.venv_dir / 'bin/python'), 'init_db.py'], env=env, check=False) except Exception as e: self.info(f"初始化数据跳过或失败:{e}") # 重启后端服务 try: result = subprocess.run( ['systemctl', 'list-units', '--type=service', '--all'], capture_output=True, text=True ) if self.backend_service in result.stdout: self.info(f"重启后端服务...") self.run_command(['systemctl', 'restart', self.backend_service]) import time time.sleep(2) # 检查服务状态 result = subprocess.run( ['systemctl', 'is-active', self.backend_service], capture_output=True, text=True ) if result.stdout.strip() == 'active': self.success("✓ 后端服务重启成功") else: self.error("✗ 后端服务启动失败") sys.exit(1) else: self.info("未找到 systemd 服务,跳过重启") except Exception as e: self.error(f"服务管理操作失败:{e}") self.success("✓ 后端部署完成") def deploy_frontend(self): """部署前端""" self.info("========== 部署前端服务 ==========") os.chdir(self.frontend_dir) # 安装依赖 self.info("安装 Node.js 依赖...") self.run_command(['npm', 'install', '--production']) # 构建前端 self.info("构建前端项目...") self.run_command(['npm', 'run', 'build']) # 检查构建结果 dist_dir = self.frontend_dir / 'dist' if dist_dir.exists() and any(dist_dir.iterdir()): self.success("✓ 前端构建成功") else: self.error("✗ 前端构建失败,dist 目录为空") sys.exit(1) # 重新加载 Nginx try: result = subprocess.run( ['systemctl', 'list-units', '--type=service', '--all'], capture_output=True, text=True ) if self.frontend_service in result.stdout: self.info("重新加载 Nginx...") self.run_command(['systemctl', 'reload', self.frontend_service]) except Exception as e: self.error(f"Nginx 操作失败:{e}") self.success("✓ 前端部署完成") def health_check(self): """健康检查""" self.info("========== 执行健康检查 ==========") import time time.sleep(5) # 检查后端 API import requests backend_url = f"http://localhost:{self.backend_port}/api/v1/health" try: response = requests.get(backend_url, timeout=10) if response.status_code == 200: self.success("✓ 后端 API 健康检查通过") else: self.error(f"✗ 后端 API 返回异常状态码:{response.status_code}") sys.exit(1) except requests.exceptions.RequestException as e: self.error(f"✗ 后端 API 连接失败:{e}") sys.exit(1) # 检查前端文件 index_html = self.frontend_dir / 'dist' / 'index.html' if index_html.exists(): self.success("✓ 前端文件存在") else: self.error("✗ 前端文件缺失") sys.exit(1) self.success("✓ 所有健康检查通过") def cleanup(self): """清理临时文件""" self.info("========== 清理临时文件 ==========") # 清理 npm 缓存 try: self.run_command(['npm', 'cache', 'clean', '--force'], check=False) except: pass # 清理 Python 缓存 import glob for pycache in glob.glob(str(self.project_dir / '**/__pycache__'), recursive=True): try: shutil.rmtree(pycache) except: pass for pyc in glob.glob(str(self.project_dir / '**/*.pyc'), recursive=True): try: os.remove(pyc) except: pass self.success("✓ 清理完成") def run(self): """主流程""" self.info("========================================") self.info("Spug 自动发布开始") self.info(f"项目名称:{self.project_name}") self.info(f"部署目录:{self.project_dir}") self.info(f"Git 分支:{self.git_branch}") self.info("========================================") try: self.pre_check() self.update_code() self.backup_old_version() self.deploy_backend() self.deploy_frontend() self.health_check() self.cleanup() self.info("========================================") self.success("🎉 部署成功完成!") self.info("========================================") except Exception as e: self.error(f"部署失败:{e}") import traceback self.error(traceback.format_exc()) sys.exit(1) if __name__ == '__main__': deployer = SpugDeploy() deployer.run()