快捷方式

torchrec.modules

Torchrec 通用模块

torchrec 模块包含各种模块的集合。

这些模块包括
  • nn.Embeddingnn.EmbeddingBag 的扩展,分别称为 EmbeddingBagCollectionEmbeddingCollection

  • 已建立的模块,例如 DeepFMCrossNet

  • 常见的模块模式,例如 MLPSwishLayerNorm

  • TorchRec 的自定义模块,例如 PositionWeightedModuleLazyModuleExtensionMixin

  • EmbeddingTowerEmbeddingTowerCollection,传递给提供的交互模块的嵌入的逻辑“塔”。

torchrec.modules.activation

激活模块

class torchrec.modules.activation.SwishLayerNorm(input_dims: Union[int, List[int], Size], device: Optional[device] = None)

基类:Module

应用具有层归一化的 Swish 函数:Y = X * Sigmoid(LayerNorm(X))

参数::
  • input_dims (Union[int, List[int], torch.Size]) – 要归一化的维度。如果输入张量的形状为 [batch_size, d1, d2, d3],则设置 input_dim=[d2, d3] 将对最后两个维度进行层归一化。

  • device (Optional[torch.device]) – 默认计算设备。

示例

sln = SwishLayerNorm(100)
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 输入张量。

返回值::

输出张量。

返回类型::

torch.Tensor

training: bool

torchrec.modules.crossnet

CrossNet API

class torchrec.modules.crossnet.CrossNet(in_features: int, num_layers: int)

基类:Module

交叉网络:

Cross Net 是对形状为 \((*, N)\) 的张量的“交叉”操作的堆栈,有效地创建了对输入张量的 \(N\) 个可学习的多项式函数。

在这个模块中,交叉操作是根据一个满秩矩阵 (NxN) 定义的,因此交叉效应可以覆盖每层上的所有位。在每一层 l 上,张量被转换为

\[x_{l+1} = x_0 * (W_l \cdot x_l + b_l) + x_l\]

其中 \(W_l\) 是一个方阵 \((NxN)\)\(*\) 表示逐元素相乘,\(\cdot\) 表示矩阵相乘。

参数::
  • in_features (int) – 输入的维度。

  • num_layers (int) – 模块中的层数。

示例

batch_size = 3
num_layers = 2
in_features = 10
input = torch.randn(batch_size, in_features)
dcn = CrossNet(num_layers=num_layers)
output = dcn(input)
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。

返回值::

形状为 [batch_size, in_features] 的张量。

返回类型::

torch.Tensor

training: bool
class torchrec.modules.crossnet.LowRankCrossNet(in_features: int, num_layers: int, low_rank: int = 1)

基类:Module

低秩交叉网络是一种高效的交叉网络。它不是在每一层使用满秩交叉矩阵 (NxN),而是使用两个核 \(W (N x r)\)\(V (r x N)\),其中 r << N,来简化矩阵相乘。

在每一层 l 上,张量被转换为

\[x_{l+1} = x_0 * (W_l \cdot (V_l \cdot x_l) + b_l) + x_l\]

其中 \(W_l\) 是一个向量,\(*\) 表示逐元素相乘,\(\cdot\) 表示矩阵相乘。

注意

r 应该明智地选择。通常,我们期望 r < N/2 具有计算节省;我们应该期望 \(r ~= N/4\) 保留满秩交叉网络的精度。

参数::
  • in_features (int) – 输入的维度。

  • num_layers (int) – 模块中的层数。

  • low_rank (int) – 交叉矩阵的秩设置 (默认 = 1)。值必须始终 >= 1。

示例

batch_size = 3
num_layers = 2
in_features = 10
input = torch.randn(batch_size, in_features)
dcn = LowRankCrossNet(num_layers=num_layers, low_rank=3)
output = dcn(input)
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。

返回值::

形状为 [batch_size, in_features] 的张量。

返回类型::

torch.Tensor

training: bool
class torchrec.modules.crossnet.LowRankMixtureCrossNet(in_features: int, num_layers: int, num_experts: int = 1, low_rank: int = 1, activation: ~typing.Union[~torch.nn.modules.module.Module, ~typing.Callable[[~torch.Tensor], ~torch.Tensor]] = <built-in method relu of type object>)

基类:Module

低秩混合交叉网络是来自 论文 的 DCN V2 实现

LowRankMixtureCrossNet 将每层的可学习交叉参数定义为一个低秩矩阵 \((N*r)\) 以及专家混合。与 LowRankCrossNet 相比,该模块不是依靠单个专家学习特征交叉,而是利用 \(K\) 个专家;每个专家在不同的子空间学习特征交互,并使用依赖于输入 \(x\) 的门控机制自适应地组合学习的交叉。

在每一层 l 上,张量被转换为

\[x_{l+1} = MoE({expert_i : i \in K_{experts}}) + x_l\]

每个 \(expert_i\) 定义为

\[expert_i = x_0 * (U_{li} \cdot g(C_{li} \cdot g(V_{li} \cdot x_l)) + b_l)\]

其中 \(U_{li} (N, r)\)\(C_{li} (r, r)\)\(V_{li} (r, N)\) 是低秩矩阵,\(*\) 表示元素级乘法,\(x\) 表示矩阵乘法,\(g()\) 是非线性激活函数。

当 num_expert 为 1 时,将跳过门控评估和 MOE 以节省计算。

参数::
  • in_features (int) – 输入的维度。

  • num_layers (int) – 模块中的层数。

  • low_rank (int) – 交叉矩阵的秩设置(默认 = 1)。值必须始终 >= 1

  • activation (Union[torch.nn.Module, Callable[[torch.Tensor], torch.Tensor]]) – 非线性激活函数,用于定义专家。默认值为 relu。

示例

batch_size = 3
num_layers = 2
in_features = 10
input = torch.randn(batch_size, in_features)
dcn = LowRankCrossNet(num_layers=num_layers, num_experts=5, low_rank=3)
output = dcn(input)
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。

返回值::

形状为 [batch_size, in_features] 的张量。

返回类型::

torch.Tensor

training: bool
class torchrec.modules.crossnet.VectorCrossNet(in_features: int, num_layers: int)

基类:Module

向量交叉网络可以称为 DCN-V1

它也是一种专门的低秩交叉网络,其中秩=1。在此版本中,在每一层,我们不保留两个内核 W 和 V,只保留一个向量内核 W (Nx1)。我们使用点运算来计算特征的“交叉”效应,从而节省两个矩阵乘法,以进一步降低计算成本并减少可学习参数的数量。

