• 文档 >
  • 可迭代式 DataPipes
快捷方式

可迭代式 DataPipes

可迭代式数据集是 IterableDataset 子类的实例,它实现了 __iter__() 协议,并表示对数据样本的可迭代对象。这种类型的数据集特别适合于随机读取代价高昂甚至不可能的情况,以及批次大小取决于获取的数据的情况。

例如,当调用 iter(iterdatapipe) 时,这种数据集可以返回从数据库、远程服务器甚至实时生成的日志中读取的数据流。

这是 torchIterableDataset 的更新版本。

class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)

可迭代式 DataPipe。

所有表示数据样本可迭代对象的数据管道都应该继承自它。这种类型的数据管道在数据来自流或样本数量过大而无法全部放入内存时特别有用。IterDataPipe 是延迟初始化的,它的元素只有在 next()IterDataPipe 的迭代器上被调用时才会计算。

所有子类都应该覆盖 __iter__(),它将返回此 DataPipe 中样本的迭代器。调用 __iter__IterDataPipe 会自动调用它的方法 reset(),该方法默认情况下不会执行任何操作。在编写自定义 IterDataPipe 时,用户应根据需要覆盖 reset()。常见的用途包括重置自定义 IterDataPipe 中的缓冲区、指针和各种状态变量。

注意

每个 IterDataPipe 一次只能有一个迭代器有效,创建第二个迭代器将使第一个迭代器失效。此约束是必要的,因为某些 IterDataPipe 具有内部缓冲区,如果存在多个迭代器,其状态可能会变得无效。下面的代码示例介绍了此约束在实践中的表现形式。如果您对这个约束有任何反馈,请查看 GitHub IterDataPipe 单个迭代器问题

这些 DataPipes 可以通过两种方式调用,使用类构造函数或将它们的函数形式应用到现有 IterDataPipe 上(推荐,大多数 DataPipes 都支持,但并非所有 DataPipes 都支持)。您可以将多个 IterDataPipe 连接在一起,形成一个管道,该管道将依次执行多个操作。

注意

当子类与 DataLoader 一起使用时,DataPipe 中的每个项都将从 DataLoader 迭代器中生成。当 num_workers > 0 时,每个工作进程将拥有 DataPipe 对象的不同副本,因此通常需要独立配置每个副本,以避免从工作进程返回重复数据。get_worker_info() 在工作进程中被调用时,返回有关工作进程的信息。它可以用于数据集的 __iter__() 方法或 DataLoaderworker_init_fn 选项,以修改每个副本的行为。

示例

一般用法
>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> dp = IterableWrapper(range(10))
>>> map_dp_1 = Mapper(dp, lambda x: x + 1)  # Using class constructor
>>> map_dp_2 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
>>> list(map_dp_1)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> list(map_dp_2)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
>>> list(filter_dp)
[2, 4, 6, 8, 10]
单个迭代器约束示例
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> source_dp = IterableWrapper(range(10))
>>> it1 = iter(source_dp)
>>> list(it1)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> it1 = iter(source_dp)
>>> it2 = iter(source_dp)  # The creation of a new iterator invalidates `it1`
>>> next(it2)
0
>>> next(it1)  # Further usage of `it1` will raise a `RunTimeError`

我们有不同类型的可迭代数据管道

  1. 归档 - 打开和解压缩不同格式的归档文件。

  2. 增强 - 增强您的样本(例如,添加索引,或无限循环)。

  3. 组合 - 执行组合操作(例如,采样,洗牌)。

  4. 组合/拆分 - 通过组合多个 DataPipes 或将一个 DataPipe 拆分为多个 DataPipes 来与它们交互。

  5. 分组 - 在 DataPipe 中对样本进行分组

  6. IO - 与文件系统或远程服务器交互(例如,下载、打开、保存文件,以及列出目录中的文件)。

  7. 映射 - 将给定函数应用于 DataPipe 中的每个元素。

  8. 其他 - 执行各种各样的操作。

  9. 选择 - 在 DataPipe 中选择特定的样本。

  10. 文本 - 解析、读取和转换文本文件和数据

归档 DataPipes

这些 DataPipes 有助于打开和解压缩不同格式的归档文件。

Bz2FileLoader

