简介:赋能新一代人工智能构建者

我们非常激动地宣布,Meta 的 PyTorch 团队(Monarch 项目的主导方)与 Lightning AI 达成合作。此次合作将大规模训练的强大性能与本地开发的熟悉感和便捷性完美结合,并直接集成在 Lightning Studio 笔记本中。这一合作为集群级开发树立了新标杆,赋能新一代 AI 构建者能够在一个熟悉的工具中,实现快速、大规模的迭代。

机遇:重新构想无缝分布式训练

想象一下,在舒适熟悉的交互式笔记本中,就能发挥庞大 GPU 集群的强大威力,同时又无需在快速迭代周期上做出妥协。Monarch 让这一愿景成为现实,开启了全新的工作流和创意空间。

从历史上看,在大规模集群上的开发涉及漫长的迭代过程。每一行代码的变更都需要用户重新分配、重新构建并从头部署工作流。Monarch 通过一种通用的分布式计算语言,直接解决了这些迭代瓶颈。这一新框架不仅使用户能够保持连接并直接在集群上进行开发,还能让他们从单个本地训练脚本的角度来思考其工作流。

在与 Lightning AI 的这次合作中,我们通过与 Lightning AI 平台及其多机训练 (MMT) 应用的直接集成,展示了此类脚本的强大威力。Lightning MMT 允许用户异步调度大规模作业,并负责从 AWS、Google Cloud Platform、Voltage Park、Nebius 和 Lambda 等多家云服务商处预置和配置经过训练优化的计算环境。利用 Monarch 的 API,我们将 Lightning Studios 的交互式开发体验引入到了 Lightning MMT 的大规模计算能力中。

在这种新工作流中,资源通过 Lightning MMT 预置一次。此后,用户利用 Monarch API 直接控制作业的生命周期。由于用户现在可以直接控制集群的执行,代码变更和未预见的异常不再需要用户重新分配新集群。用户可以添加遗漏的 print 语句、更改模型配置,甚至修改执行流程,所有这些都在交互式笔记本的熟悉环境中,并且是在单次分配的上下文中完成。Monarch 的 API 提供了一个围绕具有可扩展消息传递功能的远程 Actor 构建的接口。Actor 被分组为称为“网格”(meshes)的集合,消息可以广播给所有成员。

这种简单而强大的 API 允许用户以指令式方式描述如何创建进程和 Actor,从而使分布式编程变得直观且易于上手。

from monarch.actor import Actor, endpoint, this_host

# spawn 8 trainer processes one for each gpu
training_procs = this_host().spawn_procs({"gpus": 8})

# define the actor to run on each process
class Trainer(Actor):
    @endpoint
    def train(self, step: int): ...

# create the trainers
trainers = training_procs.spawn("trainers", Trainer)

# tell all the trainers to to take a step
fut = trainers.train.call(step=0)

# wait for all trainers to complete
fut.get()

Monarch 与 Lightning:三大改变游戏规则的功能

1. 持久化计算与轻松迭代
有了 Monarch,您的计算资源将保持持久化——即使在您迭代、实验或离开时也是如此。您可以随时从中断的地方继续工作。

当使用 Monarch 部署时,计算节点通过 Monarch 的进程分配器运行。该进程分配器有两个简单的任务。首先,路由在 Monarch 生态系统中传递的任何消息;其次,生成任意形状的进程网格。这种集群职责的简化使我们能够封装节点的职责,从而在运行计算的硬件和用户的运行时环境之间建立隔离。与传统的启动集群规模工作流的方法不同,进程分配器(以及您的分配)可以避开用户代码中的任何异常或断开连接,这意味着它可以在多次迭代中重复使用。即使客户端程序结束或崩溃,计算环境仍可保持活动状态,从而减少了人工干预,并提高了交互式和长运行工作负载的可靠性。

Monarch 的核心抽象始于分配进程网格的能力。一旦分配了进程,用户就可以利用 Monarch 的 Actor 模型来部署封装好的 Python 代码单元。这些 Actor 本身就像服务器一样运行,公开端点作为通信原语。这些端点可以从其他 Actor 或任意 Python 进程异步调用。此接口可以轻松编排以创建多机训练程序。

import asyncio
from monarch.actor import Actor, current_rank, endpoint, proc_mesh

NUM_ACTORS = 4

class ToyActor(Actor):
    def __init__(self):
        self.rank = current_rank().rank

    @endpoint
    async def hello_world(self, msg):
        print(f"Identity: {self.rank}, {msg=}")

