Kubeflow Trainer Logo

我们非常激动地宣布,Kubeflow Trainer 项目已正式集成到 PyTorch 生态系统中!此次集成确保了 Kubeflow Trainer 与 PyTorch 的标准和实践保持一致,为开发人员提供了一种可靠、可扩展且由社区支持的解决方案,以便在 Kubernetes 上运行 PyTorch。

要查看 PyTorch 生态系统,请访问 PyTorch Landscape。了解更多关于项目如何加入 PyTorch 生态系统的信息。 

关于 Kubeflow Trainer

Kubeflow Trainer 是一个原生的 Kubernetes 项目,能够实现 AI 模型的可扩展分布式训练,并专为大型语言模型 (LLM) 的微调而构建。它简化了跨多节点的训练负载扩展,能够高效管理大规模数据集并确保容错能力。

Kubeflow Trainer

其核心功能包括:

  • 简化 Kubernetes 的复杂性:Kubeflow Trainer API 专为两类主要用户设计——AI 从业者(使用 Kubeflow Python SDK 和 TrainJob API 开发 AI 模型的机器学习工程师和数据科学家)以及平台管理员(负责管理 Kubernetes 集群和 Kubeflow Trainer 运行时 API 的管理员和 DevOps 工程师)。AI 从业者可以专注于 PyTorch 的应用代码,无需担心基础设施细节。同时,平台管理员可以灵活地调度工作负载资源,以实现最大化的集群利用率和成本效率。为支持这些角色,Kubeflow Trainer 指定了专用的 Kubernetes 自定义资源定义 (CRD),从而简化了模型训练和基础设施管理。

Kubeflow Trainer user personas

  • Python SDK:专为 AI 从业者设计的 Python 接口,抽象了直接与 Kubernetes API 交互的复杂细节。它使用户能够专注于开发 PyTorch 模型,而无需担心 Kubernetes YAML 配置。
  • LLM 在 Kubernetes 上进行微调的蓝图:凭借内置的训练器,Kubeflow Trainer 使 AI 从业者能够通过所需的配置(如数据集、LoRA 参数、学习率等)无缝微调他们喜爱的 LLM。在首个版本中,它实现了支持多种微调策略的方案,包括监督微调 (SFT)、知识蒸馏、DPO、PPO、GRPO 和量化感知训练。社区正致力于利用 LLaMA-FactoryUnslothHuggingFace TRL 添加更多内置训练器,以实现高效的 LLM 微调。
  • 优化的 GPU 利用率:Kubeflow Trainer 通过利用由 Apache Arrow 和 Apache DataFusion 提供支持的内存分布式数据缓存,将大规模数据直接流式传输到分布式 GPU,从而最大限度地提高 GPU 效率。
  • 高级调度能力:Kubeflow Trainer 通过 PodGroupPolicy API 支持组调度 (gang scheduling),实现跨节点的协调 pod 调度。它还与 KueueCoschedulingVolcanoKAI Scheduler 等 Kubernetes 调度器集成,确保在训练作业开始前分配好所有必需的资源。
  • 加速 Kubernetes 上的 MPI 工作负载: Kubeflow Trainer 支持基于 MPI 的运行时,例如 DeepSpeedMLX。它通过基于 SSH 的优化处理 MPI 工作负载的所有必要编排,从而提升 MPI 性能。
  • 改进的韧性和容错性: 通过利用 JobsJobSets 等原生 Kubernetes API,Kubeflow Trainer 提高了 AI 工作负载的可靠性和效率。通过支持 PodFailurePolicy API,用户可以通过避免不必要的重启来降低成本。此外,SuccessPolicy API 允许训练作业在达到目标指标后提前结束。

背景与演进

该项目最初是作为 TensorFlow 的分布式训练运算符(例如 TFJob)启动的,后来我们合并了其他 Kubeflow 训练运算符(例如 PyTorchJob、MPIJob)的工作,旨在为用户和开发人员提供统一且简化的体验。我们非常感谢所有提交问题、帮助解决问题、提出并回答问题以及参与启发性讨论的人员。我们还要感谢所有为原始运算符做出贡献和维护的人员

通过加入 PyTorch 生态系统,我们致力于应用在 Kubernetes 上部署分布式 PyTorch 应用的最佳实践,并为 Kubeflow Trainer 带来一流的 PyTorch 支持。

与 PyTorch 生态系统的集成

Kubeflow Trainer 与 PyTorch 生态系统深度集成,支持广泛的工具和库,包括 torch、DeepSpeed、HuggingFace、Horovod 等。

它使 PyTorch 用户能够实现先进的分布式训练策略,例如分布式数据并行 (DDP)、完全分片数据并行 (FSDP & FSDP2) 和张量并行,从而在 Kubernetes 上实现高效的大规模模型训练。

此外,Kubeflow Trainer 支持使用 PyTorch IterableDatasets 进行数据并行,直接从分布式内存数据缓存节点流式传输数据。即使数据集超出了本地内存容量,这也允许进行可扩展的训练。

快速入门

请按照以下步骤快速部署 Kubeflow Trainer 并运行您的第一个训练作业。

先决条件

