PettingZoo 多智能体强化学习环境库教程

1. 项目介绍

PettingZoo是一个多智能体强化学习环境库,由Farama Foundation开发和维护,旨在为多智能体强化学习研究和开发提供标准化的环境接口。它是OpenAI Gym的多智能体扩展,提供了多种多智能体环境,包括合作、竞争和混合场景。

  • GitHub链接https://github.com/Farama-Foundation/PettingZoo
  • Star数量:4.5k+
  • 主要功能
    • 提供多种多智能体环境
    • 与OpenAI Gym兼容的接口
    • 支持多种环境类型:Atari、棋盘游戏、连续控制等
    • 标准化的环境接口
    • 详细的文档和示例
    • 支持自定义环境

2. 安装指南

2.1 系统要求

  • Python 3.6+
  • OpenAI Gym 0.18.0+
  • 支持的操作系统:Linux, macOS, Windows

2.2 安装步骤

  1. 安装PettingZoo:
pip install pettingzoo
  1. 安装特定环境的依赖:
# 安装所有环境的依赖
pip install "pettingzoo[all]"

# 或者安装特定环境的依赖
pip install "pettingzoo[atari]"  # Atari环境
pip install "pettingzoo[classic]"  # 经典棋盘游戏
pip install "pettingzoo[butterfly]"  # 合作环境
pip install "pettingzoo[mamultienv]"  # 多智能体环境
  1. 验证安装:
python -c "import pettingzoo; print('PettingZoo installed successfully')"

3. 核心概念

3.1 环境(Environment)

环境是智能体与之交互的外部世界,PettingZoo提供了多种多智能体环境,每种环境都有其特定的规则和状态空间。

3.2 智能体(Agent)

智能体是在环境中行动的实体,每个智能体都有自己的观察空间、动作空间和奖励信号。

3.3 观察空间(Observation Space)

观察空间定义了智能体可以接收到的环境信息的类型和范围。

3.4 动作空间(Action Space)

动作空间定义了智能体可以执行的动作的类型和范围。

3.5 奖励(Reward)

奖励是环境给予智能体的反馈信号,智能体的目标是最大化累积奖励。

3.6 回合(Episode)

回合是指从环境重置到终止的完整交互过程。

4. 基本使用

4.1 基本环境交互

import pettingzoo.butterfly.pistonball_v6 as pistonball

# 创建环境
env = pistonball.env()

# 重置环境
env.reset()

# 与环境交互
for agent in env.agent_iter():
    observation, reward, done, info = env.last()
    if done:
        action = None
    else:
        # 简单的随机策略
        action = env.action_space(agent).sample()
    env.step(action)

4.2 环境信息

import pettingzoo.butterfly.pistonball_v6 as pistonball

# 创建环境
env = pistonball.env()

# 重置环境
env.reset()

# 获取环境信息
print(f"环境名称: {env.metadata['name']}")
print(f"智能体列表: {env.agents}")
print(f"观察空间: {env.observation_space(env.agents[0])}")
print(f"动作空间: {env.action_space(env.agents[0])}")

4.3 环境渲染

import pettingzoo.butterfly.pistonball_v6 as pistonball

# 创建环境
env = pistonball.env(render_mode="human")

# 重置环境
env.reset()

# 与环境交互
for agent in env.agent_iter():
    observation, reward, done, info = env.last()
    if done:
        action = None
    else:
        action = env.action_space(agent).sample()
    env.step(action)

4.4 保存和加载环境

import pettingzoo.butterfly.pistonball_v6 as pistonball
import pickle

# 创建环境
env = pistonball.env()

# 重置环境
env.reset()

# 保存环境状态
state = env.state()
with open("env_state.pkl", "wb") as f:
    pickle.dump(state, f)

# 加载环境状态
with open("env_state.pkl", "rb") as f:
    loaded_state = pickle.load(f)
env.load_state(loaded_state)

5. 高级功能

5.1 自定义环境

import gym
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
import numpy as np