从包含路径名和 bz2 二进制流的元组的可迭代 DataPipe 中解压缩 bz2 二进制流,并生成路径名和解压缩二进制流的元组(函数名:load_from_bz2)。

Decompressor

获取路径和压缩数据流的元组,并返回路径和解压缩数据流的元组(函数名:decompress)。

RarArchiveLoader

从包含路径名和 rar 二进制流的元组的输入可迭代 DataPipes 中解压缩 rar 二进制流,并生成路径名和解压缩二进制流的元组(函数名:load_from_rar)。

TarArchiveLoader

从包含路径名和 tar 二进制流的元组的可迭代 DataPipe 中打开/解压缩 tar 二进制流,并生成路径名和解压缩二进制流的元组(函数名:load_from_tar)。

TFRecordLoader

从包含路径名和 tfrecord 二进制流的元组的可迭代 DataPipe 中打开/解压缩 tfrecord 二进制流,并生成存储的记录(函数名:load_from_tfrecord)。

WebDataset

可迭代 DataPipe,它接受(路径,数据)元组流,通常表示 tar 归档文件的路径名和文件(函数名:webdataset)。

XzFileLoader

从包含路径名和 xy 二进制流的元组的可迭代 DataPipe 中解压缩 xz(lzma)二进制流,并生成路径名和解压缩二进制流的元组(函数名:load_from_xz)。

ZipArchiveLoader

从包含路径名称和 zip 二进制流的 Iterable DataPipe 中打开/解压缩 zip 二进制流,并生成路径名称和解压缩的二进制流的元组(函数名称:load_from_zip)。

增强 DataPipes

这些 DataPipes 有助于增强您的样本。

循环器

默认情况下,无限期地循环指定的输入,或者循环指定的次数(函数名称:cycle)。

枚举器

通过枚举为现有 DataPipe 添加索引,默认情况下索引从 0 开始(函数名称:enumerate)。

索引添加器

为现有的 Iterable DataPipe 添加索引(函数名称:add_index)。

重复器

在移至下一个元素之前,为指定的次数重复生成源 DataPipe 的每个元素(函数名称:repeat)。

组合 DataPipes

这些 DataPipes 有助于执行组合操作。

批次内洗牌器

对来自先前 DataPipe 的每个小批量数据进行洗牌(函数名称:in_batch_shuffle)。

采样器

使用提供的 Sampler 生成样本元素(默认值为 SequentialSampler)。

洗牌器

使用缓冲区对输入 DataPipe 进行洗牌(函数名称:shuffle)。

组合/拆分 DataPipes

这些通常涉及多个 DataPipes,将它们组合起来或将一个 DataPipe 拆分为多个 DataPipe。

连接器

连接多个 Iterable DataPipes(函数名称:concat)。

解复用器

使用给定的分类函数将输入 DataPipe 拆分为多个子 DataPipe(函数名称:demux)。

分叉器

创建同一 Iterable DataPipe 的多个实例(函数名称:fork)。

迭代键压缩器

根据匹配的键将两个 IterDataPipes 压缩在一起(函数名称:zip_with_iter)。

映射键压缩器

将来自源 IterDataPipe 的项目与来自 MapDataPipe 的项目连接在一起(函数名称:zip_with_map)。

复用器

一次从每个输入 Iterable DataPipe 生成一个元素(函数名称:mux)。

最长复用器

一次从每个输入 Iterable DataPipe 生成一个元素(函数名称:mux_longest)。

循环轮询解复用器

以循环轮询顺序将输入 DataPipe 拆分为多个子 DataPipe(函数名称:round_robin_demux)。

样本复用器

接受 (IterDataPipe, Weight) 的 Dict,并根据其权重从这些 DataPipes 中采样,生成项目。

解压缩器

接受 Sequences 的 DataPipe,解压缩每个 Sequence,并根据它们在 Sequence 中的位置将元素返回到单独的 DataPipes 中(函数名称:unzip)。

压缩器

将来自每个输入 DataPipe 的元素聚合为元组(函数名称:zip)。

最长压缩器

将来自每个输入 DataPipe 的元素聚合为元组(函数名称:zip_longest)。

分组 DataPipes

这些 DataPipes 允许您对 DataPipe 中的样本进行分组。

批处理器

创建数据的小批量(函数名称:batch)。

桶批处理器