在每一层 l 上,张量被转换为

\[x_{l+1} = x_0 * (W_l . x_l + b_l) + x_l\]

其中 \(W_l\) 是一个向量,\(*\) 表示元素级乘法;\(.\) 表示点运算。

参数::
  • in_features (int) – 输入的维度。

  • num_layers (int) – 模块中的层数。

示例

batch_size = 3
num_layers = 2
in_features = 10
input = torch.randn(batch_size, in_features)
dcn = VectorCrossNet(num_layers=num_layers)
output = dcn(input)
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 形状为 [batch_size, in_features] 的张量。

返回值::

形状为 [batch_size, in_features] 的张量。

返回类型::

torch.Tensor

training: bool

torchrec.modules.deepfm

深度分解机模块

以下模块基于 深度分解机 (DeepFM) 论文

  • 类 DeepFM 实现 DeepFM 框架

  • 类 FactorizationMachine 实现 FM,如上述论文所述。

class torchrec.modules.deepfm.DeepFM(dense_module: Module)

基类:Module

这是 DeepFM 模块

此模块不涵盖已发布论文的端到端功能。相反,它只涵盖出版物中的深度组件。它用于学习高阶特征交互。如果应学习低阶特征交互,请使用 FactorizationMachine 模块,该模块将共享与该模块相同的嵌入输入。

为了支持建模灵活性,我们将关键组件定制为

  • 与公开论文不同,我们将输入从原始稀疏特征更改为特征的嵌入。它允许嵌入维度和嵌入数量的灵活性,只要所有嵌入张量具有相同的批次大小。

  • 在公开论文的基础上,我们允许用户自定义隐藏层,使其可以是任何模块,而不限于 MLP。

该模块的一般架构如下

        1 x 10                  output
         /|\
          |                     pass into `dense_module`
          |
        1 x 90
         /|\
          |                     concat
          |
1 x 20, 1 x 30, 1 x 40          list of embeddings
参数::

dense_module (nn.Module) – DeepFM 中可以使用任何自定义模块(例如 MLP)。该模块的 in_features 必须等于元素计数。例如,如果输入嵌入是 [randn(3, 2, 3), randn(3, 4, 5)],则 in_features 应为:2*3+4*5。

示例

import torch
from torchrec.fb.modules.deepfm import DeepFM
from torchrec.fb.modules.mlp import LazyMLP
batch_size = 3
output_dim = 30
# the input embedding are a torch.Tensor of [batch_size, num_embeddings, embedding_dim]
input_embeddings = [
    torch.randn(batch_size, 2, 64),
    torch.randn(batch_size, 2, 32),
]
dense_module = nn.Linear(192, output_dim)
deepfm = DeepFM(dense_module=dense_module)
deep_fm_output = deepfm(embeddings=input_embeddings)
forward(embeddings: List[Tensor]) Tensor
参数::

embeddings (List[torch.Tensor]) –

所有嵌入的列表(例如 dense、common_sparse、specialized_sparse、embedding_features、raw_embedding_features),形状为

(batch_size, num_embeddings, embedding_dim)

为了便于操作,具有相同嵌入维度的嵌入可以选择堆叠成单个张量。例如,当我们有一个训练过的嵌入,其维度=32,5 个本地嵌入,其维度=64,以及 3 个稠密特征,其维度=16 时,我们可以将嵌入列表准备为以下列表:

tensor(B, 1, 32) (trained_embedding with num_embeddings=1, embedding_dim=32)
tensor(B, 5, 64) (native_embedding with num_embeddings=5, embedding_dim=64)
tensor(B, 3, 16) (dense_features with num_embeddings=3, embedding_dim=32)

注意

所有输入张量的 batch_size 需要相同。

返回值::

以扁平化和连接的 embeddings 作为输入的 dense_module 的输出。

返回类型::

torch.Tensor

training: bool
class torchrec.modules.deepfm.FactorizationMachine

基类:Module

这是 DeepFM 论文 中提到的分解机模块

此模块不涵盖已发布论文的端到端功能。相反,它只涵盖出版物中的 FM 部分,用于学习二阶特征交互。

为了支持建模灵活性,我们将关键组件定制为与公开论文不同

我们将输入从原始稀疏特征更改为特征的嵌入。这允许嵌入维度和嵌入数量的灵活性,只要所有嵌入张量具有相同的批次大小。

该模块的一般架构如下

        1 x 10                  output
         /|\
          |                     pass into `dense_module`
          |
        1 x 90
         /|\
          |                     concat
          |
1 x 20, 1 x 30, 1 x 40          list of embeddings

示例

batch_size = 3
# the input embedding are in torch.Tensor of [batch_size, num_embeddings, embedding_dim]
input_embeddings = [
    torch.randn(batch_size, 2, 64),
    torch.randn(batch_size, 2, 32),
]
fm = FactorizationMachine()
output = fm(embeddings=input_embeddings)
forward(embeddings: List[Tensor]) Tensor
参数::

embeddings (List[torch.Tensor]) –

所有嵌入的列表(例如 dense、common_sparse、specialized_sparse、embedding_features、raw_embedding_features),形状为

(batch_size, num_embeddings, embedding_dim)

为了便于操作,具有相同嵌入维度的嵌入可以选择堆叠成单个张量。例如,当我们有一个训练过的嵌入,其维度=32,5 个本地嵌入,其维度=64,以及 3 个稠密特征,其维度=16 时,我们可以将嵌入列表准备为以下列表:

tensor(B, 1, 32) (trained_embedding with num_embeddings=1, embedding_dim=32)
tensor(B, 5, 64) (native_embedding with num_embeddings=5, embedding_dim=64)
tensor(B, 3, 16) (dense_features with num_embeddings=3, embedding_dim=32)

注意

所有输入张量的 batch_size 需要相同。

返回值::

以扁平化和连接的 embeddings 作为输入的 fm 的输出。预计为 [B, 1]。

返回类型::

torch.Tensor

training: bool

torchrec.modules.embedding_configs

class torchrec.modules.embedding_configs.BaseEmbeddingConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False)

Bases: object

