• 文档 >
  • 使用 Torchserve 服务大型模型
快捷方式

使用 Torchserve 服务大型模型

本文档解释了 Torchserve 如何支持大型模型服务。这里的大型模型指的是无法放入单个 GPU 的模型,因此需要将它们拆分到多个 GPU 上的多个分区中。本页分为以下几个部分

工作原理?

对于较小模型的 GPU 推理,TorchServe 为每个工作进程执行单个进程,该进程被分配一个 GPU。对于大型模型推理,模型需要拆分到多个 GPU 上。实现这种拆分有不同的模式,通常包括流水线并行 (PP)、张量并行或它们的组合。选择哪种模式以及如何实现拆分取决于所用框架中的实现。TorchServe 允许用户为其模型部署使用任何框架,并尝试通过灵活的配置来满足框架的需求。某些框架需要为每个 GPU 执行一个单独的进程(PiPPy、Deep Speed),而另一些框架则需要分配所有 GPU 的单个进程(vLLM)。如果需要多个进程,TorchServe 会使用 torchrun 来为工作进程设置分布式环境。在设置期间,torchrun 将为分配给工作进程的每个 GPU 启动一个新进程。是否使用 torchrun 取决于参数 parallelType,该参数可以在 model-config.yaml 中设置为以下选项之一

  • pp - 用于流水线并行

  • tp - 用于张量并行

  • pptp - 用于流水线 + 张量并行

  • 自定义

前三个选项使用 torchrun 设置环境,而“自定义”选项将并行化的方式留给用户,并将分配给工作进程的 GPU 分配给单个进程。分配的 GPU 数量由 torchrun 启动的进程数(即通过 nproc-per-node 配置)或参数 parallelLevel 确定。这意味着如果设置了 nproc-per-node,则不应设置参数 parallelLevel,反之亦然。

默认情况下,TorchServe 使用轮询算法将 GPU 分配给主机上的工作进程。对于大型模型推理,分配给每个工作进程的 GPU 会根据 model_config.yaml 中指定的 GPU 数量自动计算。CUDA_VISIBLE_DEVICES 基于此数量设置。

例如,假设一个节点上有八个 GPU,一个工作进程在一个节点上需要 4 个 GPU(即 nproc-per-node=4 或 parallelLevel=4)。在这种情况下,TorchServe 会将 CUDA_VISIBLE_DEVICES=”0,1,2,3” 分配给 worker1,并将 CUDA_VISIBLE_DEVICES=”4,5,6,7” 分配给 worker2。

除了这种默认行为外,TorchServe 还为用户提供了为工作进程指定 GPU 的灵活性。例如,如果用户在 模型配置 YAML 文件 中设置了 “deviceIds: [2,3,4,5]”,并且 nproc-per-node(或 parallelLevel)设置为 2,则 TorchServe 会将 CUDA_VISIBLE_DEVICES=”2,3” 分配给 worker1,并将 CUDA_VISIBLE_DEVICES=”4,5” 分配给 worker2。

以 Pippy 集成为例,下图说明了 TorchServe 大型模型推理的内部结构。有关使用 vLLM 的示例,请参阅 此示例

ts-lmi-internal

PiPPy(用于大型模型推理的 PyTorch 原生解决方案)

PiPPy 为服务于无法放入单个 GPU 的大型模型提供流水线并行性。它将您的模型拆分为相等大小(阶段),并分区到您指定的设备数量上。然后使用微批处理来运行您的批处理输入以进行推理(对于批处理大小 >1,它更优)。

如何在 Torchserve 中使用 PiPPy

要在 Torchserve 中使用 Pippy,我们需要使用继承自 base_pippy_handler 的自定义处理程序,并将我们的设置放在 model-config.yaml 中。

Torchserve 中的客户处理程序只是一个 python 脚本,用于定义特定于您的工作流程的模型加载、预处理、推理和后处理逻辑。

它看起来像下面这样

创建 custom_handler.py 或任何其他描述性名称。

#DO import the necessary packages along with following
from ts.torch_handler.distributed.base_pippy_handler import BasePippyHandler
from ts.handler_utils.distributed.pt_pippy import initialize_rpc_workers, get_pipline_driver
class ModelHandler(BasePippyHandler, ABC):
    def __init__(self):
        super(ModelHandler, self).__init__()
        self.initialized = False

    def initialize(self, ctx):
        model = # load your model from model_dir
        self.device = self.local_rank %  torch.cuda.device_count()# being used to move model inputs to (self.device)
        self.model = get_pipline_driver(model,self.world_size, ctx)

