快捷方式

DataPipe 教程

使用 DataPipes

假设我们要从 CSV 文件加载数据,执行以下步骤

  • 列出目录中的所有 CSV 文件

  • 加载 CSV 文件

  • 解析 CSV 文件并生成行

  • 将数据集拆分为训练集和验证集

有一些 内置 DataPipes 可以帮助我们完成上述操作。

例如,CSVParser 的源代码如下所示

@functional_datapipe("parse_csv")
class CSVParserIterDataPipe(IterDataPipe):
    def __init__(self, dp, **fmtparams) -> None:
        self.dp = dp
        self.fmtparams = fmtparams

    def __iter__(self) -> Iterator[Union[Str_Or_Bytes, Tuple[str, Str_Or_Bytes]]]:
        for path, file in self.source_datapipe:
            stream = self._helper.skip_lines(file)
            stream = self._helper.strip_newline(stream)
            stream = self._helper.decode(stream)
            yield from self._helper.return_path(stream, path=path)  # Returns 1 line at a time as List[str or bytes]

如另一节所述,DataPipes 可以使用其函数形式(推荐)或其类构造函数进行调用。管道可以按如下方式组装

import torchdata.datapipes as dp

FOLDER = 'path/2/csv/folder'
datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(delimiter=',')
N_ROWS = 10000  # total number of rows of data
train, valid = datapipe.random_split(total_length=N_ROWS, weights={"train": 0.5, "valid": 0.5}, seed=0)

for x in train:  # Iterating through the training dataset
    pass

for y in valid:  # Iterating through the validation dataset
    pass

您可以在此处找到内置 IterDataPipes 的完整列表,以及 MapDataPipes 的完整列表

使用 DataLoader

在本节中,我们将演示如何将 DataPipeDataLoader 结合使用。在大多数情况下,您可以通过将 dataset=datapipe 作为输入参数传递给 DataLoader 来使用它。有关 DataLoader 的详细文档,请访问 此 PyTorch 核心页面

请参阅 此页面,了解有关将 DataPipeDataLoader2 结合使用的信息。

对于此示例,我们首先将有一个辅助函数,用于生成一些包含随机标签和数据的 CSV 文件。

import csv
import random

def generate_csv(file_label, num_rows: int = 5000, num_features: int = 20) -> None:
    fieldnames = ['label'] + [f'c{i}' for i in range(num_features)]
    writer = csv.DictWriter(open(f"sample_data{file_label}.csv", "w", newline=''), fieldnames=fieldnames)
    writer.writeheader()
    for i in range(num_rows):
        row_data = {col: random.random() for col in fieldnames}
        row_data['label'] = random.randint(0, 9)
        writer.writerow(row_data)

接下来,我们将构建 DataPipes,以便读取并解析生成的 CSV 文件。请注意,我们更倾向于将定义的函数传递给 DataPipes,而不是 lambda 函数,因为前者是可与 pickle 序列化的。

import numpy as np
import torchdata.datapipes as dp

def filter_for_data(filename):
    return "sample_data" in filename and filename.endswith(".csv")

def row_processor(row):
    return {"label": np.array(row[0], np.int32), "data": np.array(row[1:], dtype=np.float64)}

def build_datapipes(root_dir="."):
    datapipe = dp.iter.FileLister(root_dir)
    datapipe = datapipe.filter(filter_fn=filter_for_data)
    datapipe = datapipe.open_files(mode='rt')
    datapipe = datapipe.parse_csv(delimiter=",", skip_lines=1)
    # Shuffle will happen as long as you do NOT set `shuffle=False` later in the DataLoader
    datapipe = datapipe.shuffle()
    datapipe = datapipe.map(row_processor)
    return datapipe

最后,我们将在 '__main__' 中将所有内容整合在一起,并将 DataPipe 传递给 DataLoader。请注意,如果您选择在设置 batch_size > 1 时使用 Batcher,您的样本将被多次批处理。您应该选择其中之一。

from torch.utils.data import DataLoader