从排序的桶中创建数据的小批量(函数名称:bucketbatch)。

整理器

通过自定义整理函数将 DataPipe 中的样本整理为 Tensor(函数名称:collate)。

分组器

根据来自 group_key_fn 的键对 IterDataPipe 中的数据进行分组,生成具有最大 group_size 的批次大小的 DataChunk

最大令牌桶化器

从具有有限大小的最小堆中创建数据的小批量,并且每个批次中由 len_fn 返回的样本的总长度将受 max_token_count 限制(函数名称:max_token_bucketize)。

解批处理器

撤消数据的批处理(函数名称:unbatch)。

IO DataPipes

这些 DataPipes 有助于与文件系统或远程服务器交互(例如,下载、打开、保存文件以及列出目录中的文件)。

AISFileLister

Iterable Datapipe,列出具有给定 URL 前缀的 AIStore 后端的文件(函数名称:list_files_by_ais)。

AISFileLoader

Iterable DataPipe,使用给定的 URL 从 AIStore 加载文件(函数名称:load_files_by_ais)。

FSSpecFileLister

列出提供给 root 路径名或 URL 的目录的内容,并生成目录内每个文件的完整路径名或 URL(函数名称:list_files_by_fsspec)。

FSSpecFileOpener

从包含 fsspec 路径的输入 datapipe 中打开文件,并生成路径名和打开的文件流的元组(函数名称:open_files_by_fsspec)。

FSSpecSaver

接受元数据和数据的 DataPipe,将数据保存到目标路径(由 filepath_fn 和元数据生成),并生成生成的 fsspec 路径(函数名称:save_by_fsspec)。

FileLister

给定根目录的路径,生成根目录内的文件路径名(路径 + 文件名)。

FileOpener

给定路径名,打开文件,并以元组形式生成路径名和文件流(函数名称:open_files)。

GDriveReader

接受指向 GDrive 文件的 URL,并生成文件名和 IO 流的元组(函数名称:read_from_gdrive)。

HttpReader

接受文件 URL(指向文件的 HTTP URL),并生成文件 URL 和 IO 流的元组(函数名称:read_from_http)。

HuggingFaceHubReader

接受数据集名称并返回一个 Iterable HuggingFace 数据集。

IoPathFileLister

列出提供给 root 路径名或 URL 的目录的内容,并生成目录内每个文件的完整路径名或 URL(函数名称:list_files_by_iopath)。

IoPathFileOpener

从包含路径名或 URL 的输入 datapipe 中打开文件,并生成路径名和打开的文件流的元组(函数名称:open_files_by_iopath)。

IoPathSaver

接受元数据和数据的 DataPipe,将数据保存到由 filepath_fn 和元数据生成的 target path,并生成 iopath 格式的路径(函数名称:save_by_iopath)。

在线读取器

接受文件 URL(可以是指向文件的 HTTP URL 或指向 GDrive 文件的 URL),并生成文件 URL 和 IO 流的元组(函数名称:read_from_remote)。

ParquetDataFrameLoader

接受指向 Parquet 文件的路径,并为 Parquet 文件中的每个行组返回一个 TorchArrow DataFrame(函数名称:load_parquet_as_df)。

S3FileLister

[已弃用] 改用 https://github.com/awslabs/s3-connector-for-pytorch

S3FileLoader

[已弃用] 改用 https://github.com/awslabs/s3-connector-for-pytorch

保存器

接受元数据和数据的 DataPipe,将数据保存到由 filepath_fn 和元数据生成的 target path,并生成本地文件系统的文件路径(函数名称:save_to_disk)。

映射 DataPipes

这些 DataPipes 将给定函数应用于 DataPipe 中的每个元素。

批次异步映射器

将来自源 DataPipe 的元素组合成批次,并对每个批次中的元素并发地应用协程函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称:async_map_batches)。

批次映射器

将来自源 DataPipe 的元素组合成批次,并对每个批次应用函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称:map_batches)。

扁平映射器

对来自源 DataPipe 的每个项目应用函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称:flatmap)。

映射器

对来自源 DataPipe 的每个项目应用函数(函数名称:map)。

洗牌扁平映射器

