可迭代风格 DataPipes¶
可迭代风格数据集是 IterableDataset 子类的一个实例,它实现了 __iter__()
协议,表示数据样本上的一个可迭代对象。这种类型的数据集特别适用于随机读取成本很高甚至不可能的情况,以及批大小取决于获取的数据的情况。
例如,当调用 iter(iterdatapipe)
时,这样的数据集可以返回从数据库、远程服务器甚至实时生成的日志读取的数据流。
这是 torch
中 IterableDataset
的更新版本。
- class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)¶
可迭代风格 DataPipe。
所有表示数据样本可迭代对象的 DataPipe 都应继承此类。当数据来自流或样本数量太大而无法全部放入内存时,这种风格的 DataPipe 特别有用。
IterDataPipe
是延迟初始化的,只有在IterDataPipe
的迭代器上调用next()
时才会计算其元素。所有子类都应该覆盖
__iter__()
,它将返回此 DataPipe 中样本的迭代器。调用IterDataPipe
的__iter__
会自动调用其方法reset()
,默认情况下该方法不执行任何操作。编写自定义IterDataPipe
时,如果需要,用户应覆盖reset()
。常见的用法包括重置自定义IterDataPipe
中的缓冲区、指针和各种状态变量。注意
每个
IterDataPipe
一次只能有一个有效的迭代器,创建第二个迭代器将使第一个迭代器失效。此约束是必要的,因为某些IterDataPipe
具有内部缓冲区,如果有多个迭代器,则其状态可能变得无效。下面的代码示例介绍了此约束在实践中的表现。如果您对此约束有任何反馈,请参阅 GitHub IterDataPipe 单迭代器问题。这些 DataPipe 可以通过两种方式调用:使用类构造函数或将其函数形式应用于现有的
IterDataPipe
(推荐,适用于大多数但并非所有 DataPipe)。您可以将多个 IterDataPipe 链接在一起,形成一个管道,依次执行多个操作。注意
当子类与
DataLoader
一起使用时,DataPipe 中的每个项目都将从DataLoader
迭代器中产生。当num_workers > 0
时,每个工作进程都将拥有 DataPipe 对象的不同副本,因此通常希望独立配置每个副本,以避免工作进程返回重复数据。在工作进程中调用get_worker_info()
会返回有关该工作进程的信息。它可以在数据集的__iter__()
方法或DataLoader
的worker_init_fn
选项中使用,以修改每个副本的行为。示例
- 一般用法
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10]
- 单迭代器约束示例
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> source_dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates `it1` >>> next(it2) 0 >>> next(it1) # Further usage of `it1` will raise a `RunTimeError`
我们有不同类型的可迭代 DataPipe
归档 - 打开和解压缩不同格式的归档文件。
增强 - 增强您的样本(例如,添加索引或无限循环)。
组合 - 执行组合操作(例如,采样、洗牌)。
组合/拆分 - 通过组合多个 DataPipe 或将一个 DataPipe 拆分为多个来与之交互。
分组 - 对 DataPipe 中的样本进行分组
IO - 与文件系统或远程服务器交互(例如,下载、打开、保存文件以及列出目录中的文件)。
映射 - 将给定函数应用于 DataPipe 中的每个元素。
其他 - 执行各种操作集。
选择 - 选择 DataPipe 中的特定样本。
文本 - 解析、读取和转换文本文件和数据
归档 DataPipes¶
这些 DataPipe 帮助打开和解压缩不同格式的归档文件。
解压缩包含路径名和 bz2 二进制流元组的可迭代 DataPipe 中的 bz2 二进制流,并生成路径名和提取的二进制流元组(函数名称: |
|
获取路径和压缩数据流的元组,并返回路径和解压缩数据流的元组(函数名称: |
|
解压缩包含路径名和 rar 二进制流元组的输入可迭代 Datapipes 中的 rar 二进制流,并生成路径名和提取的二进制流元组(函数名称: |
|
打开/解压缩包含路径名和 tar 二进制流元组的可迭代 DataPipe 中的 tar 二进制流,并生成路径名和提取的二进制流元组(函数名称: |
|
打开/解压缩包含路径名和 tfrecord 二进制流元组的可迭代 DataPipe 中的 tfrecord 二进制流,并生成存储的记录(函数名称: |
|
可迭代 DataPipe,接受(路径、数据)元组流,通常表示 tar 归档文件的路径名和文件(函数名称: |
|
解压缩包含路径名和 xy 二进制流元组的可迭代 DataPipe 中的 xz (lzma) 二进制流,并生成路径名和提取的二进制流元组(函数名称: |
|
打开/解压缩包含路径名和 zip 二进制流元组的可迭代 DataPipe 中的 zip 二进制流,并生成路径名和提取的二进制流元组(函数名称: |
增强 DataPipes¶
这些 DataPipe 帮助增强您的样本。
默认情况下永久循环指定的输入,或循环指定的次数(函数名称: |
|
通过枚举向现有 DataPipe 添加索引,默认情况下索引从 0 开始(函数名称: |
|
向现有的可迭代 DataPipe 添加索引(函数名称: |
|
在移至下一个元素之前,重复生成源 DataPipe 的每个元素指定的次数(函数名称: |
组合 DataPipes¶
这些 DataPipe 帮助执行组合操作。
洗牌来自先前 DataPipe 的每个小批量(函数名称: |
|
使用提供的 |
|
使用缓冲区对输入 DataPipe 进行随机排序(函数名: |
组合/拆分 DataPipes¶
这些操作通常涉及多个 DataPipes,将它们组合在一起或将一个拆分为多个。
串联多个可迭代的 DataPipes(函数名: |
|
使用给定的分类函数,将输入 DataPipe 拆分为多个子 DataPipes(函数名: |
|
创建同一个可迭代 DataPipe 的多个实例(函数名: |
|
根据匹配的键值对压缩两个 IterDataPipes(函数名: |
|
将来自源 IterDataPipe 的项目与来自 MapDataPipe 的项目连接起来(函数名: |
|
从每个输入的 Iterable DataPipes 中一次生成一个元素(函数名: |
|
从每个输入的 Iterable DataPipes 中一次生成一个元素(函数名: |
|
以循环调度的方式将输入 DataPipe 拆分为多个子 DataPipes(函数名: |
|
接收一个 (IterDataPipe, 权重) 的 字典,并根据这些 DataPipes 的权重从中采样生成项目。 |
|
接收一个序列的 DataPipe,解压缩每个序列,并根据元素在序列中的位置将它们返回到单独的 DataPipes 中(函数名: |
|
将来自每个输入 DataPipes 的元素聚合成一个元组(函数名: |
|
将来自每个输入 DataPipes 的元素聚合成一个元组(函数名: |
分组 DataPipes¶
这些 DataPipes 允许您对 DataPipe 内的样本进行分组。
创建数据的微批次(函数名: |
|
从排序的桶中创建数据的微批次(函数名: |
|
通过自定义整理函数将 DataPipe 中的样本整理成张量(函数名: |
|
根据 |
|
从一个大小有限的最小堆中创建数据的微批次,并且每个批次中由 |
|
撤销数据的批处理(函数名: |
IO DataPipes¶
这些 DataPipes 帮助与文件系统或远程服务器进行交互(例如下载、打开、保存文件以及列出目录中的文件)。
可迭代的 Datapipe,用于列出具有给定 URL 前缀的 AIStore 后端中的文件(函数名: |
|
可迭代的 DataPipe,用于从 AIStore 加载具有给定 URL 的文件(函数名: |
|
列出提供的 |
|
从包含 fsspec 路径的输入数据管道中打开文件,并生成路径名和打开的文件流的元组(函数名: |
|
接收元数据和数据元组的 DataPipe,将数据保存到目标路径(由 filepath_fn 和元数据生成),并生成生成的 fsspec 路径(函数名: |
|
给定根目录的路径,生成根目录中文件的路径名(路径+文件名)。 |
|
给定路径名,打开文件并在元组中生成路径名和文件流(函数名: |
|
接收指向谷歌云端硬盘文件的 URL,并生成文件名和 IO 流的元组(函数名: |
|
接收文件 URL(指向文件的 HTTP URL),并生成文件 URL 和 IO 流的元组(函数名: |
|
接收数据集名称并返回一个可迭代的 HuggingFace 数据集。 |
|
列出提供的 |
|
从包含路径名或 URL 的输入数据管道中打开文件,并生成路径名和打开的文件流的元组(函数名: |
|
接收元数据和数据元组的 DataPipe,将数据保存到由 |
|
接收文件 URL(可以是指向文件的 HTTP URL 或指向谷歌云端硬盘文件的 URL),并生成文件 URL 和 IO 流的元组(函数名: |
|
接收 Parquet 文件的路径,并为 Parquet 文件中的每个行组返回一个 TorchArrow 数据帧(函数名: |
|
可迭代的 DataPipe,用于列出具有给定前缀的 Amazon S3 文件 URL(函数名: |
|
可迭代的 DataPipe,用于从给定的 S3 URL 加载 Amazon S3 文件(函数名: |
|
接收元数据和数据元组的 DataPipe,将数据保存到由 |
映射 DataPipes¶
这些 DataPipes 将给定函数应用于 DataPipe 中的每个元素。
将来自源 DataPipe 的元素组合成批次,并同时对批次中的每个元素应用协程函数,然后将输出扁平化为单个、非嵌套的 IterDataPipe(函数名: |
|
将来自源 DataPipe 的元素组合成批次,并对每个批次应用函数,然后将输出扁平化为单个、非嵌套的 IterDataPipe(函数名: |
|
对来自源 DataPipe 的每个项目应用函数,然后将输出扁平化为单个、非嵌套的 IterDataPipe(函数名: |
|
对来自源 DataPipe 的每个项目应用函数(函数名: |
|
对来自源 DataPipe 的每个项目应用函数,然后将返回的可迭代对象收集到缓冲区中,然后,在每次迭代中,随机选择缓冲区中的一个可迭代对象,并从此可迭代对象中生成一个项目(函数名: |
|
使用 |
其他 DataPipes¶
一组具有不同功能的 DataPipes。
接收数据行,将其中的一些行组合在一起,并创建 TorchArrow 数据帧(函数名: |
|
指示何时将先前 DataPipe 的结果保存到由 |
|
同步分布式进程之间的数据,以防止在训练期间出现由不均匀分片数据导致的挂起(函数名: |
|
计算并检查每个文件的哈希值,输入 DataPipe 为文件名和数据/流的元组(函数名: |
|
将来自源 DataPipe 的元素存储在内存中,如果指定了大小限制,则存储到大小限制(函数名: |
|
包装一个可迭代对象以创建一个 IterDataPipe。 |
|
设置 DataPipe 的长度属性,该属性由 |
|
将 |
|
将多个 DataPipe 操作的输出缓存到本地文件,这些操作通常是性能瓶颈,例如下载、解压缩等(函数名: |
|
从源 DataPipe 中预取一个元素并将其移动到固定内存(函数名: |
|
从源 DataPipe 中预取元素并将它们放入缓冲区(函数名: |
|
将来自源 DataPipe 的样本随机拆分为多个组(函数名: |
|
将传入的分片字符串扩展为分片。 |
|
允许对 DataPipe 进行分片的包装器(函数名: |
|
该包装器表示 |
选择 DataPipes¶
这些 DataPipes 帮助您在 DataPipe 中选择特定的样本。
根据输入的 |
|
从源 DataPipe 中提取元素,从头开始,直到指定的限制(函数名: |
|
通过索引删除输入 DataPipe 中的列/元素(函数名: |
|
通过开始/停止/步长或索引返回输入 DataPipe 中元素的切片(函数名: |
|
根据提供的索引,在每个样本/元素级别返回输入 DataPipe 的扁平化副本(函数名: |
文本 DataPipes¶
这些 DataPipes 帮助您解析、读取和转换文本文件和数据。
接受由文件名和 CSV 数据流元组组成的 DataPipe,一次读取并返回 CSV 文件中的一行内容(函数名: |
|
接受由文件名和 CSV 数据流元组组成的 DataPipe,一次读取并返回 CSV 文件中的一行内容(函数名: |
|
从 JSON 数据流中读取数据,并生成文件名和 JSON 数据的元组(函数名: |
|
接受由文件名和字符串数据流元组组成的 DataPipe,并为流中的每一行生成文件名和行的元组(函数名: |
|
将来自同一文件的文本行聚合成一个段落(函数名: |
|
解码来自输入 DataPipe 的二进制流,在元组中生成路径名和解码后的数据(函数名: |
|
接受具有数据批次的输入 DataPipe,并一次处理一个批次,并为每个批次生成一个 Dict,其中 |
|
给定 IO 流及其标签名称,在元组中生成带有标签名称的字节(函数名: |