feat: add Spug auto-deployment scripts for fullstack project

This commit is contained in:
2026-02-28 15:26:09 +08:00
parent f37cdcb5c0
commit 13badac2dc
5 changed files with 1260 additions and 0 deletions

389
spug/deploy.py Normal file
View File

@@ -0,0 +1,389 @@
#!/usr/bin/env python3
"""
Spug 自动发布脚本 - Python 版本
用途医院绩效考核系统自动化部署Python 实现)
使用场景:在 Spug 的"执行任务"中通过 Python 调用
"""
import os
import sys
import subprocess
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional, List
class SpugDeploy:
"""Spug 部署类"""
def __init__(self):
# 配置参数(从环境变量获取,可在 Spug 中设置)
self.project_name = os.getenv('SPUG_APP_NAME', 'hospital-performance')
self.project_dir = Path(os.getenv('SPUG_DEPLOY_DIR', '/var/www/hospital-performance'))
self.backup_dir = self.project_dir / 'backups'
self.frontend_dir = self.project_dir / 'frontend'
self.backend_dir = self.project_dir / 'backend'
# Git 配置
self.git_repo = os.getenv('SPUG_GIT_REPO',
'https://gitea.gentronhealth.com/chenqi/hospital_performance.git')
self.git_branch = os.getenv('SPUG_GIT_BRANCH', 'main')
# Python 配置
self.python_version = os.getenv('PYTHON_VERSION', 'python3.10')
self.venv_dir = self.project_dir / 'venv'
# Node.js 配置
self.node_version = os.getenv('NODE_VERSION', '18')
# 服务配置
self.backend_service = os.getenv('BACKEND_SERVICE', 'hospital-backend')
self.backend_port = int(os.getenv('BACKEND_PORT', '8000'))
self.frontend_service = os.getenv('FRONTEND_SERVICE', 'nginx')
# 日志配置
log_file = os.getenv('LOG_FILE', '/var/log/spug/deploy.log')
self.log_file = Path(log_file)
self.deploy_time = datetime.now().strftime('%Y%m%d_%H%M%S')
# 确保日志目录存在
self.log_file.parent.mkdir(parents=True, exist_ok=True)
def log(self, level: str, message: str):
"""记录日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] [{level}] {message}"
print(log_msg)
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_msg + '\n')
except Exception as e:
print(f"写入日志失败:{e}")
def info(self, message: str):
self.log("INFO", message)
def error(self, message: str):
self.log("ERROR", message)
def success(self, message: str):
self.log("SUCCESS", message)
def run_command(self, cmd: List[str], cwd: Optional[Path] = None,
check: bool = True, shell: bool = False) -> subprocess.CompletedProcess:
"""运行命令"""
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
shell=shell,
check=check
)
if result.stdout:
self.info(result.stdout.strip())
if result.stderr:
self.info(result.stderr.strip())
return result
except subprocess.CalledProcessError as e:
self.error(f"命令执行失败:{' '.join(cmd)}")
self.error(f"错误信息:{e.stderr}")
raise
def check_command(self, cmd: str) -> bool:
"""检查命令是否存在"""
return shutil.which(cmd) is not None
def pre_check(self):
"""前置检查"""
self.info("========== 开始前置检查 ==========")
# 检查必要命令
required_commands = ['git', 'python3', 'node', 'npm']
for cmd in required_commands:
if not self.check_command(cmd):
self.error(f"命令 {cmd} 未安装,请先安装")
sys.exit(1)
# 检查磁盘空间
try:
stat = shutil.disk_usage(self.project_dir)
available_gb = stat.free / (1024 ** 3)
if available_gb < 1:
self.error(f"磁盘空间不足 1GB当前可用{available_gb:.2f}GB")
sys.exit(1)
except Exception as e:
self.error(f"检查磁盘空间失败:{e}")
self.success("✓ 前置检查通过")
def update_code(self):
"""更新代码"""
self.info("========== 更新代码 ==========")
os.chdir(self.project_dir)
git_dir = self.project_dir / '.git'
if not git_dir.exists():
self.info("首次部署,克隆仓库...")
self.run_command(['git', 'clone', self.git_repo, '.'])
self.run_command(['git', 'checkout', self.git_branch])
else:
self.info("更新现有代码...")
self.run_command(['git', 'fetch', 'origin', self.git_branch])
self.run_command(['git', 'reset', '--hard', f'origin/{self.git_branch}'])
self.run_command(['git', 'clean', '-fd'])
# 获取当前版本信息
result = self.run_command(['git', 'rev-parse', '--short', 'HEAD'],
capture_output=True, text=True)
commit_hash = result.stdout.strip()
result = self.run_command(['git', 'log', '-1', '--pretty=format:%s'],
capture_output=True, text=True)
commit_msg = result.stdout.strip()
self.success(f"✓ 代码更新完成,当前版本:{commit_hash} - {commit_msg}")
def backup_old_version(self):
"""备份旧版本"""
self.info("========== 备份当前版本 ==========")
self.backup_dir.mkdir(parents=True, exist_ok=True)
backup_name = f"backup_{self.deploy_time}"
backup_path = self.backup_dir / backup_name
# 备份后端
if self.backend_dir.exists():
backup_backend = backup_path.with_name(f"{backup_name}_backend")
shutil.copytree(self.backend_dir, backup_backend, dirs_exist_ok=True)
self.info(f"✓ 后端代码已备份到:{backup_backend}")
# 备份前端构建
frontend_dist = self.frontend_dir / 'dist'
if frontend_dist.exists():
backup_frontend = backup_path.with_name(f"{backup_name}_frontend_dist")
shutil.copytree(frontend_dist, backup_frontend, dirs_exist_ok=True)
self.info(f"✓ 前端构建已备份到:{backup_frontend}")
# 清理 30 天前的备份
try:
for item in self.backup_dir.iterdir():
if item.is_dir() and item.name.startswith('backup_'):
mtime = datetime.fromtimestamp(item.stat().st_mtime)
age = (datetime.now() - mtime).days
if age > 30:
shutil.rmtree(item)
self.info(f"清理过期备份:{item.name}")
except Exception as e:
self.error(f"清理备份失败:{e}")
self.success("✓ 备份完成(保留最近 30 天)")
def deploy_backend(self):
"""部署后端"""
self.info("========== 部署后端服务 ==========")
os.chdir(self.backend_dir)
# 创建虚拟环境
if not self.venv_dir.exists():
self.info("创建 Python 虚拟环境...")
self.run_command([self.python_version, '-m', 'venv', str(self.venv_dir)])
# 激活虚拟环境(通过设置 PATH
venv_bin = self.venv_dir / 'bin'
env = os.environ.copy()
env['PATH'] = str(venv_bin) + os.pathsep + env['PATH']
env['VIRTUAL_ENV'] = str(self.venv_dir)
# 升级 pip
self.run_command(['pip', 'install', '--upgrade', 'pip'], env=env)
# 安装依赖
self.info("安装 Python 依赖...")
self.run_command(['pip', 'install', '-r', 'requirements.txt'], env=env)
# 数据库迁移
alembic_ini = self.backend_dir / 'alembic.ini'
if alembic_ini.exists():
self.info("执行数据库迁移...")
self.run_command(['alembic', 'upgrade', 'head'], env=env)
# 初始化数据
init_db_py = self.backend_dir / 'init_db.py'
if init_db_py.exists():
self.info("初始化数据库...")
try:
self.run_command([str(self.venv_dir / 'bin/python'), 'init_db.py'],
env=env, check=False)
except Exception as e:
self.info(f"初始化数据跳过或失败:{e}")
# 重启后端服务
try:
result = subprocess.run(
['systemctl', 'list-units', '--type=service', '--all'],
capture_output=True,
text=True
)
if self.backend_service in result.stdout:
self.info(f"重启后端服务...")
self.run_command(['systemctl', 'restart', self.backend_service])
import time
time.sleep(2)
# 检查服务状态
result = subprocess.run(
['systemctl', 'is-active', self.backend_service],
capture_output=True,
text=True
)
if result.stdout.strip() == 'active':
self.success("✓ 后端服务重启成功")
else:
self.error("✗ 后端服务启动失败")
sys.exit(1)
else:
self.info("未找到 systemd 服务,跳过重启")
except Exception as e:
self.error(f"服务管理操作失败:{e}")
self.success("✓ 后端部署完成")
def deploy_frontend(self):
"""部署前端"""
self.info("========== 部署前端服务 ==========")
os.chdir(self.frontend_dir)
# 安装依赖
self.info("安装 Node.js 依赖...")
self.run_command(['npm', 'install', '--production'])
# 构建前端
self.info("构建前端项目...")
self.run_command(['npm', 'run', 'build'])
# 检查构建结果
dist_dir = self.frontend_dir / 'dist'
if dist_dir.exists() and any(dist_dir.iterdir()):
self.success("✓ 前端构建成功")
else:
self.error("✗ 前端构建失败dist 目录为空")
sys.exit(1)
# 重新加载 Nginx
try:
result = subprocess.run(
['systemctl', 'list-units', '--type=service', '--all'],
capture_output=True,
text=True
)
if self.frontend_service in result.stdout:
self.info("重新加载 Nginx...")
self.run_command(['systemctl', 'reload', self.frontend_service])
except Exception as e:
self.error(f"Nginx 操作失败:{e}")
self.success("✓ 前端部署完成")
def health_check(self):
"""健康检查"""
self.info("========== 执行健康检查 ==========")
import time
time.sleep(5)
# 检查后端 API
import requests
backend_url = f"http://localhost:{self.backend_port}/api/v1/health"
try:
response = requests.get(backend_url, timeout=10)
if response.status_code == 200:
self.success("✓ 后端 API 健康检查通过")
else:
self.error(f"✗ 后端 API 返回异常状态码:{response.status_code}")
sys.exit(1)
except requests.exceptions.RequestException as e:
self.error(f"✗ 后端 API 连接失败:{e}")
sys.exit(1)
# 检查前端文件
index_html = self.frontend_dir / 'dist' / 'index.html'
if index_html.exists():
self.success("✓ 前端文件存在")
else:
self.error("✗ 前端文件缺失")
sys.exit(1)
self.success("✓ 所有健康检查通过")
def cleanup(self):
"""清理临时文件"""
self.info("========== 清理临时文件 ==========")
# 清理 npm 缓存
try:
self.run_command(['npm', 'cache', 'clean', '--force'], check=False)
except:
pass
# 清理 Python 缓存
import glob
for pycache in glob.glob(str(self.project_dir / '**/__pycache__'), recursive=True):
try:
shutil.rmtree(pycache)
except:
pass
for pyc in glob.glob(str(self.project_dir / '**/*.pyc'), recursive=True):
try:
os.remove(pyc)
except:
pass
self.success("✓ 清理完成")
def run(self):
"""主流程"""
self.info("========================================")
self.info("Spug 自动发布开始")
self.info(f"项目名称:{self.project_name}")
self.info(f"部署目录:{self.project_dir}")
self.info(f"Git 分支:{self.git_branch}")
self.info("========================================")
try:
self.pre_check()
self.update_code()
self.backup_old_version()
self.deploy_backend()
self.deploy_frontend()
self.health_check()
self.cleanup()
self.info("========================================")
self.success("🎉 部署成功完成!")
self.info("========================================")
except Exception as e:
self.error(f"部署失败:{e}")
import traceback
self.error(traceback.format_exc())
sys.exit(1)
if __name__ == '__main__':
deployer = SpugDeploy()
deployer.run()