• 教程 >
  • 结合分布式数据并行和分布式 RPC 框架
快捷方式

结合分布式数据并行和分布式RPC框架

创建于: 2020年7月28日 | 最后更新: 2023年6月6日 | 最后验证: 未验证

作者: Pritam DamaniaYi Wang

注意

editgithub 上查看和编辑本教程。

本教程使用一个简单的示例来演示如何结合 DistributedDataParallel (DDP) 和 分布式RPC框架,以结合分布式数据并行和分布式模型并行来训练一个简单模型。示例源代码可以在 这里 找到。

之前的教程,分布式数据并行入门分布式RPC框架入门,分别描述了如何执行分布式数据并行和分布式模型并行训练。尽管如此,在某些训练范例中,您可能希望结合这两种技术。例如

  1. 如果我们的模型包含稀疏部分(大型嵌入表)和稠密部分(全连接层),我们可能希望将嵌入表放在参数服务器上,并使用 DistributedDataParallel 在多个训练器上复制全连接层。分布式RPC框架 可用于在参数服务器上执行嵌入查找。

  2. PipeDream 论文所述,启用混合并行。我们可以使用 分布式RPC框架 将模型的阶段在多个工作器之间进行流水线化,并使用 DistributedDataParallel 复制每个阶段(如果需要)。


在本教程中,我们将介绍上面提到的情况1。我们的设置中共有4个工作器,如下所示:

  1. 1个主节点,负责在参数服务器上创建一个嵌入表 (nn.EmbeddingBag)。主节点还负责驱动两个训练器上的训练循环。

  2. 1个参数服务器,它主要在内存中保存嵌入表,并响应来自主节点和训练器的RPC请求。

  3. 2个训练器,它们存储一个全连接层 (nn.Linear),该层使用 DistributedDataParallel 在它们之间复制和同步。训练器还负责执行前向传播、反向传播和优化器步进。


整个训练过程执行如下:

  1. 主节点创建一个 RemoteModule,该模块在参数服务器上持有一个嵌入表。

  2. 然后,主节点启动训练器上的训练循环,并将远程模块传递给训练器。

  3. 训练器创建一个 HybridModel,该模型首先使用主节点提供的远程模块执行嵌入查找,然后执行包裹在 DDP 中的全连接层。

  4. 训练器执行模型的前向传播,并使用损失通过 分布式自动微分 执行反向传播。

  5. 作为反向传播的一部分,首先计算全连接层的梯度,并通过 DDP 中的 allreduce 同步到所有训练器。

  6. 接下来,分布式自动微分将梯度传播到参数服务器,并在那里更新嵌入表的梯度。

  7. 最后,使用 分布式优化器 更新所有参数。

注意

如果您结合使用 DDP 和 RPC,应始终使用 分布式自动微分 进行反向传播。

现在,让我们详细介绍每个部分。首先,我们需要设置好所有工作器才能进行训练。我们创建4个进程,其中 rank 0 和 rank 1 是我们的训练器,rank 2 是主节点,rank 3 是参数服务器。

我们在所有4个工作器上使用 TCP init_method 初始化 RPC 框架。RPC 初始化完成后,主节点使用 RemoteModule 在参数服务器上创建一个持有 EmbeddingBag 层的远程模块。然后,主节点遍历每个训练器,并使用 rpc_async 在每个训练器上调用 _run_trainer 启动训练循环。最后,主节点等待所有训练完成后再退出。

训练器首先使用 init_process_group 初始化一个 DDP 的 ProcessGroup,世界大小 world_size=2(对应两个训练器)。接下来,它们使用 TCP init_method 初始化 RPC 框架。请注意,RPC 初始化和 ProcessGroup 初始化中的端口是不同的。这是为了避免两个框架初始化之间的端口冲突。初始化完成后,训练器只需等待来自主节点的 _run_trainer RPC。

参数服务器只初始化 RPC 框架,并等待来自训练器和主节点的 RPC 请求。

def run_worker(rank, world_size):
    r"""
    A wrapper function that initializes RPC, calls the function, and shuts down
    RPC.
    """

    # We need to use different port numbers in TCP init_method for init_rpc and
    # init_process_group to avoid port conflicts.
    rpc_backend_options = TensorPipeRpcBackendOptions()
    rpc_backend_options.init_method = "tcp://localhost:29501"

    # Rank 2 is master, 3 is ps and 0 and 1 are trainers.
    if rank == 2:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        remote_emb_module = RemoteModule(
            "ps",
            torch.nn.EmbeddingBag,
            args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
            kwargs={"mode": "sum"},
        )

        # Run the training loop on trainers.
        futs = []
        for trainer_rank in [0, 1]:
            trainer_name = "trainer{}".format(trainer_rank)
            fut = rpc.rpc_async(
                trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
            )
            futs.append(fut)

        # Wait for all training to finish.
        for fut in futs:
            fut.wait()
    elif rank <= 1:
        # Initialize process group for Distributed DataParallel on trainers.
        dist.init_process_group(
            backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
        )

        # Initialize RPC.
        trainer_name = "trainer{}".format(rank)
        rpc.init_rpc(
            trainer_name,
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )

        # Trainer just waits for RPCs from master.
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=rpc_backend_options,
        )
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__ == "__main__":
    # 2 trainers, 1 parameter server, 1 master.
    world_size = 4
    mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)

