跳转到主要内容

Kubeflow Trainer Logo

我们很高兴地宣布,Kubeflow Trainer 项目已集成到 PyTorch 生态系统中!此次集成确保了 Kubeflow Trainer 符合 PyTorch 的标准和实践,为开发者提供了在 Kubernetes 上运行 PyTorch 的可靠、可扩展且由社区支持的解决方案。

要查看 PyTorch 生态系统,请参阅 PyTorch Landscape。了解有关项目如何加入 PyTorch 生态系统的更多信息。

关于 Kubeflow Trainer

Kubeflow Trainer 是一个 Kubernetes 原生项目,支持 AI 模型的可扩展分布式训练,专为微调大型语言模型 (LLM) 而设计。它简化了训练工作负载在多个节点上的横向扩展,高效管理大型数据集并确保容错性。

Kubeflow Trainer

其核心功能包括:

  • 简化 Kubernetes 复杂性Kubeflow Trainer API 专为两种主要用户角色设计——AI 从业者(使用 Kubeflow Python SDK 和 TrainJob API 开发 AI 模型的机器学习工程师和数据科学家)和平台管理员(负责管理 Kubernetes 集群和 Kubeflow Trainer 运行时 API 的管理员和 DevOps 工程师)。AI 从业者可以专注于 PyTorch 中的应用程序代码,而无需担心基础设施细节。同时,平台管理员可以灵活调度工作负载资源,以实现最大的集群利用率和成本效益。为了支持这些角色,Kubeflow Trainer 指定了专用的 Kubernetes 自定义资源定义 (CRD),以简化模型训练和基础设施管理。

Kubeflow Trainer user personas

  • Python SDK:一个为 AI 从业者设计的 Pythonic 接口,抽象了直接与 Kubernetes API 交互的细节。它使用户能够专注于开发 PyTorch 模型,而无需担心 Kubernetes YAML 配置。
  • Kubernetes 上 LLM 微调的蓝图:通过内置训练器,Kubeflow Trainer 使 AI 从业者能够使用所需的数据集、LoRA 参数、学习率等配置无缝微调他们最喜欢的 LLM。在第一个版本中,它实现了配方以支持各种微调策略,包括监督微调 (SFT)、知识蒸馏、DPO、PPO、GRPO 和量化感知训练。社区正在努力添加更多由 LLaMA-FactoryUnslothHuggingFace TRL 支持的内置训练器,以实现高效的 LLM 微调。
  • 优化 GPU 利用率:Kubeflow Trainer 通过使用由 Apache Arrow 和 Apache DataFusion 提供支持的内存中分布式数据缓存将大规模数据直接传输到分布式 GPU,从而最大限度地提高 GPU 效率。
  • 高级调度功能:Kubeflow Trainer 通过 PodGroupPolicy API 支持组调度,从而实现 Pod 在节点间的协调调度。它还与 Kubernetes 调度器集成,例如 KueueCoschedulingVolcano KAI Scheduler,以确保在训练作业开始前分配所有必需的资源。
  • 加速 Kubernetes 上的 MPI 工作负载: Kubeflow Trainer 支持基于 MPI 的运行时,例如 DeepSpeedMLX。它处理所有必要的 MPI 工作负载编排,并通过 基于 SSH 的优化来提升 MPI 性能。
  • 提高韧性和容错性: 通过利用 Kubernetes 原生 API,如 JobsJobSets,Kubeflow Trainer 提高了 AI 工作负载的可靠性和效率。通过支持 PodFailurePolicy API,用户可以通过避免不必要的重启来降低成本。此外,SuccessPolicy API 允许训练作业在达到目标后提前完成。

背景与演进

该项目最初是作为 TensorFlow 的分布式训练运算符(例如 TFJob),后来我们合并了来自其他 Kubeflow 训练运算符(例如 PyTorchJob、MPIJob)的工作,为用户和开发者提供统一和简化的体验。我们非常感谢所有提交问题或帮助解决问题、提问和回答问题以及参与启发性讨论的人。我们还要感谢所有为原始运算符做出贡献和维护的人

通过加入 PyTorch 生态系统,我们致力于在 Kubernetes 上部署分布式 PyTorch 应用程序时应用最佳实践,并在 Kubeflow Trainer 中提供一流的 PyTorch 支持。

与 PyTorch 生态系统的集成

Kubeflow Trainer 与 PyTorch 生态系统深度集成,支持广泛的工具和库,包括 torch、DeepSpeed、HuggingFace、Horovod 等。

它使 PyTorch 用户能够实现高级分布式训练策略,例如分布式数据并行 (DDP)、完全分片数据并行 (FSDP & FSDP2) 和张量并行,从而实现在 Kubernetes 上高效的大规模模型训练。

此外,Kubeflow Trainer 支持使用 PyTorch IterableDatasets 进行数据并行,直接从分布式内存数据缓存节点流式传输数据。这使得即使数据集超过本地内存容量,也能实现可扩展的训练。

快速入门

请按照以下步骤快速部署 Kubeflow Trainer 并运行您的第一个训练作业。

先决条件

安装 Kubeflow Trainer

