RAG

私有RAG之极限优化等待时间

私有RAG

Posted by LuochuanAD on March 14, 2026 本文总阅读量

背景

在构建RAG系统过程中,我通过设计Query Rewrite, Rerank,Structural Chunk,Structural prompt等,极大的增加了RAG检索出来的知识的准确性. 但在使用集成了此RAG的flask程序软件中发现有大量的等待时间, 所以写下此篇文章尽可能的优化等待时间.

案例:

通过调用私有API, 获取大量的PDF,将PDF文件OCR提取文字,Chunk切分,Embedding化, 最后写入向量数据库, 通过web程序访问RAG知识库.

耗时分析:

步骤 耗时比例
私有API调用 10%
PDF下载 15%
PDF解析 35%
Chunk切分 5%
Embedding化 20%
写入向量数据库 15%

极限优化

完整架构

将架构拆分为2个部分, 用户在web程序中进行query, 后台处理所有的 “私有API调用=》…=》写入向量数据库” 流程.

                ┌───────────────┐
                │   Web Server  │
                │   (Flask)     │
                └───────┬───────┘
                        │
                        │查询
                        ▼
                ┌───────────────┐
                │  Vector DB    │
                └───────────────┘


                文档处理流水线(后台)
私有 API
      │
      ▼
Document Queue
      │
      ▼
Parser Workers
      │
      ▼
Embedding Workers
      │
      ▼
Vector DB

1, 异步Asyncio 请求私有API下载文件

注意:如Gmail邮箱API,每次调用API时,只会获取API中最新的邮件.实时加载邮件.

import aiohttp
import asyncio

async def download_attachment(session, mail):

    url = mail["url"]
    filename = f"pdfs/{mail['id']}.pdf"

    async with session.get(url) as resp:
        data = await resp.read()

    with open(filename, "wb") as f:
        f.write(data)

    return filename


async def download_all(mails):

    async with aiohttp.ClientSession() as session:

        tasks = [
            download_attachment(session, mail)
            for mail in mails
        ]

        results = await asyncio.gather(*tasks)

    return results


asyncio.run(download_all(mails))

2, 缓存API下载

私有API
↓
下载
↓
Amazon S3
↓
解析

3, 并行PDF解析

解析库 速度
PyMuPDF
pdfminer
OCR

注意: 很多PDF文件 有文字, 是不需要OCR解析的.

1, 使用PyMuPDF判断文本是否大于60, 如果大于则跳过OCR解析

伪代码:

def has_text(page):
    text = page.get_text()
    return len(text.strip()) > 60

2, 开启 CPU*2 个线程来解析文件

def parse_page(page):
    return page.get_text()

def parse_pdf(file_path):
    with fitz.open(file_path) as doc:
        with ThreadPoolExecutor(max_workers=8) as executor:
    		executor.map(pipeline, pdf_files)

...
for r in executor.map(...):
    process(r)   		

4, GPU OCR (使用onnxruntime加速)

例如:

  • PaddleOCR
  • EasyOCR

模型导出:

PyTorch/Paddle
    ↓
ONNX
    ↓
ONNX Runtime

GPU推理:

CUDAExecutionProvider

速度提升:CPU → GPU 约10倍

5, Chunk切分

保持chunk的size在一定的程度,如400~800 tokens

但是我这里的进行了Structural Chunk, 就不用固定大小来Chunk切分了

文章:“RAG之Structural Chunks设计” https://strictfrog.com/2026/01/31/RAG%E4%B9%8BStructural-Chunks%E8%AE%BE%E8%AE%A1%E4%B8%8E%E6%80%9D%E8%80%83/

常见的3种方式:

规则 chunk(最快)
semantic chunk(embedding)
layout chunk(复杂文档)

如果这里的Chunk切分过程, 使用了模型推理,那么都可以使用onnxruntime加速

例如:使用了Layout detecttion模型来识别文档结构,可以使用onnxruntime加速

6, embedding优化(使用onnxruntime加速)

  • 1, embedding批处理,设置batch_size
  • 2, 设置embedding的线程池,数量 cup/2 worker pool.
  • 3, 将embedding模型转化.onnx, 使用onnxruntime加速

架构图(局部)

chunk generator
      │
      ▼
chunk queue
      │
      ▼
Worker Pool (2~4)
      │
      ▼
Batch Builder (64 chunks)
      │
      ▼
ONNX Runtime 推理
      │
      ▼
Vector DB Batch Insert

伪代码:

import onnxruntime as ort
from concurrent.futures import ProcessPoolExecutor

session = ort.InferenceSession(
    "embedding.onnx",
    providers=["CPUExecutionProvider"]
)

def embed_batch(texts):

    inputs = tokenizer(
        texts,
        padding=True,
        truncation=True,
        return_tensors="np"
    )

    outputs = session.run(None, dict(inputs))

    return outputs[0]

def process_chunks(chunk_batch):

    embeddings = embed_batch(chunk_batch)

    vector_db.insert(embeddings)

def run_pipeline(chunks):

    batches = list(batch(chunks,64))

    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(process_chunks, batches)


假设AWS: 8 vCPU,1 GPU 则配置为: worker = 2 ,batch_size = 128, onnxruntime-gpu

7, Vertoc DB批量写入

如: 1000 vectors ,1 insert (1000个向量,一次导入)

8, 缓存机制

对 PDF,page,chunk文件做hash, 避免重复embedding

请参考下面文章:“Embedding之增量向量更新策略”

https://strictfrog.com/2026/03/14/Embedding%E4%B9%8B%E5%A2%9E%E9%87%8F%E5%90%91%E9%87%8F%E6%9B%B4%E6%96%B0%E7%AD%96%E7%95%A5/

9, 预热模型

Flask启东时:

load_embedding_model()
load_ocr_model()