ローカルモデル向けの高並行性モジュールを構築

AIインフラ層

Posted by LuochuanAD on April 11, 2026 本文总阅读量

背景

プライベートモデルシステムでEmbeddingモデル、Rerankerモデル、LLMをローカルにデプロイ済みで、さらに並行処理対応を実現したい。ここでは高並行モジュールを単独で構築した。

選択

キュー + バッチ処理

アーキテクチャ

リクエスト1 \
リクエスト2  \
リクエスト3   →  キュー → バッチ処理 → 一度に model.encode()
リクエスト4  /
リクエスト5 /

プロジェクト構成(高並行バッチリクエスト処理モジュール)

├── inference_engine/          高並行バッチリクエスト処理モジュール
│   ├── queue_manager.py       # キュー管理
│   ├── batch_scheduler.py     # バッチスケジューラ
│   ├── worker.py              # ワーカー実行者
│   ├── task.py                # リクエストラッパー
│   └── config.py              # 設定(バッチサイズ等)

詳細

task.py

統一抽象:Task

class InferenceTask:
    def __init__(self, data, future):
        self.data = data
        self.future = future  # "future": ...
}

queue_manager.py

キュー管理

import asyncio

class QueueManager:
    def __init__(self):
        self.queue = asyncio.Queue()

    async def put(self, task):
        await self.queue.put(task)

    async def get_batch(self, max_batch_size, timeout=0.01):
        .....
        return batch

batch_scheduler.py

バッチスケジューラ

import asyncio

class BatchScheduler:
    def __init__(self, queue_manager, process_fn, batch_size=32):
        self.queue_manager = queue_manager
        self.process_fn = process_fn
        self.batch_size = batch_size

    async def start(self):
        while True:
            batch = await self.queue_manager.get_batch(self.batch_size)

            if not batch:
                continue

            inputs = [task.data for task in batch]

            # 👉 コア:一括処理
            results = self.process_fn(inputs)

            for task, result in zip(batch, results):
                task.future.set_result(result)

worker.py

ワーカー実行者

import asyncio

class InferenceWorker:
    def __init__(self, scheduler):
        self.scheduler = scheduler

    def start(self):
        asyncio.create_task(self.scheduler.start())

config.py

設定

BATCH_SIZE = 32
TIMEOUT = 0.01

まとめ

ローカルデプロイされたモデル(Embeddingモデル、Rerankerモデル、ローカルLLMモデル)に関わらずプロジェクト構成は共通している。
そのため高並行バッチ処理モジュールは単独で抽出し、各ローカルマイクロサービスが統一的にinference_engineモジュールを利用可能にした。

今後

次のステップはリソーススケジューリング戦略で、こちらは後続の記事で解説予定。