rendered paste body# -*- coding: utf-8 *-*
import cPickle
import numpy
import random
import itertools
class Space(object):
def __init__(self, dimensions):
self.dimensions = dimensions
self.shape = tuple(points for start, stop, points in dimensions)
self.points = []
for start, stop, points in dimensions:
stop = float(stop)
start = float(start)
step_size = (stop - start) / (points - 1)
d = numpy.zeros(points)
for i in range(points):
d[i] = start + step_size * i
self.points.append(d)
def __call__(self, value):
return tuple(numpy.argmin(d - v) for d, v in zip(self.points, value))
def __iter__(self):
return itertools.product(*(range(i) for i in self.shape))
def shape(self):
return self.shape
class TablePolicy(object):
def __init__(self, state_space, action_space):
self.state_space = state_space
self.action_space = action_space
self.table = numpy.random.random(
self.state_space.shape() +
self.action_space.shape()
)
def store_value(self, state, action, new_value):
"""Update the (state, action) -> value relationship.
"""
self.table[
self.action_space(state) +
self.action_space(action)] = new_value
def get_value(self, state, action):
"""Return the value for the (state, action) pair.
"""
return self.table[
self.action_space(state) +
self.action_space(action)]
def get_max_value(self, state):
"""Return the maximum action value for the given state.
"""
return max(self.table[state, a] for a in self.action_space)
def get_best_action(self, state):
"""Returns the best action for this state."""
best_action = None
best_value = float("-inf")
for action in self.action_space:
av = self.get_action_value(state, action)
if av > best_value:
best_value = av
best_action = action
return best_action
@staticmethod
def load(cls, filename):
"""Retrieve a persisted policy from a file."""
handler = open(filename, 'rb')
return cPickle.load(handler)
def save(self, filename):
"""Persist the policy to a file."""
handler = open(filename, 'wb')
cPickle.dump(self.table, handler)
class QLearner(object):
"""Learner class implemeting the Q algorithm."""
def __init__(self, alpha, gamma, alpha_decay=1, min_alpha=None):
"""Initialize the larner.
Arguments:
alpha -- learning rate
gamma -- discount rate
Keyword arguments:
alpha_decay -- learning rate decay
min_alpha -- minimum learning rate
"""
self.alpha = alpha
self.alpha_decay = alpha_decay
self.min_alpha = min_alpha
self.gamma = gamma
def finish(self, policy):
self.alpha *= self.alpha_decay
if self.min_alpha is not None:
self.alpha = max(self.min_alpha, self.alpha)
def update(self, policy, selector, state, action, reward, next_state):
"""Update the (state, action, next_state) -> reward relationship."""
prev_value = policy.get_value(state, action)
max_value_next = policy.get_max_value(next_state)
new_value = (
prev_value + self.alpha *
(reward + self.gamma * max_value_next - prev_value)
)
#print "%f + %f * ( %f + %f * %f - %f )"%(
# prev_value, self.alpha, reward, self.gamma, max_value_next,
# prev_value)
#print "state:", state, prev_value, "->", next_state, new_value,
#print "(r=%i, a=%i)"%(reward, action)
#print "max_next", max_value_next
policy.store_value(state, action, new_value)
class EGreedySelector(object):
"""Epsilon-greedy selector class."""
def __init__(self, epsilon, decay=1, min_epsilon=0):
"""Initialize the selector.
Arguments:
epsilon -- random action rate.
decay -- random action rate decay.
min_epsilon -- minimum random action rate.
"""
self.epsilon = epsilon
self.decay = decay
self.min_epsilon = min_epsilon
super(EGreedySelector, self).__init__(self)
def new_episode(self):
"""Start a new episode."""
self.epsilon = max(self.min_epsilon, self.epsilon * self.decay)
def select_action(self, policy, state):
"""Return an action.
The action returned is the optimal action with probability (1-p),
and a random action with probability p.
"""
if random.random() < self.epsilon:
return random.choice(list(policy.action_space))
return policy.get_best_action(state)
class Agent(object):
def __init__(self, policy, learner=None, selector=None):
self.policy = policy
if learner is None:
learner = QLearner(.1, .9)
self.learner = learner
if selector is None:
selector = EGreedySelector(0.3)
self.selector = selector
def finish(self):
self.learner.finish(self.policy)
self.selector.finish()
def update(self, old_state, action, new_state, reward):
self.learner.update(self.policy, self.selector,
old_state, action, new_state, reward)
def select_action(self, state):
self.selector.select_action(self.policy, state)