这是您的 model-config.yaml 需要的内容,此配置文件非常灵活,您可以添加与前端、后端和处理程序相关的设置。

#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "pp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
                   # This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
    nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
                      # gpus you wish to split your model
#backend settings
pippy:
    chunks: 1 # This sets the microbatch sizes, microbatch = batch size/ chunks
    input_names: ['input_ids'] # input arg names to the model, this is required for FX tracing
    model_type: "HF" # set the model type to HF if you are using Huggingface model other wise leave it blank or any other model you use.
    rpc_timeout: 1800
    num_worker_threads: 512 #set number of threads for rpc worker init.

handler:
    max_length: 80 # max length of tokens for tokenizer in the handler

如何在处理程序中访问它? 这是一个示例

def initialize(self, ctx):
    model_type = ctx.model_yaml_config["pippy"]["model_type"]

其余部分与 Torchserve 中通常一样,基本上是打包您的模型并启动服务器。

用于打包模型的命令示例,请确保传递 model-config.yaml

torch-model-archiver --model-name bloom --version 1.0 --handler pippy_handler.py --extra-files $MODEL_CHECKPOINTS_PATH -r requirements.txt --config-file model-config.yaml --archive-format tgz

张量并行支持正在进行中,一旦准备就绪就会添加。

DeepSpeed

DeepSpeed-Inference 是 MicroSoft 的一个开源项目。它为服务于无法放入单个 GPU 内存的大型基于 Transformer 的 PyTorch 模型提供模型并行性。

如何在 TorchServe 中使用 DeepSpeed

要在 TorchServe 中使用 DeepSpeed,我们需要使用继承自 base_deepspeed_handler 的自定义处理程序,并将我们的设置放在 model-config.yaml 中。

它看起来像下面这样

创建 custom_handler.py 或任何其他描述性名称。

#DO import the necessary packages along with following
from ts.handler_utils.distributed.deepspeed import get_ds_engine
from ts.torch_handler.distributed.base_deepspeed_handler import BaseDeepSpeedHandler
class ModelHandler(BaseDeepSpeedHandler, ABC):
    def __init__(self):
        super(ModelHandler, self).__init__()
        self.initialized = False

    def initialize(self, ctx):
        model = # load your model from model_dir
        ds_engine = get_ds_engine(self.model, ctx)
        self.model = ds_engine.module
        self.initialized = True

这是您的 model-config.yaml 需要的内容,此配置文件非常灵活,您可以添加与前端、后端和处理程序相关的设置。

#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "tp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
                   # This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
    nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
                      # gpus you wish to split your model
#backend settings
deepspeed:
    config: ds-config.json # DeepSpeed config json filename.
                           # Details:https://deepspeed.org.cn/docs/config-json/
handler:
    max_length: 80 # max length of tokens for tokenizer in the handler

这是一个 ds-config.json 的示例

{
  "dtype": "torch.float16",
  "replace_with_kernel_inject": true,
  "tensor_parallel": {
    "tp_size": 2
  }
}

安装 DeepSpeed

方法 1:requirements.txt

方法 2:通过命令预安装(建议加快模型加载速度)

# See https://deepspeed.org.cn/tutorials/advanced-install/
DS_BUILD_OPS=1 pip install deepspeed

其余部分与 Torchserve 中通常一样,基本上是打包您的模型并启动服务器。

用于打包模型的命令示例,请确保传递 model-config.yaml

# option 1: Using model_dir
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files $MODEL_CHECKPOINTS_PATH,ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format tgz

# option 2: Using HF model_name
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format

DeepSpeed MII

如果使用 此处 显示的受支持模型之一,您可以利用 Deep Speed MII。Deep Speed MII 使用 Deep Speed Inference 以及深度学习的进一步发展来最大限度地减少延迟并最大化吞吐量。它针对特定模型类型、模型大小、批量大小和可用硬件资源执行此操作。

有关如何在受支持模型上利用 Deep Speed MII 的更多信息,请参阅 此处 的信息。您还可以找到有关如何将其应用于 TorchServe 的示例,请参阅 此处

使用 Accelerate 服务大型 Hugging Face 模型

如果使用大型 Hugging Face 模型但资源有限,您可以使用 accelerate 来服务这些模型。为了实现这一点,您需要在 setup_config.json 文件中设置 low_cpu_mem_usage=True 并设置 `device_map=”auto”`。

