深度强化学习之:Tensorflow2 ppo算法实现月球登陆器

强化学习算法种类

深度强化学习之:Tensorflow2ppo算法实现月球登陆器

On-policy vs Off-policy:

  • On-policy: 训练数据由当前 agent 不断与环境交互得到
  • Off-policy: 训练的 agent 和与环境交互的 agent 不是同一个 agent, 即别人与环境交互为我提供训练数据

PPO 算法

PPO (Proximal Policy Optimization) 即近端策略优化. PPO 是一种 on-policy 算法, 通过实现小批量更新, 解决了训练过程中新旧策略的变化差异过大导致不易学习的问题.

深度强化学习之:Tensorflow2ppo算法实现月球登陆器

Actor-Critic 算法

Actor-Critic 算法共分为两部分. 第一部分为策略函数 Actor, 负责生成动作并与环境交互; 第二部分为价值函数, 负责评估 Actor 的表现.

深度强化学习之:Tensorflow2ppo算法实现月球登陆器

Gym

Gym 是一个强化学习会经常用到的包. Gym 里收集了很多游戏的环境. 下面我们就会用 LunarLander-v2 来实现一个自动版的 “阿波罗登月”.

深度强化学习之:Tensorflow2ppo算法实现月球登陆器

安装:

pip install gym

如果遇到报错:

AttributeError: module 'gym.envs.box2d' has no attribute 'LunarLander'

解决办法:

pip install gym[box2d]

LunarLander-v2

LunarLander-v2 是一个月球登陆器. 着陆平台位于坐标 (0, 0). 坐标是状态向量的前两个数字, 从屏幕顶部移动到着陆台和零速度的奖励大约是 100 到 140分. 如果着陆器坠毁或停止, 则回合结束, 获得额外的 -100 或 +100点. 每脚接地为 +10, 点火主机每帧 -0.3分, 正解为200分.

深度强化学习之:Tensorflow2ppo算法实现月球登陆器

启动登陆器

import gym

# 创建环境
env = gym.make("LunarLander-v2")

# 重置环境
env.reset()

# 启动
for i in range(180):

  # 渲染环境
  env.render()

  # 随机移动
  observation, reward, done, info = env.step(env.action_space.sample())

  if i % 10 == 0:
      # 调试输出
      print("观察:", observation)
      print("得分:", reward)

PPO 算法实现月球登录器

PPO

import numpy as np
import tensorflow as tf
from tensorflow_probability.python.distributions import Categorical


class Memory:
  def __init__(self):
      """初始化"""
      self.actions = []  # 行动(共4种)
      self.states = []  # 状态, 由8个数字组成
      self.logprobs = []  # 概率
      self.rewards = []  # 奖励
      self.is_terminals = []  # 游戏是否结束

  def clear_memory(self):
      """清除memory"""
      del self.actions[:]
      del self.states[:]
      del self.logprobs[:]
      del self.rewards[:]
      del self.is_terminals[:]


class ActorCritic(tf.keras.Model):
  def __init__(self, state_dim, action_dim, n_latent_var):
      super(ActorCritic, self).__init__()

      # 行动
      self.action_layer = tf.keras.Sequential([
          # [b, 8] => [b, 64]
          tf.keras.layers.Dense(n_latent_var, activation="tanh"),

          # [b, 64] => [b, 64]
          tf.keras.layers.Dense(n_latent_var, activation="tanh"),

          # [b, 64] => [b, 4]
          tf.keras.layers.Dense(action_dim, activation="softmax")
      ])

      # 评判
      self.value_layer = tf.keras.Sequential([
          # [b, 8] => [b, 64]
          tf.keras.layers.Dense(n_latent_var, activation="tanh"),

          # [b, 64] => [b, 64]
          tf.keras.layers.Dense(n_latent_var, activation="tanh"),

          # [b, 64] => [b, 1]
          tf.keras.layers.Dense(1)
      ])

  def forward(self):
      """前向传播, 由act替代"""

      raise NotImplementedError

  def build(self, input_shape):

      # No weight to train.
      super(ActorCritic, self).build(input_shape)  # Be sure to call this at the end

  def act(self, state, memory):
      """计算行动"""

      # 计算4个方向概率
      action_probs = self.action_layer(state)

      # 通过最大概率计算最终行动方向
      dist = Categorical(action_probs)
      action = dist.sample()

      # 存入memory
      memory.states.append(state)
      memory.actions.append(action)
      memory.logprobs.append(dist.log_prob(action))

      # 返回行动
      return action.numpy()[0]

  def evaluate(self, state, action):
      """
      评估
      :param state: 状态, 2000个一组, 形状为 [2000, 8]
      :param action: 行动, 2000个一组, 形状为 [2000]
      :return:
      """

      # 计算行动概率
      action_probs = self.action_layer(state)
      dist = Categorical(action_probs)  # 转换成类别分布

      # 计算概率密度, log(概率)
      action_logprobs = dist.log_prob(action)

      # 计算熵
      dist_entropy = dist.entropy()
      dist_entropy = tf.squeeze(dist_entropy)

      # 评判
      state_value = self.value_layer(state)
      state_value = tf.squeeze(state_value)  # [2000, 1] => [2000]


      # 返回行动概率密度, 评判值, 行动概率熵
      return action_logprobs, state_value, dist_entropy


