可迭代式 DataPipes¶
可迭代式数据集是 IterableDataset 子类的实例,它实现了 __iter__()
协议,并表示对数据样本的可迭代对象。这种类型的数据集特别适合于随机读取代价高昂甚至不可能的情况,以及批次大小取决于获取的数据的情况。
例如,当调用 iter(iterdatapipe)
时,这种数据集可以返回从数据库、远程服务器甚至实时生成的日志中读取的数据流。
这是 torch
中 IterableDataset
的更新版本。
- class torchdata.datapipes.iter.IterDataPipe(*args, **kwds)¶
可迭代式 DataPipe。
所有表示数据样本可迭代对象的数据管道都应该继承自它。这种类型的数据管道在数据来自流或样本数量过大而无法全部放入内存时特别有用。
IterDataPipe
是延迟初始化的,它的元素只有在next()
在IterDataPipe
的迭代器上被调用时才会计算。所有子类都应该覆盖
__iter__()
,它将返回此 DataPipe 中样本的迭代器。调用__iter__
的IterDataPipe
会自动调用它的方法reset()
,该方法默认情况下不会执行任何操作。在编写自定义IterDataPipe
时,用户应根据需要覆盖reset()
。常见的用途包括重置自定义IterDataPipe
中的缓冲区、指针和各种状态变量。注意
每个
IterDataPipe
一次只能有一个迭代器有效,创建第二个迭代器将使第一个迭代器失效。此约束是必要的,因为某些IterDataPipe
具有内部缓冲区,如果存在多个迭代器,其状态可能会变得无效。下面的代码示例介绍了此约束在实践中的表现形式。如果您对这个约束有任何反馈,请查看 GitHub IterDataPipe 单个迭代器问题。这些 DataPipes 可以通过两种方式调用,使用类构造函数或将它们的函数形式应用到现有
IterDataPipe
上(推荐,大多数 DataPipes 都支持,但并非所有 DataPipes 都支持)。您可以将多个 IterDataPipe 连接在一起,形成一个管道,该管道将依次执行多个操作。注意
当子类与
DataLoader
一起使用时,DataPipe 中的每个项都将从DataLoader
迭代器中生成。当num_workers > 0
时,每个工作进程将拥有 DataPipe 对象的不同副本,因此通常需要独立配置每个副本,以避免从工作进程返回重复数据。get_worker_info()
在工作进程中被调用时,返回有关工作进程的信息。它可以用于数据集的__iter__()
方法或DataLoader
的worker_init_fn
选项,以修改每个副本的行为。示例
- 一般用法
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10]
- 单个迭代器约束示例
>>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> source_dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates `it1` >>> next(it2) 0 >>> next(it1) # Further usage of `it1` will raise a `RunTimeError`
我们有不同类型的可迭代数据管道
归档 - 打开和解压缩不同格式的归档文件。
增强 - 增强您的样本(例如,添加索引,或无限循环)。
组合 - 执行组合操作(例如,采样,洗牌)。
组合/拆分 - 通过组合多个 DataPipes 或将一个 DataPipe 拆分为多个 DataPipes 来与它们交互。
分组 - 在 DataPipe 中对样本进行分组
IO - 与文件系统或远程服务器交互(例如,下载、打开、保存文件,以及列出目录中的文件)。
映射 - 将给定函数应用于 DataPipe 中的每个元素。
其他 - 执行各种各样的操作。
选择 - 在 DataPipe 中选择特定的样本。
文本 - 解析、读取和转换文本文件和数据
归档 DataPipes¶
这些 DataPipes 有助于打开和解压缩不同格式的归档文件。
从包含路径名和 bz2 二进制流的元组的可迭代 DataPipe 中解压缩 bz2 二进制流,并生成路径名和解压缩二进制流的元组(函数名: |
|
获取路径和压缩数据流的元组,并返回路径和解压缩数据流的元组(函数名: |
|
从包含路径名和 rar 二进制流的元组的输入可迭代 DataPipes 中解压缩 rar 二进制流,并生成路径名和解压缩二进制流的元组(函数名: |
|
从包含路径名和 tar 二进制流的元组的可迭代 DataPipe 中打开/解压缩 tar 二进制流,并生成路径名和解压缩二进制流的元组(函数名: |
|
从包含路径名和 tfrecord 二进制流的元组的可迭代 DataPipe 中打开/解压缩 tfrecord 二进制流,并生成存储的记录(函数名: |
|
可迭代 DataPipe,它接受(路径,数据)元组流,通常表示 tar 归档文件的路径名和文件(函数名: |
|
从包含路径名和 xy 二进制流的元组的可迭代 DataPipe 中解压缩 xz(lzma)二进制流,并生成路径名和解压缩二进制流的元组(函数名: |
|
从包含路径名称和 zip 二进制流的 Iterable DataPipe 中打开/解压缩 zip 二进制流,并生成路径名称和解压缩的二进制流的元组(函数名称: |
增强 DataPipes¶
这些 DataPipes 有助于增强您的样本。
默认情况下,无限期地循环指定的输入,或者循环指定的次数(函数名称: |
|
通过枚举为现有 DataPipe 添加索引,默认情况下索引从 0 开始(函数名称: |
|
为现有的 Iterable DataPipe 添加索引(函数名称: |
|
在移至下一个元素之前,为指定的次数重复生成源 DataPipe 的每个元素(函数名称: |
组合 DataPipes¶
这些 DataPipes 有助于执行组合操作。
对来自先前 DataPipe 的每个小批量数据进行洗牌(函数名称: |
|
使用提供的 |
|
使用缓冲区对输入 DataPipe 进行洗牌(函数名称: |
组合/拆分 DataPipes¶
这些通常涉及多个 DataPipes,将它们组合起来或将一个 DataPipe 拆分为多个 DataPipe。
连接多个 Iterable DataPipes(函数名称: |
|
使用给定的分类函数将输入 DataPipe 拆分为多个子 DataPipe(函数名称: |
|
创建同一 Iterable DataPipe 的多个实例(函数名称: |
|
根据匹配的键将两个 IterDataPipes 压缩在一起(函数名称: |
|
将来自源 IterDataPipe 的项目与来自 MapDataPipe 的项目连接在一起(函数名称: |
|
一次从每个输入 Iterable DataPipe 生成一个元素(函数名称: |
|
一次从每个输入 Iterable DataPipe 生成一个元素(函数名称: |
|
以循环轮询顺序将输入 DataPipe 拆分为多个子 DataPipe(函数名称: |
|
接受 (IterDataPipe, Weight) 的 Dict,并根据其权重从这些 DataPipes 中采样,生成项目。 |
|
接受 Sequences 的 DataPipe,解压缩每个 Sequence,并根据它们在 Sequence 中的位置将元素返回到单独的 DataPipes 中(函数名称: |
|
将来自每个输入 DataPipe 的元素聚合为元组(函数名称: |
|
将来自每个输入 DataPipe 的元素聚合为元组(函数名称: |
分组 DataPipes¶
这些 DataPipes 允许您对 DataPipe 中的样本进行分组。
创建数据的小批量(函数名称: |
|
从排序的桶中创建数据的小批量(函数名称: |
|
通过自定义整理函数将 DataPipe 中的样本整理为 Tensor(函数名称: |
|
根据来自 |
|
从具有有限大小的最小堆中创建数据的小批量,并且每个批次中由 |
|
撤消数据的批处理(函数名称: |
IO DataPipes¶
这些 DataPipes 有助于与文件系统或远程服务器交互(例如,下载、打开、保存文件以及列出目录中的文件)。
Iterable Datapipe,列出具有给定 URL 前缀的 AIStore 后端的文件(函数名称: |
|
Iterable DataPipe,使用给定的 URL 从 AIStore 加载文件(函数名称: |
|
列出提供给 |
|
从包含 fsspec 路径的输入 datapipe 中打开文件,并生成路径名和打开的文件流的元组(函数名称: |
|
接受元数据和数据的 DataPipe,将数据保存到目标路径(由 filepath_fn 和元数据生成),并生成生成的 fsspec 路径(函数名称: |
|
给定根目录的路径,生成根目录内的文件路径名(路径 + 文件名)。 |
|
给定路径名,打开文件,并以元组形式生成路径名和文件流(函数名称: |
|
接受指向 GDrive 文件的 URL,并生成文件名和 IO 流的元组(函数名称: |
|
接受文件 URL(指向文件的 HTTP URL),并生成文件 URL 和 IO 流的元组(函数名称: |
|
接受数据集名称并返回一个 Iterable HuggingFace 数据集。 |
|
列出提供给 |
|
从包含路径名或 URL 的输入 datapipe 中打开文件,并生成路径名和打开的文件流的元组(函数名称: |
|
接受元数据和数据的 DataPipe,将数据保存到由 |
|
接受文件 URL(可以是指向文件的 HTTP URL 或指向 GDrive 文件的 URL),并生成文件 URL 和 IO 流的元组(函数名称: |
|
接受指向 Parquet 文件的路径,并为 Parquet 文件中的每个行组返回一个 TorchArrow DataFrame(函数名称: |
|
[已弃用] 改用 https://github.com/awslabs/s3-connector-for-pytorch。 |
|
[已弃用] 改用 https://github.com/awslabs/s3-connector-for-pytorch。 |
|
接受元数据和数据的 DataPipe,将数据保存到由 |
映射 DataPipes¶
这些 DataPipes 将给定函数应用于 DataPipe 中的每个元素。
将来自源 DataPipe 的元素组合成批次,并对每个批次中的元素并发地应用协程函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称: |
|
将来自源 DataPipe 的元素组合成批次,并对每个批次应用函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称: |
|
对来自源 DataPipe 的每个项目应用函数,然后将输出展平为单个、非嵌套的 IterDataPipe(函数名称: |
|
对来自源 DataPipe 的每个项目应用函数(函数名称: |
|
对来自源 DataPipe 的每个项目应用函数,然后将返回的迭代器收集到一个缓冲区中,然后,在每次迭代时,随机选择缓冲区中的一个迭代器并生成该迭代器中的一个项目(函数名称: |
|
使用 |
其他 DataPipes¶
一组具有不同功能的杂项 DataPipes。
接受数据行,将一定数量的行批处理在一起,并创建 TorchArrow DataFrames(函数名称: |
|
指示先前 DataPipe 的结果何时将保存到由 |
|
同步分布式进程中的数据,以防止训练期间出现挂起,这是由不均匀的切片数据引起的(函数名称: |
|
计算并检查每个文件的哈希值,来自包含文件名和数据/流的元组的输入 DataPipe(函数名称: |
|
将来自源 DataPipe 的元素存储在内存中,如果指定了大小限制,则最多存储该大小限制(函数名称: |
|
包装一个可迭代对象以创建一个 IterDataPipe。 |
|
设置 DataPipe 的 length 属性,该属性由 |
|
将 |
|
将多个 DataPipe 操作的输出缓存到本地文件,这些文件通常是性能瓶颈,例如下载、解压缩等(函数名: |
|
从源 DataPipe 预取一个元素并将其移动到固定内存(函数名: |
|
从源 DataPipe 预取元素并将其放入缓冲区(函数名: |
|
随机将源 DataPipe 中的样本分成组(函数名: |
|
将传入的 shard 字符串扩展为 shard。 |
|
允许 DataPipe 进行分片的包装器(函数名: |
|
包装器,指示 |
选择 DataPipes¶
这些 DataPipes 可帮助您选择 DataPipe 中的特定样本。
根据输入 |
|
从源 DataPipe 的开头生成元素,直到指定的限制(函数名: |
|
通过其索引删除输入 DataPipe 中的列/元素(函数名: |
|
通过开始/停止/步长或索引返回输入 DataPipe 中的元素切片(函数名: |
|
根据提供的索引,在每个样本/元素级别返回输入 DataPipe 的扁平副本(函数名: |
文本 DataPipes¶
这些 DataPipes 可帮助您解析、读取和转换文本文件和数据。
接受由文件名和 CSV 数据流组成的元组组成的 DataPipe,读取并返回 CSV 文件中的内容,每次一行(函数名: |
|
接受由文件名和 CSV 数据流组成的元组组成的 DataPipe,读取并返回 CSV 文件中的内容,每次一行(函数名: |
|
从 JSON 数据流中读取并生成文件名和 JSON 数据的元组(函数名: |
|
接受由文件名和字符串数据流组成的元组组成的 DataPipe,并为流中的每一行生成文件名和该行的元组(函数名: |
|
将来自同一文件的文本行聚合到一个段落中(函数名: |
|
解码来自输入 DataPipe 的二进制流,以元组形式生成路径名和解码后的数据。 |
|
接受具有数据批次的输入 DataPipe,并一次处理一批数据,并为每批数据生成一个 Dict, |
|
给出 IO 流及其标签名称,以元组形式生成带有标签名称的字节。 |