# Note: Meshes can be also be created on different nodes, but we're ignoring that in this example
async def create_toy_actors():
    local_proc_mesh = proc_mesh(gpus=NUM_ACTORS)
    # This spawns 4 instances of 'ToyActor'
    toy_actor = await local_proc_mesh.spawn("toy_actor", ToyActor)
    return toy_actor, local_proc_mesh

# Once actors are spawned, we can call all of them simultaneously with 
Actor.endpoint.call
async def call_all_actors(toy_actor):
    await toy_actor.hello_world.call("hey there, from script!!")

2. 笔记本原生集群管理
扩展至数百个 GPU 并编排复杂的分布式作业——所有操作都在您的 Studio 笔记本中完成。Monarch 以您在 Lightning Studios 中所期望的简单性,将集群的威力带到您的指尖。

只需定义所需的 GPU、节点数量以及每个节点的 GPU 数量,即可通过 Lightning MMT 预留集群。之后,您可以通过定义的进程网格调用训练 Actor。

from lightning_sdk import Machine, MMT, Studio
studio = Studio()

NUM_NODES = 16
NUM_GPUS = 8

# Install the MMT plugin befor running the actual job
studio.install_plugin("multi-machine-training")

# Machine with T4 GPUs
# machine_type = getattr(Machine, f"T4_X_{NUM_GPUS}")

# Machine with L40S GPUs
# machine_type = getattr(Machine, f"L40S_X_{NUM_GPUS}")

# Machine with H100 GPUs
machine_type = getattr(Machine, f"H100_X_{NUM_GPUS}")

job = MMT.run(
command=process_allocator,
       name="Multi-Nodes-Monarch-Titan",
       machine=machine_type,
       studio=studio,
       num_machines=NUM_NODES,
       env={
          "CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7",
        },
    )

当您的训练任务完成后,无需担心资源丢失。Monarch 实现了笔记本原生的集群管理体验。您可以根据需要更改配置和文件;Monarch 处理笔记本和工作节点之间的代码和文件共享。无需重新启动 MMT 作业——只需使用新配置调用您的 Actor 即可。Monarch 将多机训练体验从每次迭代耗时数分钟缩短至连续启动几乎无需等待。

3. 实时交互式调试
实时交互式调试分布式作业,无需中断工作流。Monarch 开启了更深层次的洞察力和控制力,加速发现与创新。

Monarch 支持用于 Python Actor 网格的 pdb 调试。使用 Python 内置断点 (breakpoint()) 设置您的 Actor 进行调试。当您运行 Monarch 程序时,您将看到一个表格,列出当前停在断点处的所有 Actor,以及 Actor 名称、Rank、坐标、主机名、函数和行号等详细信息。在 monarch_dbg> 提示符下,您可以使用 attach 命令并指定 Actor 的名称和 Rank,深入查看特定的 Actor/断点。

from monarch.actor import Actor, current_rank, endpoint, this_host

def _bad_rank():
    raise ValueError("bad rank")

def _debugee_actor_internal(rank):
    if rank % 4 == 0:
        breakpoint()  # noqa
        rank += 1
        return rank
    elif rank % 4 == 1:
        breakpoint()  # noqa
        rank += 2
        return rank
    elif rank % 4 == 2:
        breakpoint()  # noqa
        rank += 3
        _bad_rank()
    elif rank % 4 == 3:
        breakpoint()  # noqa
        rank += 4
        return rank

class DebugeeActor(Actor):
    @endpoint
    async def to_debug(self):
       rank = current_rank().rank
        return _debugee_actor_internal(rank)

if __name__ == "__main__":
    # Create a mesh with 4 "hosts" and 4 gpus per "host"
    process_mesh = this_host().spawn_procs(per_host={"host": 4, "gpu": 4})

    # Spawn the actor you want to debug on the mesh
    debugee_mesh = process_mesh.spawn("debugee", DebugeeActor)

    # Call the endpoint you want to debug
    print(debugee_mesh.to_debug.call().get())

~ $ monarch debug

************************ MONARCH DEBUGGER ************************

Enter 'help' for a list of commands.
Enter 'list' to show all active breakpoints.

monarch_dbg> list

monarch_dbg> attach debugee 13
Attached to debug session for rank 13 (your.host.com)
> /path/to/debugging.py(16)to_debug()
-> rank = _debugee_actor_internal(rank)
(Pdb)

开启全新的工作流与协作

Monarch 与 Lightning AI 携手开启新机遇

  • 自信地运行长期的探索性实验。
  • 实现从原型设计到大规模训练的无缝过渡。
  • 在笔记本环境中直接进行实时协作与调试。

演示示例:128 个 GPU,一个笔记本

观看 Monarch 的实际操作,我们在单个 Studio 笔记本中启动了由 Torchtitan 驱动的 128 GPU 训练作业。体验无缝扩展、持久化资源和交互式调试——一切尽在一处。

