• 教程 >
  • 使用 Cpp 扩展自定义进程组后端
快捷方式

使用 Cpp 扩展自定义进程组后端

作者: Howard HuangFeng TianShen LiMin Si

注意

editgithub 上查看和编辑本教程。

先决条件

本教程演示了如何实现自定义 Backend 并使用 cpp 扩展 将其插入到 PyTorch 分布式包 中。当您需要为您的硬件使用专门的软件堆栈,或者您想尝试新的集体通信算法时,这将非常有用。

基础知识

PyTorch 集体通信为多种广泛采用的分布式训练功能提供支持,包括DistributedDataParallelZeroRedundancyOptimizerFullyShardedDataParallel。为了使相同的集体通信 API 能够与不同的通信后端协同工作,分布式包将集体通信操作抽象成一个Backend 类。然后,可以使用首选的第三方库将不同的后端实现为Backend 的子类。PyTorch 分布式附带三个默认后端,ProcessGroupNCCLProcessGroupGlooProcessGroupMPI。但是,除了这三个后端之外,还有其他通信库(例如,UCCOneCCL),不同类型的硬件(例如,TPUTrainum)以及新兴的通信算法(例如,HerringReduction Server)。因此,分布式包公开了扩展 API,以允许自定义集体通信后端。

以下 4 个步骤展示了如何实现一个虚拟的 Backend 后端并在 Python 应用程序代码中使用它。请注意,本教程重点演示扩展 API,而不是开发功能完善的通信后端。因此,dummy 后端仅涵盖了 API 的一部分(all_reduceall_gather),并且只是将张量的值设置为 0。

步骤 1:实现 Backend 的子类

第一步是实现一个 Backend 子类,该子类覆盖目标集体通信 API 并运行自定义通信算法。扩展还需要实现一个 Work 子类,它充当通信结果的未来,并允许在应用程序代码中异步执行。如果扩展使用第三方库,它可以包含头文件并从 BackendDummy 子类调用库 API。以下两个代码片段展示了 dummy.hdummy.cpp 的实现。有关完整实现,请参阅 dummy collectives 存储库。

// file name: dummy.hpp
#include <torch/python.h>

#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>

#include <pybind11/chrono.h>

namespace c10d {

class BackendDummy : public Backend {
  public:
    BackendDummy(int rank, int size);

    c10::intrusive_ptr<Work> allgather(
        std::vector<std::vector<at::Tensor>>& outputTensors,
        std::vector<at::Tensor>& inputTensors,
        const AllgatherOptions& opts = AllgatherOptions()) override;

    c10::intrusive_ptr<Work> allreduce(
        std::vector<at::Tensor>& tensors,
        const AllreduceOptions& opts = AllreduceOptions()) override;

    // The collective communication APIs without a custom implementation
    // will error out if invoked by application code.
};

class WorkDummy : public Work {
  public:
    WorkDummy(
      OpType opType,
      c10::intrusive_ptr<c10::ivalue::Future> future) // future of the output
      : Work(
          -1, // rank, only used by recvAnySource, irrelevant in this demo
          opType),
      future_(std::move(future)) {}
    bool isCompleted() override;
    bool isSuccess() const override;
    bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
    virtual c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;

  private:
    c10::intrusive_ptr<c10::ivalue::Future> future_;
};
} // namespace c10d
// file name: dummy.cpp
#include "dummy.hpp"

namespace c10d {

// This is a dummy allgather that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allgather(
        std::vector<std::vector<at::Tensor>>& outputTensors,
        std::vector<at::Tensor>& inputTensors,
        const AllgatherOptions& /* unused */) {
    for (auto& outputTensorVec : outputTensors) {
        for (auto& outputTensor : outputTensorVec) {
            outputTensor.zero_();
        }
    }

    auto future = c10::make_intrusive<c10::ivalue::Future>(
        c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
    future->markCompleted(c10::IValue(outputTensors));
    return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}

// This is a dummy allreduce that sets all output tensors to zero
// Modify the implementation to conduct real communication asynchronously
c10::intrusive_ptr<Work> BackendDummy::allreduce(
        std::vector<at::Tensor>& tensors,
        const AllreduceOptions& opts) {
    for (auto& tensor : tensors) {
        tensor.zero_();
    }

    auto future = c10::make_intrusive<c10::ivalue::Future>(
        c10::ListType::create(c10::TensorType::get()));
    future->markCompleted(c10::IValue(tensors));
    return c10::make_intrusive<WorkDummy>(OpType::ALLGATHER, std::move(future));
}
} // namespace c10d

