背景
プライベートモデルシステムでEmbeddingモデル、Rerankerモデル、LLMをローカルにデプロイ済みで、さらに並行処理対応を実現したい。ここでは高並行モジュールを単独で構築した。
選択
キュー + バッチ処理
アーキテクチャ
リクエスト1 \
リクエスト2 \
リクエスト3 → キュー → バッチ処理 → 一度に model.encode()
リクエスト4 /
リクエスト5 /
プロジェクト構成(高並行バッチリクエスト処理モジュール)
├── inference_engine/ 高並行バッチリクエスト処理モジュール
│ ├── queue_manager.py # キュー管理
│ ├── batch_scheduler.py # バッチスケジューラ
│ ├── worker.py # ワーカー実行者
│ ├── task.py # リクエストラッパー
│ └── config.py # 設定(バッチサイズ等)
詳細
task.py
統一抽象:Task
class InferenceTask:
def __init__(self, data, future):
self.data = data
self.future = future # "future": ...
}
queue_manager.py
キュー管理
import asyncio
class QueueManager:
def __init__(self):
self.queue = asyncio.Queue()
async def put(self, task):
await self.queue.put(task)
async def get_batch(self, max_batch_size, timeout=0.01):
.....
return batch
batch_scheduler.py
バッチスケジューラ
import asyncio
class BatchScheduler:
def __init__(self, queue_manager, process_fn, batch_size=32):
self.queue_manager = queue_manager
self.process_fn = process_fn
self.batch_size = batch_size
async def start(self):
while True:
batch = await self.queue_manager.get_batch(self.batch_size)
if not batch:
continue
inputs = [task.data for task in batch]
# 👉 コア:一括処理
results = self.process_fn(inputs)
for task, result in zip(batch, results):
task.future.set_result(result)
worker.py
ワーカー実行者
import asyncio
class InferenceWorker:
def __init__(self, scheduler):
self.scheduler = scheduler
def start(self):
asyncio.create_task(self.scheduler.start())
config.py
設定
BATCH_SIZE = 32
TIMEOUT = 0.01
まとめ
ローカルデプロイされたモデル(Embeddingモデル、Rerankerモデル、ローカルLLMモデル)に関わらずプロジェクト構成は共通している。
そのため高並行バッチ処理モジュールは単独で抽出し、各ローカルマイクロサービスが統一的にinference_engineモジュールを利用可能にした。
今後
次のステップはリソーススケジューリング戦略で、こちらは後続の記事で解説予定。