注意
前往末尾 下载完整的示例代码。
使用 TorchRL 教程进行竞争性多智能体强化学习 (DDPG)¶
作者: Matteo Bettini
另请参阅
BenchMARL 库提供了使用 TorchRL 的最先进的 MARL 算法实现。
本教程演示了如何使用 PyTorch 和 TorchRL 解决竞争性多智能体强化学习 (MARL) 问题。
为了便于使用,本教程将遵循已有的 使用 TorchRL 教程进行多智能体强化学习 (PPO) 的通用结构。
在本教程中,我们将使用 MADDPG 论文 中的 simple_tag 环境。此环境是与论文一起引入的名为 MultiAgentParticleEnvironments (MPE) 的一组环境的一部分。
目前有多个模拟器提供 MPE 环境。在本教程中,我们将展示如何使用 TorchRL 在以下环境中训练此环境:
PettingZoo,在环境的传统 CPU 版本中;
VMAS,它在 PyTorch 中提供了一个向量化实现,能够在 GPU 上模拟多个环境以加速计算。
data:image/s3,"s3://crabby-images/68b6e/68b6e8c404e04e3396b7606c48aeaecf17683133" alt="Simple tag"
多智能体 simple_tag 场景¶
主要学习内容
如何在 TorchRL 中使用竞争性多智能体环境,它们的规范如何工作,以及它们如何与库集成;
如何在 TorchRL 中使用带有多个智能体组的 Parallel PettingZoo 和 VMAS 环境;
如何在 TorchRL 中创建不同的多智能体网络架构(例如,使用参数共享、集中式评论家);
我们如何使用
TensorDict
来携带多智能体多组数据;我们如何将所有库组件(收集器、模块、回放缓冲区和损失函数)绑定到一个离线策略多智能体 MADDPG/IDDPG 训练循环中。
如果你在 Google Colab 中运行此代码,请确保安装以下依赖项
!pip3 install torchrl
!pip3 install vmas
!pip3 install pettingzoo[mpe]==1.24.3
!pip3 install tqdm
深度确定性策略梯度 (DDPG) 是一种离线策略 Actor-Critic 算法,其中使用来自评论家网络的梯度优化确定性策略。有关更多信息,请参阅 深度确定性策略梯度 论文。这种算法通常以离线方式训练。有关离线策略学习的更多信息,请参阅 Sutton, Richard S., 和 Andrew G. Barto。强化学习:导论。MIT 出版社,2018。
data:image/s3,"s3://crabby-images/ad5cc/ad5cc75f8a153fd8f03f5bc7a8c6cddc5c9a0b9c" alt="Off-policy learning"
离线策略学习¶
这种方法已扩展到 混合合作-竞争环境的多智能体 Actor-Critic 中的多智能体学习,其中介绍了多智能体 DDPG (MADDPG) 算法。在多智能体设置中,情况略有不同。我们现在有多个策略 \(\mathbf{\pi}\),每个智能体一个。策略通常是本地和去中心化的。这意味着单个智能体的策略将仅基于其观察结果输出该智能体的动作。在 MARL 文献中,这被称为去中心化执行。另一方面,评论家存在不同的公式,主要是
在 MADDPG 中,评论家是集中式的,并将系统的全局状态和全局动作作为输入。全局状态可以是全局观察,也可以只是智能体观察的串联。全局动作是智能体动作的串联。MADDPG 可用于执行集中式训练的上下文中,因为它需要访问全局信息。
在 IDDPG 中,评论家仅将一个智能体的观察和动作作为输入。这允许去中心化训练,因为评论家和策略都只需要本地信息来计算其输出。
集中式评论家有助于克服多个智能体并发学习的非平稳性,但另一方面,它们可能会受到其庞大输入空间的影响。在本教程中,我们将能够训练这两种公式,并且还将讨论参数共享(跨智能体共享网络参数的做法)如何影响每种公式。
本教程的结构如下:
首先,我们将建立一组超参数以供使用。
随后,我们将构建一个多智能体环境,利用 TorchRL 的 PettingZoo 或 VMAS 包装器。
接下来,我们将制定策略和评论家网络,讨论各种选择对参数共享和评论家中心化的影响。
之后,我们将创建采样收集器和回放缓冲区。
最后,我们将执行我们的训练循环并检查结果。
如果你在 Colab 或具有 GUI 的机器上操作此代码,你还将有机会在训练过程前后渲染和可视化你自己的训练策略。
导入我们的依赖项
import copy
import tempfile
import torch
from matplotlib import pyplot as plt
from tensordict import TensorDictBase
from tensordict.nn import TensorDictModule, TensorDictSequential
from torch import multiprocessing
from torchrl.collectors import SyncDataCollector
from torchrl.data import LazyMemmapStorage, RandomSampler, ReplayBuffer
from torchrl.envs import (
check_env_specs,
ExplorationType,
PettingZooEnv,
RewardSum,
set_exploration_type,
TransformedEnv,
VmasEnv,
)
from torchrl.modules import (
AdditiveGaussianModule,
MultiAgentMLP,
ProbabilisticActor,
TanhDelta,
)
from torchrl.objectives import DDPGLoss, SoftUpdate, ValueEstimators
from torchrl.record import CSVLogger, PixelRenderTransform, VideoRecorder
from tqdm import tqdm
# Check if we're building the doc, in which case disable video rendering
try:
is_sphinx = __sphinx_build__
except NameError:
is_sphinx = False
定义超参数¶
我们为本教程设置超参数。根据可用资源,可以选择在 GPU 或其他设备上执行策略和模拟器。你可以调整其中一些值以调整计算要求。
# Seed
seed = 0
torch.manual_seed(seed)
# Devices
is_fork = multiprocessing.get_start_method() == "fork"
device = (
torch.device(0)
if torch.cuda.is_available() and not is_fork
else torch.device("cpu")
)
# Sampling
frames_per_batch = 1_000 # Number of team frames collected per sampling iteration
n_iters = 10 # Number of sampling and training iterations
total_frames = frames_per_batch * n_iters
# We will stop training the evaders after this many iterations,
# should be 0 <= iteration_when_stop_training_evaders <= n_iters
iteration_when_stop_training_evaders = n_iters // 2
# Replay buffer
memory_size = 1_000_000 # The replay buffer of each group can store this many frames
# Training
n_optimiser_steps = 100 # Number of optimization steps per training iteration
train_batch_size = 128 # Number of frames trained in each optimiser step
lr = 3e-4 # Learning rate
max_grad_norm = 1.0 # Maximum norm for the gradients
# DDPG
gamma = 0.99 # Discount factor
polyak_tau = 0.005 # Tau for the soft-update of the target network
环境¶
多智能体环境模拟多个智能体与世界的互动。TorchRL API 允许集成各种类型的多智能体环境风格。在本教程中,我们将重点关注多个智能体组并行交互的环境。也就是说:在每个步骤中,所有智能体都将同步获得观察结果并采取行动。
此外,TorchRL MARL API 允许将智能体分成组。每个组都将是 tensordict 中的一个单独条目。组内智能体的数据堆叠在一起。因此,通过选择如何对智能体进行分组,你可以决定哪些数据是堆叠/保留为单独条目的。可以在 VMAS 和 PettingZoo 等环境的构造中指定分组策略。有关分组的更多信息,请参阅 MarlGroupMapType
。
在 simple_tag 环境中,有两队智能体:追逐者(或“对手”)(红色圆圈)和逃避者(或“智能体”)(绿色圆圈)。追逐者因接触到逃避者而获得奖励 (+10)。一旦发生接触,追逐者团队将获得集体奖励,而被接触的逃避者将受到相同的惩罚 (-10)。逃避者的速度和加速度高于追逐者。环境中还有障碍物(黑色圆圈)。智能体和障碍物根据均匀随机分布生成。智能体在一个具有阻力和弹性碰撞的 2D 连续世界中行动。他们的动作是 2D 连续力,决定了他们的加速度。每个智能体观察其位置、速度、与其他所有智能体和障碍物的相对位置以及逃避者的速度。
PettingZoo 和 VMAS 版本在奖励函数方面略有不同,因为 PettingZoo 会惩罚逃避者超出边界,而 VMAS 会在物理上阻止它。这就是为什么你会观察到在 VMAS 中,两队的奖励是相同的,只是符号相反,而在 PettingZoo 中,逃避者的奖励会更低的原因。
我们现在将实例化环境。对于本教程,我们将 эпизод 限制为 max_steps
,之后将设置 terminated 标志。此功能已在 PettingZoo 和 VMAS 模拟器中提供,但 TorchRL StepCounter
转换也可以替代使用。
max_steps = 100 # Environment steps before done
n_chasers = 2
n_evaders = 1
n_obstacles = 2
use_vmas = True # Set this to True for a great performance speedup
if not use_vmas:
base_env = PettingZooEnv(
task="simple_tag_v3",
parallel=True, # Use the Parallel version
seed=seed,
# Scenario specific
continuous_actions=True,
num_good=n_evaders,
num_adversaries=n_chasers,
num_obstacles=n_obstacles,
max_cycles=max_steps,
)
else:
num_vmas_envs = (
frames_per_batch // max_steps
) # Number of vectorized environments. frames_per_batch collection will be divided among these environments
base_env = VmasEnv(
scenario="simple_tag",
num_envs=num_vmas_envs,
continuous_actions=True,
max_steps=max_steps,
device=device,
seed=seed,
# Scenario specific
num_good_agents=n_evaders,
num_adversaries=n_chasers,
num_landmarks=n_obstacles,
)
组映射¶
PettingZoo 和 VMAS 环境使用 TorchRL MARL 分组 API。我们可以访问组映射,将每个组映射到其中的智能体,如下所示
print(f"group_map: {base_env.group_map}")
group_map: {'adversary': ['adversary_0', 'adversary_1'], 'agent': ['agent_0']}
正如我们所见,它包含 2 个组:“agents”(逃避者)和“adversaries”(追逐者)。
环境不仅由其模拟器和转换定义,还由一系列元数据定义,这些元数据描述了在执行期间可以预期的内容。出于效率目的,TorchRL 在环境规范方面非常严格,但你可以轻松检查你的环境规范是否足够。在我们的示例中,模拟器包装器负责为你的 base_env 设置适当的规范,因此你不必担心这一点。
有四个规范需要查看
action_spec
定义了动作空间;reward_spec
定义了奖励域;done_spec
定义了完成域;observation_spec
定义了环境步骤的所有其他输出的域;
print("action_spec:", base_env.full_action_spec)
print("reward_spec:", base_env.full_reward_spec)
print("done_spec:", base_env.full_done_spec)
print("observation_spec:", base_env.observation_spec)
action_spec: Composite(
adversary: Composite(
action: BoundedContinuous(
shape=torch.Size([10, 2, 2]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 2])),
agent: Composite(
action: BoundedContinuous(
shape=torch.Size([10, 1, 2]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 1])),
device=cpu,
shape=torch.Size([10]))
reward_spec: Composite(
adversary: Composite(
reward: UnboundedContinuous(
shape=torch.Size([10, 2, 1]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 2])),
agent: Composite(
reward: UnboundedContinuous(
shape=torch.Size([10, 1, 1]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 1])),
device=cpu,
shape=torch.Size([10]))
done_spec: Composite(
done: Categorical(
shape=torch.Size([10, 1]),
space=CategoricalBox(n=2),
device=cpu,
dtype=torch.bool,
domain=discrete),
terminated: Categorical(
shape=torch.Size([10, 1]),
space=CategoricalBox(n=2),
device=cpu,
dtype=torch.bool,
domain=discrete),
device=cpu,
shape=torch.Size([10]))
observation_spec: Composite(
adversary: Composite(
observation: UnboundedContinuous(
shape=torch.Size([10, 2, 14]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 2])),
agent: Composite(
observation: UnboundedContinuous(
shape=torch.Size([10, 1, 12]),
space=ContinuousBox(
low=Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, contiguous=True),
high=Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, contiguous=True)),
device=cpu,
dtype=torch.float32,
domain=continuous),
device=cpu,
shape=torch.Size([10, 1])),
device=cpu,
shape=torch.Size([10]))
使用刚刚显示的命令,我们可以访问每个值的域。
我们可以看到所有规范都结构化为一个字典,根始终包含组名。此结构将在所有传入和传出环境的 tensordict 数据中遵循。此外,每个组的规范都有前导形状 (n_agents_in_that_group)
(智能体为 1,对手为 2),这意味着该组的张量数据将始终具有该前导形状(组内的智能体的数据已堆叠)。
查看 done_spec
,我们可以看到有一些键位于智能体组之外("done"、 "terminated"、 "truncated"
),它们没有前导多智能体维度。这些键由所有智能体共享,并表示用于重置的环境全局完成状态。默认情况下,就像在本例中一样,当任何智能体完成时,并行 PettingZoo 环境都会完成,但可以通过在 PettingZoo 环境构造中设置 done_on_any
来覆盖此行为。
为了快速访问 tensordict 中每个值的键,我们可以简单地向环境询问各自的键,我们将立即了解哪些是每个智能体的,哪些是共享的。此信息将有助于告诉所有其他 TorchRL 组件在哪里找到每个值
print("action_keys:", base_env.action_keys)
print("reward_keys:", base_env.reward_keys)
print("done_keys:", base_env.done_keys)
action_keys: [('adversary', 'action'), ('agent', 'action')]
reward_keys: [('adversary', 'reward'), ('agent', 'reward')]
done_keys: ['done', 'terminated']
转换¶
我们可以将我们需要的任何 TorchRL 转换附加到我们的环境。这些将以某种期望的方式修改其输入/输出。我们强调,在多智能体上下文中,显式提供要修改的键至关重要。
例如,在本例中,我们将实例化一个 RewardSum
转换,它将对 эпизод 的奖励求和。我们将告诉此转换在哪里找到每个奖励键的重置键。本质上,我们只是说每个组的 эпизод 奖励应该在设置 "_reset"
tensordict 键时重置,这意味着调用了 env.reset()
。转换后的环境将继承包装环境的设备和元数据,并根据其包含的转换序列转换这些数据。
env = TransformedEnv(
base_env,
RewardSum(
in_keys=base_env.reward_keys,
reset_keys=["_reset"] * len(base_env.group_map.keys()),
),
)
check_env_specs()
函数运行一个小型的 rollout 并将其输出与环境规范进行比较。如果没有引发错误,我们可以确信规范已正确定义
check_env_specs(env)
Rollout¶
为了好玩,让我们看看一个简单的随机 rollout 是什么样的。你可以调用 env.rollout(n_steps) 并大致了解环境输入和输出的样子。动作将自动从动作规范域中随机抽取。
n_rollout_steps = 5
rollout = env.rollout(n_rollout_steps)
print(f"rollout of {n_rollout_steps} steps:", rollout)
print("Shape of the rollout TensorDict:", rollout.batch_size)
rollout of 5 steps: TensorDict(
fields={
adversary: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 5, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 5, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
adversary: TensorDict(
fields={
episode_reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 5, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
episode_reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 5, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([10, 5, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 5, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10, 5]),
device=cpu,
is_shared=False),
terminated: Tensor(shape=torch.Size([10, 5, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10, 5]),
device=cpu,
is_shared=False)
Shape of the rollout TensorDict: torch.Size([10, 5])
我们可以看到我们的 rollout 的 batch_size
为 (n_rollout_steps)
。这意味着其中的所有张量都将具有此引导维度。
更深入地了解,我们可以看到输出 tensordict 可以按以下方式划分
在根目录中(通过运行
rollout.exclude("next")
访问)我们将找到在第一个时间步调用重置后可用的所有键。我们可以通过索引n_rollout_steps
维度来查看它们在 rollout 步骤中的演变。在这些键中,我们将找到rollout[group_name]
tensordict 中每个智能体不同的键,它们的批大小为(n_rollout_steps, n_agents_in_group)
,表示它正在存储额外的智能体维度。组 tensordict 之外的键将是共享键。在下一个(通过运行
rollout.get("next")
访问)中。我们将找到与根相同的结构,但有一些细微的差异在下面突出显示。
在 TorchRL 中,约定是 done 和 observations 将同时存在于根和下一个中(因为这些在重置时和步骤后都可用)。动作将仅在根中可用(因为没有步骤产生的动作),奖励将仅在下一个中可用(因为重置时没有奖励)。此结构遵循 强化学习:导论(Sutton 和 Barto) 中的结构,其中根表示时间 \(t\) 的数据,下一个表示世界步骤的时间 \(t+1\) 的数据。
渲染随机 rollout¶
如果你在 Google Colab 上,或者在具有 OpenGL 和 GUI 的机器上,你实际上可以渲染随机 rollout。这将让你了解随机策略在此任务中将实现什么,以便将其与你将自己训练的策略进行比较!
要渲染 rollout,请按照本教程末尾渲染部分中的说明进行操作,只需从 env.rollout()
中删除行 policy=agents_exploration_policy
。
策略¶
DDPG 利用确定性策略。这意味着我们的神经网络将输出要采取的动作。由于动作是连续的,我们使用 Tanh-Delta 分布来尊重动作空间边界。此类唯一做的事情是对动作应用 Tanh 变换,以确保动作在域边界内。
我们需要做出的另一个重要决定是,我们是否希望团队内的智能体共享策略参数。一方面,共享参数意味着他们都将共享相同的策略,这将使他们能够从彼此的经验中受益。这也将加快训练速度。另一方面,它将使他们在行为上同质化,因为他们实际上将共享相同的模型。对于此示例,我们将启用共享,因为我们不介意同质性,并且可以从计算速度中受益,但在你自己的问题中始终考虑此决定非常重要!
我们分三个步骤设计策略。
第一步:定义神经网络 n_obs_per_agent
-> n_actions_per_agents
为此,我们使用 MultiAgentMLP
,这是一个专为多个智能体设计的 TorchRL 模块,具有许多可用的自定义项。
我们将为每个组定义不同的策略,并将它们存储在字典中。
policy_modules = {}
for group, agents in env.group_map.items():
share_parameters_policy = True # Can change this based on the group
policy_net = MultiAgentMLP(
n_agent_inputs=env.observation_spec[group, "observation"].shape[
-1
], # n_obs_per_agent
n_agent_outputs=env.full_action_spec[group, "action"].shape[
-1
], # n_actions_per_agents
n_agents=len(agents), # Number of agents in the group
centralised=False, # the policies are decentralised (i.e., each agent will act from its local observation)
share_params=share_parameters_policy,
device=device,
depth=2,
num_cells=256,
activation_class=torch.nn.Tanh,
)
# Wrap the neural network in a :class:`~tensordict.nn.TensorDictModule`.
# This is simply a module that will read the ``in_keys`` from a tensordict, feed them to the
# neural networks, and write the
# outputs in-place at the ``out_keys``.
policy_module = TensorDictModule(
policy_net,
in_keys=[(group, "observation")],
out_keys=[(group, "param")],
) # We just name the input and output that the network will read and write to the input tensordict
policy_modules[group] = policy_module
第二步:将 TensorDictModule
包装在 ProbabilisticActor
中
我们现在需要构建 TanhDelta 分布。我们指示 ProbabilisticActor
类从策略动作参数中构建 TanhDelta
。我们还提供了此分布的最小值和最大值,我们从环境规范中收集这些值。
in_keys
的名称(以及因此 TensorDictModule
上述代码的 out_keys
的名称)必须以 TanhDelta
分布构造函数关键字参数 (param) 结尾。
policies = {}
for group, _agents in env.group_map.items():
policy = ProbabilisticActor(
module=policy_modules[group],
spec=env.full_action_spec[group, "action"],
in_keys=[(group, "param")],
out_keys=[(group, "action")],
distribution_class=TanhDelta,
distribution_kwargs={
"low": env.full_action_spec[group, "action"].space.low,
"high": env.full_action_spec[group, "action"].space.high,
},
return_log_prob=False,
)
policies[group] = policy
第三步:探索
由于 DDPG 策略是确定性的,我们需要一种在收集期间执行探索的方法。
为此,我们需要在将策略传递给收集器之前,将探索层附加到我们的策略。在本例中,我们使用 AdditiveGaussianModule
,它将高斯噪声添加到我们的动作中(如果噪声使动作超出边界,则将其钳制)。
此探索包装器使用 sigma
参数,该参数乘以噪声以确定其大小。Sigma 可以在整个训练过程中进行退火以减少探索。Sigma 将在 annealing_num_steps
中从 sigma_init
变为 sigma_end
。
exploration_policies = {}
for group, _agents in env.group_map.items():
exploration_policy = TensorDictSequential(
policies[group],
AdditiveGaussianModule(
spec=policies[group].spec,
annealing_num_steps=total_frames
// 2, # Number of frames after which sigma is sigma_end
action_key=(group, "action"),
sigma_init=0.9, # Initial value of the sigma
sigma_end=0.1, # Final value of the sigma
),
)
exploration_policies[group] = exploration_policy
评论家网络¶
评论家网络是 DDPG 算法的关键组件,即使它在采样时不使用。此模块将读取观察结果和采取的动作,并返回相应的值估计。
与之前一样,应该仔细考虑在智能体组内共享评论家参数的决定。一般来说,参数共享将加快训练收敛速度,但需要考虑一些重要的因素
当智能体具有不同的奖励函数时,不建议共享,因为评论家将需要学习为同一状态分配不同的值(例如,在混合合作-竞争设置中)。在这种情况下,由于两个组已经使用单独的网络,因此共享决定仅适用于组内的智能体,我们已经知道它们具有相同的奖励函数。
在去中心化训练设置中,如果没有额外的基础设施来同步参数,则无法执行共享。
在所有其他情况下,当组中所有智能体的奖励函数(与奖励区分开来)相同时(如当前场景中),共享可以提供改进的性能。这可能会以智能体策略的同质性为代价。一般来说,了解哪种选择更可取的最佳方法是快速尝试这两种选择。
这也是我们必须在 MADDPG 和 IDDPG 之间做出选择的地方
使用 MADDPG,我们将获得一个具有完全可观察性的中心评论家(即,它将所有串联的全局智能体观察结果和动作作为输入)。我们可以这样做,因为我们是在模拟器中,并且训练是集中的。
使用 IDDPG,我们将拥有一个本地去中心化评论家,就像策略一样。
在任何情况下,评论家输出都将具有形状 (..., n_agents_in_group, 1)
。如果评论家是集中的和共享的,则沿 n_agents_in_group
维度的所有值都将相同。
与策略一样,我们为每个组创建一个评论家网络,并将它们存储在字典中。
critics = {}
for group, agents in env.group_map.items():
share_parameters_critic = True # Can change for each group
MADDPG = True # IDDPG if False, can change for each group
# This module applies the lambda function: reading the action and observation entries for the group
# and concatenating them in a new ``(group, "obs_action")`` entry
cat_module = TensorDictModule(
lambda obs, action: torch.cat([obs, action], dim=-1),
in_keys=[(group, "observation"), (group, "action")],
out_keys=[(group, "obs_action")],
)
critic_module = TensorDictModule(
module=MultiAgentMLP(
n_agent_inputs=env.observation_spec[group, "observation"].shape[-1]
+ env.full_action_spec[group, "action"].shape[-1],
n_agent_outputs=1, # 1 value per agent
n_agents=len(agents),
centralised=MADDPG,
share_params=share_parameters_critic,
device=device,
depth=2,
num_cells=256,
activation_class=torch.nn.Tanh,
),
in_keys=[(group, "obs_action")], # Read ``(group, "obs_action")``
out_keys=[
(group, "state_action_value")
], # Write ``(group, "state_action_value")``
)
critics[group] = TensorDictSequential(
cat_module, critic_module
) # Run them in sequence
让我们尝试我们的策略和评论家模块。正如前面指出的,TensorDictModule
的使用使得可以直接读取环境的输出以运行这些模块,因为它们知道要读取什么信息以及在哪里写入信息。
我们可以看到,在每个组的网络运行后,它们的输出键被添加到组条目下的数据中。
从那时起,特定于多智能体的组件已被实例化,我们将简单地使用与单智能体学习中相同的组件。这不是很棒吗?
reset_td = env.reset()
for group, _agents in env.group_map.items():
print(
f"Running value and policy for group '{group}':",
critics[group](policies[group](reset_td)),
)
Running value and policy for group 'adversary': TensorDict(
fields={
adversary: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
Running value and policy for group 'agent': TensorDict(
fields={
adversary: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False),
obs_action: Tensor(shape=torch.Size([10, 2, 16]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 2, 14]), device=cpu, dtype=torch.float32, is_shared=False),
param: Tensor(shape=torch.Size([10, 2, 2]), device=cpu, dtype=torch.float32, is_shared=False),
state_action_value: Tensor(shape=torch.Size([10, 2, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 2]),
device=cpu,
is_shared=False),
agent: TensorDict(
fields={
action: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
episode_reward: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False),
obs_action: Tensor(shape=torch.Size([10, 1, 14]), device=cpu, dtype=torch.float32, is_shared=False),
observation: Tensor(shape=torch.Size([10, 1, 12]), device=cpu, dtype=torch.float32, is_shared=False),
param: Tensor(shape=torch.Size([10, 1, 2]), device=cpu, dtype=torch.float32, is_shared=False),
state_action_value: Tensor(shape=torch.Size([10, 1, 1]), device=cpu, dtype=torch.float32, is_shared=False)},
batch_size=torch.Size([10, 1]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False),
terminated: Tensor(shape=torch.Size([10, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([10]),
device=cpu,
is_shared=False)
数据收集器¶
TorchRL 提供了一组数据收集器类。简而言之,这些类执行三个操作:重置环境,使用策略和最新的观察结果计算动作,在环境中执行步骤,并重复最后两个步骤,直到环境发出停止信号(或达到完成状态)。
我们将使用最简单的数据收集器,它的输出与环境 rollout 相同,唯一的区别是它将自动重置完成状态,直到收集到所需的帧数。
我们需要向其馈送我们的探索策略。此外,为了像运行一个策略一样运行所有组的策略,我们将它们放在一个序列中。它们不会相互干扰,因为每个组在不同的位置写入和读取键。
# Put exploration policies from each group in a sequence
agents_exploration_policy = TensorDictSequential(*exploration_policies.values())
collector = SyncDataCollector(
env,
agents_exploration_policy,
device=device,
frames_per_batch=frames_per_batch,
total_frames=total_frames,
)
回放缓冲区¶
回放缓冲区是离线策略 RL 算法的常见构建块。缓冲区有很多类型,在本教程中,我们使用基本缓冲区来随机存储和采样 tensordict 数据。
replay_buffers = {}
for group, _agents in env.group_map.items():
replay_buffer = ReplayBuffer(
storage=LazyMemmapStorage(
memory_size, device=device
), # We will store up to memory_size multi-agent transitions
sampler=RandomSampler(),
batch_size=train_batch_size, # We will sample batches of this size
)
replay_buffers[group] = replay_buffer
损失函数¶
为了方便起见,可以使用 DDPGLoss
类直接从 TorchRL 导入 DDPG 损失函数。这是利用 DDPG 的最简单方法:它隐藏了 DDPG 的数学运算和随之而来的控制流。
每个组也可以有不同的策略。
losses = {}
for group, _agents in env.group_map.items():
loss_module = DDPGLoss(
actor_network=policies[group], # Use the non-explorative policies
value_network=critics[group],
delay_value=True, # Whether to use a target network for the value
loss_function="l2",
)
loss_module.set_keys(
state_action_value=(group, "state_action_value"),
reward=(group, "reward"),
done=(group, "done"),
terminated=(group, "terminated"),
)
loss_module.make_value_estimator(ValueEstimators.TD0, gamma=gamma)
losses[group] = loss_module
target_updaters = {
group: SoftUpdate(loss, tau=polyak_tau) for group, loss in losses.items()
}
optimisers = {
group: {
"loss_actor": torch.optim.Adam(
loss.actor_network_params.flatten_keys().values(), lr=lr
),
"loss_value": torch.optim.Adam(
loss.value_network_params.flatten_keys().values(), lr=lr
),
}
for group, loss in losses.items()
}
训练实用程序¶
我们确实必须定义两个辅助函数,我们将在训练循环中使用它们。它们非常简单,不包含任何重要的逻辑。
def process_batch(batch: TensorDictBase) -> TensorDictBase:
"""
If the `(group, "terminated")` and `(group, "done")` keys are not present, create them by expanding
`"terminated"` and `"done"`.
This is needed to present them with the same shape as the reward to the loss.
"""
for group in env.group_map.keys():
keys = list(batch.keys(True, True))
group_shape = batch.get_item_shape(group)
nested_done_key = ("next", group, "done")
nested_terminated_key = ("next", group, "terminated")
if nested_done_key not in keys:
batch.set(
nested_done_key,
batch.get(("next", "done")).unsqueeze(-1).expand((*group_shape, 1)),
)
if nested_terminated_key not in keys:
batch.set(
nested_terminated_key,
batch.get(("next", "terminated"))
.unsqueeze(-1)
.expand((*group_shape, 1)),
)
return batch
训练循环¶
我们现在拥有编写训练循环所需的所有部分。步骤包括:
- 收集所有组的数据
- 循环遍历组
将组数据存储在组缓冲区中
- 循环遍历 эпоха
从组缓冲区采样
计算采样数据的损失
反向传播损失
优化
重复
重复
重复
pbar = tqdm(
total=n_iters,
desc=", ".join(
[f"episode_reward_mean_{group} = 0" for group in env.group_map.keys()]
),
)
episode_reward_mean_map = {group: [] for group in env.group_map.keys()}
train_group_map = copy.deepcopy(env.group_map)
# Training/collection iterations
for iteration, batch in enumerate(collector):
current_frames = batch.numel()
batch = process_batch(batch) # Util to expand done keys if needed
# Loop over groups
for group in train_group_map.keys():
group_batch = batch.exclude(
*[
key
for _group in env.group_map.keys()
if _group != group
for key in [_group, ("next", _group)]
]
) # Exclude data from other groups
group_batch = group_batch.reshape(
-1
) # This just affects the leading dimensions in batch_size of the tensordict
replay_buffers[group].extend(group_batch)
for _ in range(n_optimiser_steps):
subdata = replay_buffers[group].sample()
loss_vals = losses[group](subdata)
for loss_name in ["loss_actor", "loss_value"]:
loss = loss_vals[loss_name]
optimiser = optimisers[group][loss_name]
loss.backward()
# Optional
params = optimiser.param_groups[0]["params"]
torch.nn.utils.clip_grad_norm_(params, max_grad_norm)
optimiser.step()
optimiser.zero_grad()
# Soft-update the target network
target_updaters[group].step()
# Exploration sigma anneal update
exploration_policies[group][-1].step(current_frames)
# Stop training a certain group when a condition is met (e.g., number of training iterations)
if iteration == iteration_when_stop_training_evaders:
del train_group_map["agent"]
# Logging
for group in env.group_map.keys():
episode_reward_mean = (
batch.get(("next", group, "episode_reward"))[
batch.get(("next", group, "done"))
]
.mean()
.item()
)
episode_reward_mean_map[group].append(episode_reward_mean)
pbar.set_description(
", ".join(
[
f"episode_reward_mean_{group} = {episode_reward_mean_map[group][-1]}"
for group in env.group_map.keys()
]
),
refresh=False,
)
pbar.update()
episode_reward_mean_adversary = 0, episode_reward_mean_agent = 0: 0%| | 0/10 [00:00<?, ?it/s]
episode_reward_mean_adversary = 1.0, episode_reward_mean_agent = -1.0: 10%|█ | 1/10 [00:02<00:26, 2.96s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0: 20%|██ | 2/10 [00:06<00:24, 3.07s/it]
episode_reward_mean_adversary = 1.0, episode_reward_mean_agent = -1.0: 30%|███ | 3/10 [00:09<00:21, 3.14s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0: 40%|████ | 4/10 [00:12<00:18, 3.15s/it]
episode_reward_mean_adversary = 2.0, episode_reward_mean_agent = -2.0: 50%|█████ | 5/10 [00:15<00:15, 3.15s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0: 60%|██████ | 6/10 [00:18<00:12, 3.13s/it]
episode_reward_mean_adversary = 2.0, episode_reward_mean_agent = -2.0: 70%|███████ | 7/10 [00:21<00:08, 2.86s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0: 80%|████████ | 8/10 [00:23<00:05, 2.69s/it]
episode_reward_mean_adversary = 0.0, episode_reward_mean_agent = 0.0: 90%|█████████ | 9/10 [00:25<00:02, 2.58s/it]
episode_reward_mean_adversary = 1.0, episode_reward_mean_agent = -1.0: 100%|██████████| 10/10 [00:27<00:00, 2.47s/it]
结果¶
我们可以绘制每个 эпизод 获得的平均奖励。
为了使训练持续更长时间,请增加 n_iters
超参数。
当在本地运行此脚本时,您可能需要关闭打开的窗口才能继续执行屏幕的其余部分。
fig, axs = plt.subplots(2, 1)
for i, group in enumerate(env.group_map.keys()):
axs[i].plot(episode_reward_mean_map[group], label=f"Episode reward mean {group}")
axs[i].set_ylabel("Reward")
axs[i].axvline(
x=iteration_when_stop_training_evaders,
label="Agent (evader) stop training",
color="orange",
)
axs[i].legend()
axs[-1].set_xlabel("Training iterations")
plt.show()
data:image/s3,"s3://crabby-images/8f80b/8f80b61b0ef0a1c80249039e5e992aaa3db78177" alt="multiagent competitive ddpg"
渲染¶
渲染指令适用于 VMAS,也即在使用 use_vmas=True
运行时。
TorchRL 提供了一些实用工具来记录和保存渲染的视频。您可以在此处了解有关这些工具的更多信息。
在以下代码块中,我们附加了一个变换,它将调用 VMAS 包装环境中的 render()
方法,并将帧堆栈保存到 mp4 文件中,该文件的位置由自定义记录器 video_logger 确定。请注意,此代码可能需要一些外部依赖项,例如 torchvision。
if use_vmas and not is_sphinx:
# Replace tmpdir with any desired path where the video should be saved
with tempfile.TemporaryDirectory() as tmpdir:
video_logger = CSVLogger("vmas_logs", tmpdir, video_format="mp4")
print("Creating rendering env")
env_with_render = TransformedEnv(env.base_env, env.transform.clone())
env_with_render = env_with_render.append_transform(
PixelRenderTransform(
out_keys=["pixels"],
# the np.ndarray has a negative stride and needs to be copied before being cast to a tensor
preproc=lambda x: x.copy(),
as_non_tensor=True,
# asking for array rather than on-screen rendering
mode="rgb_array",
)
)
env_with_render = env_with_render.append_transform(
VideoRecorder(logger=video_logger, tag="vmas_rendered")
)
with set_exploration_type(ExplorationType.DETERMINISTIC):
print("Rendering rollout...")
env_with_render.rollout(100, policy=agents_exploration_policy)
print("Saving the video...")
env_with_render.transform.dump()
print("Saved! Saved directory tree:")
video_logger.print_log_dir()
结论和后续步骤¶
在本教程中,我们已经了解了
如何在 TorchRL 中创建竞争性的多组多智能体环境,其规范如何工作,以及它如何与库集成;
如何在 TorchRL 中为多个组创建多智能体网络架构;
我们如何使用
tensordict.TensorDict
来携带多智能体多组数据;我们如何在多智能体多组 MADDPG/IDDPG 训练循环中 связать 所有库组件(收集器、模块、重放缓冲区和损失)。
既然您已经精通多智能体 DDPG,您可以查看 GitHub 存储库中 TorchRL 的所有多智能体实现。这些是许多 MARL 算法的纯代码脚本,例如本教程中看到的算法、QMIX、MADDPG、IQL 以及更多!
还请务必查看我们的教程:使用 TorchRL 的多智能体强化学习 (PPO) 教程。
最后,您可以修改本教程的参数,尝试许多其他配置和场景,成为 MARL 大师。
PettingZoo 和 VMAS 包含更多场景。以下是一些您可以在 VMAS 中尝试的可能场景的视频。
data:image/s3,"s3://crabby-images/fe067/fe06771c11ac4d64578604621d5bd72bd8b4d47a" alt="VMAS scenarios"
脚本总运行时间: (1 分钟 31.974 秒)
估计内存使用量: 323 MB