
我们非常激动地宣布,Kubeflow Trainer 项目已正式集成到 PyTorch 生态系统中!此次集成确保了 Kubeflow Trainer 与 PyTorch 的标准和实践保持一致,为开发人员提供了一种可靠、可扩展且由社区支持的解决方案,以便在 Kubernetes 上运行 PyTorch。
要查看 PyTorch 生态系统,请访问 PyTorch Landscape。了解更多关于项目如何加入 PyTorch 生态系统的信息。
关于 Kubeflow Trainer
Kubeflow Trainer 是一个原生的 Kubernetes 项目,能够实现 AI 模型的可扩展分布式训练,并专为大型语言模型 (LLM) 的微调而构建。它简化了跨多节点的训练负载扩展,能够高效管理大规模数据集并确保容错能力。

其核心功能包括:
- 简化 Kubernetes 的复杂性:Kubeflow Trainer API 专为两类主要用户设计——AI 从业者(使用 Kubeflow Python SDK 和 TrainJob API 开发 AI 模型的机器学习工程师和数据科学家)以及平台管理员(负责管理 Kubernetes 集群和 Kubeflow Trainer 运行时 API 的管理员和 DevOps 工程师)。AI 从业者可以专注于 PyTorch 的应用代码,无需担心基础设施细节。同时,平台管理员可以灵活地调度工作负载资源,以实现最大化的集群利用率和成本效率。为支持这些角色,Kubeflow Trainer 指定了专用的 Kubernetes 自定义资源定义 (CRD),从而简化了模型训练和基础设施管理。

