背景
在构建RAG系统过程中,我通过设计Query Rewrite, Rerank,Structural Chunk,Structural prompt等,极大的增加了RAG检索出来的知识的准确性. 但在使用集成了此RAG的flask程序软件中发现有大量的等待时间, 所以写下此篇文章尽可能的优化等待时间.
案例:
通过调用私有API, 获取大量的PDF,将PDF文件OCR提取文字,Chunk切分,Embedding化, 最后写入向量数据库, 通过web程序访问RAG知识库.
耗时分析:
| 步骤 | 耗时比例 |
|---|---|
| 私有API调用 | 10% |
| PDF下载 | 15% |
| PDF解析 | 35% |
| Chunk切分 | 5% |
| Embedding化 | 20% |
| 写入向量数据库 | 15% |
极限优化
完整架构
将架构拆分为2个部分, 用户在web程序中进行query, 后台处理所有的 “私有API调用=》…=》写入向量数据库” 流程.
┌───────────────┐
│ Web Server │
│ (Flask) │
└───────┬───────┘
│
│查询
▼
┌───────────────┐
│ Vector DB │
└───────────────┘
文档处理流水线(后台)
私有 API
│
▼
Document Queue
│
▼
Parser Workers
│
▼
Embedding Workers
│
▼
Vector DB
1, 异步Asyncio 请求私有API下载文件
注意:如Gmail邮箱API,每次调用API时,只会获取API中最新的邮件.实时加载邮件.
import aiohttp
import asyncio
async def download_attachment(session, mail):
url = mail["url"]
filename = f"pdfs/{mail['id']}.pdf"
async with session.get(url) as resp:
data = await resp.read()
with open(filename, "wb") as f:
f.write(data)
return filename
async def download_all(mails):
async with aiohttp.ClientSession() as session:
tasks = [
download_attachment(session, mail)
for mail in mails
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(download_all(mails))
2, 缓存API下载
私有API
↓
下载
↓
Amazon S3
↓
解析
3, 并行PDF解析
| 解析库 | 速度 |
|---|---|
| PyMuPDF | 快 |
| pdfminer | 中 |
| OCR | 慢 |
注意: 很多PDF文件 有文字, 是不需要OCR解析的.
1, 使用PyMuPDF判断文本是否大于60, 如果大于则跳过OCR解析
伪代码:
def has_text(page):
text = page.get_text()
return len(text.strip()) > 60
2, 开启 CPU*2 个线程来解析文件
def parse_page(page):
return page.get_text()
def parse_pdf(file_path):
with fitz.open(file_path) as doc:
with ThreadPoolExecutor(max_workers=8) as executor:
executor.map(pipeline, pdf_files)
...
for r in executor.map(...):
process(r)
4, GPU OCR (使用onnxruntime加速)
例如:
- PaddleOCR
- EasyOCR
模型导出:
PyTorch/Paddle
↓
ONNX
↓
ONNX Runtime
GPU推理:
CUDAExecutionProvider
速度提升:CPU → GPU 约10倍
5, Chunk切分
保持chunk的size在一定的程度,如400~800 tokens
但是我这里的进行了Structural Chunk, 就不用固定大小来Chunk切分了
文章:“RAG之Structural Chunks设计” https://strictfrog.com/2026/01/31/RAG%E4%B9%8BStructural-Chunks%E8%AE%BE%E8%AE%A1%E4%B8%8E%E6%80%9D%E8%80%83/
常见的3种方式:
规则 chunk(最快)
semantic chunk(embedding)
layout chunk(复杂文档)
如果这里的Chunk切分过程, 使用了模型推理,那么都可以使用onnxruntime加速
例如:使用了Layout detecttion模型来识别文档结构,可以使用onnxruntime加速
6, embedding优化(使用onnxruntime加速)
- 1, embedding批处理,设置batch_size
- 2, 设置embedding的线程池,数量 cup/2 worker pool.
- 3, 将embedding模型转化.onnx, 使用onnxruntime加速
架构图(局部)
chunk generator
│
▼
chunk queue
│
▼
Worker Pool (2~4)
│
▼
Batch Builder (64 chunks)
│
▼
ONNX Runtime 推理
│
▼
Vector DB Batch Insert
伪代码:
import onnxruntime as ort
from concurrent.futures import ProcessPoolExecutor
session = ort.InferenceSession(
"embedding.onnx",
providers=["CPUExecutionProvider"]
)
def embed_batch(texts):
inputs = tokenizer(
texts,
padding=True,
truncation=True,
return_tensors="np"
)
outputs = session.run(None, dict(inputs))
return outputs[0]
def process_chunks(chunk_batch):
embeddings = embed_batch(chunk_batch)
vector_db.insert(embeddings)
def run_pipeline(chunks):
batches = list(batch(chunks,64))
with ProcessPoolExecutor(max_workers=4) as executor:
executor.map(process_chunks, batches)
假设AWS: 8 vCPU,1 GPU 则配置为: worker = 2 ,batch_size = 128, onnxruntime-gpu
7, Vertoc DB批量写入
如: 1000 vectors ,1 insert (1000个向量,一次导入)
8, 缓存机制
对 PDF,page,chunk文件做hash, 避免重复embedding
请参考下面文章:“Embedding之增量向量更新策略”
9, 预热模型
Flask启东时:
load_embedding_model()
load_ocr_model()