data_type: DataType = 'FP32'
embedding_dim: int
feature_names: List[str]
get_weight_init_max() float
get_weight_init_min() float
init_fn: Optional[Callable[[Tensor], Optional[Tensor]]] = None
name: str = ''
need_pos: bool = False
num_embeddings: int
num_features() int
pruning_indices_remapping: Optional[Tensor] = None
weight_init_max: Optional[float] = None
weight_init_min: Optional[float] = None
class torchrec.modules.embedding_configs.EmbeddingBagConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False, pooling: torchrec.modules.embedding_configs.PoolingType = <PoolingType.SUM: 'SUM'>)

Bases: BaseEmbeddingConfig

pooling: PoolingType = 'SUM'
class torchrec.modules.embedding_configs.EmbeddingConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False)

Bases: BaseEmbeddingConfig

embedding_dim: int
feature_names: List[str]
num_embeddings: int
class torchrec.modules.embedding_configs.EmbeddingTableConfig(num_embeddings: int, embedding_dim: int, name: str = '', data_type: torchrec.types.DataType = <DataType.FP32: 'FP32'>, feature_names: List[str] = <factory>, weight_init_max: Union[float, NoneType] = None, weight_init_min: Union[float, NoneType] = None, pruning_indices_remapping: Union[torch.Tensor, NoneType] = None, init_fn: Union[Callable[[torch.Tensor], Union[torch.Tensor, NoneType]], NoneType] = None, need_pos: bool = False, pooling: torchrec.modules.embedding_configs.PoolingType = <PoolingType.SUM: 'SUM'>, is_weighted: bool = False, has_feature_processor: bool = False, embedding_names: List[str] = <factory>)

Bases: BaseEmbeddingConfig

embedding_names: List[str]
has_feature_processor: bool = False
is_weighted: bool = False
pooling: PoolingType = 'SUM'
class torchrec.modules.embedding_configs.PoolingType(value)

Bases: Enum

一个枚举。

MEAN = 'MEAN'
NONE = 'NONE'
SUM = 'SUM'
class torchrec.modules.embedding_configs.QuantConfig(activation, weight, per_table_weight_dtype)

Bases: tuple

activation: PlaceholderObserver

Alias for field number 0

per_table_weight_dtype: Optional[Dict[str, dtype]]

Alias for field number 2

weight: PlaceholderObserver

Alias for field number 1

class torchrec.modules.embedding_configs.ShardingType(value)

Bases: Enum

Well-known sharding types, used by inter-module optimizations.

COLUMN_WISE = 'column_wise'
DATA_PARALLEL = 'data_parallel'
ROW_WISE = 'row_wise'
TABLE_COLUMN_WISE = 'table_column_wise'
TABLE_ROW_WISE = 'table_row_wise'
TABLE_WISE = 'table_wise'
torchrec.modules.embedding_configs.data_type_to_dtype(data_type: DataType) dtype
torchrec.modules.embedding_configs.data_type_to_sparse_type(data_type: DataType) SparseType
torchrec.modules.embedding_configs.dtype_to_data_type(dtype: dtype) DataType
torchrec.modules.embedding_configs.pooling_type_to_pooling_mode(pooling_type: PoolingType, sharding_type: Optional[ShardingType] = None) PoolingMode
torchrec.modules.embedding_configs.pooling_type_to_str(pooling_type: PoolingType) str

torchrec.modules.embedding_modules

class torchrec.modules.embedding_modules.EmbeddingBagCollection(tables: List[EmbeddingBagConfig], is_weighted: bool = False, device: Optional[device] = None)

Bases: EmbeddingBagCollectionInterface

EmbeddingBagCollection 代表一组池化嵌入 (EmbeddingBags).

注意

EmbeddingBagCollection 是一个无分片模块,没有性能优化。对于性能敏感的场景,请考虑使用分片版本 ShardedEmbeddingBagCollection.

它处理以 KeyedJaggedTensor 形式的稀疏数据,其值为 [F X B X L],其中

  • F: 特征 (键)

  • B: 批量大小

  • L: 稀疏特征的长度 (参差不齐)

并输出一个值为 [B * (F * D)] 的 KeyedTensor,其中

  • F: 特征 (键)

  • D: 每个特征 (键) 的嵌入维度

  • B: 批量大小

参数::
  • tables (List[EmbeddingBagConfig]) – 嵌入表列表.

  • is_weighted (bool) – 输入 KeyedJaggedTensor 是否加权.

  • device (Optional[torch.device]) – 默认计算设备。

示例

table_0 = EmbeddingBagConfig(
    name="t1", embedding_dim=3, num_embeddings=10, feature_names=["f1"]
)
table_1 = EmbeddingBagConfig(
    name="t2", embedding_dim=4, num_embeddings=10, feature_names=["f2"]
)

ebc = EmbeddingBagCollection(tables=[table_0, table_1])

#        0       1        2  <-- batch
# "f1"   [0,1] None    [2]
# "f2"   [3]    [4]    [5,6,7]
#  ^
# feature

features = KeyedJaggedTensor(
    keys=["f1", "f2"],
    values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7]),
    offsets=torch.tensor([0, 2, 2, 3, 4, 5, 8]),
)

pooled_embeddings = ebc(features)
print(pooled_embeddings.values())
tensor([[-0.8899, -0.1342, -1.9060, -0.0905, -0.2814, -0.9369, -0.7783],
    [ 0.0000,  0.0000,  0.0000,  0.1598,  0.0695,  1.3265, -0.1011],
    [-0.4256, -1.1846, -2.1648, -1.0893,  0.3590, -1.9784, -0.7681]],
    grad_fn=<CatBackward0>)
print(pooled_embeddings.keys())
['f1', 'f2']
print(pooled_embeddings.offset_per_key())
tensor([0, 3, 7])
property device: device
embedding_bag_configs() List[EmbeddingBagConfig]
forward(features: KeyedJaggedTensor) KeyedTensor
参数::

features (KeyedJaggedTensor) – 形式为 [F X B X L] 的 KJT.

返回值::

KeyedTensor

is_weighted() bool
reset_parameters() None
training: bool
class torchrec.modules.embedding_modules.EmbeddingBagCollectionInterface(*args, **kwargs)

Bases: ABC, Module

EmbeddingBagCollection 的接口.

abstract embedding_bag_configs() List[EmbeddingBagConfig]
abstract forward(features: KeyedJaggedTensor) KeyedTensor

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

abstract is_weighted() bool
training: bool
class torchrec.modules.embedding_modules.EmbeddingCollection(tables: List[EmbeddingConfig], device: Optional[device] = None, need_indices: bool = False)

Bases: EmbeddingCollectionInterface

EmbeddingCollection 表示一组非池化嵌入。

