Exploration and Exploitation in Policy Optimization
Exploration and Exploitation
- Exploitation
- Choose the greedy action to
get the most reward
by exploiting the agent’s current action-value estimates. - But by being greedy w.r.t. current action-value estimates, may not actually get the most reward.
- The model is easier to fall into the sub-optimal behaviour.
- Choose the greedy action to
- Exploration
- Improve the current knowledge about each action, leading to
long-term benefit
. - Improve the accuracy of the estimated action-values, enables an agent to make more informed decisions in the future.
- The convergence rate of the model will be slow.
- Improve the current knowledge about each action, leading to
\(\epsilon\)-greedy Action Selection
- It is a simple method to balance exploration and exploitation by choosing between exploration and exploitation randomly.
Definition
\[a_t = \begin{cases} \arg\max_a Q(s_t,a) & P \ge \epsilon, \\ a_{random} & P < \epsilon. \end{cases}\]Where \(P\) denotes the random number
that follow a uniform distribution
.
Code of \(\epsilon\)-greedy Action Selection
import numpy as np
import matplotlib.pyplot as plt
# Define Action class
class Actions:
def __init__(self, m):
self.m = m
self.mean = 0
self.N = 0
# Add noise to the choosed action
def choose(self):
return np.random.randn() + self.m
# Update the action-value estimate
def update(self, x):
self.N += 1
self.mean = (1 - 1.0 / self.N)*self.mean + 1.0 / self.N * x
def run_experiment(m1, m2, m3, eps, N):
actions = [Actions(m1), Actions(m2), Actions(m3)]
data = np.empty(N)
for i in range(N):
# epsilon greedy
p = np.random.rand()
if p < eps:
j = np.random.choice(3)
else:
j = np.argmax([a.mean for a in actions])
x = actions[j].choose()
actions[j].update(x)
# for the plot
data[i] = x
cumulative_average = np.cumsum(data) / (np.arange(N) + 1)
return cumulative_average
if __name__ == '__main__':
c_1 = run_experiment(1.0, 2.0, 3.0, 0.1, 100000)
c_05 = run_experiment(1.0, 2.0, 3.0, 0.05, 100000)
c_01 = run_experiment(1.0, 2.0, 3.0, 0.01, 100000)
# log scale plot
plt.plot(c_1, label ='eps = 0.1')
plt.plot(c_05, label ='eps = 0.05')
plt.plot(c_01, label ='eps = 0.01')
plt.legend()
plt.xscale('log')
plt.show()
Results of the code:
Ornstein-Uhlenbeck Process
- Used in DDPG: to generate temporally correlated exploration for
exploration efficiency in physical control problems with inertia
. - OU过程在时序上具备很好的相关性,可以使agent很好的探索具备动量属性的environment1.
- OU过程相当于一个存在噪声的均值回归,当\(x_{t-1} > \mu\)时,\(x_t\)减小;当\(x_{t-1} < \mu\)时,\(x_t\)增大,向均值靠拢.
The Ornstein-Uhlenbeck process is a stationary Gauss-Markov process
, which means that it is a Gaussian process, a Markov process, and is temporally homogeneous.
Definition
For continuous space:
\[dx_t = \theta (\mu - x_t) dt + \sigma dW_t,\]where \(\theta > 0\) is parameter, \(\sigma > 0\) is the weight of stochastic noise \(W\), \(\mu\) is the mean value of \(x_t\), and \(W_t\) denotes the Wiener process (维纳过程/布朗运动) that \((W_{t_2} - W_{t_1}) \sim N(0, (t_2 - t_1))\), a corollary useful for simulation is that we can write \(W_{t_2} - W_{t_1} = \sqrt{t_2 - t_1}\cdot N\) for \(t_1 < t_2\), where \(N \sim N(0,1)\) is an independent standard normal variable.
For discrete space:
\[x_t - x_{t-1} = \theta (\mu - x_{t-1}) dt + \sigma (W_t - W_{t-1}).\]Code of OU Action Noise
import numpy as np
import matplotlib.pyplot as plt
class OrnsteinUhlenbeckActionNoise:
def __init__(self, mu, sigma=0.2, theta=0.15, dt=1e-2, x0=None):
self.theta = theta
self.mu = mu
self.sigma = sigma
self.dt = dt
self.x0 = x0
self.reset()
def __call__(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
self.x_prev = x
return x
def reset(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)
def __repr__(self):
return 'OrnsteinUhlenbeckActionNoise(mu={}, sigma={})'.format(self.mu, self.sigma)
def main():
ou_noise=OrnsteinUhlenbeckActionNoise(mu=np.zeros(2))
plt.figure('OU_noise')
y=[]
t=np.linspace(0,100,1000)
for _ in t:
y.append(ou_noise())
plt.plot(t,y)
plt.show()
if __name__=="__main__":
main()
Results of the code:
Used in action selection:
ou_noise = OrnsteinUhlenbeckActionNoise(mu=np.zeros(env.action_space.shape[0]), sigma=float(args.noise_scale) * np.ones(env.action_space.shape[0]))
def reset_noise(a_noise):
if a_noise is not None:
a_noise.reset()
reset_noise(ou_noise)
mu = self.actor(state)
mu = mu.data
if ou_noise is not None:
mu += self.Tensor(ou_noise()).to(self.device)
Adaptive Parameter Noise
Definition
Add noise to actor network (actor_perturbed
) by adjusting the parameters \(W\) of the network:
where
- \(W\) is the parameters of the actor network.
- \(N\) is a standard normal distribution with the same shape as \(W\).
- \(P_{noise}\) denotes the value of adaptive parameter noise.
The value of \(P_{noise}\) is adaptive according to the distance metric between the perturbed action \(A_{noise}\) and the normal action \(A_{normal}\) without noise:
\[d = \sqrt{\mathbb{E}\mathbb{E}\left[(A_{noise} - A_{normal})^2\right]},\] \[P_{noise} = \begin{cases} P_{noise}/C & d > \delta \\ P_{noise}*C & d \le \delta, \end{cases}\]where \(C = 1.01\) is the adaptation coefficient, \(\delta\) denotes the desired value of the distance.
Code of Adaptive Parameter Noise
class AdaptiveParamNoiseSpec:
def __init__(self, initial_stddev=0.1, desired_action_stddev=0.2, adaptation_coefficient=1.01):
"""
Note that initial_stddev and current_stddev refer to std of parameter noise,
but desired_action_stddev refers to (as name notes) desired std in action space
"""
self.initial_stddev = initial_stddev
self.desired_action_stddev = desired_action_stddev
self.adaptation_coefficient = adaptation_coefficient
self.current_stddev = initial_stddev
def adapt(self, distance):
if distance > self.desired_action_stddev:
# Decrease stddev.
self.current_stddev /= self.adaptation_coefficient
else:
# Increase stddev.
self.current_stddev *= self.adaptation_coefficient
def get_stats(self):
stats = {
'param_noise_stddev': self.current_stddev,
}
return stats
def __repr__(self):
fmt = 'AdaptiveParamNoiseSpec(initial_stddev={}, desired_action_stddev={}, adaptation_coefficient={})'
return fmt.format(self.initial_stddev, self.desired_action_stddev, self.adaptation_coefficient)
param_noise = AdaptiveParamNoiseSpec(initial_stddev=args.noise_scale, desired_action_stddev=args.noise_scale)
# Update param_noise based on distance metric during computing the loss of policy via sampling from buffer
episode_transitions = agent.memory.sample(args.batch_size)
states = torch.stack([transition[0] for transition in episode_transitions], 0)
unperturbed_actions = agent.select_action(states, None, None)
perturbed_actions = torch.stack([transition[1] for transition in episode_transitions], 0)
ddpg_dist = ddpg_distance_metric(perturbed_actions.cpu().numpy(), unperturbed_actions.cpu().numpy())
param_noise.adapt(ddpg_dist)
def ddpg_distance_metric(actions1, actions2):
"""
Compute "distance" between actions taken by two policies at the same states
Expects numpy arrays
"""
diff = actions1-actions2
mean_diff = np.mean(np.square(diff), axis=0)
dist = sqrt(np.mean(mean_diff))
return dist
# reset and update the parameters of actor_perturbed
def perturb_actor_parameters(self, param_noise):
"""Apply parameter noise to actor model, for exploration"""
hard_update(self.actor_perturbed, self.actor)
params = self.actor_perturbed.state_dict()
for name in params:
if 'ln' in name:
pass
param = params[name]
param += torch.randn(param.shape).to(self.device) * param_noise.current_stddev
"""Apply parameter noise to adversary model, for exploration"""
hard_update(self.adversary_perturbed, self.adversary)
params = self.adversary_perturbed.state_dict()
for name in params:
if 'ln' in name:
pass
param = params[name]
param += torch.randn(param.shape).to(self.device) * param_noise.current_stddev
def reset_noise(p_noise):
if p_noise is not None:
perturb_actor_parameters(param_noise)
reset_noise(param_noise)
# select the action
if param_noise is not None:
mu = self.actor_perturbed(state)