使用 Torchserve 服务大型模型¶
本文档解释了 Torchserve 如何支持大型模型服务。这里的大型模型指的是无法放入单个 GPU 的模型,因此需要将它们拆分到多个 GPU 上的多个分区中。本页分为以下几个部分
工作原理?¶
对于较小模型的 GPU 推理,TorchServe 为每个工作进程执行单个进程,该进程被分配一个 GPU。对于大型模型推理,模型需要拆分到多个 GPU 上。实现这种拆分有不同的模式,通常包括流水线并行 (PP)、张量并行或它们的组合。选择哪种模式以及如何实现拆分取决于所用框架中的实现。TorchServe 允许用户为其模型部署使用任何框架,并尝试通过灵活的配置来满足框架的需求。某些框架需要为每个 GPU 执行一个单独的进程(PiPPy、Deep Speed),而另一些框架则需要分配所有 GPU 的单个进程(vLLM)。如果需要多个进程,TorchServe 会使用 torchrun 来为工作进程设置分布式环境。在设置期间,torchrun
将为分配给工作进程的每个 GPU 启动一个新进程。是否使用 torchrun 取决于参数 parallelType,该参数可以在 model-config.yaml
中设置为以下选项之一
pp
- 用于流水线并行tp
- 用于张量并行pptp
- 用于流水线 + 张量并行自定义
前三个选项使用 torchrun 设置环境,而“自定义”选项将并行化的方式留给用户,并将分配给工作进程的 GPU 分配给单个进程。分配的 GPU 数量由 torchrun 启动的进程数(即通过 nproc-per-node 配置)或参数 parallelLevel 确定。这意味着如果设置了 nproc-per-node,则不应设置参数 parallelLevel,反之亦然。
默认情况下,TorchServe 使用轮询算法将 GPU 分配给主机上的工作进程。对于大型模型推理,分配给每个工作进程的 GPU 会根据 model_config.yaml 中指定的 GPU 数量自动计算。CUDA_VISIBLE_DEVICES 基于此数量设置。
例如,假设一个节点上有八个 GPU,一个工作进程在一个节点上需要 4 个 GPU(即 nproc-per-node=4 或 parallelLevel=4)。在这种情况下,TorchServe 会将 CUDA_VISIBLE_DEVICES=”0,1,2,3” 分配给 worker1,并将 CUDA_VISIBLE_DEVICES=”4,5,6,7” 分配给 worker2。
除了这种默认行为外,TorchServe 还为用户提供了为工作进程指定 GPU 的灵活性。例如,如果用户在 模型配置 YAML 文件 中设置了 “deviceIds: [2,3,4,5]”,并且 nproc-per-node(或 parallelLevel)设置为 2,则 TorchServe 会将 CUDA_VISIBLE_DEVICES=”2,3” 分配给 worker1,并将 CUDA_VISIBLE_DEVICES=”4,5” 分配给 worker2。
以 Pippy 集成为例,下图说明了 TorchServe 大型模型推理的内部结构。有关使用 vLLM 的示例,请参阅 此示例。
PiPPy(用于大型模型推理的 PyTorch 原生解决方案)¶
PiPPy 为服务于无法放入单个 GPU 的大型模型提供流水线并行性。它将您的模型拆分为相等大小(阶段),并分区到您指定的设备数量上。然后使用微批处理来运行您的批处理输入以进行推理(对于批处理大小 >1,它更优)。
如何在 Torchserve 中使用 PiPPy¶
要在 Torchserve 中使用 Pippy,我们需要使用继承自 base_pippy_handler 的自定义处理程序,并将我们的设置放在 model-config.yaml 中。
Torchserve 中的客户处理程序只是一个 python 脚本,用于定义特定于您的工作流程的模型加载、预处理、推理和后处理逻辑。
它看起来像下面这样
创建 custom_handler.py
或任何其他描述性名称。
#DO import the necessary packages along with following
from ts.torch_handler.distributed.base_pippy_handler import BasePippyHandler
from ts.handler_utils.distributed.pt_pippy import initialize_rpc_workers, get_pipline_driver
class ModelHandler(BasePippyHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
self.device = self.local_rank % torch.cuda.device_count()# being used to move model inputs to (self.device)
self.model = get_pipline_driver(model,self.world_size, ctx)
这是您的 model-config.yaml
需要的内容,此配置文件非常灵活,您可以添加与前端、后端和处理程序相关的设置。
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "pp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
pippy:
chunks: 1 # This sets the microbatch sizes, microbatch = batch size/ chunks
input_names: ['input_ids'] # input arg names to the model, this is required for FX tracing
model_type: "HF" # set the model type to HF if you are using Huggingface model other wise leave it blank or any other model you use.
rpc_timeout: 1800
num_worker_threads: 512 #set number of threads for rpc worker init.
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
如何在处理程序中访问它? 这是一个示例
def initialize(self, ctx):
model_type = ctx.model_yaml_config["pippy"]["model_type"]
其余部分与 Torchserve 中通常一样,基本上是打包您的模型并启动服务器。
用于打包模型的命令示例,请确保传递 model-config.yaml
torch-model-archiver --model-name bloom --version 1.0 --handler pippy_handler.py --extra-files $MODEL_CHECKPOINTS_PATH -r requirements.txt --config-file model-config.yaml --archive-format tgz
张量并行支持正在进行中,一旦准备就绪就会添加。
DeepSpeed¶
DeepSpeed-Inference 是 MicroSoft 的一个开源项目。它为服务于无法放入单个 GPU 内存的大型基于 Transformer 的 PyTorch 模型提供模型并行性。
如何在 TorchServe 中使用 DeepSpeed¶
要在 TorchServe 中使用 DeepSpeed,我们需要使用继承自 base_deepspeed_handler 的自定义处理程序,并将我们的设置放在 model-config.yaml 中。
它看起来像下面这样
创建 custom_handler.py
或任何其他描述性名称。
#DO import the necessary packages along with following
from ts.handler_utils.distributed.deepspeed import get_ds_engine
from ts.torch_handler.distributed.base_deepspeed_handler import BaseDeepSpeedHandler
class ModelHandler(BaseDeepSpeedHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
ds_engine = get_ds_engine(self.model, ctx)
self.model = ds_engine.module
self.initialized = True
这是您的 model-config.yaml
需要的内容,此配置文件非常灵活,您可以添加与前端、后端和处理程序相关的设置。
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "tp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
deepspeed:
config: ds-config.json # DeepSpeed config json filename.
# Details:https://deepspeed.org.cn/docs/config-json/
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
这是一个 ds-config.json
的示例
{
"dtype": "torch.float16",
"replace_with_kernel_inject": true,
"tensor_parallel": {
"tp_size": 2
}
}
安装 DeepSpeed
方法 1:requirements.txt
方法 2:通过命令预安装(建议加快模型加载速度)
# See https://deepspeed.org.cn/tutorials/advanced-install/
DS_BUILD_OPS=1 pip install deepspeed
其余部分与 Torchserve 中通常一样,基本上是打包您的模型并启动服务器。
用于打包模型的命令示例,请确保传递 model-config.yaml
# option 1: Using model_dir
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files $MODEL_CHECKPOINTS_PATH,ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format tgz
# option 2: Using HF model_name
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format
DeepSpeed MII¶
如果使用 此处 显示的受支持模型之一,您可以利用 Deep Speed MII。Deep Speed MII 使用 Deep Speed Inference 以及深度学习的进一步发展来最大限度地减少延迟并最大化吞吐量。它针对特定模型类型、模型大小、批量大小和可用硬件资源执行此操作。
有关如何在受支持模型上利用 Deep Speed MII 的更多信息,请参阅 此处 的信息。您还可以找到有关如何将其应用于 TorchServe 的示例,请参阅 此处。
使用 Accelerate 服务大型 Hugging Face 模型¶
如果使用大型 Hugging Face 模型但资源有限,您可以使用 accelerate 来服务这些模型。为了实现这一点,您需要在 setup_config.json 文件中设置 low_cpu_mem_usage=True
并设置 `device_map=”auto”`。
有关将 accelerate 与大型 Hugging Face 模型结合使用的更多信息,请参阅 此示例。
大型模型推理技巧¶
减少模型加载延迟¶
为了减少模型延迟,我们建议
在容器或主机上预安装模型并行库,例如 Deepspeed。
预先下载模型检查点。例如,如果使用 HuggingFace,可以通过 Download_model.py 预先下载预训练模型
设置环境变量 HUGGINGFACE_HUB_CACHE 和 TRANSFORMERS_CACHE
通过工具 Download_model.py 将模型下载到 HuggingFace 缓存目录
调整 模型配置 YAML 文件¶
您可以通过以下方式调整模型配置 YAML 文件以获得更好的性能
如果高模型推理延迟导致响应超时,请更新 responseTimeout。
如果高模型加载延迟导致启动超时,请更新 startupTimeout。
调整 torchrun 参数。支持的参数在 此处 定义。例如,默认情况下,
OMP_NUMBER_THREADS
为 1。这可以在 YAML 文件中修改。
#frontend settings
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
OMP_NUMBER_THREADS: 2
延迟敏感型应用¶
作业票证¶
对于延迟敏感型推理用例,建议使用作业票证功能。启用作业票证后,TorchServe 会验证模型的活动工作进程是否可用于处理客户端的请求。如果活动工作进程可用,则请求将被接受并立即处理,而不会产生来自作业队列或动态批处理的等待时间;否则,将向客户端发送 503 响应。
此功能有助于推理延迟可能较高的用例,例如生成模型、自动回归解码器模型(如 chatGPT)。此功能可帮助此类应用程序采取有效措施,例如,根据业务需求,将拒绝的请求路由到不同的服务器,或扩展模型服务器容量。以下是启用作业票证的示例。
minWorkers: 2
maxWorkers: 2
jobQueueSize: 2
useJobTicket: true
在此示例中,模型有 2 个工作进程,作业队列大小为 2。推理请求将立即由 TorchServe 处理,或被拒绝并返回响应代码 503。
通过 HTTP 1.1 分块编码进行流式响应¶
TorchServe 的推理 API 支持流式响应,允许通过 HTTP 1.1 分块编码发送一系列推理响应。仅当完整响应的推理延迟较高且推理中间结果发送到客户端时,才建议使用此功能。一个示例可能是用于生成应用程序的 LLM,其中生成“n”个令牌可能具有高延迟。在这种情况下,用户可以接收每个生成的令牌,一旦准备就绪,直到完整响应完成。为了实现流式响应,后端处理程序调用 “send_intermediate_predict_response” 向前端发送一个中间结果,并将最后一个结果作为现有样式返回。例如,
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]
客户端接收分块数据。
import test_utils
def test_echo_stream_inference():
test_utils.start_torchserve(no_config_snapshots=True, gen_mar=False)
test_utils.register_model('echo_stream',
'https://torchserve.pytorch.org/mar_files/echo_stream.mar')
response = requests.post(TF_INFERENCE_API + '/predictions/echo_stream', data="foo", stream=True)
assert response.headers['Transfer-Encoding'] == 'chunked'
prediction = []
for chunk in (response.iter_content(chunk_size=None)):
if chunk:
prediction.append(chunk.decode("utf-8"))
assert str(" ".join(prediction)) == "hello hello hello hello world "
test_utils.unregister_model('echo_stream')
GRPC 服务器端流式处理¶
TorchServe GRPC API 添加了推理 API “StreamPredictions” 的服务器端流式处理,以允许通过同一 GRPC 流发送一系列推理响应。仅当完整响应的推理延迟较高且推理中间结果发送到客户端时,才建议使用此 API。一个示例可能是用于生成应用程序的 LLM,其中生成 “n” 个令牌可能具有高延迟。与 HTTP 1.1 分块编码类似,使用此功能,用户可以接收每个生成的令牌,一旦准备就绪,直到完整响应完成。此 API 自动强制 batchSize 为 1。
service InferenceAPIsService {
// Check health status of the TorchServe server.
rpc Ping(google.protobuf.Empty) returns (TorchServeHealthResponse) {}
// Predictions entry point to get inference using default model version.
rpc Predictions(PredictionsRequest) returns (PredictionResponse) {}
// Streaming response for an inference request.
rpc StreamPredictions(PredictionsRequest) returns (stream PredictionResponse) {}
}
后端处理程序调用 “send_intermediate_predict_response” 向前端发送一个中间结果,并将最后一个结果作为现有样式返回。例如
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]