注意
转到结尾 下载完整的示例代码。
TorchRL 简介¶
此演示在 ICML 2022 行业演示日上展示。
它很好地概述了 TorchRL 的功能。如果您有任何疑问或意见,请随时联系 vmoens@fb.com 或提交问题。
TorchRL 是一个用于 PyTorch 的开源强化学习 (RL) 库。
PyTorch 生态系统团队 (Meta) 已决定投资该库,以提供一个领先的平台来在研究环境中开发 RL 解决方案。
它提供 PyTorch 和以 Python 为先的、低级和高级抽象 # 用于 RL,旨在高效、有文档记录并经过适当测试。代码旨在支持 RL 方面的研究。大部分代码都是以高度模块化的方式用 Python 编写的,因此研究人员可以轻松地交换组件、转换它们或编写新的组件,而无需花费太多精力。
此存储库试图与现有的 PyTorch 生态系统库保持一致,因为它具有数据集支柱 (torchrl/envs)、转换、模型、数据实用程序(例如收集器和容器)等。TorchRL 旨在尽可能减少依赖项(Python 标准库、NumPy 和 PyTorch)。常见的环境库(例如 OpenAI Gym)仅是可选的。
与其他领域不同,RL 更关注算法而不是媒体。因此,很难创建真正独立的组件。
TorchRL 不是什么
算法集合:我们不打算提供 RL 算法的 SOTA 实现,但我们仅提供这些算法作为如何使用该库的示例。
研究框架:TorchRL 中的模块化有两种风格。首先,我们尝试构建可重用的组件,以便它们可以轻松地相互交换。其次,我们尽最大努力使组件能够独立于库的其余部分使用。
TorchRL 只有很少的核心依赖项,主要是 PyTorch 和 NumPy。所有其他依赖项(Gym、Torchvision、WandB/TensorBoard)都是可选的。
数据¶
TensorDict¶
import torch
from tensordict import TensorDict
让我们创建一个 TensorDict。
batch_size = 5
tensordict = TensorDict(
source={
"key 1": torch.zeros(batch_size, 3),
"key 2": torch.zeros(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
print(tensordict)
TensorDict(
fields={
key 1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key 2: Tensor(shape=torch.Size([5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([5]),
device=None,
is_shared=False)
您可以像查询键一样索引 TensorDict。
print(tensordict[2])
print(tensordict["key 1"] is tensordict.get("key 1"))
TensorDict(
fields={
key 1: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
key 2: Tensor(shape=torch.Size([5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
True
以下显示了如何堆叠多个 TensorDict。
tensordict1 = TensorDict(
source={
"key 1": torch.zeros(batch_size, 1),
"key 2": torch.zeros(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
tensordict2 = TensorDict(
source={
"key 1": torch.ones(batch_size, 1),
"key 2": torch.ones(batch_size, 5, 6, dtype=torch.bool),
},
batch_size=[batch_size],
)
tensordict = torch.stack([tensordict1, tensordict2], 0)
tensordict.batch_size, tensordict["key 1"]
(torch.Size([2, 5]), tensor([[[0.],
[0.],
[0.],
[0.],
[0.]],
[[1.],
[1.],
[1.],
[1.],
[1.]]]))
以下是 TensorDict 的一些其他功能。
print(
"view(-1): ",
tensordict.view(-1).batch_size,
tensordict.view(-1).get("key 1").shape,
)
print("to device: ", tensordict.to("cpu"))
# print("pin_memory: ", tensordict.pin_memory())
print("share memory: ", tensordict.share_memory_())
print(
"permute(1, 0): ",
tensordict.permute(1, 0).batch_size,
tensordict.permute(1, 0).get("key 1").shape,
)
print(
"expand: ",
tensordict.expand(3, *tensordict.batch_size).batch_size,
tensordict.expand(3, *tensordict.batch_size).get("key 1").shape,
)
view(-1): torch.Size([10]) torch.Size([10, 1])
to device: TensorDict(
fields={
key 1: Tensor(shape=torch.Size([2, 5, 1]), device=cpu, dtype=torch.float32, is_shared=False),
key 2: Tensor(shape=torch.Size([2, 5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 5]),
device=cpu,
is_shared=False)
share memory: TensorDict(
fields={
key 1: Tensor(shape=torch.Size([2, 5, 1]), device=cpu, dtype=torch.float32, is_shared=True),
key 2: Tensor(shape=torch.Size([2, 5, 5, 6]), device=cpu, dtype=torch.bool, is_shared=True)},
batch_size=torch.Size([2, 5]),
device=None,
is_shared=True)
permute(1, 0): torch.Size([5, 2]) torch.Size([5, 2, 1])
expand: torch.Size([3, 2, 5]) torch.Size([3, 2, 5, 1])
您也可以创建嵌套 TensorDict。
tensordict = TensorDict(
source={
"key 1": torch.zeros(batch_size, 3),
"key 2": TensorDict(
source={"sub-key 1": torch.zeros(batch_size, 2, 1)},
batch_size=[batch_size, 2],
),
},
batch_size=[batch_size],
)
tensordict
TensorDict(
fields={
key 1: Tensor(shape=torch.Size([5, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key 2: TensorDict(
fields={
sub-key 1: Tensor(shape=torch.Size([5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([5, 2]),
device=None,
is_shared=False)},
batch_size=torch.Size([5]),
device=None,
is_shared=False)
回放缓冲区¶
from torchrl.data import PrioritizedReplayBuffer, ReplayBuffer
rb = ReplayBuffer(collate_fn=lambda x: x)
rb.add(1)
rb.sample(1)
[1]
rb.extend([2, 3])
rb.sample(3)
[2, 1, 3]
rb = PrioritizedReplayBuffer(alpha=0.7, beta=1.1, collate_fn=lambda x: x)
rb.add(1)
rb.sample(1)
rb.update_priority(1, 0.5)
以下是使用带有张量字典的回放缓冲区的示例。
collate_fn = torch.stack
rb = ReplayBuffer(collate_fn=collate_fn)
rb.add(TensorDict({"a": torch.randn(3)}, batch_size=[]))
len(rb)
1
rb.extend(TensorDict({"a": torch.randn(2, 3)}, batch_size=[2]))
print(len(rb))
print(rb.sample(10))
print(rb.sample(2).contiguous())
3
TensorDict(
fields={
a: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
TensorDict(
fields={
a: Tensor(shape=torch.Size([2, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([2]),
device=None,
is_shared=False)
torch.manual_seed(0)
from torchrl.data import TensorDictPrioritizedReplayBuffer
rb = TensorDictPrioritizedReplayBuffer(alpha=0.7, beta=1.1, priority_key="td_error")
rb.extend(TensorDict({"a": torch.randn(2, 3)}, batch_size=[2]))
tensordict_sample = rb.sample(2).contiguous()
tensordict_sample
TensorDict(
fields={
_weight: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
a: Tensor(shape=torch.Size([2, 3]), device=cpu, dtype=torch.float32, is_shared=False),
index: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([2]),
device=None,
is_shared=False)
tensordict_sample["index"]
tensor([0, 0])
tensordict_sample["td_error"] = torch.rand(2)
rb.update_tensordict_priority(tensordict_sample)
for i, val in enumerate(rb._sampler._sum_tree):
print(i, val)
if i == len(rb):
break
try:
import gymnasium as gym
except ModuleNotFoundError:
import gym
0 0.28791671991348267
1 1.0
2 0.0
环境¶
from torchrl.envs.libs.gym import GymEnv, GymWrapper
gym_env = gym.make("Pendulum-v1")
env = GymWrapper(gym_env)
env = GymEnv("Pendulum-v1")
tensordict = env.reset()
env.rand_step(tensordict)
TensorDict(
fields={
action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
更改环境配置¶
env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env.reset()
TensorDict(
fields={
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
pixels: Tensor(shape=torch.Size([500, 500, 3]), device=cpu, dtype=torch.uint8, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
env.close()
del env
from torchrl.envs import (
Compose,
NoopResetEnv,
ObservationNorm,
ToTensorImage,
TransformedEnv,
)
base_env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env = TransformedEnv(base_env, Compose(NoopResetEnv(3), ToTensorImage()))
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
TransformedEnv(
env=GymEnv(env=Pendulum-v1, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
转换¶
from torchrl.envs import (
Compose,
NoopResetEnv,
ObservationNorm,
StepCounter,
ToTensorImage,
TransformedEnv,
)
base_env = GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False)
env = TransformedEnv(base_env, Compose(NoopResetEnv(3), ToTensorImage()))
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
TransformedEnv(
env=GymEnv(env=Pendulum-v1, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
env.reset()
TensorDict(
fields={
done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
pixels: Tensor(shape=torch.Size([3, 500, 500]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
print("env: ", env)
print("last transform parent: ", env.transform[2].parent)
env: TransformedEnv(
env=GymEnv(env=Pendulum-v1, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels']),
ObservationNorm(loc=2.0000, scale=1.0000, keys=['pixels'])))
last transform parent: TransformedEnv(
env=GymEnv(env=Pendulum-v1, batch_size=torch.Size([]), device=None),
transform=Compose(
NoopResetEnv(noops=3, random=True),
ToTensorImage(keys=['pixels'])))
矢量化环境¶
from torchrl.envs import ParallelEnv
base_env = ParallelEnv(
4,
lambda: GymEnv("Pendulum-v1", frame_skip=3, from_pixels=True, pixels_only=False),
mp_start_method="fork", # This will break on Windows machines! Remove and decorate with if __name__ == "__main__"
)
env = TransformedEnv(
base_env, Compose(StepCounter(), ToTensorImage())
) # applies transforms on batch of envs
env.append_transform(ObservationNorm(in_keys=["pixels"], loc=2, scale=1))
env.reset()
TensorDict(
fields={
done: Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([4, 3]), device=cpu, dtype=torch.float32, is_shared=False),
pixels: Tensor(shape=torch.Size([4, 3, 500, 500]), device=cpu, dtype=torch.float32, is_shared=False),
step_count: Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.int64, is_shared=False),
terminated: Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([4]),
device=None,
is_shared=False)
print(env.action_spec)
env.close()
del env
BoundedTensorSpec(
shape=torch.Size([4, 1]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([4, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous)
模块¶
模型¶
MLP 模型示例
from torch import nn
from torchrl.modules import ConvNet, MLP
from torchrl.modules.models.utils import SquashDims
net = MLP(num_cells=[32, 64], out_features=4, activation_class=nn.ELU)
print(net)
print(net(torch.randn(10, 3)).shape)
MLP(
(0): LazyLinear(in_features=0, out_features=32, bias=True)
(1): ELU(alpha=1.0)
(2): Linear(in_features=32, out_features=64, bias=True)
(3): ELU(alpha=1.0)
(4): Linear(in_features=64, out_features=4, bias=True)
)
torch.Size([10, 4])
CNN 模型示例
cnn = ConvNet(
num_cells=[32, 64],
kernel_sizes=[8, 4],
strides=[2, 1],
aggregator_class=SquashDims,
)
print(cnn)
print(cnn(torch.randn(10, 3, 32, 32)).shape) # last tensor is squashed
ConvNet(
(0): LazyConv2d(0, 32, kernel_size=(8, 8), stride=(2, 2))
(1): ELU(alpha=1.0)
(2): Conv2d(32, 64, kernel_size=(4, 4), stride=(1, 1))
(3): ELU(alpha=1.0)
(4): SquashDims()
)
torch.Size([10, 6400])
TensorDictModules¶
from tensordict.nn import TensorDictModule
tensordict = TensorDict({"key 1": torch.randn(10, 3)}, batch_size=[10])
module = nn.Linear(3, 4)
td_module = TensorDictModule(module, in_keys=["key 1"], out_keys=["key 2"])
td_module(tensordict)
print(tensordict)
TensorDict(
fields={
key 1: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
key 2: Tensor(shape=torch.Size([10, 4]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=None,
is_shared=False)
模块序列¶
from tensordict.nn import TensorDictSequential
backbone_module = nn.Linear(5, 3)
backbone = TensorDictModule(
backbone_module, in_keys=["observation"], out_keys=["hidden"]
)
actor_module = nn.Linear(3, 4)
actor = TensorDictModule(actor_module, in_keys=["hidden"], out_keys=["action"])
value_module = MLP(out_features=1, num_cells=[4, 5])
value = TensorDictModule(value_module, in_keys=["hidden", "action"], out_keys=["value"])
sequence = TensorDictSequential(backbone, actor, value)
print(sequence)
TensorDictSequential(
module=ModuleList(
(0): TensorDictModule(
module=Linear(in_features=5, out_features=3, bias=True),
device=cpu,
in_keys=['observation'],
out_keys=['hidden'])
(1): TensorDictModule(
module=Linear(in_features=3, out_features=4, bias=True),
device=cpu,
in_keys=['hidden'],
out_keys=['action'])
(2): TensorDictModule(
module=MLP(
(0): LazyLinear(in_features=0, out_features=4, bias=True)
(1): Tanh()
(2): Linear(in_features=4, out_features=5, bias=True)
(3): Tanh()
(4): Linear(in_features=5, out_features=1, bias=True)
),
device=cpu,
in_keys=['hidden', 'action'],
out_keys=['value'])
),
device=cpu,
in_keys=['observation'],
out_keys=['hidden', 'action', 'value'])
print(sequence.in_keys, sequence.out_keys)
['observation'] ['hidden', 'action', 'value']
tensordict = TensorDict(
{"observation": torch.randn(3, 5)},
[3],
)
backbone(tensordict)
actor(tensordict)
value(tensordict)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
tensordict = TensorDict(
{"observation": torch.randn(3, 5)},
[3],
)
sequence(tensordict)
print(tensordict)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
函数式编程(集成/元 RL)¶
from tensordict import TensorDict
params = TensorDict.from_module(sequence)
print("extracted params", params)
extracted params TensorDict(
fields={
module: TensorDict(
fields={
0: TensorDict(
fields={
module: TensorDict(
fields={
bias: Parameter(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
1: TensorDict(
fields={
module: TensorDict(
fields={
bias: Parameter(shape=torch.Size([4]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([4, 3]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
2: TensorDict(
fields={
module: TensorDict(
fields={
0: TensorDict(
fields={
bias: Parameter(shape=torch.Size([4]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([4, 7]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
2: TensorDict(
fields={
bias: Parameter(shape=torch.Size([5]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([5, 4]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False),
4: TensorDict(
fields={
bias: Parameter(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
weight: Parameter(shape=torch.Size([1, 5]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
使用张量字典进行函数调用
with params.to_module(sequence):
sequence(tensordict)
使用矢量化映射进行模型集成
TensorDict(
fields={
action: Tensor(shape=torch.Size([4, 3, 4]), device=cpu, dtype=torch.float32, is_shared=False),
hidden: Tensor(shape=torch.Size([4, 3, 3]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([4, 3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
value: Tensor(shape=torch.Size([4, 3, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([4, 3]),
device=None,
is_shared=False)
专门类¶
torch.manual_seed(0)
from torchrl.data import BoundedTensorSpec
from torchrl.modules import SafeModule
spec = BoundedTensorSpec(-torch.ones(3), torch.ones(3))
base_module = nn.Linear(5, 3)
module = SafeModule(
module=base_module, spec=spec, in_keys=["obs"], out_keys=["action"], safe=True
)
tensordict = TensorDict({"obs": torch.randn(5)}, batch_size=[])
module(tensordict)["action"]
tensor([-0.0137, 0.1524, -0.0641], grad_fn=<ViewBackward0>)
tensordict = TensorDict({"obs": torch.randn(5) * 100}, batch_size=[])
module(tensordict)["action"] # safe=True projects the result within the set
tensor([-1., 1., -1.], grad_fn=<AsStridedBackward0>)
from torchrl.modules import Actor
base_module = nn.Linear(5, 3)
actor = Actor(base_module, in_keys=["obs"])
tensordict = TensorDict({"obs": torch.randn(5)}, batch_size=[])
actor(tensordict) # action is the default value
from tensordict.nn import (
ProbabilisticTensorDictModule,
ProbabilisticTensorDictSequential,
)
# Probabilistic modules
from torchrl.modules import NormalParamExtractor, TanhNormal
td = TensorDict({"input": torch.randn(3, 5)}, [3])
net = nn.Sequential(
nn.Linear(5, 4), NormalParamExtractor()
) # splits the output in loc and scale
module = TensorDictModule(net, in_keys=["input"], out_keys=["loc", "scale"])
td_module = ProbabilisticTensorDictSequential(
module,
ProbabilisticTensorDictModule(
in_keys=["loc", "scale"],
out_keys=["action"],
distribution_class=TanhNormal,
return_log_prob=False,
),
)
td_module(td)
print(td)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
input: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
# returning the log-probability
td = TensorDict({"input": torch.randn(3, 5)}, [3])
td_module = ProbabilisticTensorDictSequential(
module,
ProbabilisticTensorDictModule(
in_keys=["loc", "scale"],
out_keys=["action"],
distribution_class=TanhNormal,
return_log_prob=True,
),
)
td_module(td)
print(td)
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
input: Tensor(shape=torch.Size([3, 5]), device=cpu, dtype=torch.float32, is_shared=False),
loc: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
sample_log_prob: Tensor(shape=torch.Size([3]), device=cpu, dtype=torch.float32, is_shared=False),
scale: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([3]),
device=None,
is_shared=False)
# Sampling vs mode / mean
from torchrl.envs.utils import ExplorationType, set_exploration_type
td = TensorDict({"input": torch.randn(3, 5)}, [3])
torch.manual_seed(0)
with set_exploration_type(ExplorationType.RANDOM):
td_module(td)
print("random:", td["action"])
with set_exploration_type(ExplorationType.MODE):
td_module(td)
print("mode:", td["action"])
with set_exploration_type(ExplorationType.MODE):
td_module(td)
print("mean:", td["action"])
random: tensor([[ 0.8728, -0.1334],
[-0.9833, 0.3494],
[-0.6887, -0.6402]], grad_fn=<_SafeTanhBackward>)
mode: tensor([[-0.1132, 0.1762],
[-0.3430, -0.2668],
[ 0.2918, 0.6239]], grad_fn=<_SafeTanhBackward>)
mean: tensor([[-0.1132, 0.1762],
[-0.3430, -0.2668],
[ 0.2918, 0.6239]], grad_fn=<_SafeTanhBackward>)
使用环境和模块¶
from torchrl.envs.utils import step_mdp
env = GymEnv("Pendulum-v1")
action_spec = env.action_spec
actor_module = nn.Linear(3, 1)
actor = SafeModule(
actor_module, spec=action_spec, in_keys=["observation"], out_keys=["action"]
)
torch.manual_seed(0)
env.set_seed(0)
max_steps = 100
tensordict = env.reset()
tensordicts = TensorDict({}, [max_steps])
for i in range(max_steps):
actor(tensordict)
tensordicts[i] = env.step(tensordict)
if tensordict["done"].any():
break
tensordict = step_mdp(tensordict) # roughly equivalent to obs = next_obs
tensordicts_prealloc = tensordicts.clone()
print("total steps:", i)
print(tensordicts)
total steps: 99
TensorDict(
fields={
action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False)
# equivalent
torch.manual_seed(0)
env.set_seed(0)
max_steps = 100
tensordict = env.reset()
tensordicts = []
for _ in range(max_steps):
actor(tensordict)
tensordicts.append(env.step(tensordict))
if tensordict["done"].any():
break
tensordict = step_mdp(tensordict) # roughly equivalent to obs = next_obs
tensordicts_stack = torch.stack(tensordicts, 0)
print("total steps:", i)
print(tensordicts_stack)
total steps: 99
TensorDict(
fields={
action: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([100, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([100]),
device=None,
is_shared=False)
(tensordicts_stack == tensordicts_prealloc).all()
True
torch.manual_seed(0)
env.set_seed(0)
tensordict_rollout = env.rollout(policy=actor, max_steps=max_steps)
tensordict_rollout
(tensordict_rollout == tensordicts_prealloc).all()
from tensordict.nn import TensorDictModule
收集器¶
from torchrl.collectors import MultiaSyncDataCollector, MultiSyncDataCollector
from torchrl.envs import EnvCreator, SerialEnv
from torchrl.envs.libs.gym import GymEnv
EnvCreator 确保我们可以从一个进程发送 lambda 函数到另一个进程。出于简单起见,我们使用 SerialEnv,但对于较大的作业,ParallelEnv 更适合。
parallel_env = SerialEnv(
3,
EnvCreator(lambda: GymEnv("Pendulum-v1")),
)
create_env_fn = [parallel_env, parallel_env]
actor_module = nn.Linear(3, 1)
actor = TensorDictModule(actor_module, in_keys=["observation"], out_keys=["action"])
同步数据收集器
devices = ["cpu", "cpu"]
collector = MultiSyncDataCollector(
create_env_fn=create_env_fn, # either a list of functions or a ParallelEnv
policy=actor,
total_frames=240,
max_frames_per_traj=-1, # envs are terminating, we don't need to stop them early
frames_per_batch=60, # we want 60 frames at a time (we have 3 envs per sub-collector)
device=devices,
)
for i, d in enumerate(collector):
if i == 0:
print(d) # trajectories are split automatically in [6 workers x 10 steps]
collector.update_policy_weights_() # make sure that our policies have the latest weights if working on multiple devices
print(i)
collector.shutdown()
del collector
TensorDict(
fields={
action: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([2, 3, 10]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([2, 3, 10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([2, 3, 10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([2, 3, 10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([2, 3, 10]),
device=cpu,
is_shared=False)
3
# async data collector: keeps working while you update your model
collector = MultiaSyncDataCollector(
create_env_fn=create_env_fn, # either a list of functions or a ParallelEnv
policy=actor,
total_frames=240,
max_frames_per_traj=-1, # envs are terminating, we don't need to stop them early
frames_per_batch=60, # we want 60 frames at a time (we have 3 envs per sub-collector)
device=devices,
)
for i, d in enumerate(collector):
if i == 0:
print(d) # trajectories are split automatically in [6 workers x 10 steps]
collector.update_policy_weights_() # make sure that our policies have the latest weights if working on multiple devices
print(i)
collector.shutdown()
del collector
del create_env_fn
del parallel_env
TensorDict(
fields={
action: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.float32, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([3, 20]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([3, 20, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([3, 20, 3]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([3, 20, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([3, 20]),
device=cpu,
is_shared=False)
3
目标¶
# TorchRL delivers meta-RL compatible loss functions
# Disclaimer: This APi may change in the future
from torchrl.objectives import DDPGLoss
actor_module = nn.Linear(3, 1)
actor = TensorDictModule(actor_module, in_keys=["observation"], out_keys=["action"])
class ConcatModule(nn.Linear):
def forward(self, obs, action):
return super().forward(torch.cat([obs, action], -1))
value_module = ConcatModule(4, 1)
value = TensorDictModule(
value_module, in_keys=["observation", "action"], out_keys=["state_action_value"]
)
loss_fn = DDPGLoss(actor, value)
loss_fn.make_value_estimator(loss_fn.default_value_estimator, gamma=0.99)
tensordict = TensorDict(
{
"observation": torch.randn(10, 3),
"next": {
"observation": torch.randn(10, 3),
"reward": torch.randn(10, 1),
"done": torch.zeros(10, 1, dtype=torch.bool),
},
"action": torch.randn(10, 1),
},
batch_size=[10],
device="cpu",
)
loss_td = loss_fn(tensordict)
print(loss_td)
TensorDict(
fields={
loss_actor: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
loss_value: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
pred_value: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False),
pred_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
target_value: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False),
target_value_max: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
td_error: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([]),
device=None,
is_shared=False)
print(tensordict)
TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([10, 3]), device=cpu, dtype=torch.float32, is_shared=False),
td_error: Tensor(shape=torch.Size([10]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
库的状态¶
TorchRL 目前处于 alpha 版本:可能存在错误,并且不保证不会出现重大更改。我们应该能够在今年年底之前发布 beta 版本。我们实现这一目标的路线图包括
分布式解决方案
离线 RL
对元 RL 的更大支持
多任务和分层 RL
贡献¶
我们正在积极寻找贡献者和早期用户。如果您正在从事 RL 工作(或只是好奇),请尝试一下!给我们反馈:TorchRL 的成功取决于它对研究人员需求的满足程度。为此,我们需要他们的意见!由于该库尚处于起步阶段,因此这是一个塑造您想要的方式的好时机!
安装库¶
该库位于 PyPI 上:pip install torchrl
脚本的总运行时间:(3 分 43.926 秒)
估计内存使用量:324 MB