if __name__ == '__main__':
    num_files_to_generate = 3
    for i in range(num_files_to_generate):
        generate_csv(file_label=i, num_rows=10, num_features=3)
    datapipe = build_datapipes()
    dl = DataLoader(dataset=datapipe, batch_size=5, num_workers=2)
    first = next(iter(dl))
    labels, features = first['label'], first['data']
    print(f"Labels batch shape: {labels.size()}")
    print(f"Feature batch shape: {features.size()}")
    print(f"{labels = }\n{features = }")
    n_sample = 0
    for row in iter(dl):
        n_sample += 1
    print(f"{n_sample = }")

将打印以下语句以显示单个批量的标签和特征的形状。

Labels batch shape: torch.Size([5])
Feature batch shape: torch.Size([5, 3])
labels = tensor([8, 9, 5, 9, 7], dtype=torch.int32)
features = tensor([[0.2867, 0.5973, 0.0730],
        [0.7890, 0.9279, 0.7392],
        [0.8930, 0.7434, 0.0780],
        [0.8225, 0.4047, 0.0800],
        [0.1655, 0.0323, 0.5561]], dtype=torch.float64)
n_sample = 12

之所以出现 n_sample = 12,是因为未使用 ShardingFilter (datapipe.sharding_filter()),因此每个工作程序都将独立返回所有样本。在这种情况下,每个文件有 10 行,有 3 个文件,批次大小为 5,这意味着每个工作程序有 6 个批次。使用 2 个工作程序,我们从 DataLoader 中获得了 12 个总批次。

为了使 DataPipe 分片能够与 DataLoader 协同工作,我们需要添加以下内容。

def build_datapipes(root_dir="."):
    datapipe = ...
    # Add the following line to `build_datapipes`
    # Note that it is somewhere after `Shuffler` in the DataPipe line, but before expensive operations
    datapipe = datapipe.sharding_filter()
    return datapipe

重新运行时,我们将获得

...
n_sample = 6

注意

  • 尽早将 ShardingFilter (datapipe.sharding_filter) 添加到管道中,尤其是在执行解码等昂贵操作之前,以避免在工作程序/分布式进程中重复执行这些昂贵的操作。

  • 对于需要进行分片的源数据,务必在 ShardingFilter 之前添加 Shuffler,以确保在数据被分成分片之前进行全局洗牌。否则,每个工作程序进程都会在所有时期处理相同的分片数据。这意味着每个批次只包含来自同一分片的数据,这会导致训练期间的准确性降低。但是,这并不适用于已经为每个多进程/分布式进程进行分片的数据源,因为在这种情况下,ShardingFilter 不再需要出现在管道中。

  • 在某些情况下,将 Shuffler 放置在管道中更靠前会导致性能下降,因为某些操作(例如解压缩)在顺序读取时速度更快。在这种情况下,建议在洗牌之前(可能是在任何数据加载之前)对文件进行解压缩。

您可以在 此页面 上找到适用于不同研究领域的更多 DataPipe 实现示例。

实现自定义 DataPipe

目前,我们已经有了大量的内置 DataPipes,并且我们预计它们可以覆盖大多数必要的数据处理操作。如果没有任何 DataPipes 支持您的需求,您可以创建自己的自定义 DataPipe。

作为指导示例,让我们实现一个 IterDataPipe,它对输入迭代器应用一个可调用对象。对于 MapDataPipe,请查看 map 文件夹中的示例,并按照以下步骤执行 __getitem__ 方法(而不是 __iter__ 方法)。

命名

DataPipe 的命名约定是“操作”-er,后面跟着 IterDataPipeMapDataPipe,因为每个 DataPipe 本质上都是一个容器,用于对来自源 DataPipe 的数据应用操作。为了简洁起见,我们在 init 文件中将其简称为“操作-er”。对于我们的 IterDataPipe 示例,我们将模块命名为 MapperIterDataPipe,并在 torchdata.datapipes 下将其别名为 iter.Mapper

对于函数式方法名,命名约定为 datapipe.<operation>。例如,Mapper 的函数式方法名为 map,因此可以通过 datapipe.map(...) 调用它。

