• 文档 >
  • 可迭代风格 DataPipe
快捷方式

可迭代风格 DataPipes

可迭代风格数据集是 IterableDataset 子类的一个实例,它实现了 __iter__() 协议,表示数据样本上的一个可迭代对象。这种类型的数据集特别适用于随机读取成本很高甚至不可能的情况,以及批大小取决于获取的数据的情况。

例如,当调用 iter(iterdatapipe) 时,这样的数据集可以返回从数据库、远程服务器甚至实时生成的日志读取的数据流。

这是 torchIterableDataset 的更新版本。

class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)

可迭代风格 DataPipe。

所有表示数据样本可迭代对象的 DataPipe 都应继承此类。当数据来自流或样本数量太大而无法全部放入内存时,这种风格的 DataPipe 特别有用。IterDataPipe 是延迟初始化的,只有在 IterDataPipe 的迭代器上调用 next() 时才会计算其元素。

所有子类都应该覆盖 __iter__(),它将返回此 DataPipe 中样本的迭代器。调用 IterDataPipe__iter__ 会自动调用其方法 reset(),默认情况下该方法不执行任何操作。编写自定义 IterDataPipe 时,如果需要,用户应覆盖 reset()。常见的用法包括重置自定义 IterDataPipe 中的缓冲区、指针和各种状态变量。

注意

每个 IterDataPipe 一次只能有一个有效的迭代器,创建第二个迭代器将使第一个迭代器失效。此约束是必要的,因为某些 IterDataPipe 具有内部缓冲区,如果有多个迭代器,则其状态可能变得无效。下面的代码示例介绍了此约束在实践中的表现。如果您对此约束有任何反馈,请参阅 GitHub IterDataPipe 单迭代器问题

这些 DataPipe 可以通过两种方式调用:使用类构造函数或将其函数形式应用于现有的 IterDataPipe(推荐,适用于大多数但并非所有 DataPipe)。您可以将多个 IterDataPipe 链接在一起,形成一个管道,依次执行多个操作。

注意

当子类与 DataLoader 一起使用时,DataPipe 中的每个项目都将从 DataLoader 迭代器中产生。当 num_workers > 0 时,每个工作进程都将拥有 DataPipe 对象的不同副本,因此通常希望独立配置每个副本,以避免工作进程返回重复数据。在工作进程中调用 get_worker_info() 会返回有关该工作进程的信息。它可以在数据集的 __iter__() 方法或 DataLoaderworker_init_fn 选项中使用,以修改每个副本的行为。

示例

一般用法
>>> # xdoctest: +SKIP
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> dp = IterableWrapper(range(10))
>>> map_dp_1 = Mapper(dp, lambda x: x + 1)  # Using class constructor
>>> map_dp_2 = dp.map(lambda x: x + 1)  # Using functional form (recommended)
>>> list(map_dp_1)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> list(map_dp_2)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
>>> list(filter_dp)
[2, 4, 6, 8, 10]
单迭代器约束示例
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper
>>> source_dp = IterableWrapper(range(10))
>>> it1 = iter(source_dp)
>>> list(it1)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> it1 = iter(source_dp)
>>> it2 = iter(source_dp)  # The creation of a new iterator invalidates `it1`
>>> next(it2)
0
>>> next(it1)  # Further usage of `it1` will raise a `RunTimeError`

我们有不同类型的可迭代 DataPipe

  1. 归档 - 打开和解压缩不同格式的归档文件。

  2. 增强 - 增强您的样本(例如,添加索引或无限循环)。

  3. 组合 - 执行组合操作(例如,采样、洗牌)。

  4. 组合/拆分 - 通过组合多个 DataPipe 或将一个 DataPipe 拆分为多个来与之交互。

  5. 分组 - 对 DataPipe 中的样本进行分组

  6. IO - 与文件系统或远程服务器交互(例如,下载、打开、保存文件以及列出目录中的文件)。

  7. 映射 - 将给定函数应用于 DataPipe 中的每个元素。

  8. 其他 - 执行各种操作集。

  9. 选择 - 选择 DataPipe 中的特定样本。

  10. 文本 - 解析、读取和转换文本文件和数据

归档 DataPipes

这些 DataPipe 帮助打开和解压缩不同格式的归档文件。

Bz2FileLoader

解压缩包含路径名和 bz2 二进制流元组的可迭代 DataPipe 中的 bz2 二进制流,并生成路径名和提取的二进制流元组(函数名称:load_from_bz2)。

Decompressor

获取路径和压缩数据流的元组,并返回路径和解压缩数据流的元组(函数名称:decompress)。

RarArchiveLoader

解压缩包含路径名和 rar 二进制流元组的输入可迭代 Datapipes 中的 rar 二进制流,并生成路径名和提取的二进制流元组(函数名称:load_from_rar)。

