注意
转到末尾 下载完整的示例代码。
基于TorchRL的竞争性多智能体强化学习 (DDPG) 教程¶
**作者**:Matteo Bettini
另请参阅
该 BenchMARL 库提供了使用 TorchRL 的最先进的 MARL 算法实现。
本教程演示了如何使用 PyTorch 和 TorchRL 来解决竞争性多智能体强化学习 (MARL) 问题。
为了方便使用,本教程将遵循现有的 基于TorchRL的多智能体强化学习 (PPO) 教程 的总体结构。
在本教程中,我们将使用 MADDPG 论文 中的 simple_tag 环境。此环境是称为 多智能体粒子环境 (MPE) 的一组环境的一部分,该环境是在论文中引入的。
目前有多个模拟器提供 MPE 环境。在本教程中,我们将展示如何使用 TorchRL 训练此环境,可以使用以下两种方法:
PettingZoo,在环境的传统 CPU 版本中;
VMAS,它在 PyTorch 中提供了矢量化实现,能够在 GPU 上模拟多个环境以加快计算速度。
关键学习点
如何在 TorchRL 中使用竞争性多智能体环境,它们的技术规格以及它们如何与库集成;
如何在 TorchRL 中使用并行 PettingZoo 和 VMAS 环境以及多个智能体组;
如何在 TorchRL 中创建不同的多智能体网络架构(例如,使用参数共享、集中式评论家)
我们如何使用
TensorDict
来承载多智能体多组数据;我们如何在离线策略多智能体 MADDPG/IDDPG 训练循环中将所有库组件(收集器、模块、回放缓冲区和损失函数)联系起来。
如果您在 Google Colab 中运行此代码,请确保安装以下依赖项
!pip3 install torchrl
!pip3 install vmas
!pip3 install pettingzoo[mpe]==1.24.3
!pip3 install tqdm
深度确定性策略梯度 (DDPG) 是一种离线策略演员-评论家算法,其中确定性策略使用来自评论家网络的梯度进行优化。有关更多信息,请参阅 深度确定性策略梯度 论文。这种算法通常使用离线策略进行训练。有关离线策略学习的更多信息,请参阅 Sutton, Richard S., and Andrew G. Barto. 强化学习:导论。麻省理工学院出版社,2018。
这种方法已扩展到 混合协作竞争环境中的多智能体演员-评论家 中的多智能体学习,该论文介绍了多智能体 DDPG (MADDPG) 算法。在多智能体设置中,情况略有不同。我们现在有多个策略 \(\mathbf{\pi}\),每个智能体一个。策略通常是局部的和分散的。这意味着单个智能体的策略将仅根据其观察结果为该智能体输出一个动作。在 MARL 文学中,这被称为**分散执行**。另一方面,对于评论家存在不同的公式,主要有
在 MADDPG 中,评论家是集中的,并以系统的全局状态和全局动作作为输入。全局状态可以是全局观察结果,也可以是智能体观察结果的简单连接。全局动作是智能体动作的连接。MADDPG 可用于执行**集中式训练**的上下文中,因为它需要访问全局信息。
在 IDDPG 中,评论家仅以一个智能体的观察结果和动作作为输入。这允许**分散式训练**,因为评论家和策略只需要本地信息来计算其输出。
集中式评论家有助于克服多个智能体同时学习的非平稳性,但另一方面,它们可能会受到其大型输入空间的影响。在本教程中,我们将能够训练这两种公式,并且我们将讨论参数共享(跨智能体共享网络参数的做法)如何影响每种公式。
本教程的结构如下
最初,我们将建立一组要使用的超参数。
随后,我们将构建一个多智能体环境,利用 TorchRL 的包装器来使用 PettingZoo 或 VMAS。
然后,我们将制定策略和评论家网络,讨论各种选择对参数共享和评论家集中化的影响。
之后,我们将创建采样收集器和回放缓冲区。
最后,我们将执行我们的训练循环并检查结果。
如果您在 Colab 或具有 GUI 的机器上运行此代码,您还可以在训练过程之前和之后渲染和可视化您自己的训练策略。
导入我们的依赖项
import copy
import tempfile
import torch
from matplotlib import pyplot as plt
from tensordict import TensorDictBase
from tensordict.nn import TensorDictModule, TensorDictSequential
from torch import multiprocessing
from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyMemmapStorage, RandomSampler, ReplayBuffer
from torchrl.envs import (
check_env_specs,
ExplorationType,
PettingZooEnv,
RewardSum,
set_exploration_type,
TransformedEnv,
VmasEnv,
)
from torchrl.modules import (
AdditiveGaussianModule,
MultiAgentMLP,
ProbabilisticActor,
TanhDelta,
)
from torchrl.objectives import DDPGLoss, SoftUpdate, ValueEstimators
from torchrl.record import CSVLogger, PixelRenderTransform, VideoRecorder
from tqdm import tqdm
# Check if we're building the doc, in which case disable video rendering
try:
is_sphinx = __sphinx_build__
except NameError:
is_sphinx = False
定义超参数¶
我们为本教程设置了超参数。根据可用资源,可以选择在 GPU 或其他设备上执行策略和模拟器。您可以调整其中一些值以调整计算需求。
# Seed
seed = 0
torch.manual_seed(seed)
# Devices
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
# Sampling
frames_per_batch = 1_000 # Number of team frames collected per sampling iteration
n_iters = 10 # Number of sampling and training iterations
total_frames = frames_per_batch * n_iters
# We will stop training the evaders after this many iterations,
# should be 0 <= iteration_when_stop_training_evaders <= n_iters
iteration_when_stop_training_evaders = n_iters // 2
# Replay buffer
memory_size = 1_000_000 # The replay buffer of each group can store this many frames
# Training
n_optimiser_steps = 100 # Number of optimisation steps per training iteration
train_batch_size = 128 # Number of frames trained in each optimiser step
lr = 3e-4 # Learning rate
max_grad_norm = 1.0 # Maximum norm for the gradients
# DDPG
gamma = 0.99 # Discount factor
polyak_tau = 0.005 # Tau for the soft-update of the target network
环境¶
多智能体环境模拟多个智能体与世界交互。TorchRL API 允许集成各种类型多智能体环境风格。在本教程中,我们将重点介绍多个智能体组并行交互的环境。也就是说:在每个步骤中,所有智能体都将同步获取观察结果并采取行动。
此外,TorchRL MARL API 允许将智能体分成组。每组将成为 tensordict 中的单独条目。组内智能体的数据会堆叠在一起。因此,通过选择如何对智能体进行分组,您可以决定哪些数据会被堆叠/保留为单独的条目。分组策略可以在 VMAS 和 PettingZoo 等环境的构造时指定。有关分组的更多信息,请参阅 MarlGroupMapType
。
在simple_tag环境中,有两个代理团队:追捕者(或“对手”)(红色圆圈)和躲避者(或“代理”)(绿色圆圈)。追捕者触碰躲避者会获得奖励(+10)。一旦发生接触,追捕者团队会集体获得奖励,被触碰的躲避者则会受到相同值的惩罚(-10)。躲避者的速度和加速度都高于追捕者。环境中还有障碍物(黑色圆圈)。代理和障碍物根据均匀随机分布生成。代理在一个具有阻力和弹性碰撞的二维连续世界中行动。它们的行动是决定其加速度的二维连续力。每个代理观察其位置、速度、与所有其他代理和障碍物的相对位置,以及躲避者的速度。
PettingZoo 和 VMAS 版本在奖励函数上略有不同,因为 PettingZoo 会惩罚躲避者越界,而 VMAS 会通过物理方式阻止它。这就是为什么你会观察到在 VMAS 中,两个团队的奖励相同,只是符号相反,而在 PettingZoo 中,躲避者的奖励会更低。
现在我们将实例化环境。在本教程中,我们将把 episodes 限制为max_steps
,之后将设置 terminated 标志。此功能已在 PettingZoo 和 VMAS 模拟器中提供,但 TorchRL 的StepCounter
变换可以作为替代使用。
max_steps = 100 # Environment steps before done
n_chasers = 2
n_evaders = 1
n_obstacles = 2
use_vmas = True # Set this to True for a great performance speedup
if not use_vmas:
base_env = PettingZooEnv(
task="simple_tag_v3",
parallel=True, # Use the Parallel version
seed=seed,
# Scenario specific
continuous_actions=True,
num_good=n_evaders,
num_adversaries=n_chasers,
num_obstacles=n_obstacles,
max_cycles=max_steps,
)
else:
num_vmas_envs = (
frames_per_batch // max_steps
) # Number of vectorized environments. frames_per_batch collection will be divided among these environments
base_env = VmasEnv(
scenario="simple_tag",
num_envs=num_vmas_envs,
continuous_actions=True,
max_steps=max_steps,
device=device,
seed=seed,
# Scenario specific
num_good_agents=n_evaders,
num_adversaries=n_chasers,
num_landmarks=n_obstacles,
)
组映射¶
PettingZoo 和 VMAS 环境使用 TorchRL MARL 分组 API。我们可以访问组映射,将每个组映射到其中的代理,如下所示
print(f"group_map: {base_env.group_map}")
group_map: {'adversary': ['adversary_0', 'adversary_1'], 'agent': ['agent_0']}
我们可以看到它包含 2 个组:“agents”(躲避者)和“adversaries”(追捕者)。
环境不仅由其模拟器和变换定义,还由一系列描述其执行过程中可以预期内容的元数据定义。出于效率考虑,TorchRL 在环境规范方面非常严格,但您可以轻松检查您的环境规范是否充分。在我们的示例中,模拟器包装器负责为您的 base_env 设置正确的规范,因此您无需担心这一点。
有四个规范需要查看
action_spec
定义动作空间;reward_spec
定义奖励域;done_spec
定义 done 域;observation_spec
定义环境步骤的所有其他输出的域;
print("action_spec:", base_env.full_action_spec)
print("reward_spec:", base_env.full_reward_spec)
print("done_spec:", base_env.full_done_spec)
print("observation_spec:", base_env.observation_spec)
action_spec: CompositeSpec(
adversary: CompositeSpec(
action: BoundedTensorSpec(
shape=torch.Size([10, 2, 2]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 2])),
agent: CompositeSpec(
action: BoundedTensorSpec(
shape=torch.Size([10, 1, 2]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 1])),
device=cpu,
shape=torch.Size([10]))
reward_spec: CompositeSpec(
adversary: CompositeSpec(
reward: UnboundedContinuousTensorSpec(
shape=torch.Size([10, 2, 1]),
space=None,
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 2])),
agent: CompositeSpec(
reward: UnboundedContinuousTensorSpec(
shape=torch.Size([10, 1, 1]),
space=None,
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 1])),
device=cpu,
shape=torch.Size([10]))
done_spec: CompositeSpec(
done: DiscreteTensorSpec(
shape=torch.Size([10, 1]),
space=DiscreteBox(n=2),
device=cpu,
dtype=torch.bool,
domain=discrete),
terminated: DiscreteTensorSpec(
shape=torch.Size([10, 1]),
space=DiscreteBox(n=2),
device=cpu,
dtype=torch.bool,
domain=discrete),
device=cpu,
shape=torch.Size([10]))
observation_spec: CompositeSpec(
adversary: CompositeSpec(
observation: UnboundedContinuousTensorSpec(
shape=torch.Size([10, 2, 14]),
space=None,
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 2])),
agent: CompositeSpec(
observation: UnboundedContinuousTensorSpec(
shape=torch.Size([10, 1, 12]),
space=None,
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 1])),
device=cpu,
shape=torch.Size([10]))
使用刚刚显示的命令,我们可以访问每个值的域。
我们可以看到所有规范都以字典的形式构建,根节点始终包含组名称。此结构将在进出环境的所有 tensordict 数据中遵循。此外,每个组的规范具有前导形状(n_agents_in_that_group)
(代理为 1,对手为 2),这意味着该组的张量数据将始终具有该前导形状(组内的代理将数据堆叠起来)。
查看done_spec
,我们可以看到有一些键位于代理组之外("done", "terminated", "truncated"
),它们没有前导多代理维度。这些键由所有代理共享,并表示用于重置的环境全局 done 状态。默认情况下,就像本例一样,当任何代理完成时,并行 PettingZoo 环境就会完成,但可以通过在 PettingZoo 环境构建时设置done_on_any
来覆盖此行为。
为了快速访问 tensordicts 中这些值的每个键,我们可以简单地向环境请求相应的键,我们就会立即了解哪些是每个代理的,哪些是共享的。此信息对于告诉所有其他 TorchRL 组件在何处查找每个值非常有用
print("action_keys:", base_env.action_keys)
print("reward_keys:", base_env.reward_keys)
print("done_keys:", base_env.done_keys)
action_keys: [('adversary', 'action'), ('agent', 'action')]
reward_keys: [('adversary', 'reward'), ('agent', 'reward')]
done_keys: ['done', 'terminated']
变换¶
我们可以将任何需要的 TorchRL 变换附加到我们的环境中。这些将以某种所需的方式修改其输入/输出。我们强调,在多代理上下文中,显式提供要修改的键至关重要。
例如,在本例中,我们将实例化一个RewardSum
变换,它将在整个 episode 中对奖励求和。我们将告诉此变换在何处查找每个奖励键的重置键。从本质上讲,我们只是说每个组的 episode 奖励应该在设置"_reset"
tensordict 键时重置,这意味着调用了env.reset()
。转换后的环境将继承包装环境的设备和元数据,并根据其包含的变换序列转换这些数据。
env = TransformedEnv(
base_env,
RewardSum(
in_keys=base_env.reward_keys,
reset_keys=["_reset"] * len(base_env.group_map.keys()),
),
)
the check_env_specs()
函数运行一个小 rollout 并将其输出与环境规范进行比较。如果未引发错误,我们可以确信规范已正确定义
check_env_specs(env)
Rollout¶
为了好玩,让我们看看简单的随机 rollout 是什么样子。您可以调用env.rollout(n_steps) 并概述环境输入和输出的外观。动作将自动从动作规范域中随机抽取。
n_rollout_steps = 5
rollout = env.rollout(n_rollout_steps)
print(f"rollout of {n_rollout_steps} steps:", rollout)
print("Shape of the rollout TensorDict:", rollout.batch_size)
rollout of 5 steps: TensorDict(
fields={
adversary: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 5, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 5, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
adversary: TensorDict(
fields={
episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10, 5]),
device=cpu,
is_shared=False),
terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10, 5]),
device=cpu,
is_shared=False)
Shape of the rollout TensorDict: torch.Size([10, 5])
我们可以看到我们的 rollout 具有batch_size
为(n_rollout_steps)
。这意味着其中的所有张量都将具有此前导维度。
更深入地了解,我们可以看到输出 tensordict 可以如下划分
在根节点(通过运行
rollout.exclude("next")
访问)中,我们将找到在第一次时间步调用重置后可用的所有键。我们可以通过索引n_rollout_steps
维度来查看它们在 rollout 步中的演变。在这些键中,我们将找到在rollout[group_name]
tensordicts 中每个代理不同的键,这些键的批大小为(n_rollout_steps, n_agents_in_group)
,表示它正在存储额外的代理维度。组 tensordicts 之外的键将是共享键。在 next(通过运行
rollout.get("next")
访问)中。我们将找到与根节点相同的结构,但有一些细微的差异,如下所示。
在 TorchRL 中,约定是 done 和 observations 将同时出现在根节点和 next 中(因为这些在重置时和步骤之后都可用)。Action 仅在根节点中可用(因为步骤不会产生 action),而 reward 仅在 next 中可用(因为重置时没有 reward)。此结构遵循**强化学习:导论(萨顿和巴托)**中的结构,其中根节点表示时间\(t\) 的数据,next 表示时间\(t+1\) 的数据的世界步骤。
渲染随机 rollout¶
如果您使用的是 Google Colab 或具有 OpenGL 和 GUI 的机器,您实际上可以渲染随机 rollout。这将让您了解随机策略在此任务中将取得什么成就,以便将其与您自己训练的策略进行比较!
要渲染 rollout,请按照本教程末尾“渲染”部分中的说明进行操作,只需从env.rollout()
中删除行policy=agents_exploration_policy
即可。
策略¶
DDPG 使用确定性策略。这意味着我们的神经网络将输出要采取的动作。由于动作是连续的,因此我们使用 Tanh-Delta 分布来尊重动作空间边界。此类唯一做的事情是应用 Tanh 变换以确保动作在域边界内。
我们需要做出的另一个重要决定是,我们是否希望团队中的代理**共享策略参数**。一方面,共享参数意味着它们将共享相同的策略,这将使它们能够从彼此的经验中获益。这还将导致更快的训练。另一方面,它将使它们的行为同质化,因为它们实际上将共享相同的模型。在本例中,我们将启用共享,因为我们不介意同质性并且可以从计算速度中获益,但始终需要在您自己的问题中考虑此决定!
我们分三个步骤设计策略。
**首先**:定义一个神经网络n_obs_per_agent
-> n_actions_per_agents
为此,我们使用MultiAgentMLP
,这是一个专门为多个代理设计的 TorchRL 模块,并且可以进行大量自定义。
我们将为每个组定义一个不同的策略,并将它们存储在字典中。
policy_modules = {}
for group, agents in env.group_map.items():
share_parameters_policy = True # Can change this based on the group
policy_net = MultiAgentMLP(
n_agent_inputs=env.observation_spec[group, "observation"].shape[
-1
], # n_obs_per_agent
n_agent_outputs=env.full_action_spec[group, "action"].shape[
-1
], # n_actions_per_agents
n_agents=len(agents), # Number of agents in the group
centralised=False, # the policies are decentralised (i.e., each agent will act from its local observation)
share_params=share_parameters_policy,
device=device,
depth=2,
num_cells=256,
activation_class=torch.nn.Tanh,
)
# Wrap the neural network in a :class:`~tensordict.nn.TensorDictModule`.
# This is simply a module that will read the ``in_keys`` from a tensordict, feed them to the
# neural networks, and write the
# outputs in-place at the ``out_keys``.
policy_module = TensorDictModule(
policy_net,
in_keys=[(group, "observation")],
out_keys=[(group, "param")],
) # We just name the input and output that the network will read and write to the input tensordict
policy_modules[group] = policy_module
**其次**:将TensorDictModule
包装在ProbabilisticActor
中
现在我们需要构建 TanhDelta 分布。我们指示ProbabilisticActor
类根据策略动作参数构建TanhDelta
。我们还提供了此分布的最小值和最大值,这些值是从环境规范中收集的。
in_keys
的名称(以及因此来自上面TensorDictModule
的out_keys
的名称)必须以TanhDelta
分布构造函数关键字参数(param)结尾。
policies = {}
for group, _agents in env.group_map.items():
policy = ProbabilisticActor(
module=policy_modules[group],
spec=env.full_action_spec[group, "action"],
in_keys=[(group, "param")],
out_keys=[(group, "action")],
distribution_class=TanhDelta,
distribution_kwargs={
"low": env.full_action_spec[group, "action"].space.low,
"high": env.full_action_spec[group, "action"].space.high,
},
return_log_prob=False,
)
policies[group] = policy
**第三**:探索
由于 DDPG 策略是确定性的,因此我们需要一种方法在收集期间执行探索。
为此,我们需要在将策略传递给收集器之前,向策略中附加一个探索层。在本例中,我们使用AdditiveGaussianModule
,它会向我们的动作添加高斯噪声(如果噪声使动作超出范围,则将其裁剪)。
此探索包装器使用一个sigma
参数,该参数乘以噪声以确定其大小。Sigma 可以在整个训练过程中进行退火以减少探索。Sigma 将在annealing_num_steps
中从sigma_init
变为sigma_end
。
exploration_policies = {}
for group, _agents in env.group_map.items():
exploration_policy = TensorDictSequential(
policies[group],
AdditiveGaussianModule(
spec=policies[group].spec,
annealing_num_steps=total_frames
// 2, # Number of frames after which sigma is sigma_end
action_key=(group, "action"),
sigma_init=0.9, # Initial value of the sigma
sigma_end=0.1, # Final value of the sigma
),
)
exploration_policies[group] = exploration_policy
评论家网络¶
评论家网络是DDPG算法的一个关键组成部分,即使它在采样时未使用。此模块将读取观察结果和采取的操作,并返回相应的价值估计。
和之前一样,应该仔细考虑在代理组内**共享评论家参数**的决定。通常,参数共享将加快训练收敛速度,但需要考虑一些重要事项。
当代理具有不同的奖励函数时,不建议共享,因为评论家需要学习为相同的状态分配不同的值(例如,在混合合作竞争设置中)。在这种情况下,由于两个组已经使用单独的网络,因此共享决策仅适用于组内的代理,我们已经知道它们具有相同的奖励函数。
在分散式训练设置中,如果不使用额外的基础设施来同步参数,则无法执行共享。
在所有其他情况下,如果组中所有代理的奖励函数(与奖励区分开来)都相同(如当前场景),则共享可以提高性能。这可能会以代理策略的同质性为代价。通常,了解哪种选择更可取的最佳方法是快速尝试这两种选项。
这里也是我们必须在**MADDPG和IDDPG**之间做出选择的地方。
使用MADDPG,我们将获得一个具有完全可观察性的中心评论家(即,它将获取所有连接的全局代理观察结果和操作作为输入)。我们可以这样做,因为我们在模拟器中,并且训练是集中式的。
使用IDDPG,我们将拥有一个本地分散的评论家,就像策略一样。
无论如何,评论家输出的形状将为(..., n_agents_in_group, 1)
。如果评论家是集中式和共享的,则沿着n_agents_in_group
维的所有值都将相同。
与策略一样,我们为每个组创建一个评论家网络,并将它们存储在字典中。
critics = {}
for group, agents in env.group_map.items():
share_parameters_critic = True # Can change for each group
MADDPG = True # IDDPG if False, can change for each group
# This module applies the lambda function: reading the action and observation entries for the group
# and concatenating them in a new ``(group, "obs_action")`` entry
cat_module = TensorDictModule(
lambda obs, action: torch.cat([obs, action], dim=-1),
in_keys=[(group, "observation"), (group, "action")],
out_keys=[(group, "obs_action")],
)
critic_module = TensorDictModule(
module=MultiAgentMLP(
n_agent_inputs=env.observation_spec[group, "observation"].shape[-1]
+ env.full_action_spec[group, "action"].shape[-1],
n_agent_outputs=1, # 1 value per agent
n_agents=len(agents),
centralised=MADDPG,
share_params=share_parameters_critic,
device=device,
depth=2,
num_cells=256,
activation_class=torch.nn.Tanh,
),
in_keys=[(group, "obs_action")], # Read ``(group, "obs_action")``
out_keys=[
(group, "state_action_value")
], # Write ``(group, "state_action_value")``
)
critics[group] = TensorDictSequential(
cat_module, critic_module
) # Run them in sequence
让我们尝试我们的策略和评论家模块。如前所述,使用TensorDictModule
可以使我们能够直接读取环境的输出以运行这些模块,因为它们知道要读取哪些信息以及在哪里写入信息。
我们可以看到,在每个组的网络运行后,它们的输出键将添加到数据组的条目下。
从这一点开始,特定于多代理的组件已实例化,我们将简单地使用与单代理学习中相同的组件。这难道不是太棒了吗?
reset_td = env.reset()
for group, _agents in env.group_map.items():
print(
f"Running value and policy for group '{group}':",
critics[group](policies[group](reset_td)),
)
Running value and policy for group 'adversary': TensorDict(
fields={
adversary: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
Running value and policy for group 'agent': TensorDict(
fields={
adversary: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
obs_action: Tensor(shape=torch.Size([10, 1, 14]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
param: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
state_action_value: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
数据收集器¶
TorchRL提供了一套数据收集器类。简而言之,这些类执行三个操作:重置环境,使用策略和最新观察结果计算动作,在环境中执行一步,并重复最后两步,直到环境发出停止信号(或达到完成状态)。
我们将使用最简单的数据收集器,它与环境展开具有相同的输出,唯一的区别在于它将自动重置完成状态,直到收集到所需的帧。
我们需要向它提供我们的探索策略。此外,为了像运行一个一样运行所有组的策略,我们将它们放在一个序列中。它们不会相互干扰,因为每个组在不同的位置写入和读取键。
# Put exploration policies from each group in a sequence
agents_exploration_policy = TensorDictSequential(*exploration_policies.values())
collector = SyncDataCollector(
env,
agents_exploration_policy,
device=device,
frames_per_batch=frames_per_batch,
total_frames=total_frames,
)
回放缓冲区¶
回放缓冲区是离策略强化学习算法的常见构建块。缓冲区有很多类型,在本教程中,我们使用一个基本缓冲区来随机存储和采样tensordict数据。
replay_buffers = {}
for group, _agents in env.group_map.items():
replay_buffer = ReplayBuffer(
storage=LazyMemmapStorage(
memory_size, device=device
), # We will store up to memory_size multi-agent transitions
sampler=RandomSampler(),
batch_size=train_batch_size, # We will sample batches of this size
)
replay_buffers[group] = replay_buffer
损失函数¶
为了方便起见,DDPG损失可以直接从TorchRL导入,使用DDPGLoss
类。这是利用DDPG最简单的方法:它隐藏了DDPG的数学运算及其相关的控制流。
也可以为每个组使用不同的策略。
losses = {}
for group, _agents in env.group_map.items():
loss_module = DDPGLoss(
actor_network=policies[group], # Use the non-explorative policies
value_network=critics[group],
delay_value=True, # Whether to use a target network for the value
loss_function="l2",
)
loss_module.set_keys(
state_action_value=(group, "state_action_value"),
reward=(group, "reward"),
done=(group, "done"),
terminated=(group, "terminated"),
)
loss_module.make_value_estimator(ValueEstimators.TD0, gamma=gamma)
losses[group] = loss_module
target_updaters = {
group: SoftUpdate(loss, tau=polyak_tau) for group, loss in losses.items()
}
optimisers = {
group: {
"loss_actor": torch.optim.Adam(
loss.actor_network_params.flatten_keys().values(), lr=lr
),
"loss_value": torch.optim.Adam(
loss.value_network_params.flatten_keys().values(), lr=lr
),
}
for group, loss in losses.items()
}
训练实用程序¶
我们确实需要定义两个辅助函数,我们将在训练循环中使用它们。它们非常简单,不包含任何重要的逻辑。
def process_batch(batch: TensorDictBase) -> TensorDictBase:
"""
If the `(group, "terminated")` and `(group, "done")` keys are not present, create them by expanding
`"terminated"` and `"done"`.
This is needed to present them with the same shape as the reward to the loss.
"""
for group in env.group_map.keys():
keys = list(batch.keys(True, True))
group_shape = batch.get_item_shape(group)
nested_done_key = ("next", group, "done")
nested_terminated_key = ("next", group, "terminated")
if nested_done_key not in keys:
batch.set(
nested_done_key,
batch.get(("next", "done")).unsqueeze(-1).expand((*group_shape, 1)),
)
if nested_terminated_key not in keys:
batch.set(
nested_terminated_key,
batch.get(("next", "terminated"))
.unsqueeze(-1)
.expand((*group_shape, 1)),
)
return batch
训练循环¶
现在我们拥有了编写训练循环所需的所有部分。步骤包括
- 收集所有组的数据
- 循环遍历组
将组数据存储在组缓冲区中
- 循环遍历时期
从组缓冲区采样
计算采样数据的损失
反向传播损失
优化
重复
重复
重复
pbar = tqdm(
total=n_iters,
desc=", ".join(
[f"episode_reward_mean_{group} = 0" for group in env.group_map.keys()]
),
)
episode_reward_mean_map = {group: [] for group in env.group_map.keys()}
train_group_map = copy.deepcopy(env.group_map)
# Training/collection iterations
for iteration, batch in enumerate(collector):
current_frames = batch.numel()
batch = process_batch(batch) # Util to expand done keys if needed
# Loop over groups
for group in train_group_map.keys():
group_batch = batch.exclude(
*[
key
for _group in env.group_map.keys()
if _group != group
for key in [_group, ("next", _group)]
]
) # Exclude data from other groups
group_batch = group_batch.reshape(
-1
) # This just affects the leading dimensions in batch_size of the tensordict
replay_buffers[group].extend(group_batch)
for _ in range(n_optimiser_steps):
subdata = replay_buffers[group].sample()
loss_vals = losses[group](subdata)
for loss_name in ["loss_actor", "loss_value"]:
loss = loss_vals[loss_name]
optimiser = optimisers[group][loss_name]
loss.backward()
# Optional
params = optimiser.param_groups[0]["params"]
torch.nn.utils.clip_grad_norm_(params, max_grad_norm)
optimiser.step()
optimiser.zero_grad()
# Soft-update the target network
target_updaters[group].step()
# Exploration sigma anneal update
exploration_policies[group].step(current_frames)
# Stop training a certain group when a condition is met (e.g., number of training iterations)
if iteration == iteration_when_stop_training_evaders:
del train_group_map["agent"]
# Logging
for group in env.group_map.keys():
episode_reward_mean = (
batch.get(("next", group, "episode_reward"))[
batch.get(("next", group, "done"))
]
.mean()
.item()
)
episode_reward_mean_map[group].append(episode_reward_mean)
pbar.set_description(
", ".join(
[
f"episode_reward_mean_{group} = {episode_reward_mean_map[group][-1]}"
for group in env.group_map.keys()
]
),
refresh=False,
)
pbar.update()
Traceback (most recent call last):
File "/pytorch/rl/docs/source/reference/generated/tutorials/multiagent_competitive_ddpg.py", line 820, in <module>
exploration_policies[group].step(current_frames)
File "/pytorch/rl/env/lib/python3.8/site-packages/tensordict/nn/common.py", line 1302, in __getattr__
return getattr(super().__getattr__("module"), name)
File "/pytorch/rl/env/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1914, in __getattr__
raise AttributeError(
AttributeError: 'ModuleList' object has no attribute 'step'
结果¶
我们可以绘制每集获得的平均奖励。
要延长训练时间,请增加n_iters
超参数。
在本地运行此脚本时,您可能需要关闭打开的窗口才能继续处理屏幕的其余部分。
fig, axs = plt.subplots(2, 1)
for i, group in enumerate(env.group_map.keys()):
axs[i].plot(episode_reward_mean_map[group], label=f"Episode reward mean {group}")
axs[i].set_ylabel("Reward")
axs[i].axvline(
x=iteration_when_stop_training_evaders,
label="Agent (evader) stop training",
color="orange",
)
axs[i].legend()
axs[-1].set_xlabel("Training iterations")
plt.show()
渲染¶
渲染说明适用于VMAS,即在使用use_vmas=True
运行时。
TorchRL提供了一些实用程序来记录和保存渲染的视频。您可以在此处了解有关这些工具的更多信息。
在以下代码块中,我们添加了一个转换,它将调用VMAS包装环境中的render()
方法,并将帧堆栈保存到mp4文件中,其位置由自定义日志记录器video_logger确定。请注意,此代码可能需要一些外部依赖项,例如torchvision。
if use_vmas and not is_sphinx:
# Replace tmpdir with any desired path where the video should be saved
with tempfile.TemporaryDirectory() as tmpdir:
video_logger = CSVLogger("vmas_logs", tmpdir, video_format="mp4")
print("Creating rendering env")
env_with_render = TransformedEnv(env.base_env, env.transform.clone())
env_with_render = env_with_render.append_transform(
PixelRenderTransform(
out_keys=["pixels"],
# the np.ndarray has a negative stride and needs to be copied before being cast to a tensor
preproc=lambda x: x.copy(),
as_non_tensor=True,
# asking for array rather than on-screen rendering
mode="rgb_array",
)
)
env_with_render = env_with_render.append_transform(
VideoRecorder(logger=video_logger, tag="vmas_rendered")
)
with set_exploration_type(ExplorationType.MODE):
print("Rendering rollout...")
env_with_render.rollout(100, policy=agents_exploration_policy)
print("Saving the video...")
env_with_render.transform.dump()
print("Saved! Saved directory tree:")
video_logger.print_log_dir()
结论和后续步骤¶
在本教程中,我们看到了
如何在TorchRL中创建竞争性多组多代理环境,其规范如何工作以及它如何与库集成;
如何在TorchRL中为多个组创建多代理网络架构;
我们如何使用
tensordict.TensorDict
来承载多代理多组数据;我们如何在多代理多组MADDPG/IDDPG训练循环中将所有库组件(收集器、模块、回放缓冲区和损失)联系起来。
现在您已经熟练掌握了多代理DDPG,您可以查看GitHub存储库中所有TorchRL多代理实现。这些是许多MARL算法的仅代码脚本,例如本教程中看到的QMIX、MADDPG、IQL等等!
也请记住查看我们的教程:使用TorchRL进行多智能体强化学习(PPO)教程。
最后,您可以修改本教程的参数以尝试许多其他配置和场景,以成为MARL大师。
PettingZoo和VMAS包含更多场景。以下是一些您可以在VMAS中尝试的可能场景的视频。
脚本的总运行时间:(0分钟58.493秒)
估计内存使用量:3114 MB