在我们讨论训练器的细节之前,先介绍训练器使用的 HybridModel。如下所述,HybridModel 使用参数服务器上持有嵌入表的远程模块 (remote_emb_module) 和用于 DDP 的 device 进行初始化。模型的初始化将一个 nn.Linear 层包裹在 DDP 中,以便在所有训练器之间复制和同步该层。

模型的前向方法非常直观。它使用 RemoteModule 的 forward 在参数服务器上执行嵌入查找,并将其输出传递给全连接层。

class HybridModel(torch.nn.Module):
    r"""
    The model consists of a sparse part and a dense part.
    1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
    2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
    This remote model can get a Remote Reference to the embedding table on the parameter server.
    """

    def __init__(self, remote_emb_module, device):
        super(HybridModel, self).__init__()
        self.remote_emb_module = remote_emb_module
        self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
        self.device = device

    def forward(self, indices, offsets):
        emb_lookup = self.remote_emb_module.forward(indices, offsets)
        return self.fc(emb_lookup.cuda(self.device))

接下来,让我们看看训练器上的设置。训练器首先使用参数服务器上持有嵌入表的远程模块及其自身的 rank 创建上述的 HybridModel

现在,我们需要获取一个 RRefs 列表,指向我们希望使用 DistributedOptimizer 进行优化的所有参数。要从参数服务器检索嵌入表的参数,我们可以调用 RemoteModule 的 remote_parameters,它会遍历嵌入表的所有参数并返回一个 RRefs 列表。训练器通过 RPC 在参数服务器上调用此方法,以接收所需参数的 RRefs 列表。由于 DistributedOptimizer 总是接受一个需要优化的参数的 RRefs 列表,因此即使对于全连接层的本地参数,我们也需要创建 RRefs。这是通过遍历 model.fc.parameters(),为每个参数创建一个 RRef,并将其附加到 remote_parameters() 返回的列表中来完成的。注意,我们不能使用 model.parameters(),因为它会递归调用 model.remote_emb_module.parameters(),而 RemoteModule 不支持此操作。

最后,我们使用所有 RRefs 创建 DistributedOptimizer,并定义一个 CrossEntropyLoss 函数。

def _run_trainer(remote_emb_module, rank):
    r"""
    Each trainer runs a forward pass which involves an embedding lookup on the
    parameter server and running nn.Linear locally. During the backward pass,
    DDP is responsible for aggregating the gradients for the dense part
    (nn.Linear) and distributed autograd ensures gradients updates are
    propagated to the parameter server.
    """

    # Setup the model.
    model = HybridModel(remote_emb_module, rank)

    # Retrieve all model parameters as rrefs for DistributedOptimizer.

    # Retrieve parameters for embedding table.
    model_parameter_rrefs = model.remote_emb_module.remote_parameters()

    # model.fc.parameters() only includes local parameters.
    # NOTE: Cannot call model.parameters() here,
    # because this will call remote_emb_module.parameters(),
    # which supports remote_parameters() but not parameters().
    for param in model.fc.parameters():
        model_parameter_rrefs.append(RRef(param))

    # Setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model_parameter_rrefs,
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

现在我们准备介绍在每个训练器上运行的主训练循环。get_next_batch 只是一个辅助函数,用于生成训练所需的随机输入和目标。我们在多个 epoch 中运行训练循环,对于每个 batch:

  1. 为分布式自动微分设置一个 分布式自动微分上下文 (Distributed Autograd Context)

  2. 运行模型的前向传播并获取其输出。

  3. 根据我们的输出和目标,使用损失函数计算损失。

  4. 使用分布式自动微分根据损失执行分布式反向传播。

  5. 最后,运行一个分布式优化器步进以优化所有参数。

    def get_next_batch(rank):
        for _ in range(10):
            num_indices = random.randint(20, 50)
            indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)

            # Generate offsets.
            offsets = []
            start = 0
            batch_size = 0
            while start < num_indices:
                offsets.append(start)
                start += random.randint(1, 10)
                batch_size += 1

            offsets_tensor = torch.LongTensor(offsets)
            target = torch.LongTensor(batch_size).random_(8).cuda(rank)
            yield indices, offsets_tensor, target

    # Train for 100 epochs
    for epoch in range(100):
        # create distributed autograd context
        for indices, offsets, target in get_next_batch(rank):
            with dist_autograd.context() as context_id:
                output = model(indices, offsets)
                loss = criterion(output, target)

                # Run distributed backward pass
                dist_autograd.backward(context_id, [loss])

                # Tun distributed optimizer
                opt.step(context_id)

                # Not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training done for epoch {}".format(epoch))

完整示例的源代码可以在 这里 找到。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得问题解答

查看资源