注意

EmbeddingCollection 是一个非分片模块,未进行性能优化。对于性能敏感的场景,请考虑使用分片版本 ShardedEmbeddingCollection。

它以 KeyedJaggedTensor 的形式处理稀疏数据,形式为 [F X B X L],其中

  • F: 特征 (键)

  • B: 批量大小

  • L:稀疏特征的长度(可变)

并输出 Dict[feature (key), JaggedTensor]。每个 JaggedTensor 包含形式为 (B * L) X D 的值,其中

  • B: 批量大小

  • L: 稀疏特征的长度 (参差不齐)

  • D:每个特征(键)的嵌入维度,长度为 L

参数::
  • tables (List[EmbeddingConfig]) – 嵌入表的列表。

  • device (Optional[torch.device]) – 默认计算设备。

  • need_indices (bool) – 如果需要将索引传递给最终的查找字典。

示例

e1_config = EmbeddingConfig(
    name="t1", embedding_dim=3, num_embeddings=10, feature_names=["f1"]
)
e2_config = EmbeddingConfig(
    name="t2", embedding_dim=3, num_embeddings=10, feature_names=["f2"]
)

ec = EmbeddingCollection(tables=[e1_config, e2_config])

#     0       1        2  <-- batch
# 0   [0,1] None    [2]
# 1   [3]    [4]    [5,6,7]
# ^
# feature

features = KeyedJaggedTensor.from_offsets_sync(
    keys=["f1", "f2"],
    values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7]),
    offsets=torch.tensor([0, 2, 2, 3, 4, 5, 8]),
)
feature_embeddings = ec(features)
print(feature_embeddings['f2'].values())
tensor([[-0.2050,  0.5478,  0.6054],
[ 0.7352,  0.3210, -3.0399],
[ 0.1279, -0.1756, -0.4130],
[ 0.7519, -0.4341, -0.0499],
[ 0.9329, -1.0697, -0.8095]], grad_fn=<EmbeddingBackward>)
property device: device
embedding_configs() List[EmbeddingConfig]
embedding_dim() int
embedding_names_by_table() List[List[str]]
forward(features: KeyedJaggedTensor) Dict[str, JaggedTensor]
参数::

features (KeyedJaggedTensor) – 形式为 [F X B X L] 的 KJT.

返回值::

Dict[str, JaggedTensor]

need_indices() bool
reset_parameters() None
training: bool
class torchrec.modules.embedding_modules.EmbeddingCollectionInterface(*args, **kwargs)

Bases: ABC, Module

用于 EmbeddingCollection 的接口。

abstract embedding_configs() List[EmbeddingConfig]
abstract embedding_dim() int
abstract embedding_names_by_table() List[List[str]]
abstract forward(features: KeyedJaggedTensor) Dict[str, JaggedTensor]

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

abstract need_indices() bool
training: bool
torchrec.modules.embedding_modules.get_embedding_names_by_table(tables: Union[List[EmbeddingBagConfig], List[EmbeddingConfig]]) List[List[str]]
torchrec.modules.embedding_modules.process_pooled_embeddings(pooled_embeddings: List[Tensor], inverse_indices: Tensor) Tensor
torchrec.modules.embedding_modules.reorder_inverse_indices(inverse_indices: Optional[Tuple[List[str], Tensor]], feature_names: List[str]) Tensor

torchrec.modules.feature_processor

class torchrec.modules.feature_processor.BaseFeatureProcessor(*args, **kwargs)

基类:Module

特征处理器抽象基类。

abstract forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

training: bool
class torchrec.modules.feature_processor.BaseGroupedFeatureProcessor(*args, **kwargs)

基类:Module

分组特征处理器的抽象基类

abstract forward(features: KeyedJaggedTensor) KeyedJaggedTensor

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

training: bool
class torchrec.modules.feature_processor.PositionWeightedModule(max_feature_lengths: Dict[str, int], device: Optional[device] = None)

Bases: BaseFeatureProcessor

为 id 列表特征添加位置权重。

参数::

max_feature_lengths (Dict[str, int]) – 特征名称到 max_length 映射。 max_length,也称为截断大小,指定每个样本的最大 id 数。 对于每个特征,其位置权重参数大小为 max_length

forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]
参数::

features (Dict[str, JaggedTensor]) – 键到 JaggedTensor 的字典,表示特征。

返回值::

与输入特征相同,但 weights 字段已填充。

返回类型::

Dict[str, JaggedTensor]

reset_parameters() None
training: bool
class torchrec.modules.feature_processor.PositionWeightedProcessor(max_feature_lengths: Dict[str, int], device: Optional[device] = None)

Bases: BaseGroupedFeatureProcessor

PositionWeightedProcessor 表示一个处理器,用于对 KeyedJaggedTensor 应用位置权重。

它可以处理未分片和分片输入,并输出相应的输出

参数::
  • max_feature_lengths (Dict[str, int]) – feature_lengths 的字典,键是 feature_name,值是长度。

  • device (Optional[torch.device]) – 默认计算设备。

示例

keys=["Feature0", "Feature1", "Feature2"]
values=torch.tensor([0, 1, 2, 3, 4, 5, 6, 7, 3, 4, 5, 6, 7])
lengths=torch.tensor([2, 0, 1, 1, 1, 3, 2, 3, 0])
features = KeyedJaggedTensor.from_lengths_sync(keys=keys, values=values, lengths=lengths)
pw = FeatureProcessorCollection(
    feature_processor_modules={key: PositionWeightedFeatureProcessor(max_feature_length=100) for key in keys}
)
result = pw(features)
# result is
# KeyedJaggedTensor({
#     "Feature0": {
#         "values": [[0, 1], [], [2]],
#         "weights": [[1.0, 1.0], [], [1.0]]
#     },
#     "Feature1": {
#         "values": [[3], [4], [5, 6, 7]],
#         "weights": [[1.0], [1.0], [1.0, 1.0, 1.0]]
#     },
#     "Feature2": {
#         "values": [[3, 4], [5, 6, 7], []],
#         "weights": [[1.0, 1.0], [1.0, 1.0, 1.0], []]
#     }
# })
forward(features: KeyedJaggedTensor) KeyedJaggedTensor

在未分片或非流水线模型中,输入特征包含 fp_feature 和 non_fp_features,输出将过滤掉 non_fp 特征。 在分片流水线模型中,输入特征只能包含无或全部 feature_processed 特征,因为输入特征来自 ebc 的 input_dist(),它将过滤掉不在 ebc 中的键。 并且输入大小与输出大小相同。

