Exploration and Exploitation

  • Exploitation
    • Choose the greedy action to get the most reward by exploiting the agent’s current action-value estimates.
    • But by being greedy w.r.t. current action-value estimates, may not actually get the most reward.
    • The model is easier to fall into the sub-optimal behaviour.
  • Exploration
    • Improve the current knowledge about each action, leading to long-term benefit.
    • Improve the accuracy of the estimated action-values, enables an agent to make more informed decisions in the future.
    • The convergence rate of the model will be slow.

\(\epsilon\)-greedy Action Selection

  • It is a simple method to balance exploration and exploitation by choosing between exploration and exploitation randomly.

Definition

\[a_t = \begin{cases} \arg\max_a Q(s_t,a) & P \ge \epsilon, \\ a_{random} & P < \epsilon. \end{cases}\]

Where \(P\) denotes the random number that follow a uniform distribution.

Code of \(\epsilon\)-greedy Action Selection

import numpy as np
import matplotlib.pyplot as plt

# Define Action class
class Actions:
    def __init__(self, m):
        self.m = m
        self.mean = 0
        self.N = 0

    # Add noise to the choosed action
    def choose(self):
        return np.random.randn() + self.m

    # Update the action-value estimate
    def update(self, x):
        self.N += 1
        self.mean = (1 - 1.0 / self.N)*self.mean + 1.0 / self.N * x

def run_experiment(m1, m2, m3, eps, N):
    actions = [Actions(m1), Actions(m2), Actions(m3)]
    data = np.empty(N)

    for i in range(N):
        # epsilon greedy
        p = np.random.rand()
        if p < eps:
            j = np.random.choice(3)
        else:
            j = np.argmax([a.mean for a in actions])
        x = actions[j].choose()
        actions[j].update(x)

        # for the plot
        data[i] = x
    cumulative_average = np.cumsum(data) / (np.arange(N) + 1)
    return cumulative_average

if __name__ == '__main__':

    c_1 = run_experiment(1.0, 2.0, 3.0, 0.1, 100000)
    c_05 = run_experiment(1.0, 2.0, 3.0, 0.05, 100000)
    c_01 = run_experiment(1.0, 2.0, 3.0, 0.01, 100000)

    # log scale plot
    plt.plot(c_1, label ='eps = 0.1')
    plt.plot(c_05, label ='eps = 0.05')
    plt.plot(c_01, label ='eps = 0.01')
    plt.legend()
    plt.xscale('log')
    plt.show()

Results of the code:

Ornstein-Uhlenbeck Process

  • Used in DDPG: to generate temporally correlated exploration for exploration efficiency in physical control problems with inertia.
  • OU过程在时序上具备很好的相关性,可以使agent很好的探索具备动量属性的environment1.
  • OU过程相当于一个存在噪声的均值回归,当\(x_{t-1} > \mu\)时,\(x_t\)减小;当\(x_{t-1} < \mu\)时,\(x_t\)增大,向均值靠拢.

The Ornstein-Uhlenbeck process is a stationary Gauss-Markov process, which means that it is a Gaussian process, a Markov process, and is temporally homogeneous.

Definition

For continuous space:

\[dx_t = \theta (\mu - x_t) dt + \sigma dW_t,\]

where \(\theta > 0\) is parameter, \(\sigma > 0\) is the weight of stochastic noise \(W\), \(\mu\) is the mean value of \(x_t\), and \(W_t\) denotes the Wiener process (维纳过程/布朗运动) that \((W_{t_2} - W_{t_1}) \sim N(0, (t_2 - t_1))\), a corollary useful for simulation is that we can write \(W_{t_2} - W_{t_1} = \sqrt{t_2 - t_1}\cdot N\) for \(t_1 < t_2\), where \(N \sim N(0,1)\) is an independent standard normal variable.

For discrete space:

\[x_t - x_{t-1} = \theta (\mu - x_{t-1}) dt + \sigma (W_t - W_{t-1}).\]

Code of OU Action Noise

import numpy as np
import matplotlib.pyplot as plt

