背景
在私有化大模型系统中,我已经将Embedding模型,Reranker模型,LLM部署本地, 还要实现并发支持.这里我单独构建了一个高并发的模块.
选择
队列 + 批处理
架构
请求1 \
请求2 \
请求3 → 队列 → 批处理 → 一次 model.encode()
请求4 /
请求5 /
项目结构 (高并发批量处理请求模块)
├── inference_engine/ 高并发批量处理请求模块
│ ├── queue_manager.py # 队列管理
│ ├── batch_scheduler.py # 批处理调度
│ ├── worker.py # worker执行器
│ ├── task.py # 请求封装
│ └── config.py # 配置(batch size等)
详解
task.py
统一抽象:Task
class InferenceTask:
def __init__(self, data, future):
self.data = data
self.future = future "future": ...
}
queue_manager.py
队列管理
import asyncio
class QueueManager:
def __init__(self):
self.queue = asyncio.Queue()
async def put(self, task):
await self.queue.put(task)
async def get_batch(self, max_batch_size, timeout=0.01):
.....
return batch
batch_scheduler.py
批处理调度
import asyncio
class BatchScheduler:
def __init__(self, queue_manager, process_fn, batch_size=32):
self.queue_manager = queue_manager
self.process_fn = process_fn
self.batch_size = batch_size
async def start(self):
while True:
batch = await self.queue_manager.get_batch(self.batch_size)
if not batch:
continue
inputs = [task.data for task in batch]
# 👉 核心:统一处理
results = self.process_fn(inputs)
for task, result in zip(batch, results):
task.future.set_result(result)
worker.py
worker执行器
import asyncio
class InferenceWorker:
def __init__(self, scheduler):
self.scheduler = scheduler
def start(self):
asyncio.create_task(self.scheduler.start())
config.py
配置
BATCH_SIZE = 32
TIMEOUT = 0.01
总结
在本地化部署模型中, 不管是Embedding模型,reranker模型,本地LLM模型. 项目的结构都是一样的.所以这里高并发批量处理请求模块单独抽离出来, 这样每个本地化微服务都可以统一接入inference_engine模块.
未来
下一步是资源调度策略, 我会在后续的文章中讲解