参数::

features (KeyedJaggedTensor) – 输入特征

返回值::

KeyedJaggedTensor

named_buffers(prefix: str = '', recurse: bool = True, remove_duplicate: bool = True) Iterator[Tuple[str, Tensor]]

返回模块缓冲区的迭代器,同时返回缓冲区的名称和缓冲区本身。

参数::
  • prefix (str) – 要添加到所有缓冲区名称之前的 前缀。

  • recurse (bool, 可选) – 如果为 True,则会生成此模块和所有子模块的缓冲区。 否则,仅生成此模块的直接成员缓冲区。 默认为 True。

  • remove_duplicate (bool, 可选) – 是否删除结果中重复的缓冲区。 默认为 True。

生成:

(str, torch.Tensor) – 包含名称和缓冲区的元组

示例

>>> # xdoctest: +SKIP("undefined vars")
>>> for name, buf in self.named_buffers():
>>>     if name in ['running_var']:
>>>         print(buf.size())
state_dict(destination: Optional[Dict[str, Any]] = None, prefix: str = '', keep_vars: bool = False) Dict[str, Any]

返回一个字典,其中包含对模块的整个状态的引用。

包括参数和持久缓冲区(例如运行平均值)。 键是相应的参数和缓冲区名称。 设置为 None 的参数和缓冲区不会被包含。

注意

返回的对象是浅拷贝。 它包含对模块参数和缓冲区的引用。

警告

目前 state_dict() 也接受 destinationprefixkeep_vars 的位置参数。 但是,这将被弃用,在未来的版本中将强制使用关键字参数。

警告

请避免使用参数 destination,因为它不是为最终用户设计的。

参数::
  • destination (dict, 可选) – 如果提供,模块的状态将更新到字典中,并返回相同对象。 否则,将创建并返回一个 OrderedDict。 默认为 None

  • prefix (str, 可选) – 添加到参数和缓冲区名称之前的 前缀,以组成 state_dict 中的键。 默认为 ''

  • keep_vars (bool, 可选) – 默认为 False,则在状态字典中返回的 Tensor 将从自动梯度中分离。 如果设置为 True,则不会执行分离。 默认为 False

返回值::

包含模块的整个状态的字典

返回类型::

dict

示例

>>> # xdoctest: +SKIP("undefined vars")
>>> module.state_dict().keys()
['bias', 'weight']
training: bool
torchrec.modules.feature_processor.offsets_to_range_traceble(offsets: Tensor, values: Tensor) Tensor
torchrec.modules.feature_processor.position_weighted_module_update_features(features: Dict[str, JaggedTensor], weighted_features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]

torchrec.modules.lazy_extension

class torchrec.modules.lazy_extension.LazyModuleExtensionMixin(*args, **kwargs)

基类: LazyModuleMixin

这是 LazyModuleMixin 的临时扩展,用于支持将关键字参数传递给延迟模块的 forward 方法。

长期计划是将此功能上游到 LazyModuleMixin。 有关详细信息,请参见 https://github.com/pytorch/pytorch/issues/59923

请参见 TestLazyModuleExtensionMixin,其中包含确保
  • LazyModuleExtensionMixin._infer_parameters 的源代码与 torch.nn.modules.lazy.LazyModuleMixin._infer_parameters 相同,只是前者可以接受关键字参数。

  • LazyModuleExtensionMixin._call_impl 的源代码与 torch.nn.Module._call_impl 相同,只是前者可以将关键字参数传递给 forward 预钩子。

apply(fn: Callable[[Module], None]) Module

递归地将 fn 应用于每个子模块(如 .children() 所返回),以及自身。 典型用法包括初始化模型的参数。

注意

对未初始化的延迟模块调用 apply() 将导致错误。 用户需要在对延迟模块调用 apply() 之前初始化延迟模块(通过执行一个虚拟前向传递)。

参数::

fn (torch.nn.Module -> None) – 要应用于每个子模块的函数。

返回值::

self

返回类型::

torch.nn.Module

示例

@torch.no_grad()
def init_weights(m):
    print(m)
    if type(m) == torch.nn.LazyLinear:
        m.weight.fill_(1.0)
        print(m.weight)

linear = torch.nn.LazyLinear(2)
linear.apply(init_weights)  # this fails, because `linear` (a lazy-module) hasn't been initialized yet

input = torch.randn(2, 10)
linear(input)  # run a dummy forward pass to initialize the lazy-module

linear.apply(init_weights)  # this works now
torchrec.modules.lazy_extension.lazy_apply(module: Module, fn: Callable[[Module], None]) Module

将函数附加到模块,该函数将在第一次前向传递后(即所有子模块和参数初始化后)递归地应用于模块的每个子模块(如 .children() 所返回)以及模块本身。

典型用法包括初始化延迟模块的参数的数值(即从 LazyModuleMixin 继承的模块)。

注意

lazy_apply() 可以用于延迟模块和非延迟模块。

参数::
  • module (torch.nn.Module) – 要递归应用 fn 的模块。

  • fn (Callable[[torch.nn.Module], None]) – 要附加到 module 并在稍后应用于 module 的每个子模块和 module 本身的函数。

返回值::

已附加 fnmodule

返回类型::

torch.nn.Module

示例

@torch.no_grad()
def init_weights(m):
    print(m)
    if type(m) == torch.nn.LazyLinear:
        m.weight.fill_(1.0)
        print(m.weight)

linear = torch.nn.LazyLinear(2)
lazy_apply(linear, init_weights)  # doesn't run `init_weights` immediately
input = torch.randn(2, 10)
linear(input)  # runs `init_weights` only once, right after first forward pass

seq = torch.nn.Sequential(torch.nn.LazyLinear(2), torch.nn.LazyLinear(2))
lazy_apply(seq, init_weights)  # doesn't run `init_weights` immediately
input = torch.randn(2, 10)
seq(input)  # runs `init_weights` only once, right after first forward pass

torchrec.modules.mlp

class torchrec.modules.mlp.MLP(in_size: int, layer_sizes: ~typing.List[int], bias: bool = True, activation: ~typing.Union[str, ~typing.Callable[[], ~torch.nn.modules.module.Module], ~torch.nn.modules.module.Module, ~typing.Callable[[~torch.Tensor], ~torch.Tensor]] = <built-in method relu of type object>, device: ~typing.Optional[~torch.device] = None, dtype: ~torch.dtype = torch.float32)