有关将 accelerate 与大型 Hugging Face 模型结合使用的更多信息,请参阅 此示例

大型模型推理技巧

减少模型加载延迟

为了减少模型延迟,我们建议

调整 模型配置 YAML 文件

您可以通过以下方式调整模型配置 YAML 文件以获得更好的性能

  • 如果高模型推理延迟导致响应超时,请更新 responseTimeout

  • 如果高模型加载延迟导致启动超时,请更新 startupTimeout

  • 调整 torchrun 参数。支持的参数在 此处 定义。例如,默认情况下,OMP_NUMBER_THREADS 为 1。这可以在 YAML 文件中修改。

#frontend settings
torchrun:
    nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
                      # gpus you wish to split your model
    OMP_NUMBER_THREADS: 2

延迟敏感型应用

作业票证

对于延迟敏感型推理用例,建议使用作业票证功能。启用作业票证后,TorchServe 会验证模型的活动工作进程是否可用于处理客户端的请求。如果活动工作进程可用,则请求将被接受并立即处理,而不会产生来自作业队列或动态批处理的等待时间;否则,将向客户端发送 503 响应。

此功能有助于推理延迟可能较高的用例,例如生成模型、自动回归解码器模型(如 chatGPT)。此功能可帮助此类应用程序采取有效措施,例如,根据业务需求,将拒绝的请求路由到不同的服务器,或扩展模型服务器容量。以下是启用作业票证的示例。

minWorkers: 2
maxWorkers: 2
jobQueueSize: 2
useJobTicket: true

在此示例中,模型有 2 个工作进程,作业队列大小为 2。推理请求将立即由 TorchServe 处理,或被拒绝并返回响应代码 503。

通过 HTTP 1.1 分块编码进行流式响应

TorchServe 的推理 API 支持流式响应,允许通过 HTTP 1.1 分块编码发送一系列推理响应。仅当完整响应的推理延迟较高且推理中间结果发送到客户端时,才建议使用此功能。一个示例可能是用于生成应用程序的 LLM,其中生成“n”个令牌可能具有高延迟。在这种情况下,用户可以接收每个生成的令牌,一旦准备就绪,直到完整响应完成。为了实现流式响应,后端处理程序调用 “send_intermediate_predict_response” 向前端发送一个中间结果,并将最后一个结果作为现有样式返回。例如,

from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
    if type(data) is list:
        for i in range (3):
            send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
        return ["hello world "]

客户端接收分块数据。

import test_utils

def test_echo_stream_inference():
    test_utils.start_torchserve(no_config_snapshots=True, gen_mar=False)
    test_utils.register_model('echo_stream',
                              'https://torchserve.pytorch.org/mar_files/echo_stream.mar')

    response = requests.post(TF_INFERENCE_API + '/predictions/echo_stream', data="foo", stream=True)
    assert response.headers['Transfer-Encoding'] == 'chunked'

    prediction = []
    for chunk in (response.iter_content(chunk_size=None)):
        if chunk:
            prediction.append(chunk.decode("utf-8"))

    assert str(" ".join(prediction)) == "hello hello hello hello world "
    test_utils.unregister_model('echo_stream')

GRPC 服务器端流式处理

TorchServe GRPC API 添加了推理 API “StreamPredictions” 的服务器端流式处理,以允许通过同一 GRPC 流发送一系列推理响应。仅当完整响应的推理延迟较高且推理中间结果发送到客户端时,才建议使用此 API。一个示例可能是用于生成应用程序的 LLM,其中生成 “n” 个令牌可能具有高延迟。与 HTTP 1.1 分块编码类似,使用此功能,用户可以接收每个生成的令牌,一旦准备就绪,直到完整响应完成。此 API 自动强制 batchSize 为 1。

service InferenceAPIsService {
    // Check health status of the TorchServe server.
    rpc Ping(google.protobuf.Empty) returns (TorchServeHealthResponse) {}

    // Predictions entry point to get inference using default model version.
    rpc Predictions(PredictionsRequest) returns (PredictionResponse) {}

    // Streaming response for an inference request.
    rpc StreamPredictions(PredictionsRequest) returns (stream PredictionResponse) {}
}

后端处理程序调用 “send_intermediate_predict_response” 向前端发送一个中间结果,并将最后一个结果作为现有样式返回。例如

from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''


def handle(data, context):
    if type(data) is list:
        for i in range (3):
            send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
        return ["hello world "]

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的问题解答

查看资源