在上一篇,我们使用了多线程解决问题,今天我们该讨论深度确定性策略梯度(DDPG)
理论讲解
DDPG主要有以下两点:
第一。之所以叫确定性,是因为它使用一个确定的策略。之前是一种随机策略(随机抽取行为),但是在高维度action的情况下,是不可行的,所以这里直接选择最大概率行为。
π θ ( s ) = a \pi_\theta(s)=aπθ(s)=a
第二。使用了双网络
作为DDPG有Critic当前网络,Critic目标网络和DDQN的当前Q网络,目标Q网络。
1. Actor当前网络:负责策略网络参数θ的迭代更新,负责根据当前状态S选择当前动作A,用于和环境交互生成S′,R。
2. Actor目标网络:负责根据经验回放池中采样的下一状态S′选择最优下一动作A′。网络参数θ′定期从θ复制。
3. Critic当前网络:负责价值网络参数w的迭代更新,负责计算负责计算当前Q值Q(S,A,w)。目标Q值y i = R + γ Q ′ ( S ′ , A ′ , w ′ ) y_i=R+γQ^′(S^′,A^′,w^′)yi=R+γQ′(S′,A′,w′)
4. Critic目标网络:负责计算目标Q值中的Q′(S′,A′,w′)部分。网络参数w′定期从w复制。
DDPG除了这4个网络结构,还用到了经验回放
接下来我们来看下DDPG计算公式。
对于Critic当前网络,其损失函数和DQN是类似的,都是均方误差,即:
J ( w ) = 1 m ∑ j = 1 m ( y j − Q ( ϕ ( S j ) , A j , w ) ) 2 J(w) =\frac{1}{m}\sum\limits_{j=1}^m(y_j-Q(\phi(S_j),A_j,w))^2J(w)=m1j=1∑m(yj−Q(ϕ(Sj),Aj,w))2
以下是对Actor的优化公式(θ 为 π 的 参 数 \theta为\pi的参数θ为π的参数):
∇ J ( θ ) ≈ 1 m ∑ j = 1 m [ ∇ θ Q ( s i , a i , w ) ∣ s = s i , a = π θ ( s i ) ] = 1 m ∑ j = 1 m [ ∇ a Q ( s i , a i , w ) ∣ s = s i , a = π θ ( s ) ∇ θ π θ ( s ) ∣ s = s i ] \nabla_J(\theta) \approx \frac{1}{m}\sum\limits_{j=1}^m[\nabla_{\theta}Q_(s_i,a_i,w)|_{s=s_i,a=\pi_{\theta}(s_i)}]= \frac{1}{m}\sum\limits_{j=1}^m[\nabla_{a}Q_(s_i,a_i,w)|_{s=s_i,a=\pi_{\theta}(s)}\nabla_{\theta} \pi_{\theta(s)}|_{s=s_i}]∇J(θ)≈m1j=1∑m[∇θQ(si,ai,w)∣s=si,a=πθ(si)]=m1j=1∑m[∇aQ(si,ai,w)∣s=si,a=πθ(s)∇θπθ(s)∣s=si]
对以上求导求最大,加个负号:
J ( θ ) = − 1 m ∑ j = 1 m Q ( s i , a i , w ) J(\theta) = -\frac{1}{m}\sum\limits_{j=1}^m Q_(s_i,a_i,w)J(θ)=−m1j=1∑mQ(si,ai,w)
算法流程
输入:Actor当前网络,Actor目标网络,Critic当前网络,Critic目标网络,参数分别为θ , θ ′ , w , w ′ θ,θ′,w,w′θ,θ′,w,w′,衰减因子γ γγ, 软更新系数τ ττ,批量梯度下降的样本数m mm,目标Q QQ网络参数更新频率C CC。最大迭代次数T TT。随机噪音函数N \mathcal{N}N
输出:最优Actor当前网络参数θ θθ,Critic当前网络参数w ww
1. 随机初始化θ , w , w ′ = w , θ ′ = θ θ,w, w′=w,θ′=θθ,w,w′=w,θ′=θ。清空经验回放的集合D
2. for i from 1 to T,进行迭代。
a) 初始化S SS为当前状态序列的第一个状态, 拿到其特征向量ϕ ( S ) ϕ(S)ϕ(S)
b) 在Actor当前网络基于状态S SS得到动作A = π θ ( ϕ ( S ) ) + N A=πθ(ϕ(S))+\mathcal{N}A=πθ(ϕ(S))+N
c) 执行动作A,得到新状态S′,奖励R,是否终止状态i s _ e n d is\_endis_end
d) 将ϕ ( S ) , A , R , ϕ ( S ′ ) , i s _ e n d {ϕ(S),A,R,ϕ(S^′),is\_end}ϕ(S),A,R,ϕ(S′),is_end这个五元组存入经验回放集合D
e) S = S ′ S=S'S=S′
f) 从经验回放集合D DD中采样m mm个样本ϕ ( S j ) , A j , R j , ϕ ( S ′ j ) , i s e n d j , j = 1 , 2. , , , m {ϕ(Sj),Aj,Rj,ϕ(S′j),is_endj},j=1,2.,,,mϕ(Sj),Aj,Rj,ϕ(S′j),isendj,j=1,2.,,,m,计算当前目标Q QQ值y j y_jyj:
y j = { R j i s _ e n d j i s t r u e R j + γ Q ′ ( ϕ ( S j ′ ) , π θ ′ ( ϕ ( S j ′ ) ) , w ′ ) i s _ e n d j i s f a l s e y_j= \begin{cases} R_j& {is\_end_j\; is \;true}\\ R_j + \gamma Q'(\phi(S'_j),\pi_{ \theta'}(\phi(S'_j)),w')& {is\_end_j\; is \;false} \end{cases}yj={RjRj+γQ′(ϕ(Sj′),πθ′(ϕ(Sj′)),w′)is_endjistrueis_endjisfalse
g) 使用均方差损失函数1 m ∑ j = 1 m ( y j − Q ( ϕ ( S j ) , A j , w ) ) 2 \frac{1}{m}\sum\limits_{j=1}^m(y_j-Q(\phi(S_j),A_j,w))^2m1j=1∑m(yj−Q(ϕ(Sj),Aj,w))2,通过神经网络的梯度反向传播来更新Critic当前网络的所有参数w ww
h) 使用J ( θ ) = − 1 m ∑ j = 1 m Q ( s i , a i , w ) J(\theta) = -\frac{1}{m}\sum\limits_{j=1}^m Q_(s_i,a_i,w)J(θ)=−m1j=1∑mQ(si,ai,w),通过神经网络的梯度反向传播来更新Actor当前网络的所有参数θ
i) 如果T%C=1,则更新Critic目标网络和Actor目标网络参数:
w ′ ← τ w + ( 1 − τ ) w ′ w' \gets \tau w+ (1-\tau)w'w′←τw+(1−τ)w′
θ ′ ← τ θ + ( 1 − τ ) θ ′ \theta' \gets \tau \theta+ (1-\tau)\theta'θ′←τθ+(1−τ)θ′
j) 如果S ′ S^′S′是终止状态,当前轮迭代完毕,否则转到步骤b)
代码
Pytorch实现代码,参考Tensorflow代码
import torch
import numpy as np
import gym
from torch import nn
from torch.nn import functional as F
import time
##################### hyper parameters ####################
MAX_EPISODES = 2000
MAX_EP_STEPS = 200
LR_A = 0.001 # learning rate for actor
LR_C = 0.002 # learning rate for critic
GAMMA = 0.9 # reward discount
TAU = 0.3 # soft replacement
MEMORY_CAPACITY = 10000
BATCH_SIZE = 32
RENDER = False
ENV_NAME = 'Pendulum-v0'
env = gym.make(ENV_NAME)
env = env.unwrapped
env.seed(1)
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
a_bound = env.action_space.high
def convert_eval_to_target(e, t):
for x in t.state_dict().keys():
eval('t.' + x + '.data.mul_((1-TAU))')
eval('t.' + x + '.data.add_(TAU*e.' + x + '.data)')
class Actor(torch.nn.Module):
def __init__(self):
super(Actor, self).__init__()
self.fc1 = torch.nn.Linear(s_dim, 30)
self.fc1.weight.data.normal_(0, 0.1)
self.fc2 = torch.nn.Linear(30, a_dim)
self.fc1.weight.data.normal_(0, 0.1)
def forward(self, state_input, a_bound):
self.net = F.relu(self.fc1(state_input))
self.a = F.tanh(self.fc2(self.net))
return self.a*a_bound
class Critic(torch.nn.Module):
def __init__(self):
super(Critic, self).__init__()
self.w1_s = nn.Linear(s_dim, 30)
self.w1_s.weight.data.normal_(0, 0.1)
self.w1_a = nn.Linear(a_dim, 30)
self.w1_a.weight.data.normal_(0, 0.1)
self.out = nn.Linear(30,1)
self.out.weight.data.normal_(0, 0.1) # initialization
def forward(self, s, a):
net = F.relu((self.w1_s(s) + self.w1_a(a)))
return self.out(net)
class DDPG(object):
def __init__(self, a_dim, s_dim, a_bound,):
self.memory = torch.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=torch.float32)
self.pointer = 0
self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, torch.FloatTensor(a_bound)
self.actor_eval = Actor()
self.actor_target = Actor()
self.critic_eval = Critic()
self.critic_target = Critic()
self.ae_optimizer = torch.optim.Adam(params=self.actor_eval.parameters(), lr=0.001)
self.ce_optimizer = torch.optim.Adam(params=self.critic_eval.parameters(), lr=0.001)
self.mse = nn.MSELoss()
def return_c_loss(self, S, a, R, S_):
a_ = self.actor_target(S_, self.a_bound).detach()
q = self.critic_eval(S, a)
q_ = self.critic_target(S_, a_).detach()
q_target = R + GAMMA * q_
td_error = self.mse(q_target, q)
return td_error
def return_a_loss(self, S):
a = self.actor_eval(S, self.a_bound)
q = self.critic_eval(S, a)
a_loss = -q.mean()
return a_loss
def choose_action(self, s):
return self.actor_eval(s[np.newaxis, :], self.a_bound)[0]
def learn(self):
# soft target replacement
convert_eval_to_target(self.actor_eval, self.actor_target)
convert_eval_to_target(self.critic_eval, self.critic_target)
indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
bt = self.memory[indices, :]
bs = bt[:, :self.s_dim]
ba = bt[:, self.s_dim: self.s_dim + self.a_dim]
br = bt[:, -self.s_dim - 1: -self.s_dim]
bs_ = bt[:, -self.s_dim:]
a_loss = self.return_a_loss(bs)
c_loss = self.return_c_loss(bs, ba, br, bs_)
self.ae_optimizer.zero_grad()
a_loss.backward()
self.ae_optimizer.step()
self.ce_optimizer.zero_grad()
c_loss.backward()
self.ce_optimizer.step()
def store_transition(self, s, a, r, s_):
transition = torch.FloatTensor(np.hstack((s, a, [r], s_)))
index = self.pointer % MEMORY_CAPACITY # replace the old memory with new memory
self.memory[index, :] = transition
self.pointer += 1
############################### training ####################################
env = gym.make(ENV_NAME)
env = env.unwrapped
env.seed(1)
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
a_bound = env.action_space.high
ddpg = DDPG(a_dim, s_dim, a_bound)
var = 3 # control exploration
t1 = time.time()
for episode in range(MAX_EPISODES):
s = env.reset()
ep_reward = 0
#
for j in range(MAX_EP_STEPS):
# if RENDER:
# env.render()
# Add exploration noise
a = ddpg.choose_action(torch.FloatTensor(s))
a = np.clip(np.random.normal(a.detach().numpy(), var), -2, 2) # add randomness to action selection for exploration
s_, r, done, info = env.step(a)
ddpg.store_transition(s, a, r / 10, s_)
if ddpg.pointer > MEMORY_CAPACITY:
var *= .9995 # decay the action randomness
ddpg.learn()
s = s_
ep_reward += r
if j == MAX_EP_STEPS-1:
print('Episode:', episode, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var, )
# if ep_reward > -300:RENDER = True
break
if episode % 100 == 0:
total_reward = 0
for i in range(10):
state = env.reset()
for j in range(MAX_EP_STEPS):
# env.render()
action = ddpg.choose_action(torch.FloatTensor(state)) # direct action for test
state,reward,done,_ = env.step(action.detach().numpy())
total_reward += reward
if done:
break
ave_reward = total_reward/300
print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)
print('Running time: ', time.time() - t1)
后期补上详细说明