基类:Module

依次应用感知器模块堆栈(即多层感知器)。

参数::
  • in_size (int) – 输入的 in_size

  • layer_sizes (List[int]) – 每个感知器模块的 out_size

  • bias (bool) – 如果设置为 False,则层将不会学习附加偏差。默认值:True。

  • activation (str, Union[Callable[[], torch.nn.Module], torch.nn.Module, Callable[[torch.Tensor], torch.Tensor]]) – 要应用于每个感知器模块的线性变换输出的激活函数。如果 activation 是一个 str,我们目前只支持以下字符串,如“relu”、“sigmoid”和“swish_layernorm”。如果 activation 是一个 Callable[[], torch.nn.Module],则每个感知器模块将调用一次 activation() 以生成该感知器模块的激活模块,并且这些激活模块之间不会共享参数。一种用例是当所有激活模块共享相同的构造函数参数时,但不共享实际的模块参数。默认值:torch.relu。

  • device (Optional[torch.device]) – 默认计算设备。

示例

batch_size = 3
in_size = 40
input = torch.randn(batch_size, in_size)

layer_sizes = [16, 8, 4]
mlp_module = MLP(in_size, layer_sizes, bias=True)
output = mlp_module(input)
assert list(output.shape) == [batch_size, layer_sizes[-1]]
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 形状为 (B, I) 的张量,其中 I 是每个输入样本中的元素数量。

返回值::

形状为 (B, O) 的张量,其中 O 是最后一个感知器模块的 out_size

返回类型::

torch.Tensor

training: bool
class torchrec.modules.mlp.Perceptron(in_size: int, out_size: int, bias: bool = True, activation: ~typing.Union[~torch.nn.modules.module.Module, ~typing.Callable[[~torch.Tensor], ~torch.Tensor]] = <built-in method relu of type object>, device: ~typing.Optional[~torch.device] = None, dtype: ~torch.dtype = torch.float32)

基类:Module

应用线性变换和激活。

参数::
  • in_size (int) – 每个输入样本中的元素数量。

  • out_size (int) – 每个输出样本中的元素数量。

  • bias (bool) – 如果设置为 False,则层将不会学习附加偏差。默认值:True

  • activation (Union[torch.nn.Module, Callable[[torch.Tensor], torch.Tensor]]) – 要应用于线性变换输出的激活函数。默认值:torch.relu。

  • device (Optional[torch.device]) – 默认计算设备。

示例

batch_size = 3
in_size = 40
input = torch.randn(batch_size, in_size)

out_size = 16
perceptron = Perceptron(in_size, out_size, bias=True)

output = perceptron(input)
assert list(output) == [batch_size, out_size]
forward(input: Tensor) Tensor
参数::

input (torch.Tensor) – 形状为 (B, I) 的张量,其中 I 是每个输入样本中的元素数量。

返回值::

形状为 (B, O) 的张量,其中 O 是每个输出样本中每个通道的元素数量(即 out_size)。

channel in each output sample (i.e. out_size).

返回类型::

torch.Tensor

training: bool

torchrec.modules.utils

class torchrec.modules.utils.SequenceVBEContext(recat: torch.Tensor, unpadded_lengths: torch.Tensor, reindexed_lengths: torch.Tensor, reindexed_length_per_key: List[int], reindexed_values: Union[torch.Tensor, NoneType] = None)

Bases: Multistreamable

recat: Tensor
record_stream(stream: Stream) None

See https://pytorch.ac.cn/docs/stable/generated/torch.Tensor.record_stream.html

reindexed_length_per_key: List[int]
reindexed_lengths: Tensor
reindexed_values: Optional[Tensor] = None
unpadded_lengths: Tensor
torchrec.modules.utils.check_module_output_dimension(module: Union[Iterable[Module], Module], in_features: int, out_features: int) bool

验证给定模块或模块列表的 out_features 是否与指定数量匹配。如果给定模块列表或 ModuleList,则递归检查所有子模块。

torchrec.modules.utils.construct_jagged_tensors(embeddings: Tensor, features: KeyedJaggedTensor, embedding_names: List[str], need_indices: bool = False, features_to_permute_indices: Optional[Dict[str, List[int]]] = None, original_features: Optional[KeyedJaggedTensor] = None, reverse_indices: Optional[Tensor] = None, seq_vbe_ctx: Optional[SequenceVBEContext] = None) Dict[str, JaggedTensor]
torchrec.modules.utils.construct_jagged_tensors_inference(embeddings: Tensor, lengths: Tensor, values: Tensor, embedding_names: List[str], need_indices: bool = False, features_to_permute_indices: Optional[Dict[str, List[int]]] = None, reverse_indices: Optional[Tensor] = None) Dict[str, JaggedTensor]
torchrec.modules.utils.construct_modulelist_from_single_module(module: Module, sizes: Tuple[int, ...]) Module

给定一个模块,通过复制提供的模块并重新初始化线性层来构建大小为 sizes 的(嵌套)ModuleList。

torchrec.modules.utils.convert_list_of_modules_to_modulelist(modules: Iterable[Module], sizes: Tuple[int, ...]) Module
torchrec.modules.utils.deterministic_dedup(ids: Tensor) Tuple[Tensor, Tensor]

为了消除冲突更新中的竞态条件,删除重复的 ID。 仅保留重复 ID 的最后一次出现。 返回排序的唯一 ID 和最后一次出现的 ID。

torchrec.modules.utils.extract_module_or_tensor_callable(module_or_callable: Union[Callable[[], Module], Module, Callable[[Tensor], Tensor]]) Union[Module, Callable[[Tensor], Tensor]]
torchrec.modules.utils.get_module_output_dimension(module: Union[Callable[[Tensor], Tensor], Module], in_features: int) int
torchrec.modules.utils.init_mlp_weights_xavier_uniform(m: Module) None
torchrec.modules.utils.jagged_index_select_with_empty(values: Tensor, ids: Tensor, offsets: Tensor, output_offsets: Tensor) Tensor

torchrec.modules.mc_modules

class torchrec.modules.mc_modules.DistanceLFU_EvictionPolicy(decay_exponent: float = 1.0, threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)

Bases: MCHEvictionPolicy

coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None) Dict[str, Tensor]

Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由

torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。

合并元数据历史缓冲区并返回已处理的元数据张量字典。

property metadata_info: List[MCHEvictionPolicyMetadataInfo]
record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None

Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典

