背景
RAGシステムの構築過程で、Query Rewrite、Rerank、Structural Chunk、Structural promptなどを設計することで、RAGによって検索される知識の精度を大幅に向上させました。しかし、このRAGを統合したflaskプログラムを使用しているときに大量の待ち時間が発生することに気づいたため、本記事では可能な限り待ち時間の最適化を行います。
ケーススタディ
プライベートAPIを呼び出して大量のPDFを取得し、PDFファイルのOCRによる文字抽出、Chunk分割、Embedding化を行い、最終的にベクトルデータベースに書き込み、webプログラムからRAG知識ベースにアクセスする。
時間消費の分析
| ステップ | 時間割合 |
|---|---|
| プライベートAPI呼び出し | 10% |
| PDFダウンロード | 15% |
| PDF解析 | 35% |
| Chunk分割 | 5% |
| Embedding化 | 20% |
| ベクトルDBへの書き込み | 15% |
徹底的な最適化
全体アーキテクチャ
システムを2つの部分に分割し、ユーザーはwebプログラムでクエリを行い、バックエンドが「プライベートAPI呼び出し => … => ベクトルDB書き込み」の全プロセスを処理します。
┌───────────────┐
│ Web Server │
│ (Flask) │
└───────┬───────┘
│
│クエリ
▼
┌───────────────┐
│ Vector DB │
└───────────────┘
ドキュメント処理パイプライン(バックグラウンド)
プライベート API
│
▼
Document Queue
│
▼
Parser Workers
│
▼
Embedding Workers
│
▼
Vector DB
1. 非同期AsyncioでのプライベートAPIによるファイルダウンロード
注意:Gmail APIなどのケースでは、APIを呼び出すたびに最新メールのみを取得し、リアルタイムでメールをロードします。
import aiohttp
import asyncio
async def download_attachment(session, mail):
url = mail["url"]
filename = f"pdfs/{mail['id']}.pdf"
async with session.get(url) as resp:
data = await resp.read()
with open(filename, "wb") as f:
f.write(data)
return filename
async def download_all(mails):
async with aiohttp.ClientSession() as session:
tasks = [
download_attachment(session, mail)
for mail in mails
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(download_all(mails))
2. APIダウンロードのキャッシュ化
プライベートAPI
↓
ダウンロード
↓
Amazon S3
↓
解析
3. 並列PDF解析
| 解析ライブラリ | 処理速度 |
|---|---|
| PyMuPDF | 高速 |
| pdfminer | 中速 |
| OCR | 遅い |
注意: 多くのPDFファイルはテキストが埋め込まれているためOCRによる解析は不要です。
- PyMuPDFを利用してテキスト量が60文字を超える場合はOCR解析をスキップ
擬似コード:
def has_text(page):
text = page.get_text()
return len(text.strip()) > 60
- CPUコア数×2のスレッドを使ってファイルを解析
def parse_page(page):
return page.get_text()
def parse_pdf(file_path):
with fitz.open(file_path) as doc:
with ThreadPoolExecutor(max_workers=8) as executor:
executor.map(pipeline, pdf_files)
...
for r in executor.map(...):
process(r)
4. GPU OCR (onnxruntimeによる高速化)
例:
- PaddleOCR
- EasyOCR
モデル変換:
PyTorch/Paddle
↓
ONNX
↓
ONNX Runtime
GPU推論:
CUDAExecutionProvider
速度向上:CPU → GPUで約10倍
5. Chunk分割
chunkサイズは400〜800トークン程度に維持する
ただしここではStructural Chunkを使用しているため、固定サイズのChunk分割は不要
記事:「RAGにおけるStructural Chunks設計」
https://strictfrog.com/ja/2026-01-31-rag%E3%81%AE%E6%A7%8B%E9%80%A0%E3%83%81%E3%83%A3%E3%83%B3%E3%82%AF%E8%A8%AD%E8%A8%88/
一般的な3つの方法:
ルールベースchunk(最速)
セマンティックchunk(embedding)
レイアウトchunk(複雑ドキュメント対応)
もしChunk分割過程でモデル推論を使用している場合は、onnxruntimeによる加速が可能
例えばレイアウト検出モデルで文書構造を認識している場合、onnxruntimeで高速化可能
6. embedding最適化(onnxruntimeによる加速)
-
- embeddingをバッチ処理し、batch_sizeを設定
-
- embedding用のスレッドプールを設定、数はCPUコア数の半分程度
-
- embeddingモデルをONNX形式に変換し、onnxruntimeで推論高速化
局所的なアーキテクチャ図
chunk generator
│
▼
chunk queue
│
▼
Worker Pool (2~4)
│
▼
Batch Builder (64 chunks)
│
▼
ONNX Runtime 推論
│
▼
Vector DB Batch Insert
擬似コード:
import onnxruntime as ort
from concurrent.futures import ProcessPoolExecutor
session = ort.InferenceSession(
"embedding.onnx",
providers=["CPUExecutionProvider"]
)
def embed_batch(texts):
inputs = tokenizer(
texts,
padding=True,
truncation=True,
return_tensors="np"
)
outputs = session.run(None, dict(inputs))
return outputs[0]
def process_chunks(chunk_batch):
embeddings = embed_batch(chunk_batch)
vector_db.insert(embeddings)
def run_pipeline(chunks):
batches = list(batch(chunks,64))
with ProcessPoolExecutor(max_workers=4) as executor:
executor.map(process_chunks, batches)
仮にAWS 8 vCPU,1 GPU構成ならば、worker数を2、batch_sizeを128、onnxruntime-gpuを利用する形が適切。
7. Vector DBのバッチ書き込み
例: 1000ベクトルを1回のインサートでまとめて書き込み
8. キャッシュ機構
PDF、ページ、Chunkファイルにハッシュを付けて重複Embeddingを防止
詳細は以下の記事を参照:「Embeddingにおけるインクリメンタルベクトル更新戦略」
9. モデルのウォームアップ
Flask起動時に以下を実行:
load_embedding_model()
load_ocr_model()