class OrnsteinUhlenbeckActionNoise:
    def __init__(self, mu, sigma=0.2, theta=0.15, dt=1e-2, x0=None):
        self.theta = theta
        self.mu = mu
        self.sigma = sigma
        self.dt = dt
        self.x0 = x0
        self.reset()
 
    def __call__(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
                self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
        self.x_prev = x
        return x
 
    def reset(self):
        self.x_prev = self.x0 if self.x0 is not None else np.zeros_like(self.mu)
 
    def __repr__(self):
        return 'OrnsteinUhlenbeckActionNoise(mu={}, sigma={})'.format(self.mu, self.sigma)

def main():
	ou_noise=OrnsteinUhlenbeckActionNoise(mu=np.zeros(2))
	plt.figure('OU_noise')
	y=[]
	t=np.linspace(0,100,1000)
	for _ in t:
		y.append(ou_noise())
	plt.plot(t,y)
	plt.show()

if __name__=="__main__":
	main()

Results of the code:

Used in action selection:

ou_noise = OrnsteinUhlenbeckActionNoise(mu=np.zeros(env.action_space.shape[0]), sigma=float(args.noise_scale) * np.ones(env.action_space.shape[0]))

def reset_noise(a_noise):
    if a_noise is not None:
        a_noise.reset()

reset_noise(ou_noise)
mu = self.actor(state)
mu = mu.data
if ou_noise is not None:
    mu += self.Tensor(ou_noise()).to(self.device)

Adaptive Parameter Noise

Definition

Add noise to actor network (actor_perturbed) by adjusting the parameters \(W\) of the network:

\[W = W + N*P_{noise},\]

where

  • \(W\) is the parameters of the actor network.
  • \(N\) is a standard normal distribution with the same shape as \(W\).
  • \(P_{noise}\) denotes the value of adaptive parameter noise.

The value of \(P_{noise}\) is adaptive according to the distance metric between the perturbed action \(A_{noise}\) and the normal action \(A_{normal}\) without noise:

\[d = \sqrt{\mathbb{E}\mathbb{E}\left[(A_{noise} - A_{normal})^2\right]},\] \[P_{noise} = \begin{cases} P_{noise}/C & d > \delta \\ P_{noise}*C & d \le \delta, \end{cases}\]

where \(C = 1.01\) is the adaptation coefficient, \(\delta\) denotes the desired value of the distance.

Code of Adaptive Parameter Noise

class AdaptiveParamNoiseSpec:
    def __init__(self, initial_stddev=0.1, desired_action_stddev=0.2, adaptation_coefficient=1.01):
        """
        Note that initial_stddev and current_stddev refer to std of parameter noise,
        but desired_action_stddev refers to (as name notes) desired std in action space
        """
        self.initial_stddev = initial_stddev
        self.desired_action_stddev = desired_action_stddev
        self.adaptation_coefficient = adaptation_coefficient

        self.current_stddev = initial_stddev

    def adapt(self, distance):
        if distance > self.desired_action_stddev:
            # Decrease stddev.
            self.current_stddev /= self.adaptation_coefficient
        else:
            # Increase stddev.
            self.current_stddev *= self.adaptation_coefficient

    def get_stats(self):
        stats = {
            'param_noise_stddev': self.current_stddev,
        }
        return stats

    def __repr__(self):
        fmt = 'AdaptiveParamNoiseSpec(initial_stddev={}, desired_action_stddev={}, adaptation_coefficient={})'
        return fmt.format(self.initial_stddev, self.desired_action_stddev, self.adaptation_coefficient)

param_noise = AdaptiveParamNoiseSpec(initial_stddev=args.noise_scale, desired_action_stddev=args.noise_scale)

# Update param_noise based on distance metric during computing the loss of policy via sampling from buffer
episode_transitions = agent.memory.sample(args.batch_size)
states = torch.stack([transition[0] for transition in episode_transitions], 0)
unperturbed_actions = agent.select_action(states, None, None)
perturbed_actions = torch.stack([transition[1] for transition in episode_transitions], 0)
ddpg_dist = ddpg_distance_metric(perturbed_actions.cpu().numpy(), unperturbed_actions.cpu().numpy())
param_noise.adapt(ddpg_dist)

def ddpg_distance_metric(actions1, actions2):
    """
    Compute "distance" between actions taken by two policies at the same states
    Expects numpy arrays
    """
    diff = actions1-actions2
    mean_diff = np.mean(np.square(diff), axis=0)
    dist = sqrt(np.mean(mean_diff))
    return dist

# reset and update the parameters of actor_perturbed
def perturb_actor_parameters(self, param_noise):
    """Apply parameter noise to actor model, for exploration"""
    hard_update(self.actor_perturbed, self.actor)
    params = self.actor_perturbed.state_dict()
    for name in params:
        if 'ln' in name:
            pass
        param = params[name]
        param += torch.randn(param.shape).to(self.device) * param_noise.current_stddev

    """Apply parameter noise to adversary model, for exploration"""
    hard_update(self.adversary_perturbed, self.adversary)
    params = self.adversary_perturbed.state_dict()
    for name in params:
        if 'ln' in name:
            pass
        param = params[name]
        param += torch.randn(param.shape).to(self.device) * param_noise.current_stddev

def reset_noise(p_noise):
    if p_noise is not None:
        perturb_actor_parameters(param_noise)

reset_noise(param_noise)

# select the action
if param_noise is not None:
    mu = self.actor_perturbed(state)

References