根据接收的 id 计算并记录元数据

对于实现的驱逐策略。

update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping

Args

Returns Tuple of (evicted_indices, selected_new_indices) where

evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.

class torchrec.modules.mc_modules.LFU_EvictionPolicy(threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)

Bases: MCHEvictionPolicy

coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None) Dict[str, Tensor]

Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由

torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。

合并元数据历史缓冲区并返回已处理的元数据张量字典。

property metadata_info: List[MCHEvictionPolicyMetadataInfo]
record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None

Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典

根据接收的 id 计算并记录元数据

对于实现的驱逐策略。

update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping: Tensor, coalesced_history_sorted_unique_ids_counts: Tensor, coalesced_history_mch_matching_elements_mask: Tensor, coalesced_history_mch_matching_indices: Tensor, mch_metadata: Dict[str, Tensor], coalesced_history_metadata: Dict[str, Tensor]) Tuple[Tensor, Tensor]

Args

Returns Tuple of (evicted_indices, selected_new_indices) where

evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.

class torchrec.modules.mc_modules.LRU_EvictionPolicy(decay_exponent: float = 1.0, threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)

Bases: MCHEvictionPolicy

coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None

Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由

torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。

合并元数据历史缓冲区并返回已处理的元数据张量字典。

property metadata_info: List[MCHEvictionPolicyMetadataInfo]
record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None

Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典

根据接收的 id 计算并记录元数据

对于实现的驱逐策略。

update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping: Tensor, coalesced_history_sorted_unique_ids_counts: Tensor, coalesced_history_mch_matching_elements_mask: Tensor, coalesced_history_mch_matching_indices: Tensor, mch_metadata: Dict[str, Tensor], coalesced_history_metadata: Dict[str, Tensor]) Tuple[Tensor, Tensor]

Args

Returns Tuple of (evicted_indices, selected_new_indices) where

evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.

class torchrec.modules.mc_modules.MCHEvictionPolicy(metadata_info: List[MCHEvictionPolicyMetadataInfo], threshold_filtering_func: Optional[Callable[[Tensor], Tuple[Tensor, Union[float, Tensor]]]] = None)

Bases: ABC

abstract coalesce_history_metadata(current_iter: int, history_metadata: Dict[str, Tensor], unique_ids_counts: Tensor, unique_inverse_mapping: Tensor, additional_ids: Optional[Tensor] = None, threshold_mask: Optional[Tensor] = None) Dict[str, Tensor]

Args: history_metadata (Dict[str, torch.Tensor]): 历史元数据字典 additional_ids (torch.Tensor): 用作历史的一部分的额外 id unique_inverse_mapping (torch.Tensor): 由

torch.cat[history_accumulator, additional_ids] 生成的 torch.unique 逆映射。用于将历史元数据张量索引映射到它们合并的张量索引。

合并元数据历史缓冲区并返回已处理的元数据张量字典。

abstract property metadata_info: List[MCHEvictionPolicyMetadataInfo]
abstract record_history_metadata(current_iter: int, incoming_ids: Tensor, history_metadata: Dict[str, Tensor]) None

Args: current_iter (int): 当前迭代 incoming_ids (torch.Tensor): 接收的 id history_metadata (Dict[str, torch.Tensor]): 历史元数据字典

根据接收的 id 计算并记录元数据

对于实现的驱逐策略。

abstract update_metadata_and_generate_eviction_scores(current_iter: int, mch_size: int, coalesced_history_argsort_mapping: Tensor, coalesced_history_sorted_unique_ids_counts: Tensor, coalesced_history_mch_matching_elements_mask: Tensor, coalesced_history_mch_matching_indices: Tensor, mch_metadata: Dict[str, Tensor], coalesced_history_metadata: Dict[str, Tensor]) Tuple[Tensor, Tensor]

Args

Returns Tuple of (evicted_indices, selected_new_indices) where

evicted_indices are indices in the mch map to be evicted, and selected_new_indices are the indices of the ids in the coalesced history that are to be added to the mch.

class torchrec.modules.mc_modules.MCHEvictionPolicyMetadataInfo(metadata_name, is_mch_metadata, is_history_metadata)

Bases: tuple

is_history_metadata: bool

Alias for field number 2

is_mch_metadata: bool

Alias for field number 1

metadata_name: str

Alias for field number 0

class torchrec.modules.mc_modules.MCHManagedCollisionModule(zch_size: int, device: device, eviction_policy: MCHEvictionPolicy, eviction_interval: int, input_hash_size: int = 9223372036854775808, input_hash_func: Optional[Callable[[Tensor, int], Tensor]] = None, mch_size: Optional[int] = None, mch_hash_func: Optional[Callable[[Tensor, int], Tensor]] = None, name: Optional[str] = None, output_global_offset: int = 0)

Bases: ManagedCollisionModule

ZCH / MCH 管理的冲突模块

参数::
  • zch_size (int) – 输出 ID 的范围,在 [output_size_offset, output_size_offset + zch_size - 1) 内

  • device (torch.device) – 此模块将在此设备上执行

  • eviction_policy (驱逐策略) – 要使用的驱逐策略

  • eviction_interval (int) – 驱逐策略触发的间隔

  • input_hash_size (int) – 输入特征 ID 范围,将作为第二个参数传递给 input_hash_func

  • input_hash_func (Optional[Callable]) – 用于为输入特征生成哈希值的函数。此函数通常用于驱动对与输入数据相同或更大的范围的均匀分布

  • mch_size (Optional[int]) – 剩余输出的大小(即传统 MCH),实验性功能。ID 在内部由 output_size_offset + zch_output_range 偏移

  • mch_hash_func (Optional[Callable]) – 用于为剩余特征生成哈希值的函数。将哈希到 mch_size。

  • output_global_offset (int) – 输出范围的输出 ID 的偏移量,通常仅在分片应用程序中使用。

evict() Optional[Tensor]

如果本轮不需要驱逐,则返回 None。否则,返回要重置的插槽的 ID。在驱逐时,此模块应重置其这些插槽的状态,并假设下游模块将正确处理这种情况。

forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]

Args: feature (JaggedTensor): 特征表示 :returns: 修改后的 JT :rtype: Dict[str, JaggedTensor]

input_size() int

返回输入的数值范围,用于分片信息

output_size() int

返回输出的数值范围,用于验证与下游嵌入查找

preprocess(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]
profile(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]
rebuild_with_output_id_range(output_id_range: Tuple[int, int], device: Optional[device] = None) MCHManagedCollisionModule