步骤 2:公开扩展 Python API

后端构造函数是从Python 端调用的,因此扩展还需要将构造函数 API 公开给 Python。这可以通过添加以下方法来完成。在此示例中,storetimeoutBackendDummy 实例化方法忽略,因为这些方法在此虚拟实现中未使用。但是,实际的扩展应该考虑使用 store 执行会合并支持 timeout 参数。

// file name: dummy.hpp
class BackendDummy : public Backend {
    ...
    <Step 1 code>
    ...

    static c10::intrusive_ptr<Backend> createBackendDummy(
        const c10::intrusive_ptr<::c10d::Store>& store,
        int rank,
        int size,
        const std::chrono::duration<float>& timeout);

    static void BackendDummyConstructor() __attribute__((constructor)) {
        py::object module = py::module::import("torch.distributed");
        py::object register_backend =
            module.attr("Backend").attr("register_backend");
        // torch.distributed.Backend.register_backend will add `dummy` as a
        // new valid backend.
        register_backend("dummy", py::cpp_function(createBackendDummy));
    }
}
// file name: dummy.cpp
c10::intrusive_ptr<Backend> BackendDummy::createBackendDummy(
        const c10::intrusive_ptr<::c10d::Store>& /* unused */,
        int rank,
        int size,
        const std::chrono::duration<float>& /* unused */) {
    return c10::make_intrusive<BackendDummy>(rank, size);
}

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
    m.def("createBackendDummy", &BackendDummy::createBackendDummy);
}

步骤 3:构建自定义扩展

现在,扩展源代码文件已准备就绪。然后,我们可以使用cpp 扩展 来构建它。为此,创建一个 setup.py 文件来准备路径和命令。然后调用 python setup.py develop 来安装扩展。

如果扩展依赖于第三方库,您还可以将 libraries_dirslibraries 指定给 cpp 扩展 API。请参阅 torch ucc 项目作为实际示例。

# file name: setup.py
import os
import sys
import torch
from setuptools import setup
from torch.utils import cpp_extension

sources = ["src/dummy.cpp"]
include_dirs = [f"{os.path.dirname(os.path.abspath(__file__))}/include/"]

if torch.cuda.is_available():
    module = cpp_extension.CUDAExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )
else:
    module = cpp_extension.CppExtension(
        name = "dummy_collectives",
        sources = sources,
        include_dirs = include_dirs,
    )

setup(
    name = "Dummy-Collectives",
    version = "0.0.1",
    ext_modules = [module],
    cmdclass={'build_ext': cpp_extension.BuildExtension}
)

步骤 4:在应用程序中使用扩展

安装后,您可以在调用 init_process_group 时方便地使用 dummy 后端,就像它是一个内置后端一样。

我们可以通过更改 init_process_groupbackend 参数来指定基于后端的调度。我们可以将使用 CPU 张量的集体通信调度到 gloo 后端,并将使用 CUDA 张量的集体通信调度到 dummy 后端,方法是将 cpu:gloo,cuda:dummy 指定为后端参数。

要将所有张量发送到 dummy 后端,我们可以简单地将 dummy 指定为后端参数。

import os

import torch
# importing dummy_collectives makes torch.distributed recognize `dummy`
# as a valid backend.
import dummy_collectives

import torch.distributed as dist

os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'

# Alternatively:
# dist.init_process_group("dummy", rank=0, world_size=1)
dist.init_process_group("cpu:gloo,cuda:dummy", rank=0, world_size=1)

# this goes through gloo
x = torch.ones(6)
dist.all_reduce(x)
print(f"cpu allreduce: {x}")

# this goes through dummy
if torch.cuda.is_available():
    y = x.cuda()
    dist.all_reduce(y)
    print(f"cuda allreduce: {y}")

    try:
        dist.broadcast(y, 0)
    except RuntimeError:
        print("got RuntimeError when calling broadcast")

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源