快捷方式

MPPIPlanner

class torchrl.modules.MPPIPlanner(*args, **kwargs)[source]

MPPI 规划器模块。

参考
  • 使用协方差变量重要性的模型预测路径积分控制

采样。(Williams, G., Aldrich, A. 和 Theodorou, E. A.) https://arxiv.org/abs/1509.01149 - 用于模型预测控制的时间差分学习

(Hansen N., Wang X., Su H.) https://arxiv.org/abs/2203.04955

此模块将在给定包含初始状态的 TensorDict 时执行 MPPI 规划步骤。

对模块的调用返回在给定规划范围内的经验上最大化回报的动作

参数:
  • env (EnvBase) – 要在其上执行规划步骤的环境(可以是 ModelBasedEnvEnvBase)。

  • planning_horizon (int) – 模拟轨迹的长度

  • optim_steps (int) – MPC 规划器使用的优化步骤数

  • num_candidates (int) – 从高斯分布中采样的候选数。

  • top_k (int) – 用于更新高斯分布的均值和标准差的顶级候选数。

  • reward_key (str, optional) – 用于检索奖励的 TensorDict 中的键。默认为“reward”。

  • action_key (str, optional) – 用于存储动作的 TensorDict 中的键。默认为“action”

示例

>>> from tensordict import TensorDict
>>> from torchrl.data import CompositeSpec, UnboundedContinuousTensorSpec
>>> from torchrl.envs.model_based import ModelBasedEnvBase
>>> from tensordict.nn import TensorDictModule
>>> from torchrl.modules import ValueOperator
>>> from torchrl.objectives.value import TDLambdaEstimator
>>> class MyMBEnv(ModelBasedEnvBase):
...     def __init__(self, world_model, device="cpu", dtype=None, batch_size=None):
...         super().__init__(world_model, device=device, dtype=dtype, batch_size=batch_size)
...         self.state_spec = CompositeSpec(
...             hidden_observation=UnboundedContinuousTensorSpec((4,))
...         )
...         self.observation_spec = CompositeSpec(
...             hidden_observation=UnboundedContinuousTensorSpec((4,))
...         )
...         self.action_spec = UnboundedContinuousTensorSpec((1,))
...         self.reward_spec = UnboundedContinuousTensorSpec((1,))
...
...     def _reset(self, tensordict: TensorDict) -> TensorDict:
...         tensordict = TensorDict(
...             {},
...             batch_size=self.batch_size,
...             device=self.device,
...         )
...         tensordict = tensordict.update(
...             self.full_state_spec.rand())
...         tensordict = tensordict.update(
...             self.full_action_spec.rand())
...         tensordict = tensordict.update(
...             self.full_observation_spec.rand())
...         return tensordict
...
>>> from torchrl.modules import MLP, WorldModelWrapper
>>> import torch.nn as nn
>>> world_model = WorldModelWrapper(
...     TensorDictModule(
...         MLP(out_features=4, activation_class=nn.ReLU, activate_last_layer=True, depth=0),
...         in_keys=["hidden_observation", "action"],
...         out_keys=["hidden_observation"],
...     ),
...     TensorDictModule(
...         nn.Linear(4, 1),
...         in_keys=["hidden_observation"],
...         out_keys=["reward"],
...     ),
... )
>>> env = MyMBEnv(world_model)
>>> value_net = nn.Linear(4, 1)
>>> value_net = ValueOperator(value_net, in_keys=["hidden_observation"])
>>> adv = TDLambdaEstimator(
...     gamma=0.99,
...     lmbda=0.95,
...     value_network=value_net,
... )
>>> # Build a planner and use it as actor
>>> planner = MPPIPlanner(
...     env,
...     adv,
...     temperature=1.0,
...     planning_horizon=10,
...     optim_steps=11,
...     num_candidates=7,
...     top_k=3)
>>> env.rollout(5, planner)
TensorDict(
    fields={
        action: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
        hidden_observation: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                hidden_observation: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False),
                reward: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.float32, is_shared=False),
                terminated: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
            batch_size=torch.Size([5]),
            device=cpu,
            is_shared=False),
        terminated: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batch_size=torch.Size([5]),
    device=cpu,
    is_shared=False)
planning(tensordict: TensorDictBase) Tensor[source]

执行 MPC 规划步骤。

参数:

td (TensorDict) – 要在其上执行规划步骤的 TensorDict。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源