- Python SDK:专为 AI 从业者设计的 Python 接口,抽象了直接与 Kubernetes API 交互的复杂细节。它使用户能够专注于开发 PyTorch 模型,而无需担心 Kubernetes YAML 配置。
- LLM 在 Kubernetes 上进行微调的蓝图:凭借内置的训练器,Kubeflow Trainer 使 AI 从业者能够通过所需的配置(如数据集、LoRA 参数、学习率等)无缝微调他们喜爱的 LLM。在首个版本中,它实现了支持多种微调策略的方案,包括监督微调 (SFT)、知识蒸馏、DPO、PPO、GRPO 和量化感知训练。社区正致力于利用 LLaMA-Factory、Unsloth 和 HuggingFace TRL 添加更多内置训练器,以实现高效的 LLM 微调。
- 优化的 GPU 利用率:Kubeflow Trainer 通过利用由 Apache Arrow 和 Apache DataFusion 提供支持的内存分布式数据缓存,将大规模数据直接流式传输到分布式 GPU,从而最大限度地提高 GPU 效率。
- 高级调度能力:Kubeflow Trainer 通过 PodGroupPolicy API 支持组调度 (gang scheduling),实现跨节点的协调 pod 调度。它还与 Kueue、Coscheduling、Volcano 和 KAI Scheduler 等 Kubernetes 调度器集成,确保在训练作业开始前分配好所有必需的资源。
- 加速 Kubernetes 上的 MPI 工作负载: Kubeflow Trainer 支持基于 MPI 的运行时,例如 DeepSpeed 和 MLX。它通过基于 SSH 的优化处理 MPI 工作负载的所有必要编排,从而提升 MPI 性能。
- 改进的韧性和容错性: 通过利用 Jobs 和 JobSets 等原生 Kubernetes API,Kubeflow Trainer 提高了 AI 工作负载的可靠性和效率。通过支持 PodFailurePolicy API,用户可以通过避免不必要的重启来降低成本。此外,SuccessPolicy API 允许训练作业在达到目标指标后提前结束。
背景与演进
该项目最初是作为 TensorFlow 的分布式训练运算符(例如 TFJob)启动的,后来我们合并了其他 Kubeflow 训练运算符(例如 PyTorchJob、MPIJob)的工作,旨在为用户和开发人员提供统一且简化的体验。我们非常感谢所有提交问题、帮助解决问题、提出并回答问题以及参与启发性讨论的人员。我们还要感谢所有为原始运算符做出贡献和维护的人员。
通过加入 PyTorch 生态系统,我们致力于应用在 Kubernetes 上部署分布式 PyTorch 应用的最佳实践,并为 Kubeflow Trainer 带来一流的 PyTorch 支持。
与 PyTorch 生态系统的集成
Kubeflow Trainer 与 PyTorch 生态系统深度集成,支持广泛的工具和库,包括 torch、DeepSpeed、HuggingFace、Horovod 等。
它使 PyTorch 用户能够实现先进的分布式训练策略,例如分布式数据并行 (DDP)、完全分片数据并行 (FSDP & FSDP2) 和张量并行,从而在 Kubernetes 上实现高效的大规模模型训练。
此外,Kubeflow Trainer 支持使用 PyTorch IterableDatasets 进行数据并行,直接从分布式内存数据缓存节点流式传输数据。即使数据集超出了本地内存容量,这也允许进行可扩展的训练。
快速入门
请按照以下步骤快速部署 Kubeflow Trainer 并运行您的第一个训练作业。
先决条件
安装 Kubeflow Trainer
在您的本地 kind 集群上部署 Kubeflow Trainer 控制平面:
$ kind create cluster
$ kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/manager?ref=v2.0.0"
# Ensure that JobSet and Trainer controller manager are running.
$ kubectl get pods -n kubeflow-system
NAME READY STATUS RESTARTS AGE
jobset-controller-manager-54968bd57b-88dk4 2/2 Running 0 65s
kubeflow-trainer-controller-manager-cc6468559-dblnw 1/1 Running 0 65s
# Deploy the Kubeflow Trainer runtimes.
$ kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/runtimes?ref=v2.0.0"
# Install Kubeflow SDK
$ pip install git+https://github.com/kubeflow/sdk.git@64d74db2b6c9a0854e39450d8d1c0201e1e9b3f7#subdirectory=python
定义 PyTorch 训练函数
安装 Kubeflow Trainer 后,定义包含端到端训练脚本的 PyTorch 训练函数。
def train_pytorch():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader, DistributedSampler
from torchvision import datasets, transforms, models
# [1] Configure CPU/GPU device and distributed backend.
device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
dist.init_process_group(backend=backend)
local_rank = int(os.getenv("LOCAL_RANK", 0))
device = torch.device(f"{device}:{local_rank}")
# [2] Get the pre-defined model.
model = models.shufflenet_v2_x0_5(num_classes=10)
model.conv1 = torch.nn.Conv2d(1, 24, kernel_size=3, stride=2, padding=1, bias=False)
model = torch.nn.parallel.DistributedDataParallel(model.to(device))
optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
# [3] Get the FashionMNIST dataset and distribute it across all available devices.
if local_rank == 0: # Download dataset only on local_rank=0 process.
dataset = datasets.FashionMNIST("./data", train=True, download=True, transform=transforms.Compose([transforms.ToTensor()]))
dist.barrier()
dataset = datasets.FashionMNIST("./data", train=True, download=False, transform=transforms.Compose([transforms.ToTensor()]))
train_loader = DataLoader(dataset, batch_size=100, sampler=DistributedSampler(dataset))
# [4] Define the PyTorch training loop.
for epoch in range(3):
for batch_idx, (inputs, labels) in enumerate(train_loader):
inputs, labels = inputs.to(device), labels.to(device)
# Forward and Backward pass
outputs = model(inputs)
loss = torch.nn.functional.cross_entropy(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch_idx % 10 == 0 and dist.get_rank() == 0:
print(f"Epoch {epoch} [{batch_idx * len(inputs)}/{len(train_loader.dataset)}] "
f"Loss: {loss.item():.4f}"
)
使用 TrainJob 在 Kubernetes 上运行 PyTorch
定义好训练函数后,使用 Kubeflow SDK 创建 TrainJob。
from kubeflow.trainer import TrainerClient, CustomTrainer
job_id = TrainerClient().train(
trainer=CustomTrainer(
func=train_pytorch,
num_nodes=2,
resources_per_node={
"cpu": 3,
"memory": "3Gi",
# "gpu": 2, # Uncomment this line if you have GPUs.
},
),
runtime=TrainerClient().get_runtime("torch-distributed"),
)
获取 TrainJob 结果
创建 TrainJob 后,您应该能够列出它:
for job in TrainerClient().list_jobs():
print(f"TrainJob: {job.name}, Status: {job.status}")
TrainJob: q33a18f65635, Status: Created
首次拉取 PyTorch 镜像时,TrainJob 可能需要几分钟时间。镜像拉取完成后,TrainJob 的步骤应变为 Running 状态。每个步骤代表一个训练节点,每个步骤的设备数量对应于该节点上的设备数量。
for s in TrainerClient().get_job(name=job_id).steps:
print(f"Step: {s.name}, Status: {s.status}, Devices: {s.device} x {s.device_count}")
Step: node-0, Status: Running, Devices: cpu x 3
Step: node-1, Status: Running, Devices: cpu x 3
步骤运行后,您可以查看 TrainJob 日志。60,000 个样本的数据集已平均分配到 6 个 CPU 上,每个设备处理 10,000 个样本:60,000 / 6 = 10,000
print(TrainerClient().get_job_logs(name=job_id)["node-0"])
...
Epoch 0 [8000/60000] Loss: 0.4476
Epoch 0 [9000/60000] Loss: 0.4784
Epoch 1 [0/60000] Loss: 0.3909
Epoch 1 [1000/60000] Loss: 0.4888
Epoch 1 [2000/60000] Loss: 0.4100
...
恭喜,您已使用 PyTorch 和 Kubeflow Trainer 创建了第一个分布式训练作业!
下一步计划
Kubeflow Trainer 有一个令人兴奋的路线图,包括以下项目:
- 本地 TrainJob 执行 – 在没有 Kubernetes 的情况下在本地运行 Kubeflow Trainer 作业。
- 分布式数据缓存 – 流式传输由 Apache Arrow 和 Apache DataFusion 提供支持的内存分布式数据。
- 高级调度能力 – 通过集成 Kueue、KAI Scheduler、Volcano 来改进资源管理和组调度能力。
- 支持 JAX 运行时。
- 为 GPU 加速工作负载自动执行检查点 (Checkpointing)。
行动呼吁
我们很高兴欢迎 Kubeflow Trainer 加入 PyTorch 生态系统!Kubeflow Trainer 实现了 Kubernetes 上 AI 模型训练的普及,并显著改善了 AI 从业者的开发体验。我们邀请您探索以下资源,以了解更多关于该项目的信息:
- 阅读 Kubeflow Trainer v2 发布博客文章 和 发行说明。
- 探索 官方 Kubeflow Trainer 文档。
- 加入 #kubeflow-trainer Slack 频道 的讨论。
- 参加我们每两周一次的 Kubeflow Trainer 社区会议(每周三)。
- 通过在 GitHub 存储库 上创建 issue 来分享您的用例或功能建议。
- 通过撰写 Kubeflow 博客文章 或在即将举行的 Kubeflow 活动 上演讲来分享您的故事。
- 探索面向 AI 从业者的 Kubeflow Python SDK。
我们迫不及待地想看到您使用 Kubeflow Trainer 构建的作品!