BatchSizeTransform¶
- class torchrl.envs.transforms.BatchSizeTransform(*, batch_size: torch.Size | None = None, reshape_fn: Callable[[TensorDictBase], TensorDictBase] | None = None, reset_func: Callable[[TensorDictBase, TensorDictBase], TensorDictBase] | None =None, env_kwarg: bool =False)[source]¶
一个用于修改环境批大小的 transform。
此 transform 有两种不同的用法:它可以用于为非批锁定的(例如无状态的)环境设置批大小,以便使用数据收集器进行数据收集。它也可以用于修改环境的批大小(例如,squeeze、unsqueeze 或 reshape 操作)。
此 transform 修改环境批大小以匹配提供的批大小。它要求父环境的批大小可以扩展到提供的批大小。
- 关键字参数:
batch_size (torch.Size 或 等效类型, 可选) – 环境的新批大小。与
reshape_fn
互斥。reshape_fn (可调用对象, 可选) –
一个用于修改环境批大小的可调用对象。与
batch_size
互斥。注意
目前,支持涉及
reshape
、flatten
、unflatten
、squeeze
和unsqueeze
的转换。如果需要其他 reshape 操作,请在 TorchRL github 上提交功能请求。reset_func (可调用对象, 可选) – 一个生成 reset tensordict 的函数。签名必须匹配
Callable[[TensorDictBase, TensorDictBase], TensorDictBase]
,其中第一个输入参数是在调用reset()
时传递给环境的可选 tensordict,第二个参数是TransformedEnv.base_env.reset
的输出。如果env_kwarg=True
,它也可以支持一个可选的env
关键字参数。env_kwarg (布尔值, 可选) – 如果为
True
,则reset_func
必须支持一个env
关键字参数。默认为False
。传递的环境将是带有其 transform 的环境。
示例
>>> # Changing the batch-size with a function >>> from torchrl.envs import GymEnv >>> base_env = GymEnv("CartPole-v1") >>> env = TransformedEnv(base_env, BatchSizeTransform(reshape_fn=lambda data: data.reshape(1, 1))) >>> env.rollout(4) >>> # Setting the shape of a stateless environment >>> class MyEnv(EnvBase): ... batch_locked = False ... def __init__(self): ... super().__init__() ... self.observation_spec = Composite(observation=Unbounded(3)) ... self.reward_spec = Unbounded(1) ... self.action_spec = Unbounded(1) ... ... def _reset(self, tensordict: TensorDictBase, **kwargs) -> TensorDictBase: ... tensordict_batch_size = tensordict.batch_size if tensordict is not None else torch.Size([]) ... result = self.observation_spec.rand(tensordict_batch_size) ... result.update(self.full_done_spec.zero(tensordict_batch_size)) ... return result ... ... def _step( ... self, ... tensordict: TensorDictBase, ... ) -> TensorDictBase: ... result = self.observation_spec.rand(tensordict.batch_size) ... result.update(self.full_done_spec.zero(tensordict.batch_size)) ... result.update(self.full_reward_spec.zero(tensordict.batch_size)) ... return result ... ... def _set_seed(self, seed: Optional[int]): ... pass ... >>> env = TransformedEnv(MyEnv(), BatchSizeTransform([5])) >>> assert env.batch_size == torch.Size([5]) >>> assert env.rollout(10).shape == torch.Size([5, 10])
reset_func
可以创建一个具有所需批大小的 tensordict,从而实现细粒度的 reset 调用>>> def reset_func(tensordict, tensordict_reset, env): ... result = env.observation_spec.rand() ... result.update(env.full_done_spec.zero()) ... assert result.batch_size != torch.Size([]) ... return result >>> env = TransformedEnv(MyEnv(), BatchSizeTransform([5], reset_func=reset_func, env_kwarg=True)) >>> print(env.rollout(2)) TensorDict( fields={ action: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), reward: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False)
此 transform 可用于在数据收集器内部部署非批锁定的环境
>>> from torchrl.collectors import SyncDataCollector >>> collector = SyncDataCollector(env, lambda td: env.rand_action(td), frames_per_batch=10, total_frames=-1) >>> for data in collector: ... print(data) ... break TensorDict( fields={ action: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), collector: TensorDict( fields={ traj_ids: Tensor(shape=torch.Size([5, 2]), device=cpu, dtype=torch.int64, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False), done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), reward: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False), observation: Tensor(shape=torch.Size([5, 2, 3]), device=cpu, dtype=torch.float32, is_shared=False), terminated: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([5, 2]), device=None, is_shared=False) >>> collector.shutdown()
- forward(tensordict: TensorDictBase) TensorDictBase ¶
读取输入的 tensordict,并对选定的键应用 transform。
- transform_input_spec(input_spec: Composite) Composite [source]¶
转换输入 spec,使结果 spec 与 transform 映射匹配。
- 参数:
input_spec (TensorSpec) – transform 之前的 spec
- 返回:
transform 之后的预期 spec