在您的本地 kind 集群上部署 Kubeflow Trainer 控制平面:

$ kind create cluster

$ kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/manager?ref=v2.0.0"


# Ensure that JobSet and Trainer controller manager are running.
$ kubectl get pods -n kubeflow-system

NAME                                                  READY   STATUS    RESTARTS   AGE
jobset-controller-manager-54968bd57b-88dk4            2/2     Running   0          65s
kubeflow-trainer-controller-manager-cc6468559-dblnw   1/1     Running   0          65s


# Deploy the Kubeflow Trainer runtimes.
$ kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/runtimes?ref=v2.0.0"

# Install Kubeflow SDK
$ pip install git+https://github.com/kubeflow/sdk.git@64d74db2b6c9a0854e39450d8d1c0201e1e9b3f7#subdirectory=python

定义 PyTorch 训练函数

安装 Kubeflow Trainer 后,定义您的 PyTorch 训练函数,其中包含端到端训练脚本

def train_pytorch():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader, DistributedSampler
    from torchvision import datasets, transforms, models

    # [1] Configure CPU/GPU device and distributed backend.
    device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
    dist.init_process_group(backend=backend)
    local_rank = int(os.getenv("LOCAL_RANK", 0))
    device = torch.device(f"{device}:{local_rank}")
    
    # [2] Get the pre-defined model.
    model = models.shufflenet_v2_x0_5(num_classes=10)
    model.conv1 = torch.nn.Conv2d(1, 24, kernel_size=3, stride=2, padding=1, bias=False)
    model = torch.nn.parallel.DistributedDataParallel(model.to(device))
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
   
    # [3] Get the FashionMNIST dataset and distribute it across all available devices.
    if local_rank == 0: # Download dataset only on local_rank=0 process.
        dataset = datasets.FashionMNIST("./data", train=True, download=True, transform=transforms.Compose([transforms.ToTensor()]))
    dist.barrier()
    dataset = datasets.FashionMNIST("./data", train=True, download=False, transform=transforms.Compose([transforms.ToTensor()]))
    train_loader = DataLoader(dataset, batch_size=100, sampler=DistributedSampler(dataset))

    # [4] Define the PyTorch training loop.
    for epoch in range(3):
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            # Forward and Backward pass
            outputs = model(inputs)
            loss = torch.nn.functional.cross_entropy(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0 and dist.get_rank() == 0:
                print(f"Epoch {epoch} [{batch_idx * len(inputs)}/{len(train_loader.dataset)}] "
                    f"Loss: {loss.item():.4f}"
                )

使用 TrainJob 在 Kubernetes 上运行 PyTorch

定义训练函数后,使用 Kubeflow SDK 创建 TrainJob

from kubeflow.trainer import TrainerClient, CustomTrainer

job_id = TrainerClient().train(
    trainer=CustomTrainer(
        func=train_pytorch,
        num_nodes=2,
        resources_per_node={
            "cpu": 3,
            "memory": "3Gi",
            # "gpu": 2, # Uncomment this line if you have GPUs.
        },
    ),
    runtime=TrainerClient().get_runtime("torch-distributed"),
)

获取 TrainJob 结果

创建 TrainJob 后,您应该能够列出它:

for job in TrainerClient().list_jobs():
    print(f"TrainJob: {job.name}, Status: {job.status}")

TrainJob: q33a18f65635, Status: Created 

TrainJob 首次拉取 PyTorch 镜像可能需要几分钟。一旦镜像拉取完成,TrainJob 的步骤应转换为运行中状态。每个步骤代表一个训练节点,每个步骤的设备数量对应于该节点上的设备数量。

for s in TrainerClient().get_job(name=job_id).steps:
    print(f"Step: {s.name}, Status: {s.status}, Devices: {s.device} x {s.device_count}")

Step: node-0, Status: Running, Devices: cpu x 3
Step: node-1, Status: Running, Devices: cpu x 3 

步骤运行后,您可以查看 TrainJob 日志。60,000 个样本的数据集已均匀分布到 6 个 CPU 上,每个设备处理 10,000 个样本:60,000 / 6 = 10,000

print(TrainerClient().get_job_logs(name=job_id)["node-0"])

...
Epoch 0 [8000/60000] Loss: 0.4476
Epoch 0 [9000/60000] Loss: 0.4784
Epoch 1 [0/60000] Loss: 0.3909
Epoch 1 [1000/60000] Loss: 0.4888
Epoch 1 [2000/60000] Loss: 0.4100
... 

恭喜,您使用 PyTorch 和 Kubeflow Trainer 创建了您的第一个分布式训练作业!

下一步是什么

Kubeflow Trainer 具有令人兴奋的路线图,包括以下项目

行动号召

我们很高兴地欢迎 Kubeflow Trainer 加入 PyTorch 生态系统!Kubeflow Trainer 使得 AI 模型训练在 Kubernetes 上普及,并显著改善了 AI 从业者的开发体验。我们邀请您探索以下资源,以了解更多关于该项目的信息

我们迫不及待地想看看您将使用 Kubeflow Trainer 构建什么!