使用 Torchserve 提供大型模型服务¶
本文档介绍了 Torchserve 如何支持大型模型服务,这里的大型模型是指无法容纳在一个 GPU 中的模型,因此需要将其拆分为多个分区并分布到多个 GPU 上。此页面分为以下几个部分
工作原理?¶
对于较小模型的 GPU 推理,TorchServe 为每个工作器执行一个单独的进程,该进程分配给单个 GPU。对于大型模型推理,模型需要拆分到多个 GPU 上。有多种模式可以实现这种拆分,通常包括流水线并行 (PP)、张量并行或两者的组合。选择哪种模式以及如何实现拆分取决于所用框架中的实现。TorchServe 允许用户利用任何框架进行模型部署,并尝试通过灵活的配置来满足框架的需求。某些框架需要为每个 GPU 执行一个单独的进程 (PiPPy、Deep Speed),而其他框架则需要一个分配所有 GPU 的单个进程 (vLLM)。如果需要多个进程,TorchServe 将利用 torchrun 为工作器设置分布式环境。在设置过程中,torchrun
将为分配给工作器的每个 GPU 启动一个新进程。是否使用 torchrun 取决于参数 parallelType,它可以在 model-config.yaml
中设置为以下选项之一
pp
- 用于流水线并行tp
- 用于张量并行pptp
- 用于流水线 + 张量并行自定义
前三个选项使用 torchrun 设置环境,而“自定义”选项将并行化方式留给用户,并将分配给工作器的 GPU 分配给单个进程。分配的 GPU 数量由 torchrun 启动的进程数(即通过 nproc-per-node 配置)或参数 parallelLevel 决定。这意味着如果设置了 nproc-per-node,则不应设置参数 parallelLevel,反之亦然。
默认情况下,TorchServe 使用循环算法将 GPU 分配给主机上的工作器。在大型模型推理的情况下,分配给每个工作器的 GPU 将根据 model_config.yaml 中指定的 GPU 数量自动计算。CUDA_VISIBLE_DEVICES 根据此数量设置。
例如,假设节点上有 8 个 GPU,并且一个工作器需要节点上的 4 个 GPU(即,nproc-per-node=4 或 parallelLevel=4)。在这种情况下,TorchServe 将为 worker1 分配 CUDA_VISIBLE_DEVICES="0,1,2,3",为 worker2 分配 CUDA_VISIBLE_DEVICES="4,5,6,7"。
除了这种默认行为之外,TorchServe 还允许用户为工作器指定 GPU。例如,如果用户在 模型配置文件 YAML 文件 中设置“deviceIds: [2,3,4,5]”,并且 nproc-per-node(或 parallelLevel)设置为 2,则 TorchServe 将为 worker1 分配 CUDA_VISIBLE_DEVICES="2,3",为 worker2 分配 CUDA_VISIBLE_DEVICES="4,5"。
以使用 Pippy 集成为例,下图说明了 TorchServe 大型模型推理的内部机制。有关使用 vLLM 的示例,请参阅 此示例。
PiPPy(PyTorch 原生大型模型推理解决方案)¶
PiPPy 为提供大型模型服务提供流水线并行,这些大型模型无法容纳在一个 GPU 中。它将您的模型拆分为大小相等的部分(阶段),并将其分配到您指定的设备数量上。然后使用微批处理运行您的批处理输入以进行推理(对于批次大小 >1 来说,它更有效)。
如何在 Torchserve 中使用 PiPPy¶
要在 Torchserve 中使用 Pippy,我们需要使用从 base_pippy_handler 继承的自定义处理器,并将我们的设置放在 model-config.yaml 中。
Torchserve 中的客户机处理器只是一个 Python 脚本,它定义了特定于您的工作流的模型加载、预处理、推理和后处理逻辑。
它看起来如下所示
创建 custom_handler.py
或任何其他描述性名称。
#DO import the necessary packages along with following
from ts.torch_handler.distributed.base_pippy_handler import BasePippyHandler
from ts.handler_utils.distributed.pt_pippy import initialize_rpc_workers, get_pipline_driver
class ModelHandler(BasePippyHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
self.device = self.local_rank % torch.cuda.device_count()# being used to move model inputs to (self.device)
self.model = get_pipline_driver(model,self.world_size, ctx)
以下是您 model-config.yaml
需要的内容,此配置文件非常灵活,您可以添加与前端、后端和处理器相关的设置。
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "pp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
pippy:
chunks: 1 # This sets the microbatch sizes, microbatch = batch size/ chunks
input_names: ['input_ids'] # input arg names to the model, this is required for FX tracing
model_type: "HF" # set the model type to HF if you are using Huggingface model other wise leave it blank or any other model you use.
rpc_timeout: 1800
num_worker_threads: 512 #set number of threads for rpc worker init.
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
**如何在处理器中访问它?**以下是一个示例
def initialize(self, ctx):
model_type = ctx.model_yaml_config["pippy"]["model_type"]
其余操作与 Torchserve 中的操作相同,基本上是打包您的模型并启动服务器。
打包模型的命令示例,确保传递 model-config.yaml
torch-model-archiver --model-name bloom --version 1.0 --handler pippy_handler.py --extra-files $MODEL_CHECKPOINTS_PATH -r requirements.txt --config-file model-config.yaml --archive-format tgz
张量并行支持正在进行中,将在准备就绪后添加。
DeepSpeed¶
DeepSpeed-Inference 是微软的一个开源项目。它为提供无法容纳在一个 GPU 内存中的大型基于 Transformer 的 PyTorch 模型提供模型并行。
如何在 TorchServe 中使用 DeepSpeed¶
要在 TorchServe 中使用 DeepSpeed,我们需要使用从 base_deepspeed_handler 继承的自定义处理器,并将我们的设置放在 model-config.yaml 中。
它看起来如下所示
创建 custom_handler.py
或任何其他描述性名称。
#DO import the necessary packages along with following
from ts.handler_utils.distributed.deepspeed import get_ds_engine
from ts.torch_handler.distributed.base_deepspeed_handler import BaseDeepSpeedHandler
class ModelHandler(BaseDeepSpeedHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
ds_engine = get_ds_engine(self.model, ctx)
self.model = ds_engine.module
self.initialized = True
以下是您 model-config.yaml
需要的内容,此配置文件非常灵活,您可以添加与前端、后端和处理器相关的设置。
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "tp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
deepspeed:
config: ds-config.json # DeepSpeed config json filename.
# Details:https://www.deepspeed.org.cn/docs/config-json/
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
以下是一个 ds-config.json
示例
{
"dtype": "torch.float16",
"replace_with_kernel_inject": true,
"tensor_parallel": {
"tp_size": 2
}
}
安装 DeepSpeed
方法 1:requirements.txt
方法 2:通过命令预安装(推荐,以加快模型加载速度)
# See https://www.deepspeed.org.cn/tutorials/advanced-install/
DS_BUILD_OPS=1 pip install deepspeed
其余操作与 Torchserve 中的操作相同,基本上是打包您的模型并启动服务器。
打包模型的命令示例,确保传递 model-config.yaml
# option 1: Using model_dir
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files $MODEL_CHECKPOINTS_PATH,ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format tgz
# option 2: Using HF model_name
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format
Deep Speed MII¶
如果您正在使用此处显示的支持模型之一 此处,您可以利用 Deep Speed MII。Deep Speed MII 使用 Deep Speed 推理以及深度学习的进一步改进,以最大限度地减少延迟并最大限度地提高吞吐量。它对特定的模型类型、模型大小、批次大小和可用的硬件资源执行此操作。
有关如何在支持的模型上利用 Deep Speed MII 的更多信息,请参阅此处的信息 此处。您还可以找到如何在 TorchServe 上应用此示例 此处。
使用 Accelerate 提供大型 Hugging Face 模型服务¶
如果您使用大型 Hugging Face 模型但资源有限,可以使用 accelerate 来服务这些模型。为此,您需要设置 low_cpu_mem_usage=True
并在 setup_config.json 文件中设置 `device_map=”auto”。
有关使用 accelerate 与大型 Hugging Face 模型的更多信息,请参阅 此示例。
大型模型推理提示¶
减少模型加载延迟¶
为了减少模型延迟,我们建议
预安装模型并行库,例如容器或主机上的 Deepspeed。
预下载模型检查点。例如,如果使用 HuggingFace,可以通过 Download_model.py 预下载预训练模型。
设置环境变量 HUGGINGFACE_HUB_CACHE 和 TRANSFORMERS_CACHE
通过工具 Download_model.py 将模型下载到 HuggingFace 缓存目录。
调整 模型配置 YAML 文件¶
您可以通过以下方式调整模型配置 YAML 文件以获得更好的性能
如果高模型推理延迟导致响应超时,请更新 responseTimeout。
如果高模型加载延迟导致启动超时,请更新 startupTimeout。
调整 torchrun 参数。支持的参数定义在 此处。例如,默认情况下,
OMP_NUMBER_THREADS
为 1。这可以在 YAML 文件中修改。
#frontend settings
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
OMP_NUMBER_THREADS: 2
延迟敏感型应用程序¶
作业票据¶
对于延迟敏感型推理的使用场景,建议使用作业票据功能。启用作业票据后,TorchServe 会验证模型活动工作程序是否可用于处理客户端的请求。如果活动工作程序可用,则立即接受并处理请求,而不会产生作业队列或动态批处理带来的等待时间;否则,会向客户端发送 503 响应。
此功能有助于解决推理延迟可能很高的情况,例如生成模型、类似 chatGPT 的自回归解码器模型。此功能帮助此类应用程序采取有效措施,例如根据业务需求将被拒绝的请求路由到不同的服务器或扩展模型服务器容量。以下是如何启用作业票据的示例。
minWorkers: 2
maxWorkers: 2
jobQueueSize: 2
useJobTicket: true
在此示例中,模型具有 2 个工作程序,作业队列大小为 2。推理请求将由 TorchServe 立即处理,或以响应代码 503 拒绝。
通过 HTTP 1.1 分块编码进行流式响应¶
TorchServe 的推理 API 支持流式响应,以允许通过 HTTP 1.1 分块编码发送一系列推理响应。仅当完整响应的推理延迟很高且推理中间结果发送到客户端时,才建议使用此功能。一个例子可能是用于生成应用程序的 LLM,其中生成“n”个标记可能具有较高的延迟。在这种情况下,用户可以收到每个生成的标记,直到完整响应完成。为了实现流式响应,后端处理程序调用“send_intermediate_predict_response”将一个中间结果发送到前端,并返回最后一个结果作为现有样式。例如,
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]
客户端接收分块数据。
import test_utils
def test_echo_stream_inference():
test_utils.start_torchserve(no_config_snapshots=True, gen_mar=False)
test_utils.register_model('echo_stream',
'https://torchserve.pytorch.org/mar_files/echo_stream.mar')
response = requests.post(TF_INFERENCE_API + '/predictions/echo_stream', data="foo", stream=True)
assert response.headers['Transfer-Encoding'] == 'chunked'
prediction = []
for chunk in (response.iter_content(chunk_size=None)):
if chunk:
prediction.append(chunk.decode("utf-8"))
assert str(" ".join(prediction)) == "hello hello hello hello world "
test_utils.unregister_model('echo_stream')
GRPC 服务器端流式传输¶
TorchServe GRPC API 添加了推理 API“StreamPredictions”的服务器端流式传输,以允许通过相同的 GRPC 流发送一系列推理响应。仅当完整响应的推理延迟很高且推理中间结果发送到客户端时,才建议使用此 API。一个例子可能是用于生成应用程序的 LLM,其中生成“n”个标记可能具有较高的延迟。与 HTTP 1.1 分块编码类似,使用此功能,用户可以收到每个生成的标记,直到完整响应完成。此 API 自动将 batchSize 强制设置为 1。
service InferenceAPIsService {
// Check health status of the TorchServe server.
rpc Ping(google.protobuf.Empty) returns (TorchServeHealthResponse) {}
// Predictions entry point to get inference using default model version.
rpc Predictions(PredictionsRequest) returns (PredictionResponse) {}
// Streaming response for an inference request.
rpc StreamPredictions(PredictionsRequest) returns (stream PredictionResponse) {}
}
后端处理程序调用“send_intermediate_predict_response”将一个中间结果发送到前端,并返回最后一个结果作为现有样式。例如
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]