构造函数

DataSets 现在通常被构建为 DataPipes 的堆栈,因此每个 DataPipe 通常将源 DataPipe 作为其第一个参数。以下是一个简化的 Mapper 版本作为示例

from torchdata.datapipes.iter import IterDataPipe

class MapperIterDataPipe(IterDataPipe):
    def __init__(self, source_dp: IterDataPipe, fn) -> None:
        super().__init__()
        self.source_dp = source_dp
        self.fn = fn

注意

  • 避免在 __init__ 函数中从源 DataPipe 加载数据,以便支持延迟数据加载并节省内存。

  • 如果 IterDataPipe 实例将数据保存在内存中,请注意数据就地修改。当从实例创建第二个迭代器时,数据可能已经发生了变化。请以 IterableWrapper 为参考,为每个迭代器 deepcopy 数据。

  • 避免使用现有 DataPipes 的函数名称占用的变量名。例如,.filter 是可以用来调用 FilterIterDataPipe 的函数名。在另一个 IterDataPipe 中使用名为 filter 的变量会导致混淆。

迭代器

对于 IterDataPipes,需要一个 __iter__ 函数从源 IterDataPipe 中获取数据,然后对数据应用操作,最后 yield

class MapperIterDataPipe(IterDataPipe):
    # ... See __init__() defined above

    def __iter__(self):
        for d in self.dp:
            yield self.fn(d)

长度

在许多情况下,就像我们的 MapperIterDataPipe 示例一样,DataPipe 的 __len__ 方法返回源 DataPipe 的长度。

class MapperIterDataPipe(IterDataPipe):
    # ... See __iter__() defined above

    def __len__(self):
        return len(self.dp)

但是,请注意,__len__ 对于 IterDataPipe 是可选的,而且通常不建议使用。对于下面使用 DataPipes 部分中的 CSVParserIterDataPipe__len__ 没有实现,因为每个文件的行数在加载之前是未知的。在某些特殊情况下,__len__ 可以返回一个整数或根据输入引发错误。在这种情况下,错误必须是 TypeError,以便支持 Python 的内置函数,例如 list(dp)

使用函数式 API 注册 DataPipes

每个 DataPipe 可以使用装饰器 functional_datapipe 注册以支持函数式调用。

@functional_datapipe("map")
class MapperIterDataPipe(IterDataPipe):
   # ...

然后可以使用它们的函数式形式(推荐)或类构造函数构建 DataPipes 堆栈

import torchdata.datapipes as dp

# Using functional form (recommended)
datapipes1 = dp.iter.FileOpener(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
# Using class constructors
datapipes2 = dp.iter.FileOpener(['a.file', 'b.file'])
datapipes2 = dp.iter.Mapper(datapipes2, fn=decoder)
datapipes2 = dp.iter.Shuffler(datapipes2)
datapipes2 = dp.iter.Batcher(datapipes2, 2)

在上面的示例中,datapipes1datapipes2 代表完全相同的 IterDataPipe 堆栈。我们建议使用 DataPipes 的函数式形式。

使用云存储提供商

在本节中,我们将展示使用内置 fsspec DataPipes 访问 AWS S3、Google Cloud Storage 和 Azure Cloud Storage 的示例。虽然这里只讨论了这两个提供商,但使用额外的库,fsspec DataPipes 应该允许您连接到其他存储系统(已知实现列表)。

如果您需要其他云存储提供商的支持,或者您有要与社区共享的代码示例,请在 GitHub 上告知我们。

使用 fsspec DataPipes 访问 AWS S3

这需要安装库 fsspec (文档) 和 s3fs (s3fs GitHub 仓库)。

您可以通过将以 "s3://BUCKET_NAME" 开头的路径传递给 FSSpecFileLister (.list_files_by_fsspec(...)) 来列出 S3 存储桶目录中的文件。

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["s3://BUCKET_NAME"]).list_files_by_fsspec()

您还可以使用 FSSpecFileOpener (.open_files_by_fsspec(...)) 打开文件,并对它们进行流式传输(如果文件格式支持)。