对来自源 DataPipe 的每个项目应用函数,然后将返回的迭代器收集到一个缓冲区中,然后,在每次迭代时,随机选择缓冲区中的一个迭代器并生成该迭代器中的一个项目(函数名称:shuffled_flatmap)。

线程池映射器

使用 ThreadPoolExecutor 对来自源 DataPipe 的每个项目并发地应用函数(函数名称:threadpool_map)。

其他 DataPipes

一组具有不同功能的杂项 DataPipes。

DataFrameMaker

接受数据行,将一定数量的行批处理在一起,并创建 TorchArrow DataFrames(函数名称:dataframe)。

EndOnDiskCacheHolder

指示先前 DataPipe 的结果何时将保存到由 filepath_fn 指定的本地文件中(函数名称:end_caching)。

FullSync

同步分布式进程中的数据,以防止训练期间出现挂起,这是由不均匀的切片数据引起的(函数名称:fullsync)。

HashChecker

计算并检查每个文件的哈希值,来自包含文件名和数据/流的元组的输入 DataPipe(函数名称:check_hash)。

InMemoryCacheHolder

将来自源 DataPipe 的元素存储在内存中,如果指定了大小限制,则最多存储该大小限制(函数名称:in_memory_cache)。

IterableWrapper

包装一个可迭代对象以创建一个 IterDataPipe。

LengthSetter

设置 DataPipe 的 length 属性,该属性由 __len__ 返回(函数名称:set_length)。

MapToIterConverter

MapDataPipe 转换为 IterDataPipe(函数名:to_iter_datapipe)。

OnDiskCacheHolder

将多个 DataPipe 操作的输出缓存到本地文件,这些文件通常是性能瓶颈,例如下载、解压缩等(函数名:on_disk_cache)。

PinMemory

从源 DataPipe 预取一个元素并将其移动到固定内存(函数名:pin_memory)。

Prefetcher

从源 DataPipe 预取元素并将其放入缓冲区(函数名:prefetch)。

RandomSplitter

随机将源 DataPipe 中的样本分成组(函数名:random_split)。

ShardExpander

将传入的 shard 字符串扩展为 shard。

ShardingFilter

允许 DataPipe 进行分片的包装器(函数名:sharding_filter)。

ShardingRoundRobinDispatcher

包装器,指示 DataPipe 图的先前部分不可复制,并且将在单独的单一调度进程中进行迭代,以便在使用多进程时以循环方式将数据分发到工作进程。

选择 DataPipes

这些 DataPipes 可帮助您选择 DataPipe 中的特定样本。

Filter

根据输入 filter_fn 过滤掉源 datapipe 中的元素(函数名:filter)。

Header

从源 DataPipe 的开头生成元素,直到指定的限制(函数名:header)。

Dropper

通过其索引删除输入 DataPipe 中的列/元素(函数名:drop)。

Slicer

通过开始/停止/步长或索引返回输入 DataPipe 中的元素切片(函数名:slice)。

Flattener

根据提供的索引,在每个样本/元素级别返回输入 DataPipe 的扁平副本(函数名:flatten)。

文本 DataPipes

这些 DataPipes 可帮助您解析、读取和转换文本文件和数据。

CSVDictParser

接受由文件名和 CSV 数据流组成的元组组成的 DataPipe,读取并返回 CSV 文件中的内容,每次一行(函数名:parse_csv_as_dict)。

CSVParser

接受由文件名和 CSV 数据流组成的元组组成的 DataPipe,读取并返回 CSV 文件中的内容,每次一行(函数名:parse_csv)。

JsonParser

从 JSON 数据流中读取并生成文件名和 JSON 数据的元组(函数名:parse_json_files)。

LineReader

接受由文件名和字符串数据流组成的元组组成的 DataPipe,并为流中的每一行生成文件名和该行的元组(函数名:readlines)。

ParagraphAggregator

将来自同一文件的文本行聚合到一个段落中(函数名:lines_to_paragraphs)。

RoutedDecoder

解码来自输入 DataPipe 的二进制流,以元组形式生成路径名和解码后的数据。

Rows2Columnar

接受具有数据批次的输入 DataPipe,并一次处理一批数据,并为每批数据生成一个 Dict,column_names 作为键,来自每行的对应值列表作为值(函数名:rows2columnar)。

StreamReader

给出 IO 流及其标签名称,以元组形式生成带有标签名称的字节。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源