class CustomEnv(AECEnv):
    def __init__(self):
        super().__init__()
        self.agents = ["agent_0", "agent_1"]
        self.possible_agents = self.agents.copy()
        self.action_spaces = {
            agent: gym.spaces.Discrete(2) for agent in self.agents
        }
        self.observation_spaces = {
            agent: gym.spaces.Box(low=0, high=1, shape=(2,), dtype=np.float32) for agent in self.agents
        }
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.states = {agent: np.zeros(2) for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self.dones = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
    
    def reset(self, seed=None, return_info=False, options=None):
        self.agents = self.possible_agents.copy()
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.states = {agent: np.zeros(2) for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self.dones = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
    
    def step(self, action):
        if self.dones[self.agent_selection]:
            return self._was_done_step(action)
        
        agent = self.agent_selection
        self.states[agent][0] = action
        
        # 计算奖励
        if agent == "agent_0":
            self.rewards[agent] = action
            self.rewards["agent_1"] = 1 - action
        else:
            self.rewards[agent] = action
            self.rewards["agent_0"] = 1 - action
        
        # 检查是否完成
        if np.sum(self.states[agent]) >= 1:
            self.dones[agent] = True
        
        # 选择下一个智能体
        self.agent_selection = self._agent_selector.next()
    
    def observe(self, agent):
        return self.states[agent]
    
    def state(self):
        return np.concatenate([self.states[agent] for agent in self.agents])

# 使用自定义环境
env = CustomEnv()
env.reset()

for agent in env.agent_iter():
    observation, reward, done, info = env.last()
    if done:
        action = None
    else:
        action = env.action_space(agent).sample()
    env.step(action)

5.2 环境包装器

import pettingzoo.butterfly.pistonball_v6 as pistonball
from pettingzoo.utils.wrappers import OrderEnforcingWrapper, AssertOutOfBoundsWrapper

# 创建环境
env = pistonball.env()

# 应用包装器
env = OrderEnforcingWrapper(env)
env = AssertOutOfBoundsWrapper(env)

# 与环境交互
env.reset()

for agent in env.agent_iter():
    observation, reward, done, info = env.last()
    if done:
        action = None
    else:
        action = env.action_space(agent).sample()
    env.step(action)

5.3 并行环境

import pettingzoo.butterfly.pistonball_v6 as pistonball
from pettingzoo.utils.conversions import aec_to_parallel

# 创建AEC环境
aec_env = pistonball.env()

# 转换为并行环境
parallel_env = aec_to_parallel(aec_env)

# 重置环境
observations = parallel_env.reset()

# 与环境交互
while True:
    # 为每个智能体选择动作
    actions = {}
    for agent, obs in observations.items():
        actions[agent] = parallel_env.action_space(agent).sample()
    
    # 执行一步
    observations, rewards, dones, infos = parallel_env.step(actions)
    
    # 检查是否所有智能体都完成
    if all(dones.values()):
        break

5.4 环境注册

from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
import numpy as np
import gym

class CustomEnv(AECEnv):
    def __init__(self):
        super().__init__()
        self.agents = ["agent_0", "agent_1"]
        self.possible_agents = self.agents.copy()
        self.action_spaces = {
            agent: gym.spaces.Discrete(2) for agent in self.agents
        }
        self.observation_spaces = {
            agent: gym.spaces.Box(low=0, high=1, shape=(2,), dtype=np.float32) for agent in self.agents
        }
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.states = {agent: np.zeros(2) for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self.dones = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
    
    def reset(self, seed=None, return_info=False, options=None):
        self.agents = self.possible_agents.copy()
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.states = {agent: np.zeros(2) for agent in self.agents}
        self.rewards = {agent: 0 for agent in self.agents}
        self.dones = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
    
    def step(self, action):
        if self.dones[self.agent_selection]:
            return self._was_done_step(action)
        
        agent = self.agent_selection
        self.states[agent][0] = action
        
        # 计算奖励
        if agent == "agent_0":
            self.rewards[agent] = action
            self.rewards["agent_1"] = 1 - action
        else:
            self.rewards[agent] = action
            self.rewards["agent_0"] = 1 - action
        
        # 检查是否完成
        if np.sum(self.states[agent]) >= 1:
            self.dones[agent] = True
        
        # 选择下一个智能体
        self.agent_selection = self._agent_selector.next()
    
    def observe(self, agent):
        return self.states[agent]
    
    def state(self):
        return np.concatenate([self.states[agent] for agent in self.agents])

# 注册环境
from pettingzoo.utils import registry

registry.register(
    id="custom_env_v0",
    entry_point=CustomEnv,
    kwargs={},
    order_enforce=True,
)

# 使用注册的环境
import pettingzoo

env = pettingzoo.make("custom_env_v0")
env.reset()

for agent in env.agent_iter():
    observation, reward, done, info = env.last()
    if done:
        action = None
    else:
        action = env.action_space(agent).sample()
    env.step(action)

6. 实用案例

6.1 使用PettingZoo环境训练多智能体强化学习模型

场景:使用PettingZoo的PistonBall环境训练多智能体强化学习模型

实现

import pettingzoo.butterfly.pistonball_v6 as pistonball
import numpy as np

# 创建环境
env = pistonball.env()

# 训练参数
num_episodes = 100
max_steps = 1000

# 训练循环
for episode in range(num_episodes):
    env.reset()
    episode_rewards = {agent: 0 for agent in env.agents}
    
    for step in range(max_steps):
        for agent in env.agent_iter():
            observation, reward, done, info = env.last()
            
            if done:
                action = None
            else:
                # 简单的随机策略
                action = env.action_space(agent).sample()
            
            env.step(action)
            episode_rewards[agent] += reward
        
        # 检查是否所有智能体都完成
        if all(env.dones.values()):
            break
    
    print(f"Episode {episode+1}: Rewards = {episode_rewards}")
env.close()

6.2 使用PettingZoo与Stable Baselines3一起训练

场景:使用PettingZoo的环境和Stable Baselines3的PPO算法训练多智能体模型

实现

import pettingzoo.butterfly.pistonball_v6 as pistonball
from pettingzoo.utils.conversions import aec_to_parallel
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
import numpy as np

# 创建AEC环境
aec_env = pistonball.env()

# 转换为并行环境
parallel_env = aec_to_parallel(aec_env)

# 包装为DummyVecEnv
def make_env():
    return parallel_env

vec_env = DummyVecEnv([make_env])

# 训练PPO模型
model = PPO("MlpPolicy", vec_env, verbose=1)
model.learn(total_timesteps=100000)

# 测试模型
obs = vec_env.reset()
for _ in range(1000):
    action, _states = model.predict(obs)
    obs, rewards, dones, info = vec_env.step(action)
    vec_env.render()
    if all(dones):
        obs = vec_env.reset()
vec_env.close()

6.3 使用PettingZoo创建自定义多智能体环境

场景:创建一个简单的多智能体博弈环境

实现

import gym
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
import numpy as np

class PrisonersDilemmaEnv(AECEnv):
    def __init__(self):
        super().__init__()
        self.agents = ["prisoner_0", "prisoner_1"]
        self.possible_agents = self.agents.copy()
        self.action_spaces = {
            agent: gym.spaces.Discrete(2) for agent in self.agents  # 0: 合作, 1: 背叛
        }
        self.observation_spaces = {
            agent: gym.spaces.Discrete(2) for agent in self.agents  # 上一轮对方的动作
        }
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.rewards = {agent: 0 for agent in self.agents}
        self.dones = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.last_actions = {agent: 0 for agent in self.agents}
    
    def reset(self, seed=None, return_info=False, options=None):
        self.agents = self.possible_agents.copy()
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        self.rewards = {agent: 0 for agent in self.agents}
        self.dones = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.last_actions = {agent: 0 for agent in self.agents}
    
    def step(self, action):
        if self.dones[self.agent_selection]:
            return self._was_done_step(action)
        
        agent = self.agent_selection
        self.last_actions[agent] = action
        
        # 检查是否所有智能体都已行动
        if len([a for a, done in self.dones.items() if not done]) == 1:
            # 计算奖励(囚徒困境的奖励矩阵)
            a0_action = self.last_actions["prisoner_0"]
            a1_action = self.last_actions["prisoner_1"]
            
            if a0_action == 0 and a1_action == 0:
                # 双方合作
                self.rewards["prisoner_0"] = 3
                self.rewards["prisoner_1"] = 3
            elif a0_action == 0 and a1_action == 1:
                # 囚徒0合作,囚徒1背叛
                self.rewards["prisoner_0"] = 0
                self.rewards["prisoner_1"] = 5
            elif a0_action == 1 and a1_action == 0:
                # 囚徒0背叛,囚徒1合作
                self.rewards["prisoner_0"] = 5
                self.rewards["prisoner_1"] = 0
            else:
                # 双方背叛
                self.rewards["prisoner_0"] = 1
                self.rewards["prisoner_1"] = 1
            
            # 结束回合
            for a in self.agents:
                self.dones[a] = True
        
        # 选择下一个智能体
        self.agent_selection = self._agent_selector.next()
    
    def observe(self, agent):
        # 返回对方上一轮的动作
        other_agent = [a for a in self.agents if a != agent][0]
        return self.last_actions[other_agent]
    
    def state(self):
        return np.array([self.last_actions[agent] for agent in self.agents])

# 使用自定义环境
env = PrisonersDilemmaEnv()

# 运行多个回合
for episode in range(10):
    env.reset()
    episode_rewards = {agent: 0 for agent in env.agents}
    
    for agent in env.agent_iter():
        observation, reward, done, info = env.last()
        
        if done:
            action = None
        else:
            # 随机策略
            action = env.action_space(agent).sample()
        
        env.step(action)
        episode_rewards[agent] += reward
    
    print(f"Episode {episode+1}: Rewards = {episode_rewards}")
env.close()

7. 性能优化

7.1 环境优化

  • 使用向量化环境(VecEnv)并行运行多个环境
  • 对于不需要渲染的训练,关闭渲染
  • 选择合适的环境包装器,如OrderEnforcingWrapper、AssertOutOfBoundsWrapper等

7.2 算法优化

  • 根据环境类型选择合适的算法(离散动作空间:DQN、PPO;连续动作空间:A2C、PPO)
  • 调整算法超参数,如学习率、批量大小、gamma等
  • 使用适当的网络架构,如CNN用于图像输入

7.3 计算优化

  • 使用GPU加速训练
  • 调整batch_size和nsteps以充分利用GPU
  • 使用多进程并行训练
  • 对于大型模型,考虑使用梯度裁剪和学习率调度

8. 常见问题与解决方案

8.1 环境重置问题

问题:环境重置后智能体列表为空

解决方案

  • 确保在reset方法中正确重置智能体列表
  • 使用OrderEnforcingWrapper包装环境,它会检查环境的使用顺序是否正确

8.2 动作空间不匹配

问题:智能体的动作不在动作空间范围内

解决方案

  • 使用AssertOutOfBoundsWrapper包装环境,它会检查动作是否在动作空间范围内
  • 在选择动作前检查动作空间的边界

8.3 训练速度慢

问题:训练过程速度慢,迭代时间长

解决方案

  • 使用向量化环境并行训练
  • 使用GPU加速
  • 调整批量大小和学习率
  • 对于大型环境,考虑使用更高效的预处理

8.4 内存不足

问题:训练过程中内存不足

解决方案

  • 减少并行环境的数量
  • 减小批量大小
  • 使用更小的网络架构

9. 总结

PettingZoo作为一个多智能体强化学习环境库,为多智能体强化学习研究和开发提供了标准化的环境接口。它不仅提供了多种多智能体环境,还支持自定义环境和与其他强化学习库的集成。

通过本教程的学习,您应该能够:

  • 理解PettingZoo的核心概念和功能
  • 成功安装和配置PettingZoo
  • 使用不同的多智能体环境
  • 自定义多智能体环境
  • 与其他强化学习库集成
  • 优化训练性能
  • 解决常见问题

PettingZoo的设计理念是提供一个统一的多智能体环境接口,使得不同的多智能体强化学习算法可以在相同的环境上进行比较和评估。它的出现为多智能体强化学习研究和应用提供了重要的工具支持。