Planner¶
TorchRec Planner 负责确定用于分布式训练和推理的最有效、最平衡的分片计划。
用于生成分片计划的主要 API 是 EmbeddingShardingPlanner.plan
- class torchrec.distributed.types.ShardingPlan(plan: Dict[str, ModuleShardingPlan])¶
分片计划的表示形式。这使用了较大包装模型(即使用 DistributedModelParallel 包装的模型)的 FQN。当需要 TorchRec 可组合性时,应使用 EmbeddingModuleShardingPlan。
- plan¶
字典,键为模块路径,值为参数分片规范字典,参数分片规范字典的键为参数名称。
- 类型:
Dict[str, EmbeddingModuleShardingPlan]
- get_plan_for_module(module_path: str) Optional[ModuleShardingPlan] ¶
- 参数:
module_path (str) –
- 返回:
参数分片规范字典,键为参数名称。如果给定 module_path 不存在分片规范,则为 None。
- 返回类型:
Optional[ModuleShardingPlan]
- class torchrec.distributed.planner.planners.EmbeddingShardingPlanner(topology: Optional[Topology] = None, batch_size: Optional[int] = None, enumerator: Optional[Enumerator] = None, storage_reservation: Optional[StorageReservation] = None, proposer: Optional[Union[Proposer, List[Proposer]]] = None, partitioner: Optional[Partitioner] = None, performance_model: Optional[PerfModel] = None, stats: Optional[Union[Stats, List[Stats]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True, callbacks: Optional[List[Callable[[List[ShardingOption]], List[ShardingOption]]]] = None)¶
根据提供的分片器、拓扑和约束,为具有可分片参数的给定模块提供优化的分片计划。
- 参数:
topology (Optional[Topology]) – 当前进程组的拓扑。
batch_size (Optional[int]) – 模型的批大小。
enumerator (Optional[Enumerator]) – 要使用的枚举器
storage_reservation (Optional[StorageReservation]) – 要使用的存储预留
proposer (Optional[Union[Proposer, List[Proposer]]]) – 要使用的提议器
partitioner (Optional[Partitioner]) – 要使用的分区器
performance_model (Optional[PerfModel]) – 要使用的性能模型
stats (Optional[Union[Stats, List[Stats]]]) – 要使用的统计信息
constraints (Optional[Dict[str, ParameterConstraints]]) – 每个表的分片约束。
debug (bool) – 是否打印调试信息。
示例
ebc = EmbeddingBagCollection(tables=eb_configs, device=torch.device("meta")) planner = EmbeddingShardingPlanner() plan = planner.plan( module=ebc, sharders=[EmbeddingBagCollectionSharder()], )
- collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan ¶
在 rank 0 上调用 self.plan(…) 并进行广播
- 参数:
module (nn.Module) – 要分片的模块。
sharders (Optional[List[ModuleSharder[nn.Module]]]) – 用于分片的分片器
pg (Optional[dist.ProcessGroup]) – 用于集体操作的进程组
- 返回:
模块的分片计划。
- 返回类型:
- plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan ¶
根据提供的分片器、拓扑和约束,为具有可分片参数的给定模块提供优化的分片计划。
- 参数:
module (nn.Module) – 要分片的模块。
sharders (List[ModuleSharder[nn.Module]]) – 用于分片的分片器。
- 返回:
模块的分片计划。
- 返回类型:
- class torchrec.distributed.planner.enumerators.EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None, use_exact_enumerate_order: Optional[bool] = False)¶
为给定的 nn.Module 生成嵌入分片选项,并考虑用户提供的约束。
- 参数:
topology (Topology) – 设备拓扑。
batch_size (int) – 批大小。
constraints (Optional[Dict[str, ParameterConstraints]]) – 参数名称到提供的 ParameterConstraints 的字典。
estimator (Optional[Union[ShardEstimator, List[ShardEstimator]]]) – 分片性能估算器。
use_exact_enumerate_order (bool) – 是否以精确的 name_children 枚举顺序枚举可分片参数
- enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption] ¶
根据模块和分片器生成相关的分片选项。
- 参数:
module (nn.Module) – 要分片的模块。
sharders (List[ModuleSharder[nn.Module]]) – 为模块提供的分片器。
- 返回:
包含填充值的有效分片选项。
- 返回类型:
List[ShardingOption]
- populate_estimates(sharding_options: List[ShardingOption]) None ¶
请参阅类描述。
- class torchrec.distributed.planner.partitioners.GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)¶
贪婪分区器。
- 参数:
sort_by (SortBy) – 按存储或性能降序对分片选项进行排序(即,大型表将首先放置)。
balance_modules (bool) – 是否首先按模块排序,其中较小的模块将首先排序。实际上,这将以平衡的方式放置每个模块中的表。
- partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption] ¶
根据每个分片选项的 partition_by 属性,将分片选项放置在拓扑上。在放置结束时更新拓扑、存储和性能。
- 参数:
proposal (List[ShardingOption]) – 填充的分片选项列表。
storage_constraint (Topology) – 设备拓扑。
- 返回:
所选计划的分片选项列表。
- 返回类型:
List[ShardingOption]
示例
sharding_options = [ ShardingOption(partition_by="uniform", shards=[ Shards(storage=1, perf=1), Shards(storage=1, perf=1), ]), ShardingOption(partition_by="uniform", shards=[ Shards(storage=2, perf=2), Shards(storage=2, perf=2), ]), ShardingOption(partition_by="device", shards=[ Shards(storage=3, perf=3), Shards(storage=3, perf=3), ]) ShardingOption(partition_by="device", shards=[ Shards(storage=4, perf=4), Shards(storage=4, perf=4), ]), ] topology = Topology(world_size=2) # First [sharding_options[0] and sharding_options[1]] will be placed on the # topology with the uniform strategy, resulting in topology.devices[0].perf.total = (1,2) topology.devices[1].perf.total = (1,2) # Finally sharding_options[2] and sharding_options[3]] will be placed on the # topology with the device strategy (see docstring of `partition_by_device` for # more details). topology.devices[0].perf.total = (1,2) + (3,4) topology.devices[1].perf.total = (1,2) + (3,4) # The topology updates are done after the end of all the placements (the other # in the example is just for clarity).
- class torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation(percentage: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: Optional[int] = None)¶
使用启发式计算为要分片的模型预留存储空间。存储预留包括密集张量存储、KJT 存储以及总存储空间的额外百分比。
- 参数:
percentage (float) – 要预留的额外存储百分比,作为超出存储启发式计算的误差范围。
parameter_multiplier (float) – 总参数存储的启发式乘数。
dense_tensor_estimate (Optional[int]) – 密集张量的存储估计,如果未提供,则使用默认启发式估计。
- class torchrec.distributed.planner.proposers.GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)¶
以贪婪方式提出分片计划。
按性能对每个可分片参数的分片选项进行排序。在每次迭代中,找到当前存储使用量最大的参数,并尝试其下一个分片选项。
- 参数:
use_depth (bool) – 启用后,fqn 的 sharding_options 基于 max(shard.perf.total) 排序,否则 sharding_options 按 sum(shard.perf.total) 排序。
threshold (Optional[int]) – 提前停止的阈值。指定后,当提议的性能评级连续比 best_perf_rating 差时,提议器停止提议。
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None ¶
向提议器提供反馈。
- 参数:
partitionable (bool) – 计划是否可分区。
plan (Optional[List[ShardingOption]]) – 要提供反馈的计划。
perf_rating (Optional[float]) – 计划的性能评级。
storage_constraint (Optional[Topology]) – 计划的存储约束。
- load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None ¶
将搜索空间加载到提议器中。
- 参数:
search_space (List[ShardingOption]) – 要加载的搜索空间。
enumerator (Enumerator) – 用于生成搜索空间的枚举器。
- propose() Optional[List[ShardingOption]] ¶
提出分片计划。
- 返回:
提议的计划。
- 返回类型:
Optional[List[ShardingOption]]
- class torchrec.distributed.planner.shard_estimators.EmbeddingPerfEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, is_inference: bool = False)¶
Embedding Wall Time Perf Estimator. This estimator estimates the wall time of a given sharding option.
- 参数:
topology (Topology) – 设备拓扑。
constraints (Optional[Dict[str, ParameterConstraints]]) – parameter constraints.
is_inference (bool) – whether or not the estimator is used for inference.
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None ¶
Estimates the wall time of a given sharding option.
- 参数:
sharding_options (List[ShardingOption]) – list of sharding options.
sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – sharder map.
- classmethod perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device: str, sharding_type: str, batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], input_data_type_size: float, table_data_type_size: float, output_data_type_size: float, fwd_a2a_comm_data_type_size: float, bwd_a2a_comm_data_type_size: float, fwd_sr_comm_data_type_size: float, bwd_sr_comm_data_type_size: float, num_poolings: List[float], hbm_mem_bw: float, ddr_mem_bw: float, hbm_to_ddr_mem_bw: float, intra_host_bw: float, inter_host_bw: float, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: Optional[float] = None, is_inference: bool = False, prefetch_pipeline: bool = False, expected_cache_fetches: float = 0, uneven_sharding_perf_multiplier: float = 1.0) List[Perf] ¶
Attempts to model perfs as a function of relative wall times.
- 参数:
shard_sizes (List[List[int]]) – the list of (local_rows, local_cols) of each shard.
compute_kernel (str) – compute kernel.
compute_device (str) – compute device.
sharding_type (str) – tw, rw, cw, twrw, dp.
batch_sizes (List[int]) – batch size for each input feature.
world_size (int) – the number of devices for all hosts.
local_world_size (int) – the number of the device for each host.
input_lengths (List[float]) – the list of the average number of lookups of each input query feature.
input_data_type_size (float) – the data type size of the distributed data_parallel input.
table_data_type_size (float) – the data type size of the table.
output_data_type_size (float) – the data type size of the output embeddings.
fwd_comm_data_type_size (float) – the data type size of the distributed data_parallel input during forward communication.
bwd_comm_data_type_size (float) – the data type size of the distributed data_parallel input during backward communication.
num_poolings (List[float]) – number of poolings per sample, typically 1.0.
hbm_mem_bw (float) – the bandwidth of the device HBM.
ddr_mem_bw (float) – the bandwidth of the system DDR memory.
hbm_to_ddr_bw (float) – the bandwidth between device HBM and system DDR.
intra_host_bw (float) – the bandwidth within a single host like multiple threads.
inter_host_bw (float) – the bandwidth between two hosts like multiple machines.
is_pooled (bool) – True if embedding output is pooled (ie. EmbeddingBag), False if unpooled/sequential (ie. Embedding).
is_weighted (bool = False) – if the module is an EBC and is weighted, typically signifying an id score list feature.
is_inference (bool = False) – if planning for inference.
caching_ratio (Optional[float] = None) – cache ratio to determine the bandwidth of device.
prefetch_pipeline (bool = False) – whether prefetch pipeline is enabled.
expected_cache_fetches (float) – number of expected cache fetches across global batch
uneven_sharding_perf_multiplier (float = 1.0) – multiplier to account for uneven sharding perf
- 返回:
the list of perf for each shard.
- 返回类型:
List[float]
-
class torchrec.distributed.planner.shard_estimators.EmbeddingStorageEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, pipeline_type: PipelineType = PipelineType.NONE, run_embedding_at_peak_memory: bool = False, is_inference: bool = False)
Embedding Storage Usage Estimator
- 参数:
topology (Topology) – 设备拓扑。
constraints (Optional[Dict[str, ParameterConstraints]]) – parameter constraints.
pipeline_type (PipelineType) – The type of pipeline, if any. Will determine the input replication factor during memory estimation.
run_embedding_at_peak_memory (bool) –
If the embedding fwd/bwd will be execute when HBM usage is at peak. When set to TRUE, any temporary memory allocation during embedding forward/backward, as long as output sizes before output_dist will be counted towards HBM storage cost. Otherwise they won’t since they’ll be “hidden” by the real memory peak.
Only take effect if pipeline_type is set for backward compatibility (not affecting models using old pipeline-agnostic formula)
Default to false because this is typically false for RecSys since memory peak happens at the end of dense forwrad / beginning of dense backward instead.
is_inference (bool) – If the model is inference model. Default to False.
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None ¶
Estimate the storage cost of each sharding option.
- 参数:
sharding_options (List[ShardingOption]) – list of sharding options.
sharder_map (Optional[Dict[str, ModuleSharder[nn.Module]]]) – map from module type to sharder.