class PPO:
  def __init__(self, state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip):
      self.lr = lr  # 学习率
      self.betas = betas  # betas
      self.gamma = gamma  # gamma
      self.eps_clip = eps_clip  # 裁剪, 限制值范围
      self.K_epochs = K_epochs  # 迭代次数

      # 初始化policy
      self.policy = ActorCritic(state_dim, action_dim, n_latent_var)
      self.policy_old = ActorCritic(state_dim, action_dim, n_latent_var)

      self.optimizer = tf.keras.optimizers.Adam(lr=lr)  # 优化器
      self.MseLoss = tf.keras.losses.MeanSquaredError()  # 损失函数

  def update(self, memory):
      """更新梯度"""

      # 蒙特卡罗预测状态回报
      rewards = []
      discounted_reward = 0
      for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
          # 回合结束
          if is_terminal:
              discounted_reward = 0

          # 更新削减奖励(当前状态奖励 + 0.99*上一状态奖励
          discounted_reward = reward + (self.gamma * discounted_reward)

          # 首插入
          rewards.insert(0, discounted_reward)

      # 标准化奖励
      rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
      rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-5)

      # 张量转换
      old_states = tf.stack(memory.states)
      old_actions = tf.stack(memory.actions)
      old_logprobs = tf.stack(memory.logprobs)

      # 迭代优化 K 次:
      for _ in range(self.K_epochs):
          with tf.GradientTape() as tape:

              # 评估
              logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

              # 计算ratios
              ratios = tf.exp(logprobs - old_logprobs)
              ratios = tf.squeeze(ratios)

              # 计算损失
              advantages = rewards - state_values
              surr1 = ratios * advantages
              surr2 = tf.clip_by_value(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
              loss = -tf.minimum(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * dist_entropy

          # 更新梯度
          grads = tape.gradient(loss, self.policy.action_layer.trainable_variables + self.policy.value_layer.trainable_variables)
          self.optimizer.apply_gradients(zip(grads, self.policy.action_layer.trainable_variables + self.policy.value_layer.trainable_variables))

      # 将新的权重赋值给旧policy
      self.policy_old.action_layer = self.policy.action_layer
      self.policy_old.value_layer = self.policy.value_layer

main

import gym
import tensorflow as tf
from PPO import Memory, PPO

############## 超参数 ##############
env_name = "LunarLander-v2"  # 游戏名字
env = gym.make(env_name)
state_dim = 8  # 状态维度
action_dim = 4  # 行动维度
render = False  # 可视化
solved_reward = 230  # 停止循环条件 (奖励 > 230)
log_interval = 20  # print avg reward in the interval
max_episodes = 50000  # 最大迭代次数
max_timesteps = 300  # 最大单次游戏步数
n_latent_var = 64  # 全连接隐层维度
update_timestep = 2000  # 每2000步policy更新一次
lr = 0.002  # 学习率
betas = (0.9, 0.999)  # betas
gamma = 0.99  # gamma
K_epochs = 4  # policy迭代更新次数
eps_clip = 0.2  # PPO 限幅


#############################################

def main():
  # 实例化
  memory = Memory()
  ppo = PPO(state_dim, action_dim, n_latent_var, lr, betas, gamma, K_epochs, eps_clip)

  # 存放
  total_reward = 0
  total_length = 0
  timestep = 0

  # 训练
  for i_episode in range(1, max_episodes + 1):

      # 环境初始化
      state = env.reset()  # 初始化(重新玩)

      # 转换成tensor
      state = tf.convert_to_tensor(state)
      state = tf.reshape(state, [1, 8])

      # 迭代
      for t in range(max_timesteps):
          timestep += 1

          # 用旧policy得到行动
          action = ppo.policy_old.act(state, memory)

          # 行动
          state, reward, done, _ = env.step(action)  # 得到(新的状态,奖励,是否终止,额外的调试信息)

          # 转换成tensor
          state = tf.convert_to_tensor(state)
          state = tf.reshape(state, [1, 8])

          # 更新memory(奖励/游戏是否结束)
          memory.rewards.append(reward)
          memory.is_terminals.append(done)

          # 更新梯度
          if timestep % update_timestep == 0:
              ppo.update(memory)

              # memory清零
              memory.clear_memory()

              # 累计步数清零
              timestep = 0

          # 累加
          total_reward += reward

          # 可视化
          if render:
              env.render()

          # 如果游戏结束, 退出
          if done:
              break

      # 游戏步长
      total_length += t

      # 如果达到要求(230分), 退出循环
      if total_reward >= (log_interval * solved_reward):
          print("########## Solved! ##########")

          # 保存模型
          tf.keras.models.save_model(ppo.policy.action_layer, r"\model\action")
          tf.keras.models.save_model(ppo.policy.value_layer, r"\model\value")

          # 退出循环
          break

      # 输出log, 每20次迭代
      if i_episode % log_interval == 0:

          # 求20次迭代平均时长/收益
          avg_length = int(total_length / log_interval)
          running_reward = int(total_reward / log_interval)

          # 调试输出
          print('Episode {} \t avg length: {} \t average_reward: {}'.format(i_episode, avg_length, running_reward))

          # 清零
          total_reward = 0
          total_length = 0

if __name__ == '__main__':
  main()