199 lines
6.4 KiB
Python
199 lines
6.4 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
批量提取参考文档目录中所有 .doc 文件的内容
|
||
输出为 JSON 格式,方便后续处理
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import json
|
||
import olefile
|
||
import re
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
|
||
def extract_text_from_doc(filepath):
|
||
"""从 .doc 文件提取文本内容"""
|
||
try:
|
||
ole = olefile.OleFileIO(filepath)
|
||
|
||
if not ole.exists('WordDocument'):
|
||
return None, "不是有效的Word文档"
|
||
|
||
# 读取WordDocument流
|
||
word_stream = ole.openstream('WordDocument')
|
||
data = word_stream.read()
|
||
|
||
# 尝试读取表格流
|
||
table_name = '1Table' if ole.exists('1Table') else '0Table'
|
||
table_data = b''
|
||
if ole.exists(table_name):
|
||
table_stream = ole.openstream(table_name)
|
||
table_data = table_stream.read()
|
||
|
||
# 提取文本
|
||
text_parts = []
|
||
|
||
# 从WordDocument流提取
|
||
try:
|
||
decoded = data.decode('utf-16-le', errors='ignore')
|
||
# 提取中文和英文文本
|
||
matches = re.findall(r'[\u4e00-\u9fff\u0020-\u007e\n\r\t\.,;:!?()()。,;:!?、""''【】\[\]{}{}<>《》\-+×÷=±≡≈≠≤≥∞∩∪∈∉⊂⊃⊆⊇∠⊥∥→←↑↓↔\d]+', decoded)
|
||
for m in matches:
|
||
if len(m.strip()) > 1:
|
||
text_parts.append(m.strip())
|
||
except:
|
||
pass
|
||
|
||
# 从表格流提取
|
||
try:
|
||
decoded = table_data.decode('utf-16-le', errors='ignore')
|
||
matches = re.findall(r'[\u4e00-\u9fff\u0020-\u007e\n\r\t\.,;:!?()()。,;:!?、""''【】\[\]{}{}<>《》\-+×÷=±≡≈≠≤≥∞∩∪∈∉⊂⊃⊆⊇∠⊥∥→←↑↓↔\d]+', decoded)
|
||
for m in matches:
|
||
if len(m.strip()) > 1:
|
||
text_parts.append(m.strip())
|
||
except:
|
||
pass
|
||
|
||
# 简单提取(从整个文件)
|
||
with open(filepath, 'rb') as f:
|
||
raw_data = f.read()
|
||
|
||
try:
|
||
decoded = raw_data.decode('utf-16-le', errors='ignore')
|
||
matches = re.findall(r'[\u4e00-\u9fff\u0020-\u007e\n\r\t\.,;:!?()()。,;:!?、""''【】\[\]{}{}<>《》\-+×÷=±≡≈≠≤≥∞∩∪∈∉⊂⊃⊆⊇∠⊥∥→←↑↓↔\d]+', decoded)
|
||
for m in matches:
|
||
if len(m.strip()) > 1:
|
||
text_parts.append(m.strip())
|
||
except:
|
||
pass
|
||
|
||
# 去重
|
||
seen = set()
|
||
unique_parts = []
|
||
for part in text_parts:
|
||
if part not in seen and len(part) > 1:
|
||
seen.add(part)
|
||
unique_parts.append(part)
|
||
|
||
# 合并文本
|
||
full_text = '\n'.join(unique_parts)
|
||
|
||
ole.close()
|
||
return full_text, None
|
||
|
||
except Exception as e:
|
||
return None, str(e)
|
||
|
||
|
||
def extract_tables_from_doc(filepath):
|
||
"""尝试提取表格内容"""
|
||
try:
|
||
ole = olefile.OleFileIO(filepath)
|
||
|
||
tables = []
|
||
|
||
# 表格信息存储在表格流中
|
||
# 这是一个简化的方法,实际需要解析复杂的二进制结构
|
||
|
||
# 从整个文件中寻找表格特征
|
||
with open(filepath, 'rb') as f:
|
||
data = f.read()
|
||
|
||
# 尝试提取表格单元格内容
|
||
decoded = data.decode('utf-16-le', errors='ignore')
|
||
|
||
# 寻找表格模式(连续的短文本行)
|
||
lines = [l.strip() for l in decoded.split('\n') if l.strip()]
|
||
|
||
# 检测可能的表格行
|
||
table_rows = []
|
||
for line in lines:
|
||
if len(line) > 2 and len(line) < 200: # 表格单元格通常是短文本
|
||
# 检查是否是表格分隔符
|
||
if not re.match(r'^[\s\-\|]+$', line):
|
||
table_rows.append(line)
|
||
|
||
ole.close()
|
||
return table_rows
|
||
|
||
except Exception as e:
|
||
return []
|
||
|
||
|
||
def process_all_doc_files(directory):
|
||
"""处理目录下所有 .doc 文件"""
|
||
results = {}
|
||
|
||
doc_files = []
|
||
for root, dirs, files in os.walk(directory):
|
||
for file in files:
|
||
if file.lower().endswith('.doc') and not file.lower().endswith('.docx'):
|
||
doc_files.append((file, os.path.join(root, file)))
|
||
|
||
print(f"发现 {len(doc_files)} 个 .doc 文件")
|
||
|
||
for i, (filename, filepath) in enumerate(doc_files):
|
||
print(f"处理 [{i+1}/{len(doc_files)}]: {filename}")
|
||
|
||
text, error = extract_text_from_doc(filepath)
|
||
tables = extract_tables_from_doc(filepath)
|
||
|
||
results[filename] = {
|
||
'filepath': filepath,
|
||
'text': text if text else '',
|
||
'tables': tables,
|
||
'error': error
|
||
}
|
||
|
||
return results
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 默认处理参考文档目录
|
||
doc_dir = r"D:\医院绩效系统\参考文档"
|
||
|
||
if len(sys.argv) > 1:
|
||
doc_dir = sys.argv[1]
|
||
|
||
print(f"开始处理目录: {doc_dir}")
|
||
print("=" * 60)
|
||
|
||
# 提取所有 .doc 文件
|
||
results = process_all_doc_files(doc_dir)
|
||
|
||
# 保存结果
|
||
output_file = r"D:\医院绩效系统\doc_extracted_content.json"
|
||
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"\n结果已保存到: {output_file}")
|
||
|
||
# 统计
|
||
success_count = sum(1 for v in results.values() if v.get('text'))
|
||
error_count = sum(1 for v in results.values() if v.get('error'))
|
||
|
||
print(f"成功提取: {success_count} 个文件")
|
||
print(f"提取失败: {error_count} 个文件")
|
||
|
||
# 显示部分结果
|
||
print("\n" + "=" * 60)
|
||
print("部分提取结果预览:")
|
||
print("=" * 60)
|
||
|
||
for filename, data in list(results.items())[:3]:
|
||
print(f"\n【{filename}】")
|
||
text = data.get('text', '')
|
||
if text:
|
||
print(text[:500] + "..." if len(text) > 500 else text)
|
||
else:
|
||
print(f" 错误: {data.get('error', '未知错误')}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|