CEMPlanner¶
- class torchrl.modules.CEMPlanner(*args, **kwargs)[source]¶
CEMPlanner 模块。
参考:The cross-entropy method for optimization, Botev et al. 2013 (交叉熵优化方法,Botev 等人,2013)
当给定包含初始状态的 TensorDict 时,此模块将执行 CEM 规划步骤。 CEM 规划步骤通过从均值为零、单位方差的高斯分布中采样动作来执行。然后使用采样的动作在环境中执行 rollout。然后对 rollout 获得的累积奖励进行排名。我们选择前 k 个 episode,并使用它们的动作来更新动作分布的均值和标准差。 CEM 规划步骤重复指定的步骤数。
调用该模块会返回根据规划范围经验性地最大化回报的动作
- 参数:
env (EnvBase) – 要在其上执行规划步骤的环境(可以是 ModelBasedEnv 或
EnvBase
)。planning_horizon (int) – 模拟轨迹的长度
optim_steps (int) – MPC 规划器使用的优化步骤数
num_candidates (int) – 从高斯分布中采样的候选数量。
top_k (int) – 用于更新高斯分布的均值和标准差的顶部候选数量。
reward_key (str, 可选) – TensorDict 中用于检索奖励的键。默认为 “reward”。
action_key (str, 可选) – TensorDict 中用于存储动作的键。默认为 “action”
示例
>>> from tensordict import TensorDict >>> from torchrl.data import Composite, Unbounded >>> from torchrl.envs.model_based import ModelBasedEnvBase >>> from torchrl.modules import SafeModule >>> class MyMBEnv(ModelBasedEnvBase): ... def __init__(self, world_model, device="cpu", dtype=None, batch_size=None): ... super().__init__(world_model, device=device, dtype=dtype, batch_size=batch_size) ... self.state_spec = Composite( ... hidden_observation=Unbounded((4,)) ... ) ... self.observation_spec = Composite( ... hidden_observation=Unbounded((4,)) ... ) ... self.action_spec = Unbounded((1,)) ... self.reward_spec = Unbounded((1,)) ... ... def _reset(self, tensordict: TensorDict) -> TensorDict: ... tensordict = TensorDict( ... {}, ... batch_size=self.batch_size, ... device=self.device, ... ) ... tensordict = tensordict.update( ... self.full_state_spec.rand()) ... tensordict = tensordict.update( ... self.full_action_spec.rand()) ... tensordict = tensordict.update( ... self.full_observation_spec.rand()) ... return tensordict ... >>> from torchrl.modules import MLP, WorldModelWrapper >>> import torch.nn as nn >>> world_model = WorldModelWrapper( ... SafeModule( ... MLP(out_features=4, activation_class=nn.ReLU, activate_last_layer=True, depth=0), ... in_keys=["hidden_observation", "action"], ... out_keys=["hidden_observation"], ... ), ... SafeModule( ... nn.Linear(4, 1), ... in_keys=["hidden_observation"], ... out_keys=["reward"], ... ), ... ) >>> env = MyMBEnv(world_model) >>> # Build a planner and use it as actor >>> planner = CEMPlanner(env, 10, 11, 7, 3) >>> env.rollout(5, planner) TensorDict( fields={ action: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.float32, is_shared=False), done: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False), hidden_observation: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False), hidden_observation: Tensor(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False), reward: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5]), device=cpu, is_shared=False), terminated: Tensor(shape=torch.Size([5, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5]), device=cpu, is_shared=False)