请注意,您还可以通过参数 kwargs_for_open 提供其他参数。这对于访问特定存储桶版本等目的很有用,您可以通过传入 {version_id: 'SOMEVERSIONID'} 来实现(有关 s3fs 的 S3 存储桶版本感知的更多 详细信息)。支持的参数会因您访问的(云)文件系统而异。

在下面的示例中,我们使用 TarArchiveLoader (.load_from_tar(mode="r|")) 对存档进行流式传输,与通常的 mode="r:" 相比。这允许我们开始处理存档中的数据,而无需首先将整个存档下载到内存中。

from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(["s3://BUCKET_NAME/DIRECTORY/1.tar"])
dp = dp.open_files_by_fsspec(mode="rb", anon=True).load_from_tar(mode="r|") # Streaming version
# The rest of data processing logic goes here

最后,FSSpecFileSaver 也可用于将数据写入云。

使用 fsspec DataPipes 访问 Google Cloud Storage (GCS)

这需要安装库 fsspec (文档) 和 gcsfs (gcsfs GitHub 仓库)。

您可以通过指定以 "gcs://BUCKET_NAME" 开头的路径来列出 GCS 存储桶目录中的文件。下面示例中的存储桶名为 uspto-pair

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["gcs://uspto-pair/"]).list_files_by_fsspec()
print(list(dp))
# ['gcs://uspto-pair/applications', 'gcs://uspto-pair/docs', 'gcs://uspto-pair/prosecution-history-docs']

以下是如何从名为 uspto-pair 的存储桶中的 applications 目录加载 zip 文件 05900035.zip 的示例。

from torchdata.datapipes.iter import IterableWrapper

dp = IterableWrapper(["gcs://uspto-pair/applications/05900035.zip"]) \
        .open_files_by_fsspec(mode="rb") \
        .load_from_zip()
# Logic to process those archive files comes after
for path, filestream in dp:
    print(path, filestream)
# gcs:/uspto-pair/applications/05900035.zip/05900035/README.txt, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-address_and_attorney_agent.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-application_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-continuity_data.tsv, StreamWrapper<...>
# gcs:/uspto-pair/applications/05900035.zip/05900035/05900035-transaction_history.tsv, StreamWrapper<...>

使用 fsspec DataPipes 访问 Azure Blob 存储

这需要安装库 fsspec (文档) 和 adlfs (adlfs GitHub 仓库)。您可以通过提供以 abfs:// 开头的 URI 来访问 Azure 数据湖存储 Gen2 中的数据。例如,FSSpecFileLister (.list_files_by_fsspec(...)) 可用于列出容器中目录中的文件

from torchdata.datapipes.iter import IterableWrapper

storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
dp = IterableWrapper(['abfs://CONTAINER/DIRECTORY']).list_files_by_fsspec(**storage_options)
print(list(dp))
# ['abfs://container/directory/file1.txt', 'abfs://container/directory/file2.txt', ...]

您还可以使用 FSSpecFileOpener (.open_files_by_fsspec(...)) 打开文件,并对它们进行流式传输(如果文件格式支持)。

以下是如何从公共容器中的 curated/covid-19/ecdc_cases/latest 目录加载 CSV 文件 ecdc_cases.csv 的示例,该容器属于帐户 pandemicdatalake

from torchdata.datapipes.iter import IterableWrapper
dp = IterableWrapper(['abfs://public/curated/covid-19/ecdc_cases/latest/ecdc_cases.csv']) \
        .open_files_by_fsspec(account_name='pandemicdatalake') \
        .parse_csv()
print(list(dp)[:3])
# [['date_rep', 'day', ..., 'iso_country', 'daterep'],
# ['2020-12-14', '14', ..., 'AF', '2020-12-14'],
# ['2020-12-13', '13', ..., 'AF', '2020-12-13']]

如有必要,您还可以使用以 adl://abfs:// 开头的 URI 访问 Azure 数据湖存储 Gen1 中的数据,如 adlfs 仓库的 README 中所述

使用 fsspec DataPipes 访问 Azure ML 数据存储

