RAG

私有RAGの極限最適化待機時間

私有RAG

Posted by LuochuanAD on March 14, 2026 本文总阅读量

背景

RAGシステムの構築過程で、Query Rewrite、Rerank、Structural Chunk、Structural promptなどを設計することで、RAGによって検索される知識の精度を大幅に向上させました。しかし、このRAGを統合したflaskプログラムを使用しているときに大量の待ち時間が発生することに気づいたため、本記事では可能な限り待ち時間の最適化を行います。

ケーススタディ

プライベートAPIを呼び出して大量のPDFを取得し、PDFファイルのOCRによる文字抽出、Chunk分割、Embedding化を行い、最終的にベクトルデータベースに書き込み、webプログラムからRAG知識ベースにアクセスする。

時間消費の分析

ステップ 時間割合
プライベートAPI呼び出し 10%
PDFダウンロード 15%
PDF解析 35%
Chunk分割 5%
Embedding化 20%
ベクトルDBへの書き込み 15%

徹底的な最適化

全体アーキテクチャ

システムを2つの部分に分割し、ユーザーはwebプログラムでクエリを行い、バックエンドが「プライベートAPI呼び出し => … => ベクトルDB書き込み」の全プロセスを処理します。

                ┌───────────────┐
                │   Web Server  │
                │   (Flask)     │
                └───────┬───────┘
                        │
                        │クエリ
                        ▼
                ┌───────────────┐
                │  Vector DB    │
                └───────────────┘


                ドキュメント処理パイプライン(バックグラウンド)
プライベート API
      │
      ▼
Document Queue
      │
      ▼
Parser Workers
      │
      ▼
Embedding Workers
      │
      ▼
Vector DB

1. 非同期AsyncioでのプライベートAPIによるファイルダウンロード

注意:Gmail APIなどのケースでは、APIを呼び出すたびに最新メールのみを取得し、リアルタイムでメールをロードします。

import aiohttp
import asyncio

async def download_attachment(session, mail):

    url = mail["url"]
    filename = f"pdfs/{mail['id']}.pdf"

    async with session.get(url) as resp:
        data = await resp.read()

    with open(filename, "wb") as f:
        f.write(data)

    return filename


async def download_all(mails):

    async with aiohttp.ClientSession() as session:

        tasks = [
            download_attachment(session, mail)
            for mail in mails
        ]

        results = await asyncio.gather(*tasks)

    return results


asyncio.run(download_all(mails))

2. APIダウンロードのキャッシュ化

プライベートAPI
↓
ダウンロード
↓
Amazon S3
↓
解析

3. 並列PDF解析

解析ライブラリ 処理速度
PyMuPDF 高速
pdfminer 中速
OCR 遅い

注意: 多くのPDFファイルはテキストが埋め込まれているためOCRによる解析は不要です。

  1. PyMuPDFを利用してテキスト量が60文字を超える場合はOCR解析をスキップ

擬似コード:

def has_text(page):
    text = page.get_text()
    return len(text.strip()) > 60

  1. CPUコア数×2のスレッドを使ってファイルを解析
def parse_page(page):
    return page.get_text()

def parse_pdf(file_path):
    with fitz.open(file_path) as doc:
        with ThreadPoolExecutor(max_workers=8) as executor:
    		executor.map(pipeline, pdf_files)

...
for r in executor.map(...):
    process(r)   		

4. GPU OCR (onnxruntimeによる高速化)

例:

  • PaddleOCR
  • EasyOCR

モデル変換:

PyTorch/Paddle
    ↓
ONNX
    ↓
ONNX Runtime

GPU推論:

CUDAExecutionProvider

速度向上:CPU → GPUで約10倍

5. Chunk分割

chunkサイズは400〜800トークン程度に維持する

ただしここではStructural Chunkを使用しているため、固定サイズのChunk分割は不要

記事:「RAGにおけるStructural Chunks設計」
https://strictfrog.com/ja/2026-01-31-rag%E3%81%AE%E6%A7%8B%E9%80%A0%E3%83%81%E3%83%A3%E3%83%B3%E3%82%AF%E8%A8%AD%E8%A8%88/

一般的な3つの方法:

ルールベースchunk(最速)
セマンティックchunk(embedding)
レイアウトchunk(複雑ドキュメント対応)

もしChunk分割過程でモデル推論を使用している場合は、onnxruntimeによる加速が可能

例えばレイアウト検出モデルで文書構造を認識している場合、onnxruntimeで高速化可能

6. embedding最適化(onnxruntimeによる加速)

    1. embeddingをバッチ処理し、batch_sizeを設定
    1. embedding用のスレッドプールを設定、数はCPUコア数の半分程度
    1. embeddingモデルをONNX形式に変換し、onnxruntimeで推論高速化

局所的なアーキテクチャ図

chunk generator
      │
      ▼
chunk queue
      │
      ▼
Worker Pool (2~4)
      │
      ▼
Batch Builder (64 chunks)
      │
      ▼
ONNX Runtime 推論
      │
      ▼
Vector DB Batch Insert

擬似コード:

import onnxruntime as ort
from concurrent.futures import ProcessPoolExecutor

session = ort.InferenceSession(
    "embedding.onnx",
    providers=["CPUExecutionProvider"]
)

def embed_batch(texts):

    inputs = tokenizer(
        texts,
        padding=True,
        truncation=True,
        return_tensors="np"
    )

    outputs = session.run(None, dict(inputs))

    return outputs[0]

def process_chunks(chunk_batch):

    embeddings = embed_batch(chunk_batch)

    vector_db.insert(embeddings)

def run_pipeline(chunks):

    batches = list(batch(chunks,64))

    with ProcessPoolExecutor(max_workers=4) as executor:
        executor.map(process_chunks, batches)


仮にAWS 8 vCPU,1 GPU構成ならば、worker数を2、batch_sizeを128、onnxruntime-gpuを利用する形が適切。

7. Vector DBのバッチ書き込み

例: 1000ベクトルを1回のインサートでまとめて書き込み

8. キャッシュ機構

PDF、ページ、Chunkファイルにハッシュを付けて重複Embeddingを防止

詳細は以下の記事を参照:「Embeddingにおけるインクリメンタルベクトル更新戦略」

https://strictfrog.com/ja/2026-03-14-%E5%9F%8B%E3%82%81%E8%BE%BC%E3%81%BF%E3%81%AE%E5%A2%97%E5%88%86%E3%83%99%E3%82%AF%E3%83%88%E3%83%AB%E6%9B%B4%E6%96%B0%E6%88%A6%E7%95%A5/

9. モデルのウォームアップ

Flask起動時に以下を実行:

load_embedding_model()
load_ocr_model()