Files
hospital_performance/spug/deploy.py

420 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Spug 自动发布脚本 - Python 版本
用途医院绩效考核系统自动化部署Python 实现)
使用场景:在 Spug 的"执行任务"中通过 Python 调用
"""
import os
import sys
import subprocess
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional, List
class SpugDeploy:
"""Spug 部署类"""
def __init__(self):
# 配置参数(从环境变量获取,可在 Spug 中设置)
# 处理 SPUG_DEPLOY_DIR 为空的情况
deploy_dir = os.getenv('SPUG_DEPLOY_DIR', '').strip()
if not deploy_dir:
deploy_dir = '/var/www/hospital-performance'
self.project_name = os.getenv('SPUG_APP_NAME', 'hospital-performance')
self.project_dir = Path(deploy_dir)
self.backup_dir = self.project_dir / 'backups'
self.frontend_dir = self.project_dir / 'frontend'
self.backend_dir = self.project_dir / 'backend'
# Git 配置(优先使用 Spug 的环境变量)
git_repo = os.getenv('SPUG_GIT_URL', '').strip()
if not git_repo:
git_repo = os.getenv('SPUG_GIT_REPO', '').strip()
if not git_repo:
git_repo = 'https://gitea.gentronhealth.com/chenqi/hospital_performance.git'
self.git_repo = git_repo
self.git_branch = os.getenv('SPUG_GIT_BRANCH', 'main')
# Python 配置
self.python_version = os.getenv('PYTHON_VERSION', 'python3.10')
self.venv_dir = self.project_dir / 'venv'
# Node.js 配置
self.node_version = os.getenv('NODE_VERSION', '18')
# 服务配置
self.backend_service = os.getenv('BACKEND_SERVICE', 'hospital-backend')
self.backend_port = int(os.getenv('BACKEND_PORT', '8000'))
self.frontend_service = os.getenv('FRONTEND_SERVICE', 'nginx')
# 日志配置
log_file = os.getenv('LOG_FILE', '/var/log/spug/deploy.log')
self.log_file = Path(log_file)
self.deploy_time = datetime.now().strftime('%Y%m%d_%H%M%S')
# 确保日志目录存在
self.log_file.parent.mkdir(parents=True, exist_ok=True)
def log(self, level: str, message: str):
"""记录日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] [{level}] {message}"
print(log_msg)
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_msg + '\n')
except Exception as e:
print(f"写入日志失败:{e}")
def info(self, message: str):
self.log("INFO", message)
def error(self, message: str):
self.log("ERROR", message)
def success(self, message: str):
self.log("SUCCESS", message)
def run_command(self, cmd: List[str], cwd: Optional[Path] = None,
check: bool = True, shell: bool = False) -> subprocess.CompletedProcess:
"""运行命令"""
try:
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
shell=shell,
check=check
)
if result.stdout:
self.info(result.stdout.strip())
if result.stderr:
self.info(result.stderr.strip())
return result
except subprocess.CalledProcessError as e:
self.error(f"命令执行失败:{' '.join(cmd)}")
self.error(f"错误信息:{e.stderr}")
raise
def check_command(self, cmd: str) -> bool:
"""检查命令是否存在"""
return shutil.which(cmd) is not None
def pre_check(self):
"""前置检查"""
self.info("========== 开始前置检查 ==========")
# 检查必要命令
required_commands = ['git', 'python3', 'node', 'npm']
for cmd in required_commands:
if not self.check_command(cmd):
self.error(f"命令 {cmd} 未安装,请先安装")
sys.exit(1)
# 创建部署目录(如果不存在)
if not self.project_dir.exists():
self.info(f"创建部署目录:{self.project_dir}")
self.project_dir.mkdir(parents=True, exist_ok=True)
# 检查磁盘空间
try:
stat = shutil.disk_usage(self.project_dir)
available_gb = stat.free / (1024 ** 3)
if available_gb < 1:
self.error(f"磁盘空间不足 1GB当前可用{available_gb:.2f}GB")
sys.exit(1)
except Exception as e:
self.error(f"检查磁盘空间失败:{e}")
self.success("✓ 前置检查通过")
def update_code(self):
"""更新代码"""
self.info("========== 更新代码 ==========")
os.chdir(self.project_dir)
git_dir = self.project_dir / '.git'
if not git_dir.exists():
self.info("首次部署,克隆仓库...")
# 检查目录是否为空
if any(self.project_dir.iterdir()):
self.info("目录非空,先备份现有文件...")
backup_non_git = self.project_dir / f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}_non_git"
backup_non_git.mkdir(exist_ok=True)
# 移动非 .git 相关文件
for item in self.project_dir.iterdir():
if item.name not in ['.git', 'backups', 'venv'] and not item.name.startswith('backup_'):
try:
shutil.move(str(item), str(backup_non_git / item.name))
except Exception as e:
self.info(f"跳过文件 {item.name}: {e}")
self.run_command(['git', 'clone', self.git_repo, '.'])
self.run_command(['git', 'checkout', self.git_branch])
else:
self.info("更新现有代码...")
self.run_command(['git', 'fetch', 'origin', self.git_branch])
self.run_command(['git', 'reset', '--hard', f'origin/{self.git_branch}'])
self.run_command(['git', 'clean', '-fd'])
# 获取当前版本信息
result = self.run_command(['git', 'rev-parse', '--short', 'HEAD'],
capture_output=True, text=True)
commit_hash = result.stdout.strip()
result = self.run_command(['git', 'log', '-1', '--pretty=format:%s'],
capture_output=True, text=True)
commit_msg = result.stdout.strip()
self.success(f"✓ 代码更新完成,当前版本:{commit_hash} - {commit_msg}")
def backup_old_version(self):
"""备份旧版本"""
self.info("========== 备份当前版本 ==========")
self.backup_dir.mkdir(parents=True, exist_ok=True)
backup_name = f"backup_{self.deploy_time}"
backup_path = self.backup_dir / backup_name
# 备份后端
if self.backend_dir.exists():
backup_backend = backup_path.with_name(f"{backup_name}_backend")
shutil.copytree(self.backend_dir, backup_backend, dirs_exist_ok=True)
self.info(f"✓ 后端代码已备份到:{backup_backend}")
# 备份前端构建
frontend_dist = self.frontend_dir / 'dist'
if frontend_dist.exists():
backup_frontend = backup_path.with_name(f"{backup_name}_frontend_dist")
shutil.copytree(frontend_dist, backup_frontend, dirs_exist_ok=True)
self.info(f"✓ 前端构建已备份到:{backup_frontend}")
# 清理 30 天前的备份
try:
for item in self.backup_dir.iterdir():
if item.is_dir() and item.name.startswith('backup_'):
mtime = datetime.fromtimestamp(item.stat().st_mtime)
age = (datetime.now() - mtime).days
if age > 30:
shutil.rmtree(item)
self.info(f"清理过期备份:{item.name}")
except Exception as e:
self.error(f"清理备份失败:{e}")
self.success("✓ 备份完成(保留最近 30 天)")
def deploy_backend(self):
"""部署后端"""
self.info("========== 部署后端服务 ==========")
os.chdir(self.backend_dir)
# 创建虚拟环境
if not self.venv_dir.exists():
self.info("创建 Python 虚拟环境...")
self.run_command([self.python_version, '-m', 'venv', str(self.venv_dir)])
# 激活虚拟环境(通过设置 PATH
venv_bin = self.venv_dir / 'bin'
env = os.environ.copy()
env['PATH'] = str(venv_bin) + os.pathsep + env['PATH']
env['VIRTUAL_ENV'] = str(self.venv_dir)
# 升级 pip
self.run_command(['pip', 'install', '--upgrade', 'pip'], env=env)
# 安装依赖
self.info("安装 Python 依赖...")
self.run_command(['pip', 'install', '-r', 'requirements.txt'], env=env)
# 数据库迁移
alembic_ini = self.backend_dir / 'alembic.ini'
if alembic_ini.exists():
self.info("执行数据库迁移...")
self.run_command(['alembic', 'upgrade', 'head'], env=env)
# 初始化数据
init_db_py = self.backend_dir / 'init_db.py'
if init_db_py.exists():
self.info("初始化数据库...")
try:
self.run_command([str(self.venv_dir / 'bin/python'), 'init_db.py'],
env=env, check=False)
except Exception as e:
self.info(f"初始化数据跳过或失败:{e}")
# 重启后端服务
try:
result = subprocess.run(
['systemctl', 'list-units', '--type=service', '--all'],
capture_output=True,
text=True
)
if self.backend_service in result.stdout:
self.info(f"重启后端服务...")
self.run_command(['systemctl', 'restart', self.backend_service])
import time
time.sleep(2)
# 检查服务状态
result = subprocess.run(
['systemctl', 'is-active', self.backend_service],
capture_output=True,
text=True
)
if result.stdout.strip() == 'active':
self.success("✓ 后端服务重启成功")
else:
self.error("✗ 后端服务启动失败")
sys.exit(1)
else:
self.info("未找到 systemd 服务,跳过重启")
except Exception as e:
self.error(f"服务管理操作失败:{e}")
self.success("✓ 后端部署完成")
def deploy_frontend(self):
"""部署前端"""
self.info("========== 部署前端服务 ==========")
os.chdir(self.frontend_dir)
# 安装依赖
self.info("安装 Node.js 依赖...")
self.run_command(['npm', 'install', '--production'])
# 构建前端
self.info("构建前端项目...")
self.run_command(['npm', 'run', 'build'])
# 检查构建结果
dist_dir = self.frontend_dir / 'dist'
if dist_dir.exists() and any(dist_dir.iterdir()):
self.success("✓ 前端构建成功")
else:
self.error("✗ 前端构建失败dist 目录为空")
sys.exit(1)
# 重新加载 Nginx
try:
result = subprocess.run(
['systemctl', 'list-units', '--type=service', '--all'],
capture_output=True,
text=True
)
if self.frontend_service in result.stdout:
self.info("重新加载 Nginx...")
self.run_command(['systemctl', 'reload', self.frontend_service])
except Exception as e:
self.error(f"Nginx 操作失败:{e}")
self.success("✓ 前端部署完成")
def health_check(self):
"""健康检查"""
self.info("========== 执行健康检查 ==========")
import time
time.sleep(5)
# 检查后端 API
import requests
backend_url = f"http://localhost:{self.backend_port}/api/v1/health"
try:
response = requests.get(backend_url, timeout=10)
if response.status_code == 200:
self.success("✓ 后端 API 健康检查通过")
else:
self.error(f"✗ 后端 API 返回异常状态码:{response.status_code}")
sys.exit(1)
except requests.exceptions.RequestException as e:
self.error(f"✗ 后端 API 连接失败:{e}")
sys.exit(1)
# 检查前端文件
index_html = self.frontend_dir / 'dist' / 'index.html'
if index_html.exists():
self.success("✓ 前端文件存在")
else:
self.error("✗ 前端文件缺失")
sys.exit(1)
self.success("✓ 所有健康检查通过")
def cleanup(self):
"""清理临时文件"""
self.info("========== 清理临时文件 ==========")
# 清理 npm 缓存
try:
self.run_command(['npm', 'cache', 'clean', '--force'], check=False)
except:
pass
# 清理 Python 缓存
import glob
for pycache in glob.glob(str(self.project_dir / '**/__pycache__'), recursive=True):
try:
shutil.rmtree(pycache)
except:
pass
for pyc in glob.glob(str(self.project_dir / '**/*.pyc'), recursive=True):
try:
os.remove(pyc)
except:
pass
self.success("✓ 清理完成")
def run(self):
"""主流程"""
self.info("========================================")
self.info("Spug 自动发布开始")
self.info(f"项目名称:{self.project_name}")
self.info(f"部署目录:{self.project_dir}")
self.info(f"Git 分支:{self.git_branch}")
self.info("========================================")
try:
self.pre_check()
self.update_code()
self.backup_old_version()
self.deploy_backend()
self.deploy_frontend()
self.health_check()
self.cleanup()
self.info("========================================")
self.success("🎉 部署成功完成!")
self.info("========================================")
except Exception as e:
self.error(f"部署失败:{e}")
import traceback
self.error(traceback.format_exc())
sys.exit(1)
if __name__ == '__main__':
deployer = SpugDeploy()
deployer.run()