Files
hospital_performance/extract_all_doc.py
2026-02-28 15:16:15 +08:00

199 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
批量提取参考文档目录中所有 .doc 文件的内容
输出为 JSON 格式,方便后续处理
"""
import os
import sys
import json
import olefile
import re
from pathlib import Path
from datetime import datetime
def extract_text_from_doc(filepath):
"""从 .doc 文件提取文本内容"""
try:
ole = olefile.OleFileIO(filepath)
if not ole.exists('WordDocument'):
return None, "不是有效的Word文档"
# 读取WordDocument流
word_stream = ole.openstream('WordDocument')
data = word_stream.read()
# 尝试读取表格流
table_name = '1Table' if ole.exists('1Table') else '0Table'
table_data = b''
if ole.exists(table_name):
table_stream = ole.openstream(table_name)
table_data = table_stream.read()
# 提取文本
text_parts = []
# 从WordDocument流提取
try:
decoded = data.decode('utf-16-le', errors='ignore')
# 提取中文和英文文本
matches = re.findall(r'[\u4e00-\u9fff\u0020-\u007e\n\r\t\.,;:!?()()。,;:!?、""''【】\[\]{}<>《》\-+×÷=±≡≈≠≤≥∞∩∪∈∉⊂⊃⊆⊇∠⊥∥→←↑↓↔\d]+', decoded)
for m in matches:
if len(m.strip()) > 1:
text_parts.append(m.strip())
except:
pass
# 从表格流提取
try:
decoded = table_data.decode('utf-16-le', errors='ignore')
matches = re.findall(r'[\u4e00-\u9fff\u0020-\u007e\n\r\t\.,;:!?()()。,;:!?、""''【】\[\]{}<>《》\-+×÷=±≡≈≠≤≥∞∩∪∈∉⊂⊃⊆⊇∠⊥∥→←↑↓↔\d]+', decoded)
for m in matches:
if len(m.strip()) > 1:
text_parts.append(m.strip())
except:
pass
# 简单提取(从整个文件)
with open(filepath, 'rb') as f:
raw_data = f.read()
try:
decoded = raw_data.decode('utf-16-le', errors='ignore')
matches = re.findall(r'[\u4e00-\u9fff\u0020-\u007e\n\r\t\.,;:!?()()。,;:!?、""''【】\[\]{}<>《》\-+×÷=±≡≈≠≤≥∞∩∪∈∉⊂⊃⊆⊇∠⊥∥→←↑↓↔\d]+', decoded)
for m in matches:
if len(m.strip()) > 1:
text_parts.append(m.strip())
except:
pass
# 去重
seen = set()
unique_parts = []
for part in text_parts:
if part not in seen and len(part) > 1:
seen.add(part)
unique_parts.append(part)
# 合并文本
full_text = '\n'.join(unique_parts)
ole.close()
return full_text, None
except Exception as e:
return None, str(e)
def extract_tables_from_doc(filepath):
"""尝试提取表格内容"""
try:
ole = olefile.OleFileIO(filepath)
tables = []
# 表格信息存储在表格流中
# 这是一个简化的方法,实际需要解析复杂的二进制结构
# 从整个文件中寻找表格特征
with open(filepath, 'rb') as f:
data = f.read()
# 尝试提取表格单元格内容
decoded = data.decode('utf-16-le', errors='ignore')
# 寻找表格模式(连续的短文本行)
lines = [l.strip() for l in decoded.split('\n') if l.strip()]
# 检测可能的表格行
table_rows = []
for line in lines:
if len(line) > 2 and len(line) < 200: # 表格单元格通常是短文本
# 检查是否是表格分隔符
if not re.match(r'^[\s\-\|]+$', line):
table_rows.append(line)
ole.close()
return table_rows
except Exception as e:
return []
def process_all_doc_files(directory):
"""处理目录下所有 .doc 文件"""
results = {}
doc_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if file.lower().endswith('.doc') and not file.lower().endswith('.docx'):
doc_files.append((file, os.path.join(root, file)))
print(f"发现 {len(doc_files)} 个 .doc 文件")
for i, (filename, filepath) in enumerate(doc_files):
print(f"处理 [{i+1}/{len(doc_files)}]: {filename}")
text, error = extract_text_from_doc(filepath)
tables = extract_tables_from_doc(filepath)
results[filename] = {
'filepath': filepath,
'text': text if text else '',
'tables': tables,
'error': error
}
return results
def main():
"""主函数"""
# 默认处理参考文档目录
doc_dir = r"D:\医院绩效系统\参考文档"
if len(sys.argv) > 1:
doc_dir = sys.argv[1]
print(f"开始处理目录: {doc_dir}")
print("=" * 60)
# 提取所有 .doc 文件
results = process_all_doc_files(doc_dir)
# 保存结果
output_file = r"D:\医院绩效系统\doc_extracted_content.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n结果已保存到: {output_file}")
# 统计
success_count = sum(1 for v in results.values() if v.get('text'))
error_count = sum(1 for v in results.values() if v.get('error'))
print(f"成功提取: {success_count} 个文件")
print(f"提取失败: {error_count} 个文件")
# 显示部分结果
print("\n" + "=" * 60)
print("部分提取结果预览:")
print("=" * 60)
for filename, data in list(results.items())[:3]:
print(f"\n{filename}")
text = data.get('text', '')
if text:
print(text[:500] + "..." if len(text) > 500 else text)
else:
print(f" 错误: {data.get('error', '未知错误')}")
if __name__ == '__main__':
main()