注意
单击 此处 下载完整的示例代码
高级 KubeFlow Pipelines 示例¶
这是一个使用仅用 TorchX 组件构建的 KubeFlow Pipelines 的示例管道。
KFP 适配器可用于将 TorchX 组件直接转换为可在 KFP 中使用的组件。
输入参数¶
首先让我们为管道定义一些参数。
import argparse
import os.path
import sys
from typing import Dict
import kfp
import torchx
from torchx import specs
from torchx.components.dist import ddp as dist_ddp
from torchx.components.serve import torchserve
from torchx.components.utils import copy as utils_copy, python as utils_python
from torchx.pipelines.kfp.adapter import container_from_app
parser = argparse.ArgumentParser(description="example kfp pipeline")
TorchX 组件围绕镜像构建。根据您使用的调度程序,这可能会有所不同,但对于 KFP,这些镜像被指定为 Docker 容器。我们为示例应用程序提供一个容器,为标准内置应用程序提供一个容器。如果您修改了 torchx 示例代码,则需要在 KFP 上启动它之前重新构建容器。
parser.add_argument(
"--image",
type=str,
help="docker image to use for the examples apps",
default=torchx.IMAGE,
)
大多数 TorchX 组件使用 fsspec 来抽象处理远程文件系统。这使组件能够使用诸如 s3://
之类的路径,从而简化使用云存储提供商的操作。
parser.add_argument(
"--output_path",
type=str,
help="path to place the data",
required=True,
)
parser.add_argument("--load_path", type=str, help="checkpoint path to load from")
此示例使用 torchserve 进行推理,因此我们需要指定一些选项。假设您在与服务名称 torchserve
(位于默认命名空间中)相同的 Kubernetes 集群中运行 TorchServe 实例。
有关如何设置 TorchServe 的信息,请参阅 https://github.com/pytorch/serve/blob/master/kubernetes/README.md。
parser.add_argument(
"--management_api",
type=str,
help="path to the torchserve management API",
default="http://torchserve.default.svc.cluster.local:8081",
)
parser.add_argument(
"--model_name",
type=str,
help="the name of the inference model",
default="tiny_image_net",
)
笔记本。
if "NOTEBOOK" in globals():
argv = [
"--output_path",
"/tmp/output",
]
else:
argv = sys.argv[1:]
args: argparse.Namespace = parser.parse_args(argv)
创建组件¶
第一步是将数据下载到我们可以处理的位置。为此,我们可以使用内置的 copy 组件。此组件接受两个有效的 fsspec 路径,并将它们从一个路径复制到另一个路径。在本例中,我们使用 http 作为源,并使用输出路径下的文件作为输出。
data_path: str = os.path.join(args.output_path, "tiny-imagenet-200.zip")
copy_app: specs.AppDef = utils_copy(
"http://cs231n.stanford.edu/tiny-imagenet-200.zip",
data_path,
image=args.image,
)
下一个组件用于数据预处理。它接收来自先前操作符的原始数据,并对其进行一些转换,以便与训练器一起使用。
datapreproc 将数据输出到指定的 fsspec 路径。这些路径都是提前指定的,因此我们有一个完全静态的管道。
processed_data_path: str = os.path.join(args.output_path, "processed")
datapreproc_app: specs.AppDef = utils_python(
"--output_path",
processed_data_path,
"--input_path",
data_path,
"--limit",
"100",
image=args.image,
m="torchx.examples.apps.datapreproc.datapreproc",
cpu=1,
memMB=1024,
)
接下来,我们将创建训练器组件,它接收来自先前 datapreproc 组件的训练数据。我们将其定义在单独的组件文件中,就像您通常会做的那样。
拥有一个单独的组件文件使您能够通过 torchx run
从 TorchX CLI 启动您的训练器,以进行快速迭代,以及以自动方式从管道中运行它。
# make sure examples is on the path
if "__file__" in globals():
sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
logs_path: str = os.path.join(args.output_path, "logs")
models_path: str = os.path.join(args.output_path, "models")
trainer_app: specs.AppDef = dist_ddp(
*(
"--output_path",
models_path,
"--load_path",
args.load_path or "",
"--log_path",
logs_path,
"--data_path",
processed_data_path,
"--epochs",
str(1),
),
image=args.image,
m="torchx.examples.apps.lightning.train",
j="1x1",
# per node resource settings
cpu=1,
memMB=3000,
)
为了使 TensorBoard 路径显示在 KFP 的 UI 中,我们需要添加一些元数据,以便 KFP 知道从哪里获取指标。
这将在我们创建 KFP 容器时使用。
ui_metadata: Dict[str, object] = {
"outputs": [
{
"type": "tensorboard",
"source": os.path.join(logs_path, "lightning_logs"),
}
]
}
对于推理,我们利用了内置的 TorchX 组件之一。此组件接收模型并将模型上传到 TorchServe 管理 API 端点。
serve_app: specs.AppDef = torchserve(
model_path=os.path.join(models_path, "model.mar"),
management_api=args.management_api,
image=args.image,
params={
"model_name": args.model_name,
# set this to allocate a worker
# "initial_workers": 1,
},
)
对于模型的可解释性,我们利用了存储在自己组件文件中的自定义组件。此组件接收来自 datapreproc 和 train 组件的输出,并生成包含集成梯度结果的图像。
interpret_path: str = os.path.join(args.output_path, "interpret")
interpret_app: specs.AppDef = utils_python(
*(
"--load_path",
os.path.join(models_path, "last.ckpt"),
"--data_path",
processed_data_path,
"--output_path",
interpret_path,
),
image=args.image,
m="torchx.examples.apps.lightning.interpret",
)
管道定义¶
最后一步是使用 KFP 适配器通过 torchx 组件定义实际的管道,并导出可以上传到 KFP 集群的管道包。
KFP 适配器目前不跟踪输入和输出,因此容器需要通过 .after() 指定其依赖项。
我们调用 .set_tty() 以使组件的日志更具响应性,以供示例使用。
def pipeline() -> None:
# container_from_app creates a KFP container from the TorchX app
# definition.
copy = container_from_app(copy_app)
copy.container.set_tty()
datapreproc = container_from_app(datapreproc_app)
datapreproc.container.set_tty()
datapreproc.after(copy)
# For the trainer we want to log that UI metadata so you can access
# tensorboard from the UI.
trainer = container_from_app(trainer_app, ui_metadata=ui_metadata)
trainer.container.set_tty()
trainer.after(datapreproc)
if False:
serve = container_from_app(serve_app)
serve.container.set_tty()
serve.after(trainer)
if False:
# Serve and interpret only require the trained model so we can run them
# in parallel to each other.
interpret = container_from_app(interpret_app)
interpret.container.set_tty()
interpret.after(trainer)
kfp.compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)
with open("pipeline.yaml", "rt") as f:
print(f.read())
运行完所有这些操作后,您应该会得到一个管道文件(通常是 pipeline.yaml),您可以通过 UI 或 kfp.Client 将其上传到您的 KFP 集群。
# sphinx_gallery_thumbnail_path = '_static/img/gallery-kfp.png'
脚本总运行时间:(0 分 0.000 秒)