用户可以轻松请求集群的 GPU 要求,并根据需要调整节点数量和每个节点的 GPU 数量。Monarch 通过 Lightning MMT 处理剩余的工作。

# Configuration for 128 GPUs
NUM_NODES = 16
NUM_GPUS = 8

在此示例中,我们将经典的 SPMD(单程序多数据)TorchTitan 工作负载包装为 Monarch 中的一个 Actor,从而实现在交互式笔记本内以 128 个 GPU 的规模预训练大语言模型(Llama3、Llama4 等)。资源预留后,Titan Trainer 被定义为继承自 Actor 的类。 

from monarch.actor import ProcMesh, Actor, endpoint, current_rank
from torchtitan.tools.logging import init_logger, logger
from torchtitan.train import Trainer

class TitanTrainerWrapper(Actor):
    def __init__(self, job_config: JobConfig):
        self.rank = current_rank().rank
        self.job_config = job_config

    @endpoint
    def init(self):
        logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
        print(f"Initializing actor: {self.rank} {current_rank()=} 
{socket.gethostname()=}")

    @endpoint
    def train(self):
        logger.info("Starting training")
        config = self.job_config
        trainer: Optional[Trainer] = None

        try:
            trainer = Trainer(config)
            trainer.train()

            if config.checkpoint.create_seed_checkpoint:
                trainer.checkpointer.save(curr_step=0, )
                logger.info("Created seed checkpoint")
            else:
                trainer.train()
        finally:
            if trainer:
                trainer.close()

            if torch.distributed.is_initialized():
                torch.distributed.destroy_process_group()
                logger.info("Process group destroyed.")
        print("Done training")

创建的 Actor 在进程网格上生成——这是一组可以访问预留资源的 Actor。进程网格使用 Monarch 的进程分配器与集群中的所有节点通信。

from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints, AllocSpec
from monarch.actor import ProcMesh
alloc = allocator.allocate(
        AllocSpec(AllocConstraints(), hosts=NUM_NODES, gpus=NUM_GPUS)
    )
proc_mesh = await ProcMesh.from_alloc(alloc)
async def async_main(job_config: JobConfig):
    torch.use_deterministic_algorithms(True)
    await setup_env_for_distributed(proc_mesh)
    await proc_mesh.logging_option(stream_to_client=True, aggregate_window_sec=3)
    trainer_actor = await proc_mesh.spawn("trainer_actor", TitanTrainerWrapper, job_config)
    await trainer_actor.init.call()
    await trainer_actor.train.call()

用户只需定义训练配置并启动训练器,即可触发进程网格上的 Trainer Actor。日志会在笔记本单元格的 Rank_0 上报告,也可以通过 MMT 日志、Lightning 的原生日志记录器 LitLogger 或 WandB(Titan 将日志推送到 WandB)进行访问。

from torchtitan.config import ConfigManager, JobConfig

config_manager = ConfigManager()
job_name = get_job_name(NUM_NODES, NUM_GPUS)
manual_args = [
        "--job.config_file",

os.path.expanduser("/torchtitan/torchtitan/models/llama3/train_configs/llama3_8b.toml"),
        "--model.tokenizer-path",
        "/teamspace/studios/this_studio/torchtitan/assets/hf/Llama-3.1-8B",
        "--training.steps",
        "1000000",
        "--training.dataset_path",
        "/teamspace/studios/this_studio/torchtitan/tests/assets/c4",
        "--job.dump_folder",
        "/teamspace/studios/this_studio/torchtitan/outputs/" + job_name
    ]
config = config_manager.parse_args(manual_args)
await async_main(config)


训练后,用户可以更改 Titan Trainer 配置或定义新的 Actor,并在相同资源上重新启动新进程,无需等待。此外,用户可以在由 @endpoint 装饰器定义的方法内放置 Python 断点 (breakpoint()),以便在开发过程中获得更好的交互性。例如,您可能希望在实际运行前检查 Titan Trainer 的配置参数,或在训练器内部放置断点。

class TitanTrainerWrapper(Actor):
    def __init__(self, job_config: JobConfig):
        self.rank = current_rank().rank
        self.job_config = job_config

    @endpoint
    def init(self):
        
logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
        breakpoint()  # noqa
        print(f"Initializing actor: {self.rank} 
{current_rank()=} {socket.gethostname()=}")

    @endpoint
    def train(self):
        logger.info("Starting training")
        config = self.job_config
        trainer: Optional[Trainer] = None

        try:
            trainer = Trainer(config)
     breakpoint()  # noqa
            trainer.train()

入门:在 Lightning Studio 中体验 Monarch