TarArchiveLoader

打开/解压缩包含路径名和 tar 二进制流元组的可迭代 DataPipe 中的 tar 二进制流,并生成路径名和提取的二进制流元组(函数名称:load_from_tar)。

TFRecordLoader

打开/解压缩包含路径名和 tfrecord 二进制流元组的可迭代 DataPipe 中的 tfrecord 二进制流,并生成存储的记录(函数名称:load_from_tfrecord)。

WebDataset

可迭代 DataPipe,接受(路径、数据)元组流,通常表示 tar 归档文件的路径名和文件(函数名称:webdataset)。

XzFileLoader

解压缩包含路径名和 xy 二进制流元组的可迭代 DataPipe 中的 xz (lzma) 二进制流,并生成路径名和提取的二进制流元组(函数名称:load_from_xz)。

ZipArchiveLoader

打开/解压缩包含路径名和 zip 二进制流元组的可迭代 DataPipe 中的 zip 二进制流,并生成路径名和提取的二进制流元组(函数名称:load_from_zip)。

增强 DataPipes

这些 DataPipe 帮助增强您的样本。

Cycler

默认情况下永久循环指定的输入,或循环指定的次数(函数名称:cycle)。

Enumerator

通过枚举向现有 DataPipe 添加索引,默认情况下索引从 0 开始(函数名称:enumerate)。

IndexAdder

向现有的可迭代 DataPipe 添加索引(函数名称:add_index)。

Repeater

在移至下一个元素之前,重复生成源 DataPipe 的每个元素指定的次数(函数名称:repeat)。

组合 DataPipes

这些 DataPipe 帮助执行组合操作。

InBatchShuffler

洗牌来自先前 DataPipe 的每个小批量(函数名称:in_batch_shuffle)。

Sampler

使用提供的 Sampler(默认为 SequentialSampler)生成样本元素。

Shuffler

使用缓冲区对输入 DataPipe 进行随机排序(函数名:shuffle)。

组合/拆分 DataPipes

这些操作通常涉及多个 DataPipes,将它们组合在一起或将一个拆分为多个。

Concater(串联器)

串联多个可迭代的 DataPipes(函数名:concat)。

Demultiplexer(解复用器)

使用给定的分类函数,将输入 DataPipe 拆分为多个子 DataPipes(函数名:demux)。

Forker(分叉器)

创建同一个可迭代 DataPipe 的多个实例(函数名:fork)。

IterKeyZipper(迭代键值对压缩器)

根据匹配的键值对压缩两个 IterDataPipes(函数名:zip_with_iter)。

MapKeyZipper(映射键值对压缩器)

将来自源 IterDataPipe 的项目与来自 MapDataPipe 的项目连接起来(函数名:zip_with_map)。

Multiplexer(多路复用器)

从每个输入的 Iterable DataPipes 中一次生成一个元素(函数名:mux)。

MultiplexerLongest(最长多路复用器)

从每个输入的 Iterable DataPipes 中一次生成一个元素(函数名:mux_longest)。

RoundRobinDemultiplexer(循环调度解复用器)

以循环调度的方式将输入 DataPipe 拆分为多个子 DataPipes(函数名:round_robin_demux)。

SampleMultiplexer(采样多路复用器)

接收一个 (IterDataPipe, 权重) 的 字典,并根据这些 DataPipes 的权重从中采样生成项目。

UnZipper(解压缩器)

接收一个序列的 DataPipe,解压缩每个序列,并根据元素在序列中的位置将它们返回到单独的 DataPipes 中(函数名:unzip)。

Zipper(压缩器)

将来自每个输入 DataPipes 的元素聚合成一个元组(函数名:zip)。

ZipperLongest(最长压缩器)

将来自每个输入 DataPipes 的元素聚合成一个元组(函数名:zip_longest)。

分组 DataPipes

这些 DataPipes 允许您对 DataPipe 内的样本进行分组。

Batcher(批处理程序)

创建数据的微批次(函数名:batch)。

BucketBatcher(桶式批处理程序)

从排序的桶中创建数据的微批次(函数名:bucketbatch)。

Collator(整理器)

通过自定义整理函数将 DataPipe 中的样本整理成张量(函数名:collate)。

Grouper(分组器)

根据 group_key_fn 生成的键值对对来自输入 IterDataPipe 的数据进行分组,如果定义了 group_size,则生成批次大小不超过 group_sizeDataChunk(函数名:groupby)。

MaxTokenBucketizer(最大词元桶化器)

从一个大小有限的最小堆中创建数据的微批次,并且每个批次中由 len_fn 返回的样本总长度将受到 max_token_count 的限制(函数名:max_token_bucketize)。