安装 Kubeflow Trainer

在您的本地 kind 集群上部署 Kubeflow Trainer 控制平面:

$ kind create cluster

$ kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/manager?ref=v2.0.0"


# Ensure that JobSet and Trainer controller manager are running.
$ kubectl get pods -n kubeflow-system

NAME                                                  READY   STATUS    RESTARTS   AGE
jobset-controller-manager-54968bd57b-88dk4            2/2     Running   0          65s
kubeflow-trainer-controller-manager-cc6468559-dblnw   1/1     Running   0          65s


# Deploy the Kubeflow Trainer runtimes.
$ kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/runtimes?ref=v2.0.0"

# Install Kubeflow SDK
$ pip install git+https://github.com/kubeflow/sdk.git@64d74db2b6c9a0854e39450d8d1c0201e1e9b3f7#subdirectory=python

定义 PyTorch 训练函数

安装 Kubeflow Trainer 后,定义包含端到端训练脚本的 PyTorch 训练函数。

def train_pytorch():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader, DistributedSampler
    from torchvision import datasets, transforms, models

    # [1] Configure CPU/GPU device and distributed backend.
    device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
    dist.init_process_group(backend=backend)
    local_rank = int(os.getenv("LOCAL_RANK", 0))
    device = torch.device(f"{device}:{local_rank}")
    
    # [2] Get the pre-defined model.
    model = models.shufflenet_v2_x0_5(num_classes=10)
    model.conv1 = torch.nn.Conv2d(1, 24, kernel_size=3, stride=2, padding=1, bias=False)
    model = torch.nn.parallel.DistributedDataParallel(model.to(device))
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
   
    # [3] Get the FashionMNIST dataset and distribute it across all available devices.
    if local_rank == 0: # Download dataset only on local_rank=0 process.
        dataset = datasets.FashionMNIST("./data", train=True, download=True, transform=transforms.Compose([transforms.ToTensor()]))
    dist.barrier()
    dataset = datasets.FashionMNIST("./data", train=True, download=False, transform=transforms.Compose([transforms.ToTensor()]))
    train_loader = DataLoader(dataset, batch_size=100, sampler=DistributedSampler(dataset))

    # [4] Define the PyTorch training loop.
    for epoch in range(3):
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            # Forward and Backward pass
            outputs = model(inputs)
            loss = torch.nn.functional.cross_entropy(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0 and dist.get_rank() == 0:
                print(f"Epoch {epoch} [{batch_idx * len(inputs)}/{len(train_loader.dataset)}] "
                    f"Loss: {loss.item():.4f}"
                )

使用 TrainJob 在 Kubernetes 上运行 PyTorch

定义好训练函数后,使用 Kubeflow SDK 创建 TrainJob。

from kubeflow.trainer import TrainerClient, CustomTrainer

job_id = TrainerClient().train(
    trainer=CustomTrainer(
        func=train_pytorch,
        num_nodes=2,
        resources_per_node={
            "cpu": 3,
            "memory": "3Gi",
            # "gpu": 2, # Uncomment this line if you have GPUs.
        },
    ),
    runtime=TrainerClient().get_runtime("torch-distributed"),
)

获取 TrainJob 结果

创建 TrainJob 后,您应该能够列出它:

for job in TrainerClient().list_jobs():
    print(f"TrainJob: {job.name}, Status: {job.status}")

TrainJob: q33a18f65635, Status: Created 

首次拉取 PyTorch 镜像时,TrainJob 可能需要几分钟时间。镜像拉取完成后,TrainJob 的步骤应变为 Running 状态。每个步骤代表一个训练节点,每个步骤的设备数量对应于该节点上的设备数量。

for s in TrainerClient().get_job(name=job_id).steps:
    print(f"Step: {s.name}, Status: {s.status}, Devices: {s.device} x {s.device_count}")

Step: node-0, Status: Running, Devices: cpu x 3
Step: node-1, Status: Running, Devices: cpu x 3 

步骤运行后,您可以查看 TrainJob 日志。60,000 个样本的数据集已平均分配到 6 个 CPU 上,每个设备处理 10,000 个样本:60,000 / 6 = 10,000

print(TrainerClient().get_job_logs(name=job_id)["node-0"])

...
Epoch 0 [8000/60000] Loss: 0.4476
Epoch 0 [9000/60000] Loss: 0.4784
Epoch 1 [0/60000] Loss: 0.3909
Epoch 1 [1000/60000] Loss: 0.4888
Epoch 1 [2000/60000] Loss: 0.4100
... 

恭喜,您已使用 PyTorch 和 Kubeflow Trainer 创建了第一个分布式训练作业!

下一步计划

Kubeflow Trainer 有一个令人兴奋的路线图,包括以下项目:

行动呼吁

我们很高兴欢迎 Kubeflow Trainer 加入 PyTorch 生态系统!Kubeflow Trainer 实现了 Kubernetes 上 AI 模型训练的普及,并显著改善了 AI 从业者的开发体验。我们邀请您探索以下资源,以了解更多关于该项目的信息:

我们迫不及待地想看到您使用 Kubeflow Trainer 构建的作品!