用于创建 RW 分片的本地 MC 模块,目前是临时解决方案

remap(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]
training: bool
class torchrec.modules.mc_modules.ManagedCollisionCollection(managed_collision_modules: Dict[str, ManagedCollisionModule], embedding_configs: List[BaseEmbeddingConfig])

基类:Module

ManagedCollisionCollection 表示一组管理的冲突模块。传递给 MCC 的输入将被管理的冲突模块重新映射

并返回。

参数::
embedding_configs() List[BaseEmbeddingConfig]
evict() Dict[str, Optional[Tensor]]
forward(features: KeyedJaggedTensor) KeyedJaggedTensor

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

training: bool
class torchrec.modules.mc_modules.ManagedCollisionModule(device: device)

基类:Module

ManagedCollisionModule 的抽象基类。将输入 ID 映射到范围 [0, max_output_id)。

参数::
  • max_output_id (int) – 重新映射 ID 的最大输出值。

  • input_hash_size (int) – 输入范围的最大值,即 [0, input_hash_size)

  • remapping_range_start_index (int) – 重新映射范围的相对起始索引

  • device (torch.device) – 默认计算设备。

示例:

jt = JaggedTensor(…) mcm = ManagedCollisionModule(…) mcm_jt = mcm(fp)

property device: device
abstract evict() Optional[Tensor]

如果本轮不需要驱逐,则返回 None。否则,返回要重置的插槽的 ID。在驱逐时,此模块应重置其这些插槽的状态,并假设下游模块将正确处理这种情况。

abstract forward(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

abstract input_size() int

返回输入的数值范围,用于分片信息

abstract output_size() int

返回输出的数值范围,用于验证与下游嵌入查找

abstract preprocess(features: Dict[str, JaggedTensor]) Dict[str, JaggedTensor]
abstract rebuild_with_output_id_range(output_id_range: Tuple[int, int], device: Optional[device] = None) ManagedCollisionModule

用于创建 RW 分片的本地 MC 模块,目前是临时解决方案

training: bool
torchrec.modules.mc_modules.apply_mc_method_to_jt_dict(method: str, features_dict: Dict[str, JaggedTensor], table_to_features: Dict[str, List[str]], managed_collisions: ModuleDict) Dict[str, JaggedTensor]

将 MC 方法应用于 JaggedTensor 字典,并返回具有相同排序的更新字典

torchrec.modules.mc_modules.average_threshold_filter(id_counts: Tensor) Tuple[Tensor, Tensor]

阈值为 id_counts 的平均值。如果 id 的计数严格大于平均值,则添加该 id。

torchrec.modules.mc_modules.dynamic_threshold_filter(id_counts: Tensor, threshold_skew_multiplier: float = 10.0) Tuple[Tensor, Tensor]

阈值为 total_count / num_ids * threshold_skew_multiplier。如果 id 的计数严格大于阈值,则添加该 id。

torchrec.modules.mc_modules.probabilistic_threshold_filter(id_counts: Tensor, per_id_probability: float = 0.01) Tuple[Tensor, Tensor]

每个 id 被添加的概率为 per_id_probability。例如,如果 per_id_probability 为 0.01,而 id 出现 100 次,则它被添加的概率为 60%。更准确地说,id 分数为 1 - (1 - per_id_probability) ^ id_count,对于随机生成的阈值,id 分数为它被添加的概率。

torchrec.modules.mc_embedding_modules

class torchrec.modules.mc_embedding_modules.BaseManagedCollisionEmbeddingCollection(embedding_module: Union[EmbeddingBagCollection, EmbeddingCollection], managed_collision_collection: ManagedCollisionCollection, return_remapped_features: bool = False)

基类:Module

BaseManagedCollisionEmbeddingCollection 表示一个 EC/EBC 模块和一组受控冲突模块。进入 MC-EC/EBC 的输入将在被传递到嵌入集合之前首先被受控冲突模块修改。

参数::
  • embedding_module – 用于查找嵌入的 EmbeddingCollection

  • managed_collision_modules – 受控冲突模块的字典

  • return_remapped_features (bool) – 是否除了嵌入之外还返回重映射的输入特征

forward(features: KeyedJaggedTensor) Tuple[Union[KeyedTensor, Dict[str, JaggedTensor]], Optional[KeyedJaggedTensor]]

定义每次调用时执行的计算。

所有子类都应该覆盖它。

注意

虽然前向传递的方案需要在此函数中定义,但应在之后调用Module实例,而不是此函数,因为前者负责运行注册的钩子,而后者则会静默地忽略它们。

training: bool
class torchrec.modules.mc_embedding_modules.ManagedCollisionEmbeddingBagCollection(embedding_bag_collection: EmbeddingBagCollection, managed_collision_collection: ManagedCollisionCollection, return_remapped_features: bool = False)

Bases: BaseManagedCollisionEmbeddingCollection

ManagedCollisionEmbeddingBagCollection 表示一个 EmbeddingBagCollection 模块和一组受控冲突模块。进入 MC-EBC 的输入将在被传递到嵌入包集合之前首先被受控冲突模块修改。

有关输入和输出类型的信息,请参阅 EmbeddingBagCollection

参数::
  • embedding_module – 用于查找嵌入的 EmbeddingBagCollection

  • managed_collision_modules – 受控冲突模块的字典

  • return_remapped_features (bool) – 是否除了嵌入之外还返回重映射的输入特征

training: bool
class torchrec.modules.mc_embedding_modules.ManagedCollisionEmbeddingCollection(embedding_collection: EmbeddingCollection, managed_collision_collection: ManagedCollisionCollection, return_remapped_features: bool = False)

Bases: BaseManagedCollisionEmbeddingCollection

ManagedCollisionEmbeddingCollection 表示一个 EmbeddingCollection 模块和一组受控冲突模块。进入 MC-EC 的输入将在被传递到嵌入集合之前首先被受控冲突模块修改。

有关输入和输出类型的信息,请参阅 EmbeddingCollection

参数::
  • embedding_module – 用于查找嵌入的 EmbeddingCollection

  • managed_collision_modules – 受控冲突模块的字典

  • return_remapped_features (bool) – 是否除了嵌入之外还返回重映射的输入特征

training: bool
torchrec.modules.mc_embedding_modules.evict(evictions: Dict[str, Optional[Tensor]], ebc: Union[EmbeddingBagCollection, EmbeddingCollection]) None

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源