Azure ML 数据存储是对 Azure 上现有存储帐户的引用。创建和使用 Azure ML 数据存储的主要好处是

  • 一个通用的、易于使用的 API,用于与 Azure 中的不同存储类型进行交互(Blob/Files/<datastore>)。

  • 在团队合作时,更容易发现有用的数据存储。

  • 身份验证会自动处理 - 支持基于凭据的访问(服务主体/SAS/密钥)和基于身份的访问(Azure Active Directory/托管身份)。使用基于凭据的身份验证时,您无需在代码中公开秘密。

这需要安装库 azureml-fsspec (文档)。

您可以通过提供以 azureml:// 开头的 URI 来访问 Azure ML 数据存储中的数据。例如,FSSpecFileLister (.list_files_by_fsspec(...)) 可用于列出容器中目录中的文件

from torchdata.datapipes.iter import IterableWrapper

# set the subscription_id, resource_group, and AzureML workspace_name
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace_name = "<workspace_name>"

# set the datastore name and path on the datastore
datastore_name = "<datastore_name>"
path_on_datastore = "<path_on_datastore>"

uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/{datastore_name}/paths/{path_on_datastore}"

dp = IterableWrapper([uri]).list_files_by_fsspec()
print(list(dp))
# ['azureml:///<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/<folder>/file1.txt',
# 'azureml:///<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/<folder>/file2.txt', ...]

您还可以使用 FSSpecFileOpener (.open_files_by_fsspec(...)) 打开文件,并对它们进行流式传输(如果文件格式支持)。

以下是如何从默认的 Azure ML 数据存储 workspaceblobstore 中加载 tar 文件的示例,其中路径为 /cifar-10-python.tar.gz(顶层文件夹)。

from torchdata.datapipes.iter import IterableWrapper

# set the subscription_id, resource_group, and AzureML workspace_name
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace_name = "<workspace_name>"

# set the datastore name and path on the datastore
datastore_name = "workspaceblobstore"
path_on_datastore = "cifar-10-python.tar.gz"

uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/{datastore_name}/paths/{path_on_datastore}"

dp = IterableWrapper([uri]) \
        .open_files_by_fsspec(mode="rb") \
        .load_from_tar()

for path, filestream in dp:
    print(path)
# ['azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_4',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/readme.html',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/test_batch',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_3',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/batches.meta',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_2',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_5',
#   'azureml:/subscriptions/<sub_id>/resourcegroups/<rg_name>/workspaces/<ws_name>/datastores/<datastore>/paths/cifar-10-python.tar.gz/cifar-10-batches-py/data_batch_1]

以下是如何从 Azure ML 数据存储 workspaceblobstore 中加载 CSV 文件(著名的泰坦尼克号数据集 (下载))的示例,其中路径为 /titanic.csv(顶层文件夹)。

from torchdata.datapipes.iter import IterableWrapper

# set the subscription_id, resource_group, and AzureML workspace_name
subscription_id = "<subscription_id>"
resource_group = "<resource_group>"
workspace_name = "<workspace_name>"

# set the datastore name and path on the datastore
datastore_name = "workspaceblobstore"
path_on_datastore = "titanic.csv"

uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace_name}/datastores/{datastore_name}/paths/{path_on_datastore}"

def row_processer(row):
    # return the label and data (the class and age of the passenger)
    # if missing age, set to 50
    if row[5] == "":
        row[5] = 50.0
    return {"label": np.array(row[1], np.int32), "data": np.array([row[2],row[5]], dtype=np.float32)}

dp = IterableWrapper([uri]) \
        .open_files_by_fsspec() \
        .parse_csv(delimiter=",", skip_lines=1) \
        .map(row_processer)

print(list(dp)[:3])
# [{'label': array(0, dtype=int32), 'data': array([ 3., 22.], dtype=float32)},
#  {'label': array(1, dtype=int32), 'data': array([ 1., 38.], dtype=float32)},
#  {'label': array(1, dtype=int32), 'data': array([ 3., 26.], dtype=float32)}]

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源