torchrec.modules¶
Torchrec 通用模块
torchrec 模块包含各种模块的集合。
- 这些模块包括
torchrec.modules.activation¶
激活模块
- class torchrec.modules.activation.SwishLayerNorm(input_dims: Union[int, List[int], Size], device: Optional[device] = None)¶
基类:
Module
应用具有层归一化的 Swish 函数:Y = X * Sigmoid(LayerNorm(X))。
- 参数::
input_dims (Union[int, List[int], torch.Size]) – 要归一化的维度。如果输入张量的形状为 [batch_size, d1, d2, d3],则设置 input_dim=[d2, d3] 将对最后两个维度进行层归一化。
device (Optional[torch.device]) – 默认计算设备。
示例
sln = SwishLayerNorm(100)
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 输入张量。
- 返回值::
输出张量。
- 返回类型::
torch.Tensor
- training: bool¶
torchrec.modules.crossnet¶
CrossNet API
- class torchrec.modules.crossnet.CrossNet(in_features: int, num_layers: int)¶
基类:
Module
交叉网络:
Cross Net 是对形状为 \((*, N)\) 的张量的“交叉”操作的堆栈,有效地创建了对输入张量的 \(N\) 个可学习的多项式函数。
在这个模块中,交叉操作是根据一个满秩矩阵 (NxN) 定义的,因此交叉效应可以覆盖每层上的所有位。在每一层 l 上,张量被转换为
\[x_{l+1} = x_0 * (W_l \cdot x_l + b_l) + x_l\]其中 \(W_l\) 是一个方阵 \((NxN)\),\(*\) 表示逐元素相乘,\(\cdot\) 表示矩阵相乘。
- 参数::
in_features (int) – 输入的维度。
num_layers (int) – 模块中的层数。
示例
batch_size = 3 num_layers = 2 in_features = 10 input = torch.randn(batch_size, in_features) dcn = CrossNet(num_layers=num_layers) output = dcn(input)
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。
- 返回值::
形状为 [batch_size, in_features] 的张量。
- 返回类型::
torch.Tensor
- training: bool¶
- class torchrec.modules.crossnet.LowRankCrossNet(in_features: int, num_layers: int, low_rank: int = 1)¶
基类:
Module
低秩交叉网络是一种高效的交叉网络。它不是在每一层使用满秩交叉矩阵 (NxN),而是使用两个核 \(W (N x r)\) 和 \(V (r x N)\),其中 r << N,来简化矩阵相乘。
在每一层 l 上,张量被转换为
\[x_{l+1} = x_0 * (W_l \cdot (V_l \cdot x_l) + b_l) + x_l\]其中 \(W_l\) 是一个向量,\(*\) 表示逐元素相乘,\(\cdot\) 表示矩阵相乘。
注意
秩 r 应该明智地选择。通常,我们期望 r < N/2 具有计算节省;我们应该期望 \(r ~= N/4\) 保留满秩交叉网络的精度。
- 参数::
in_features (int) – 输入的维度。
num_layers (int) – 模块中的层数。
low_rank (int) – 交叉矩阵的秩设置 (默认 = 1)。值必须始终 >= 1。
示例
batch_size = 3 num_layers = 2 in_features = 10 input = torch.randn(batch_size, in_features) dcn = LowRankCrossNet(num_layers=num_layers, low_rank=3) output = dcn(input)
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。
- 返回值::
形状为 [batch_size, in_features] 的张量。
- 返回类型::
torch.Tensor
- training: bool¶
- class torchrec.modules.crossnet.LowRankMixtureCrossNet(in_features: int, num_layers: int, num_experts: int = 1, low_rank: int = 1, activation: ~typing.Union[~torch.nn.modules.module.Module, ~typing.Callable[[~torch.Tensor], ~torch.Tensor]] = <built-in method relu of type object>)¶
基类:
Module
低秩混合交叉网络是来自 论文 的 DCN V2 实现
LowRankMixtureCrossNet 将每层的可学习交叉参数定义为一个低秩矩阵 \((N*r)\) 以及专家混合。与 LowRankCrossNet 相比,该模块不是依靠单个专家学习特征交叉,而是利用 \(K\) 个专家;每个专家在不同的子空间学习特征交互,并使用依赖于输入 \(x\) 的门控机制自适应地组合学习的交叉。
在每一层 l 上,张量被转换为
\[x_{l+1} = MoE({expert_i : i \in K_{experts}}) + x_l\]每个 \(expert_i\) 定义为
\[expert_i = x_0 * (U_{li} \cdot g(C_{li} \cdot g(V_{li} \cdot x_l)) + b_l)\]其中 \(U_{li} (N, r)\)、\(C_{li} (r, r)\) 和 \(V_{li} (r, N)\) 是低秩矩阵,\(*\) 表示元素级乘法,\(x\) 表示矩阵乘法,\(g()\) 是非线性激活函数。
当 num_expert 为 1 时,将跳过门控评估和 MOE 以节省计算。
- 参数::
in_features (int) – 输入的维度。
num_layers (int) – 模块中的层数。
low_rank (int) – 交叉矩阵的秩设置(默认 = 1)。值必须始终 >= 1
activation (Union[torch.nn.Module, Callable[[torch.Tensor], torch.Tensor]]) – 非线性激活函数,用于定义专家。默认值为 relu。
示例
batch_size = 3 num_layers = 2 in_features = 10 input = torch.randn(batch_size, in_features) dcn = LowRankCrossNet(num_layers=num_layers, num_experts=5, low_rank=3) output = dcn(input)
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。
- 返回值::
形状为 [batch_size, in_features] 的张量。
- 返回类型::
torch.Tensor
- training: bool¶
- class torchrec.modules.crossnet.VectorCrossNet(in_features: int, num_layers: int)¶
基类:
Module
向量交叉网络可以称为 DCN-V1。
它也是一种专门的低秩交叉网络,其中秩=1。在此版本中,在每一层,我们不保留两个内核 W 和 V,只保留一个向量内核 W (Nx1)。我们使用点运算来计算特征的“交叉”效应,从而节省两个矩阵乘法,以进一步降低计算成本并减少可学习参数的数量。
在每一层 l 上,张量被转换为
\[x_{l+1} = x_0 * (W_l . x_l + b_l) + x_l\]其中 \(W_l\) 是一个向量,\(*\) 表示元素级乘法;\(.\) 表示点运算。
- 参数::
in_features (int) – 输入的维度。
num_layers (int) – 模块中的层数。
示例
batch_size = 3 num_layers = 2 in_features = 10 input = torch.randn(batch_size, in_features) dcn = VectorCrossNet(num_layers=num_layers) output = dcn(input)
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。
- 返回值::
形状为 [batch_size, in_features] 的张量。
- 返回类型::
torch.Tensor
- training: bool¶
torchrec.modules.deepfm¶
深度分解机模块
以下模块基于 深度分解机 (DeepFM) 论文
类 DeepFM 实现 DeepFM 框架
类 FactorizationMachine 实现 FM,如上述论文所述。
- class torchrec.modules.deepfm.DeepFM(dense_module: Module)¶
基类:
Module
这是 DeepFM 模块
此模块不涵盖已发布论文的端到端功能。相反,它只涵盖出版物中的深度组件。它用于学习高阶特征交互。如果应学习低阶特征交互,请使用 FactorizationMachine 模块,该模块将共享与该模块相同的嵌入输入。
为了支持建模灵活性,我们将关键组件定制为
与公开论文不同,我们将输入从原始稀疏特征更改为特征的嵌入。它允许嵌入维度和嵌入数量的灵活性,只要所有嵌入张量具有相同的批次大小。
在公开论文的基础上,我们允许用户自定义隐藏层,使其可以是任何模块,而不限于 MLP。
该模块的一般架构如下
1 x 10 output /|\ | pass into `dense_module` | 1 x 90 /|\ | concat | 1 x 20, 1 x 30, 1 x 40 list of embeddings
- 参数::
dense_module (nn.Module) – DeepFM 中可以使用任何自定义模块(例如 MLP)。该模块的 in_features 必须等于元素计数。例如,如果输入嵌入是 [randn(3, 2, 3), randn(3, 4, 5)],则 in_features 应为:2*3+4*5。
示例
import torch from torchrec.fb.modules.deepfm import DeepFM from torchrec.fb.modules.mlp import LazyMLP batch_size = 3 output_dim = 30 # the input embedding are a torch.Tensor of [batch_size, num_embeddings, embedding_dim] input_embeddings = [ torch.randn(batch_size, 2, 64), torch.randn(batch_size, 2, 32), ] dense_module = nn.Linear(192, output_dim) deepfm = DeepFM(dense_module=dense_module) deep_fm_output = deepfm(embeddings=input_embeddings)
- forward(embeddings: List[Tensor]) Tensor ¶
- 参数::
embeddings (List[torch.Tensor]) –
所有嵌入的列表(例如 dense、common_sparse、specialized_sparse、embedding_features、raw_embedding_features),形状为
(batch_size, num_embeddings, embedding_dim)
为了便于操作,具有相同嵌入维度的嵌入可以选择堆叠成单个张量。例如,当我们有一个训练过的嵌入,其维度=32,5 个本地嵌入,其维度=64,以及 3 个稠密特征,其维度=16 时,我们可以将嵌入列表准备为以下列表:
tensor(B, 1, 32) (trained_embedding with num_embeddings=1, embedding_dim=32) tensor(B, 5, 64) (native_embedding with num_embeddings=5, embedding_dim=64) tensor(B, 3, 16) (dense_features with num_embeddings=3, embedding_dim=32)
注意
所有输入张量的 batch_size 需要相同。
- 返回值::
以扁平化和连接的 embeddings 作为输入的 dense_module 的输出。
- 返回类型::
torch.Tensor
- training: bool¶
- class torchrec.modules.deepfm.FactorizationMachine¶
基类:
Module
这是 DeepFM 论文 中提到的分解机模块
此模块不涵盖已发布论文的端到端功能。相反,它只涵盖出版物中的 FM 部分,用于学习二阶特征交互。
为了支持建模灵活性,我们将关键组件定制为与公开论文不同
我们将输入从原始稀疏特征更改为特征的嵌入。这允许嵌入维度和嵌入数量的灵活性,只要所有嵌入张量具有相同的批次大小。
该模块的一般架构如下
1 x 10 output /|\ | pass into `dense_module` | 1 x 90 /|\ | concat | 1 x 20, 1 x 30, 1 x 40 list of embeddings
示例
batch_size = 3 # the input embedding are in torch.Tensor of [batch_size, num_embeddings, embedding_dim] input_embeddings = [ torch.randn(batch_size, 2, 64), torch.randn(batch_size, 2, 32), ] fm = FactorizationMachine() output = fm(embeddings=input_embeddings)
- forward(embeddings: List[Tensor]) Tensor ¶
- 参数::
embeddings (List[torch.Tensor]) –
所有嵌入的列表(例如 dense、common_sparse、specialized_sparse、embedding_features、raw_embedding_features),形状为
(batch_size, num_embeddings, embedding_dim)
为了便于操作,具有相同嵌入维度的嵌入可以选择堆叠成单个张量。例如,当我们有一个训练过的嵌入,其维度=32,5 个本地嵌入,其维度=64,以及 3 个稠密特征,其维度=16 时,我们可以将嵌入列表准备为以下列表:
tensor(B, 1, 32) (trained_embedding with num_embeddings=1, embedding_dim=32) tensor(B, 5, 64) (native_embedding with num_embeddings=5, embedding_dim=64) tensor(B, 3, 16) (dense_features with num_embeddings=3, embedding_dim=32)
注意
所有输入张量的 batch_size 需要相同。
- 返回值::
以扁平化和连接的 embeddings 作为输入的 fm 的输出。预计为 [B, 1]。
- 返回类型::
torch.Tensor
- training: bool¶
torchrec.modules.embedding_configs¶
- class torchrec.modules.embedding_configs.BaseEmbeddingConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False)¶
Bases:
object
- data_type: DataType = 'FP32'¶
- embedding_dim: int¶
- feature_names: List[str]¶
- get_weight_init_max() float ¶
- get_weight_init_min() float ¶
- init_fn: Optional[Callable[[Tensor], Optional[Tensor]]] = None¶
- name: str = ''¶
- need_pos: bool = False¶
- num_embeddings: int¶
- num_features() int ¶
- pruning_indices_remapping: Optional[Tensor] = None¶
- weight_init_max: Optional[float] = None¶
- weight_init_min: Optional[float] = None¶
- class torchrec.modules.embedding_configs.EmbeddingBagConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False, pooling: torchrec.modules.embedding_configs.PoolingType = <PoolingType.SUM: 'SUM'>)¶
Bases:
BaseEmbeddingConfig
- pooling: PoolingType = 'SUM'¶
- class torchrec.modules.embedding_configs.EmbeddingConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False)¶
Bases:
BaseEmbeddingConfig
- embedding_dim: int¶
- feature_names: List[str]¶
- num_embeddings: int¶
- class torchrec.modules.embedding_configs.EmbeddingTableConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False, pooling: torchrec.modules.embedding_configs.PoolingType = <PoolingType.SUM: 'SUM'>, is_weighted: bool = False, has_feature_processor: bool = False, embedding_names: List[str] = <factory>)¶
Bases:
BaseEmbeddingConfig
- embedding_names: List[str]¶
- has_feature_processor: bool = False¶
- is_weighted: bool = False¶
- pooling: PoolingType = 'SUM'¶
- class torchrec.modules.embedding_configs.PoolingType(value)¶
Bases:
Enum
一个枚举。
- MEAN = 'MEAN'¶
- NONE = 'NONE'¶
- SUM = 'SUM'¶
- class torchrec.modules.embedding_configs.QuantConfig(activation, weight, per_table_weight_dtype)¶
Bases:
tuple
- activation: PlaceholderObserver¶
Alias for field number 0
- per_table_weight_dtype: Optional[Dict[str, dtype]]¶
Alias for field number 2
- weight: PlaceholderObserver¶
Alias for field number 1
- class torchrec.modules.embedding_configs.ShardingType(value)¶
Bases:
Enum
Well-known sharding types, used by inter-module optimizations.
- COLUMN_WISE = 'column_wise'¶
- DATA_PARALLEL = 'data_parallel'¶
- ROW_WISE = 'row_wise'¶
- TABLE_COLUMN_WISE = 'table_column_wise'¶
- TABLE_ROW_WISE = 'table_row_wise'¶
- TABLE_WISE = 'table_wise'¶
- torchrec.modules.embedding_configs.data_type_to_dtype(data_type: DataType) dtype ¶
- torchrec.modules.embedding_configs.data_type_to_sparse_type(data_type: DataType) SparseType ¶
- torchrec.modules.embedding_configs.dtype_to_data_type(dtype: dtype) DataType ¶
- torchrec.modules.embedding_configs.pooling_type_to_pooling_mode(pooling_type: PoolingType, sharding_type: Optional[ShardingType] = None) PoolingMode ¶
- torchrec.modules.embedding_configs.pooling_type_to_str(pooling_type: PoolingType) str ¶
torchrec.modules.embedding_modules¶
- class torchrec.modules.embedding_modules.EmbeddingBagCollection(tables: List[EmbeddingBagConfig], is_weighted: bool = False, device: Optional[device] = None)¶
Bases:
EmbeddingBagCollectionInterface
EmbeddingBagCollection 代表一组池化嵌入 (EmbeddingBags).
注意
EmbeddingBagCollection 是一个无分片模块,没有性能优化。对于性能敏感的场景,请考虑使用分片版本 ShardedEmbeddingBagCollection.
它处理以 KeyedJaggedTensor 形式的稀疏数据,其值为 [F X B X L],其中
F: 特征 (键)
B: 批量大小
L: 稀疏特征的长度 (参差不齐)
并输出一个值为 [B * (F * D)] 的 KeyedTensor,其中
F: 特征 (键)
D: 每个特征 (键) 的嵌入维度
B: 批量大小
- 参数::
tables (List[EmbeddingBagConfig]) – 嵌入表列表.
is_weighted (bool) – 输入 KeyedJaggedTensor 是否加权.
device (Optional[torch.device]) – 默认计算设备。
示例
table_0 = EmbeddingBagConfig( name="t1", embedding_dim=3, num_embeddings=10, feature_names=["f1"] ) table_1 = EmbeddingBagConfig( name="t2", embedding_dim=4, num_embeddings=10, feature_names=["f2"] ) ebc = EmbeddingBagCollection(tables=[table_0, table_1]) # 0 1 2 <-- batch # "f1" [0,1] None [2] # "f2" [3] [4] [5,6,7] # ^ # feature features = KeyedJaggedTensor( keys=["f1", "f2"], values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7]), offsets=torch.tensor([0, 2, 2, 3, 4, 5, 8]), ) pooled_embeddings = ebc(features) print(pooled_embeddings.values()) tensor([[-0.8899, -0.1342, -1.9060, -0.0905, -0.2814, -0.9369, -0.7783], [ 0.0000, 0.0000, 0.0000, 0.1598, 0.0695, 1.3265, -0.1011], [-0.4256, -1.1846, -2.1648, -1.0893, 0.3590, -1.9784, -0.7681]], grad_fn=<CatBackward0>) print(pooled_embeddings.keys()) ['f1', 'f2'] print(pooled_embeddings.offset_per_key()) tensor([0, 3, 7])
- property device: device¶
- embedding_bag_configs() List[EmbeddingBagConfig] ¶
- forward(features: KeyedJaggedTensor) KeyedTensor ¶
- 参数::
features (KeyedJaggedTensor) – 形式为 [F X B X L] 的 KJT.
- 返回值::
KeyedTensor
- is_weighted() bool ¶
- reset_parameters() None ¶
- training: bool¶
- class torchrec.modules.embedding_modules.EmbeddingBagCollectionInterface(*args, **kwargs)¶
Bases:
ABC
,Module
EmbeddingBagCollection 的接口.
- abstract embedding_bag_configs() List[EmbeddingBagConfig] ¶
- abstract forward(features: KeyedJaggedTensor) KeyedTensor ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- abstract is_weighted() bool ¶
- training: bool¶
- class torchrec.modules.embedding_modules.EmbeddingCollection(tables: List[EmbeddingConfig], device: Optional[device] = None, need_indices: bool = False)¶
Bases:
EmbeddingCollectionInterface
EmbeddingCollection 表示一组非池化嵌入。
注意
EmbeddingCollection 是一个非分片模块,未进行性能优化。对于性能敏感的场景,请考虑使用分片版本 ShardedEmbeddingCollection。
它以 KeyedJaggedTensor 的形式处理稀疏数据,形式为 [F X B X L],其中
F: 特征 (键)
B: 批量大小
L:稀疏特征的长度(可变)
并输出 Dict[feature (key), JaggedTensor]。每个 JaggedTensor 包含形式为 (B * L) X D 的值,其中
B: 批量大小
L: 稀疏特征的长度 (参差不齐)
D:每个特征(键)的嵌入维度,长度为 L
- 参数::
tables (List[EmbeddingConfig]) – 嵌入表的列表。
device (Optional[torch.device]) – 默认计算设备。
need_indices (bool) – 如果需要将索引传递给最终的查找字典。
示例
e1_config = EmbeddingConfig( name="t1", embedding_dim=3, num_embeddings=10, feature_names=["f1"] ) e2_config = EmbeddingConfig( name="t2", embedding_dim=3, num_embeddings=10, feature_names=["f2"] ) ec = EmbeddingCollection(tables=[e1_config, e2_config]) # 0 1 2 <-- batch # 0 [0,1] None [2] # 1 [3] [4] [5,6,7] # ^ # feature features = KeyedJaggedTensor.from_offsets_sync( keys=["f1", "f2"], values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7]), offsets=torch.tensor([0, 2, 2, 3, 4, 5, 8]), ) feature_embeddings = ec(features) print(feature_embeddings['f2'].values()) tensor([[-0.2050, 0.5478, 0.6054], [ 0.7352, 0.3210, -3.0399], [ 0.1279, -0.1756, -0.4130], [ 0.7519, -0.4341, -0.0499], [ 0.9329, -1.0697, -0.8095]], grad_fn=<EmbeddingBackward>)
- property device: device¶
- embedding_configs() List[EmbeddingConfig] ¶
- embedding_dim() int ¶
- embedding_names_by_table() List[List[str]] ¶
- forward(features: KeyedJaggedTensor) Dict[str, JaggedTensor] ¶
- 参数::
features (KeyedJaggedTensor) – 形式为 [F X B X L] 的 KJT.
- 返回值::
Dict[str, JaggedTensor]
- need_indices() bool ¶
- reset_parameters() None ¶
- training: bool¶
- class torchrec.modules.embedding_modules.EmbeddingCollectionInterface(*args, **kwargs)¶
Bases:
ABC
,Module
用于 EmbeddingCollection 的接口。
- abstract embedding_configs() List[EmbeddingConfig] ¶
- abstract embedding_dim() int ¶
- abstract embedding_names_by_table() List[List[str]] ¶
- abstract forward(features: KeyedJaggedTensor) Dict[str, JaggedTensor] ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- abstract need_indices() bool ¶
- training: bool¶
- torchrec.modules.embedding_modules.get_embedding_names_by_table(tables: Union[List[EmbeddingBagConfig], List[EmbeddingConfig]]) List[List[str]] ¶
- torchrec.modules.embedding_modules.process_pooled_embeddings(pooled_embeddings: List[Tensor], inverse_indices: Tensor) Tensor ¶
- torchrec.modules.embedding_modules.reorder_inverse_indices(inverse_indices: Optional[Tuple[List[str], Tensor]], feature_names: List[str]) Tensor ¶
torchrec.modules.feature_processor¶
- class torchrec.modules.feature_processor.BaseFeatureProcessor(*args, **kwargs)¶
基类:
Module
特征处理器抽象基类。
- abstract forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- training: bool¶
- class torchrec.modules.feature_processor.BaseGroupedFeatureProcessor(*args, **kwargs)¶
基类:
Module
分组特征处理器的抽象基类
- abstract forward(features: KeyedJaggedTensor) KeyedJaggedTensor ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- training: bool¶
- class torchrec.modules.feature_processor.PositionWeightedModule(max_feature_lengths: Dict[str, int], device: Optional[device] = None)¶
Bases:
BaseFeatureProcessor
为 id 列表特征添加位置权重。
- 参数::
max_feature_lengths (Dict[str, int]) – 特征名称到 max_length 映射。 max_length,也称为截断大小,指定每个样本的最大 id 数。 对于每个特征,其位置权重参数大小为 max_length。
- forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
- 参数::
features (Dict[str, JaggedTensor]) – 键到 JaggedTensor 的字典,表示特征。
- 返回值::
与输入特征相同,但 weights 字段已填充。
- 返回类型::
Dict[str, JaggedTensor]
- reset_parameters() None ¶
- training: bool¶
- class torchrec.modules.feature_processor.PositionWeightedProcessor(max_feature_lengths: Dict[str, int], device: Optional[device] = None)¶
Bases:
BaseGroupedFeatureProcessor
PositionWeightedProcessor 表示一个处理器,用于对 KeyedJaggedTensor 应用位置权重。
它可以处理未分片和分片输入,并输出相应的输出
- 参数::
max_feature_lengths (Dict[str, int]) – feature_lengths 的字典,键是 feature_name,值是长度。
device (Optional[torch.device]) – 默认计算设备。
示例
keys=["Feature0", "Feature1", "Feature2"] values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7, 3, 4, 5, 6, 7]) lengths=torch.tensor([2, 0, 1, 1, 1, 3, 2, 3, 0]) features = KeyedJaggedTensor.from_lengths_sync(keys=keys, values=values, lengths=lengths) pw = FeatureProcessorCollection( feature_processor_modules={key: PositionWeightedFeatureProcessor(max_feature_length=100) for key in keys} ) result = pw(features) # result is # KeyedJaggedTensor({ # "Feature0": { # "values": [[0, 1], [], [2]], # "weights": [[1.0, 1.0], [], [1.0]] # }, # "Feature1": { # "values": [[3], [4], [5, 6, 7]], # "weights": [[1.0], [1.0], [1.0, 1.0, 1.0]] # }, # "Feature2": { # "values": [[3, 4], [5, 6, 7], []], # "weights": [[1.0, 1.0], [1.0, 1.0, 1.0], []] # } # })
- forward(features: KeyedJaggedTensor) KeyedJaggedTensor ¶
在未分片或非流水线模型中,输入特征包含 fp_feature 和 non_fp_features,输出将过滤掉 non_fp 特征。 在分片流水线模型中,输入特征只能包含无或全部 feature_processed 特征,因为输入特征来自 ebc 的 input_dist(),它将过滤掉不在 ebc 中的键。 并且输入大小与输出大小相同。
- 参数::
features (KeyedJaggedTensor) – 输入特征
- 返回值::
KeyedJaggedTensor
- named_buffers(prefix: str = '', recurse: bool = True, remove_duplicate: bool = True) Iterator[Tuple[str, Tensor]] ¶
返回模块缓冲区的迭代器,同时返回缓冲区的名称和缓冲区本身。
- 参数::
prefix (str) – 要添加到所有缓冲区名称之前的 前缀。
recurse (bool, 可选) – 如果为 True,则会生成此模块和所有子模块的缓冲区。 否则,仅生成此模块的直接成员缓冲区。 默认为 True。
remove_duplicate (bool, 可选) – 是否删除结果中重复的缓冲区。 默认为 True。
- 生成:
(str, torch.Tensor) – 包含名称和缓冲区的元组
示例
>>> # xdoctest: +SKIP("undefined vars") >>> for name, buf in self.named_buffers(): >>> if name in ['running_var']: >>> print(buf.size())
- state_dict(destination: Optional[Dict[str, Any]] = None, prefix: str = '', keep_vars: bool = False) Dict[str, Any] ¶
返回一个字典,其中包含对模块的整个状态的引用。
包括参数和持久缓冲区(例如运行平均值)。 键是相应的参数和缓冲区名称。 设置为
None
的参数和缓冲区不会被包含。注意
返回的对象是浅拷贝。 它包含对模块参数和缓冲区的引用。
警告
目前
state_dict()
也接受destination
、prefix
和keep_vars
的位置参数。 但是,这将被弃用,在未来的版本中将强制使用关键字参数。警告
请避免使用参数
destination
,因为它不是为最终用户设计的。- 参数::
destination (dict, 可选) – 如果提供,模块的状态将更新到字典中,并返回相同对象。 否则,将创建并返回一个
OrderedDict
。 默认为None
。prefix (str, 可选) – 添加到参数和缓冲区名称之前的 前缀,以组成
state_dict
中的键。 默认为''
。keep_vars (bool, 可选) – 默认为
False
,则在状态字典中返回的Tensor
将从自动梯度中分离。 如果设置为True
,则不会执行分离。 默认为False
。
- 返回值::
包含模块的整个状态的字典
- 返回类型::
dict
示例
>>> # xdoctest: +SKIP("undefined vars") >>> module.state_dict().keys() ['bias', 'weight']
- training: bool¶
- torchrec.modules.feature_processor.offsets_to_range_traceble(offsets: Tensor, values: Tensor) Tensor ¶
- torchrec.modules.feature_processor.position_weighted_module_update_features(features: Dict[str, JaggedTensor], weighted_features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
torchrec.modules.lazy_extension¶
- class torchrec.modules.lazy_extension.LazyModuleExtensionMixin(*args, **kwargs)¶
基类:
LazyModuleMixin
这是 LazyModuleMixin 的临时扩展,用于支持将关键字参数传递给延迟模块的 forward 方法。
长期计划是将此功能上游到 LazyModuleMixin。 有关详细信息,请参见 https://github.com/pytorch/pytorch/issues/59923。
- 请参见 TestLazyModuleExtensionMixin,其中包含确保
LazyModuleExtensionMixin._infer_parameters 的源代码与 torch.nn.modules.lazy.LazyModuleMixin._infer_parameters 相同,只是前者可以接受关键字参数。
LazyModuleExtensionMixin._call_impl 的源代码与 torch.nn.Module._call_impl 相同,只是前者可以将关键字参数传递给 forward 预钩子。
- apply(fn: Callable[[Module], None]) Module ¶
递归地将 fn 应用于每个子模块(如 .children() 所返回),以及自身。 典型用法包括初始化模型的参数。
注意
对未初始化的延迟模块调用 apply() 将导致错误。 用户需要在对延迟模块调用 apply() 之前初始化延迟模块(通过执行一个虚拟前向传递)。
- 参数::
fn (torch.nn.Module -> None) – 要应用于每个子模块的函数。
- 返回值::
self
- 返回类型::
torch.nn.Module
示例
@torch.no_grad() def init_weights(m): print(m) if type(m) == torch.nn.LazyLinear: m.weight.fill_(1.0) print(m.weight) linear = torch.nn.LazyLinear(2) linear.apply(init_weights) # this fails, because `linear` (a lazy-module) hasn't been initialized yet input = torch.randn(2, 10) linear(input) # run a dummy forward pass to initialize the lazy-module linear.apply(init_weights) # this works now
- torchrec.modules.lazy_extension.lazy_apply(module: Module, fn: Callable[[Module], None]) Module ¶
将函数附加到模块,该函数将在第一次前向传递后(即所有子模块和参数初始化后)递归地应用于模块的每个子模块(如 .children() 所返回)以及模块本身。
典型用法包括初始化延迟模块的参数的数值(即从 LazyModuleMixin 继承的模块)。
注意
lazy_apply() 可以用于延迟模块和非延迟模块。
- 参数::
module (torch.nn.Module) – 要递归应用 fn 的模块。
fn (Callable[[torch.nn.Module], None]) – 要附加到 module 并在稍后应用于 module 的每个子模块和 module 本身的函数。
- 返回值::
已附加 fn 的 module。
- 返回类型::
torch.nn.Module
示例
@torch.no_grad() def init_weights(m): print(m) if type(m) == torch.nn.LazyLinear: m.weight.fill_(1.0) print(m.weight) linear = torch.nn.LazyLinear(2) lazy_apply(linear, init_weights) # doesn't run `init_weights` immediately input = torch.randn(2, 10) linear(input) # runs `init_weights` only once, right after first forward pass seq = torch.nn.Sequential(torch.nn.LazyLinear(2), torch.nn.LazyLinear(2)) lazy_apply(seq, init_weights) # doesn't run `init_weights` immediately input = torch.randn(2, 10) seq(input) # runs `init_weights` only once, right after first forward pass
torchrec.modules.mlp¶
- class torchrec.modules.mlp.MLP(in_size: int, layer_sizes: ~typing.List[int], bias: bool = True, activation: ~typing.Union[str, ~typing.Callable[[], ~torch.nn.modules.module.Module], ~torch.nn.modules.module.Module, ~typing.Callable[[~torch.Tensor], ~torch.Tensor]] = <built-in method relu of type object>, device: ~typing.Optional[~torch.device] = None, dtype: ~torch.dtype = torch.float32)¶
基类:
Module
依次应用感知器模块堆栈(即多层感知器)。
- 参数::
in_size (int) – 输入的 in_size。
layer_sizes (List[int]) – 每个感知器模块的 out_size。
bias (bool) – 如果设置为 False,则层将不会学习附加偏差。默认值:True。
activation (str, Union[Callable[[], torch.nn.Module], torch.nn.Module, Callable[[torch.Tensor], torch.Tensor]]) – 要应用于每个感知器模块的线性变换输出的激活函数。如果 activation 是一个 str,我们目前只支持以下字符串,如“relu”、“sigmoid”和“swish_layernorm”。如果 activation 是一个 Callable[[], torch.nn.Module],则每个感知器模块将调用一次 activation() 以生成该感知器模块的激活模块,并且这些激活模块之间不会共享参数。一种用例是当所有激活模块共享相同的构造函数参数时,但不共享实际的模块参数。默认值:torch.relu。
device (Optional[torch.device]) – 默认计算设备。
示例
batch_size = 3 in_size = 40 input = torch.randn(batch_size, in_size) layer_sizes = [16, 8, 4] mlp_module = MLP(in_size, layer_sizes, bias=True) output = mlp_module(input) assert list(output.shape) == [batch_size, layer_sizes[-1]]
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 形状为 (B, I) 的张量,其中 I 是每个输入样本中的元素数量。
- 返回值::
形状为 (B, O) 的张量,其中 O 是最后一个感知器模块的 out_size。
- 返回类型::
torch.Tensor
- training: bool¶
- class torchrec.modules.mlp.Perceptron(in_size: int, out_size: int, bias: bool = True, activation: ~typing.Union[~torch.nn.modules.module.Module, ~typing.Callable[[~torch.Tensor], ~torch.Tensor]] = <built-in method relu of type object>, device: ~typing.Optional[~torch.device] = None, dtype: ~torch.dtype = torch.float32)¶
基类:
Module
应用线性变换和激活。
- 参数::
in_size (int) – 每个输入样本中的元素数量。
out_size (int) – 每个输出样本中的元素数量。
bias (bool) – 如果设置为
False
,则层将不会学习附加偏差。默认值:True
。activation (Union[torch.nn.Module, Callable[[torch.Tensor], torch.Tensor]]) – 要应用于线性变换输出的激活函数。默认值:torch.relu。
device (Optional[torch.device]) – 默认计算设备。
示例
batch_size = 3 in_size = 40 input = torch.randn(batch_size, in_size) out_size = 16 perceptron = Perceptron(in_size, out_size, bias=True) output = perceptron(input) assert list(output) == [batch_size, out_size]
- forward(input: Tensor) Tensor ¶
- 参数::
input (torch.Tensor) – 形状为 (B, I) 的张量,其中 I 是每个输入样本中的元素数量。
- 返回值::
- 形状为 (B, O) 的张量,其中 O 是每个输出样本中每个通道的元素数量(即 out_size)。
channel in each output sample (i.e. out_size).
- 返回类型::
torch.Tensor
- training: bool¶
torchrec.modules.utils¶
- class torchrec.modules.utils.SequenceVBEContext(recat: torch.Tensor, unpadded_lengths: torch.Tensor, reindexed_lengths: torch.Tensor, reindexed_length_per_key: List[int], reindexed_values: Union[torch.Tensor, NoneType] = None)¶
Bases:
Multistreamable
- recat: Tensor¶
- record_stream(stream: Stream) None ¶
See https://pytorch.ac.cn/docs/stable/generated/torch.Tensor.record_stream.html
- reindexed_length_per_key: List[int]¶
- reindexed_lengths: Tensor¶
- reindexed_values: Optional[Tensor] = None¶
- unpadded_lengths: Tensor¶
- torchrec.modules.utils.check_module_output_dimension(module: Union[Iterable[Module], Module], in_features: int, out_features: int) bool ¶
验证给定模块或模块列表的 out_features 是否与指定数量匹配。如果给定模块列表或 ModuleList,则递归检查所有子模块。
- torchrec.modules.utils.construct_jagged_tensors(embeddings: Tensor, features: KeyedJaggedTensor, embedding_names: List[str], need_indices: bool = False, features_to_permute_indices: Optional[Dict[str, List[int]]] = None, original_features: Optional[KeyedJaggedTensor] = None, reverse_indices: Optional[Tensor] = None, seq_vbe_ctx: Optional[SequenceVBEContext] = None) Dict[str, JaggedTensor] ¶
- torchrec.modules.utils.construct_jagged_tensors_inference(embeddings: Tensor, lengths: Tensor, values: Tensor, embedding_names: List[str], need_indices: bool = False, features_to_permute_indices: Optional[Dict[str, List[int]]] = None, reverse_indices: Optional[Tensor] = None) Dict[str, JaggedTensor] ¶
- torchrec.modules.utils.construct_modulelist_from_single_module(module: Module, sizes: Tuple[int, ...]) Module ¶
给定一个模块,通过复制提供的模块并重新初始化线性层来构建大小为 sizes 的(嵌套)ModuleList。
- torchrec.modules.utils.convert_list_of_modules_to_modulelist(modules: Iterable[Module], sizes: Tuple[int, ...]) Module ¶
- torchrec.modules.utils.deterministic_dedup(ids: Tensor) Tuple[Tensor, Tensor] ¶
为了消除冲突更新中的竞态条件,删除重复的 ID。 仅保留重复 ID 的最后一次出现。 返回排序的唯一 ID 和最后一次出现的 ID。
- torchrec.modules.utils.extract_module_or_tensor_callable(module_or_callable: Union[Callable[[], Module], Module, Callable[[Tensor], Tensor]]) Union[Module, Callable[[Tensor], Tensor]] ¶
- torchrec.modules.utils.get_module_output_dimension(module: Union[Callable[[Tensor], Tensor], Module], in_features: int) int ¶
- torchrec.modules.utils.init_mlp_weights_xavier_uniform(m: Module) None ¶
- torchrec.modules.utils.jagged_index_select_with_empty(values: Tensor, ids: Tensor, offsets: Tensor, output_offsets: Tensor) Tensor ¶
torchrec.modules.mc_modules¶
- class torchrec.modules.mc_modules.DistanceLFU_EvictionPolicy(decay_exponent: float = 1.0, threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)¶
Bases:
MCHEvictionPolicy
- coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None) Dict[str, Tensor] ¶
Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由
torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。
合并元数据历史缓冲区并返回已处理的元数据张量字典。
- property metadata_info: List[MCHEvictionPolicyMetadataInfo]¶
- record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None ¶
Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典
- 根据接收的 id 计算并记录元数据
对于实现的驱逐策略。
- update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping
Args
- Returns Tuple of (evicted_indices, selected_new_indices) where
evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.
- class torchrec.modules.mc_modules.LFU_EvictionPolicy(threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)¶
Bases:
MCHEvictionPolicy
- coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None) Dict[str, Tensor] ¶
Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由
torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。
合并元数据历史缓冲区并返回已处理的元数据张量字典。
- property metadata_info: List[MCHEvictionPolicyMetadataInfo]¶
- record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None ¶
Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典
- 根据接收的 id 计算并记录元数据
对于实现的驱逐策略。
- update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping: Tensor, coalesced_history_sorted_unique_ids_counts: Tensor, coalesced_history_mch_matching_elements_mask: Tensor, coalesced_history_mch_matching_indices: Tensor, mch_metadata: Dict[str, Tensor], coalesced_history_metadata: Dict[str, Tensor]) Tuple[Tensor, Tensor] ¶
Args
- Returns Tuple of (evicted_indices, selected_new_indices) where
evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.
- class torchrec.modules.mc_modules.LRU_EvictionPolicy(decay_exponent: float = 1.0, threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)¶
Bases:
MCHEvictionPolicy
- coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None
Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由
torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。
合并元数据历史缓冲区并返回已处理的元数据张量字典。
- property metadata_info: List[MCHEvictionPolicyMetadataInfo]¶
- record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None ¶
Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典
- 根据接收的 id 计算并记录元数据
对于实现的驱逐策略。
- update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping: Tensor, coalesced_history_sorted_unique_ids_counts: Tensor, coalesced_history_mch_matching_elements_mask: Tensor, coalesced_history_mch_matching_indices: Tensor, mch_metadata: Dict[str, Tensor], coalesced_history_metadata: Dict[str, Tensor]) Tuple[Tensor, Tensor] ¶
Args
- Returns Tuple of (evicted_indices, selected_new_indices) where
evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.
- class torchrec.modules.mc_modules.MCHEvictionPolicy(metadata_info: List[MCHEvictionPolicyMetadataInfo], threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)¶
Bases:
ABC
- abstract coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None) Dict[str, Tensor] ¶
Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由
torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。
合并元数据历史缓冲区并返回已处理的元数据张量字典。
- abstract property metadata_info: List[MCHEvictionPolicyMetadataInfo]¶
- abstract record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None ¶
Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典
- 根据接收的 id 计算并记录元数据
对于实现的驱逐策略。
- abstract update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping: Tensor, coalesced_history_sorted_unique_ids_counts: Tensor, coalesced_history_mch_matching_elements_mask: Tensor, coalesced_history_mch_matching_indices: Tensor, mch_metadata: Dict[str, Tensor], coalesced_history_metadata: Dict[str, Tensor]) Tuple[Tensor, Tensor] ¶
Args
- Returns Tuple of (evicted_indices, selected_new_indices) where
evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.
- class torchrec.modules.mc_modules.MCHEvictionPolicyMetadataInfo(metadata_name, is_mch_metadata, is_history_metadata)¶
Bases:
tuple
- is_history_metadata: bool¶
Alias for field number 2
- is_mch_metadata: bool¶
Alias for field number 1
- metadata_name: str¶
Alias for field number 0
- class torchrec.modules.mc_modules.MCHManagedCollisionModule(zch_size: int, device: device, eviction_policy: MCHEvictionPolicy, eviction_interval: int, input_hash_size: int = 9223372036854775808, input_hash_func: Optional[Callable[[Tensor, int], Tensor]] = None, mch_size: Optional[int] = None, mch_hash_func: Optional[Callable[[Tensor, int], Tensor]] = None, name: Optional[str] = None, output_global_offset: int = 0)¶
Bases:
ManagedCollisionModule
ZCH / MCH 管理的冲突模块
- 参数::
zch_size (int) – 输出 ID 的范围,在 [output_size_offset, output_size_offset + zch_size - 1) 内
device (torch.device) – 此模块将在此设备上执行
eviction_policy (驱逐策略) – 要使用的驱逐策略
eviction_interval (int) – 驱逐策略触发的间隔
input_hash_size (int) – 输入特征 ID 范围,将作为第二个参数传递给 input_hash_func
input_hash_func (Optional[Callable]) – 用于为输入特征生成哈希值的函数。此函数通常用于驱动对与输入数据相同或更大的范围的均匀分布
mch_size (Optional[int]) – 剩余输出的大小(即传统 MCH),实验性功能。ID 在内部由 output_size_offset + zch_output_range 偏移
mch_hash_func (Optional[Callable]) – 用于为剩余特征生成哈希值的函数。将哈希到 mch_size。
output_global_offset (int) – 输出范围的输出 ID 的偏移量,通常仅在分片应用程序中使用。
- evict() Optional[Tensor] ¶
如果本轮不需要驱逐,则返回 None。否则,返回要重置的插槽的 ID。在驱逐时,此模块应重置其这些插槽的状态,并假设下游模块将正确处理这种情况。
- forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
Args: feature (JaggedTensor): 特征表示 :returns: 修改后的 JT :rtype: Dict[str, JaggedTensor]
- input_size() int ¶
返回输入的数值范围,用于分片信息
- output_size() int ¶
返回输出的数值范围,用于验证与下游嵌入查找
- preprocess(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
- profile(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
- rebuild_with_output_id_range(output_id_range: Tuple[int, int], device: Optional[device] = None) MCHManagedCollisionModule ¶
用于创建 RW 分片的本地 MC 模块,目前是临时解决方案
- remap(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
- training: bool¶
- class torchrec.modules.mc_modules.ManagedCollisionCollection(managed_collision_modules: Dict[str, ManagedCollisionModule], embedding_configs: List[BaseEmbeddingConfig])¶
基类:
Module
ManagedCollisionCollection 表示一组管理的冲突模块。传递给 MCC 的输入将被管理的冲突模块重新映射
并返回。
- 参数::
managed_collision_modules (Dict[str, ManagedCollisionModule]) – 管理的冲突模块字典
embedding_confgs (List[BaseEmbeddingConfig]) – 嵌入配置列表,每个带管理冲突模块的表
- embedding_configs() List[BaseEmbeddingConfig] ¶
- evict() Dict[str, Optional[Tensor]] ¶
- forward(features: KeyedJaggedTensor) KeyedJaggedTensor ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- training: bool¶
- class torchrec.modules.mc_modules.ManagedCollisionModule(device: device)¶
基类:
Module
ManagedCollisionModule 的抽象基类。将输入 ID 映射到范围 [0, max_output_id)。
- 参数::
max_output_id (int) – 重新映射 ID 的最大输出值。
input_hash_size (int) – 输入范围的最大值,即 [0, input_hash_size)
remapping_range_start_index (int) – 重新映射范围的相对起始索引
device (torch.device) – 默认计算设备。
- 示例:
jt = JaggedTensor(…) mcm = ManagedCollisionModule(…) mcm_jt = mcm(fp)
- property device: device¶
- abstract evict() Optional[Tensor] ¶
如果本轮不需要驱逐,则返回 None。否则,返回要重置的插槽的 ID。在驱逐时,此模块应重置其这些插槽的状态,并假设下游模块将正确处理这种情况。
- abstract forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- abstract input_size() int ¶
返回输入的数值范围,用于分片信息
- abstract output_size() int ¶
返回输出的数值范围,用于验证与下游嵌入查找
- abstract preprocess(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor] ¶
- abstract rebuild_with_output_id_range(output_id_range: Tuple[int, int], device: Optional[device] = None) ManagedCollisionModule ¶
用于创建 RW 分片的本地 MC 模块,目前是临时解决方案
- training: bool¶
- torchrec.modules.mc_modules.apply_mc_method_to_jt_dict(method: str, features_dict: Dict[str, JaggedTensor], table_to_features: Dict[str, List[str]], managed_collisions: ModuleDict) Dict[str, JaggedTensor] ¶
将 MC 方法应用于 JaggedTensor 字典,并返回具有相同排序的更新字典
- torchrec.modules.mc_modules.average_threshold_filter(id_counts: Tensor) Tuple[Tensor, Tensor] ¶
阈值为 id_counts 的平均值。如果 id 的计数严格大于平均值,则添加该 id。
- torchrec.modules.mc_modules.dynamic_threshold_filter(id_counts: Tensor, threshold_skew_multiplier: float = 10.0) Tuple[Tensor, Tensor] ¶
阈值为 total_count / num_ids * threshold_skew_multiplier。如果 id 的计数严格大于阈值,则添加该 id。
- torchrec.modules.mc_modules.probabilistic_threshold_filter(id_counts: Tensor, per_id_probability: float = 0.01) Tuple[Tensor, Tensor] ¶
每个 id 被添加的概率为 per_id_probability。例如,如果 per_id_probability 为 0.01,而 id 出现 100 次,则它被添加的概率为 60%。更准确地说,id 分数为 1 - (1 - per_id_probability) ^ id_count,对于随机生成的阈值,id 分数为它被添加的概率。
torchrec.modules.mc_embedding_modules¶
- class torchrec.modules.mc_embedding_modules.BaseManagedCollisionEmbeddingCollection(embedding_module: Union[EmbeddingBagCollection, EmbeddingCollection], managed_collision_collection: ManagedCollisionCollection, return_remapped_features: bool = False)¶
基类:
Module
BaseManagedCollisionEmbeddingCollection 表示一个 EC/EBC 模块和一组受控冲突模块。进入 MC-EC/EBC 的输入将在被传递到嵌入集合之前首先被受控冲突模块修改。
- 参数::
embedding_module – 用于查找嵌入的 EmbeddingCollection
managed_collision_modules – 受控冲突模块的字典
return_remapped_features (bool) – 是否除了嵌入之外还返回重映射的输入特征
- forward(features: KeyedJaggedTensor) Tuple[Union[KeyedTensor, Dict[str, JaggedTensor]], Optional[KeyedJaggedTensor]] ¶
定义每次调用时执行的计算。
所有子类都应该覆盖它。
注意
虽然前向传递的方案需要在此函数中定义,但应在之后调用
Module
实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。
- training: bool¶
- class torchrec.modules.mc_embedding_modules.ManagedCollisionEmbeddingBagCollection(embedding_bag_collection: EmbeddingBagCollection, managed_collision_collection: ManagedCollisionCollection, return_remapped_features: bool = False)¶
Bases:
BaseManagedCollisionEmbeddingCollection
ManagedCollisionEmbeddingBagCollection 表示一个 EmbeddingBagCollection 模块和一组受控冲突模块。进入 MC-EBC 的输入将在被传递到嵌入包集合之前首先被受控冲突模块修改。
有关输入和输出类型的信息,请参阅 EmbeddingBagCollection
- 参数::
embedding_module – 用于查找嵌入的 EmbeddingBagCollection
managed_collision_modules – 受控冲突模块的字典
return_remapped_features (bool) – 是否除了嵌入之外还返回重映射的输入特征
- training: bool¶
- class torchrec.modules.mc_embedding_modules.ManagedCollisionEmbeddingCollection(embedding_collection: EmbeddingCollection, managed_collision_collection: ManagedCollisionCollection, return_remapped_features: bool = False)¶
Bases:
BaseManagedCollisionEmbeddingCollection
ManagedCollisionEmbeddingCollection 表示一个 EmbeddingCollection 模块和一组受控冲突模块。进入 MC-EC 的输入将在被传递到嵌入集合之前首先被受控冲突模块修改。
有关输入和输出类型的信息,请参阅 EmbeddingCollection
- 参数::
embedding_module – 用于查找嵌入的 EmbeddingCollection
managed_collision_modules – 受控冲突模块的字典
return_remapped_features (bool) – 是否除了嵌入之外还返回重映射的输入特征
- training: bool¶
- torchrec.modules.mc_embedding_modules.evict(evictions: Dict[str, Optional[Tensor]], ebc: Union[EmbeddingBagCollection, EmbeddingCollection]) None ¶