如何为本地模型构建高并发模块?

AI Infra Layer

Posted by LuochuanAD on April 11, 2026 本文总阅读量

背景

在私有化大模型系统中,我已经将Embedding模型,Reranker模型,LLM部署本地, 还要实现并发支持.这里我单独构建了一个高并发的模块.

选择

队列 + 批处理

架构

请求1 \
请求2  \
请求3   →  队列 → 批处理 → 一次 model.encode()
请求4  /
请求5 /

项目结构 (高并发批量处理请求模块)

├── inference_engine/          高并发批量处理请求模块
│   ├── queue_manager.py       # 队列管理
│   ├── batch_scheduler.py     # 批处理调度
│   ├── worker.py              # worker执行器
│   ├── task.py                # 请求封装
│   └── config.py              # 配置(batch size等)

详解

task.py

统一抽象:Task

class InferenceTask:
    def __init__(self, data, future):
        self.data = data
        self.future = future "future": ...
}

queue_manager.py

队列管理

import asyncio

class QueueManager:
    def __init__(self):
        self.queue = asyncio.Queue()

    async def put(self, task):
        await self.queue.put(task)

    async def get_batch(self, max_batch_size, timeout=0.01):
        .....
        return batch

batch_scheduler.py

批处理调度

import asyncio

class BatchScheduler:
    def __init__(self, queue_manager, process_fn, batch_size=32):
        self.queue_manager = queue_manager
        self.process_fn = process_fn
        self.batch_size = batch_size

    async def start(self):
        while True:
            batch = await self.queue_manager.get_batch(self.batch_size)

            if not batch:
                continue

            inputs = [task.data for task in batch]

            # 👉 核心:统一处理
            results = self.process_fn(inputs)

            for task, result in zip(batch, results):
                task.future.set_result(result)

worker.py

worker执行器

import asyncio

class InferenceWorker:
    def __init__(self, scheduler):
        self.scheduler = scheduler

    def start(self):
        asyncio.create_task(self.scheduler.start())

config.py

配置

BATCH_SIZE = 32
TIMEOUT = 0.01

总结

在本地化部署模型中, 不管是Embedding模型,reranker模型,本地LLM模型. 项目的结构都是一样的.所以这里高并发批量处理请求模块单独抽离出来, 这样每个本地化微服务都可以统一接入inference_engine模块.

未来

下一步是资源调度策略, 我会在后续的文章中讲解