UnBatcher(反批处理程序)

撤销数据的批处理(函数名:unbatch)。

IO DataPipes

这些 DataPipes 帮助与文件系统或远程服务器进行交互(例如下载、打开、保存文件以及列出目录中的文件)。

AISFileLister(AIS 文件列表器)

可迭代的 Datapipe,用于列出具有给定 URL 前缀的 AIStore 后端中的文件(函数名:list_files_by_ais)。

AISFileLoader(AIS 文件加载器)

可迭代的 DataPipe,用于从 AIStore 加载具有给定 URL 的文件(函数名:load_files_by_ais)。

FSSpecFileLister(FSSpec 文件列表器)

列出提供的 root 路径名或 URL 处的目录内容,并生成目录中每个文件的完整路径名或 URL(函数名:list_files_by_fsspec)。

FSSpecFileOpener(FSSpec 文件打开器)

从包含 fsspec 路径的输入数据管道中打开文件,并生成路径名和打开的文件流的元组(函数名:open_files_by_fsspec)。

FSSpecSaver(FSSpec 保存器)

接收元数据和数据元组的 DataPipe,将数据保存到目标路径(由 filepath_fn 和元数据生成),并生成生成的 fsspec 路径(函数名:save_by_fsspec)。

FileLister(文件列表器)

给定根目录的路径,生成根目录中文件的路径名(路径+文件名)。

FileOpener(文件打开器)

给定路径名,打开文件并在元组中生成路径名和文件流(函数名:open_files)。

GDriveReader(谷歌云端硬盘读取器)

接收指向谷歌云端硬盘文件的 URL,并生成文件名和 IO 流的元组(函数名:read_from_gdrive)。

HttpReader(HTTP 读取器)

接收文件 URL(指向文件的 HTTP URL),并生成文件 URL 和 IO 流的元组(函数名:read_from_http)。

HuggingFaceHubReader(HuggingFace Hub 读取器)

接收数据集名称并返回一个可迭代的 HuggingFace 数据集。

IoPathFileLister(IoPath 文件列表器)

列出提供的 root 路径名或 URL 处的目录内容,并生成目录中每个文件的完整路径名或 URL(函数名:list_files_by_iopath)。

IoPathFileOpener(IoPath 文件打开器)

从包含路径名或 URL 的输入数据管道中打开文件,并生成路径名和打开的文件流的元组(函数名:open_files_by_iopath)。

IoPathSaver(IoPath 保存器)

接收元数据和数据元组的 DataPipe,将数据保存到由 filepath_fn 和元数据生成的目标路径,并以 iopath 格式生成生成的路径(函数名:save_by_iopath)。

OnlineReader(在线读取器)

接收文件 URL(可以是指向文件的 HTTP URL 或指向谷歌云端硬盘文件的 URL),并生成文件 URL 和 IO 流的元组(函数名:read_from_remote)。

ParquetDataFrameLoader(Parquet 数据帧加载器)

接收 Parquet 文件的路径,并为 Parquet 文件中的每个行组返回一个 TorchArrow 数据帧(函数名:load_parquet_as_df)。

S3FileLister(S3 文件列表器)

可迭代的 DataPipe,用于列出具有给定前缀的 Amazon S3 文件 URL(函数名:list_files_by_s3)。

S3FileLoader(S3 文件加载器)

可迭代的 DataPipe,用于从给定的 S3 URL 加载 Amazon S3 文件(函数名:load_files_by_s3)。

Saver(保存器)

接收元数据和数据元组的 DataPipe,将数据保存到由 filepath_fn 和元数据生成的目标路径,并在本地文件系统上生成文件路径(函数名:save_to_disk)。

映射 DataPipes

这些 DataPipes 将给定函数应用于 DataPipe 中的每个元素。

BatchAsyncMapper(异步批处理映射器)

将来自源 DataPipe 的元素组合成批次,并同时对批次中的每个元素应用协程函数,然后将输出扁平化为单个、非嵌套的 IterDataPipe(函数名:async_map_batches)。

BatchMapper(批处理映射器)

将来自源 DataPipe 的元素组合成批次,并对每个批次应用函数,然后将输出扁平化为单个、非嵌套的 IterDataPipe(函数名:map_batches)。

FlatMapper(扁平化映射器)

对来自源 DataPipe 的每个项目应用函数,然后将输出扁平化为单个、非嵌套的 IterDataPipe(函数名:flatmap)。

Mapper(映射器)

对来自源 DataPipe 的每个项目应用函数(函数名:map)。

ShuffledFlatMapper(随机排序扁平化映射器)

对来自源 DataPipe 的每个项目应用函数,然后将返回的可迭代对象收集到缓冲区中,然后,在每次迭代中,随机选择缓冲区中的一个可迭代对象,并从此可迭代对象中生成一个项目(函数名:shuffled_flatmap)。

ThreadPoolMapper(线程池映射器)

使用 ThreadPoolExecutor 并发地对来自源 DataPipe 的每个项目应用函数(函数名:threadpool_map)。

其他 DataPipes

一组具有不同功能的 DataPipes。

DataFrameMaker(数据帧创建器)

接收数据行,将其中的一些行组合在一起,并创建 TorchArrow 数据帧(函数名:dataframe)。

EndOnDiskCacheHolder(磁盘缓存结束持有器)

指示何时将先前 DataPipe 的结果保存到由 filepath_fn 指定的本地文件(函数名:end_caching)。

FullSync(完全同步)

同步分布式进程之间的数据,以防止在训练期间出现由不均匀分片数据导致的挂起(函数名:fullsync)。

HashChecker(哈希校验器)

计算并检查每个文件的哈希值,输入 DataPipe 为文件名和数据/流的元组(函数名:check_hash)。

InMemoryCacheHolder(内存缓存持有器)

将来自源 DataPipe 的元素存储在内存中,如果指定了大小限制,则存储到大小限制(函数名:in_memory_cache)。

IterableWrapper(可迭代对象包装器)

包装一个可迭代对象以创建一个 IterDataPipe。

LengthSetter(长度设置器)

设置 DataPipe 的长度属性,该属性由 __len__ 返回(函数名:set_length)。

MapToIterConverter(映射到迭代器转换器)

MapDataPipe 转换为 IterDataPipe(函数名:to_iter_datapipe)。

OnDiskCacheHolder(磁盘缓存持有器)

将多个 DataPipe 操作的输出缓存到本地文件,这些操作通常是性能瓶颈,例如下载、解压缩等(函数名:on_disk_cache)。

PinMemory(固定内存)

从源 DataPipe 中预取一个元素并将其移动到固定内存(函数名:pin_memory)。

Prefetcher(预取器)

从源 DataPipe 中预取元素并将它们放入缓冲区(函数名:prefetch)。

RandomSplitter(随机拆分器)

将来自源 DataPipe 的样本随机拆分为多个组(函数名:random_split)。

ShardExpander(分片扩展器)

将传入的分片字符串扩展为分片。

ShardingFilter(分片过滤器)

允许对 DataPipe 进行分片的包装器(函数名:sharding_filter)。

ShardingRoundRobinDispatcher(分片循环调度器)

该包装器表示 DataPipe 图的先前部分是不可复制的,并且将在一个单独的、单一的调度过程中进行迭代,以便在使用多处理时以循环的方式将数据分发给工作进程。

选择 DataPipes

这些 DataPipes 帮助您在 DataPipe 中选择特定的样本。

过滤器

根据输入的 filter_fn(函数名:filter)过滤掉源数据管道中的元素。

头部

从源 DataPipe 中提取元素,从头开始,直到指定的限制(函数名:header)。

删除器

通过索引删除输入 DataPipe 中的列/元素(函数名:drop)。

切片器

通过开始/停止/步长或索引返回输入 DataPipe 中元素的切片(函数名:slice)。

扁平化器

根据提供的索引,在每个样本/元素级别返回输入 DataPipe 的扁平化副本(函数名:flatten)。

文本 DataPipes

这些 DataPipes 帮助您解析、读取和转换文本文件和数据。

CSVDictParser

接受由文件名和 CSV 数据流元组组成的 DataPipe,一次读取并返回 CSV 文件中的一行内容(函数名:parse_csv_as_dict)。

CSVParser

接受由文件名和 CSV 数据流元组组成的 DataPipe,一次读取并返回 CSV 文件中的一行内容(函数名:parse_csv)。

JsonParser

从 JSON 数据流中读取数据,并生成文件名和 JSON 数据的元组(函数名:parse_json_files)。

LineReader

接受由文件名和字符串数据流元组组成的 DataPipe,并为流中的每一行生成文件名和行的元组(函数名:readlines)。

ParagraphAggregator

将来自同一文件的文本行聚合成一个段落(函数名:lines_to_paragraphs)。

RoutedDecoder

解码来自输入 DataPipe 的二进制流,在元组中生成路径名和解码后的数据(函数名:routed_decode)。

Rows2Columnar

接受具有数据批次的输入 DataPipe,并一次处理一个批次,并为每个批次生成一个 Dict,其中 column_names 作为键,来自每一行的相应值列表作为值(函数名:rows2columnar)。

StreamReader

给定 IO 流及其标签名称,在元组中生成带有标签名称的字节(函数名:read_from_stream)。

文档

访问 PyTorch 的综合开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得您的问题的答案

查看资源