All pastes #2124730 Raw Edit

Anonymous

public text v1 · immutable
#2124730 ·published 2012-03-06 00:39 UTC
rendered paste body
# -*- coding: utf-8 *-*
import cPickle
import numpy
import random
import itertools


class Space(object):

    def __init__(self, dimensions):
        self.dimensions = dimensions
        self.shape = tuple(points for start, stop, points in dimensions)

        self.points = []
        for start, stop, points in dimensions:
            stop = float(stop)
            start = float(start)
            step_size = (stop - start) / (points - 1)
            d = numpy.zeros(points)
            for i in range(points):
                d[i] = start + step_size * i
            self.points.append(d)

    def __call__(self, value):
        return tuple(numpy.argmin(d - v) for d, v in zip(self.points, value))

    def __iter__(self):
        return itertools.product(*(range(i) for i in self.shape))

    def shape(self):
        return self.shape


class TablePolicy(object):

    def __init__(self, state_space, action_space):
        self.state_space = state_space
        self.action_space = action_space
        self.table = numpy.random.random(
            self.state_space.shape() +
            self.action_space.shape()
            )

    def store_value(self, state, action, new_value):
        """Update the (state, action) -> value relationship.
        """
        self.table[
            self.action_space(state) +
            self.action_space(action)] = new_value

    def get_value(self, state, action):
        """Return the value for the (state, action) pair.
        """
        return self.table[
            self.action_space(state) +
            self.action_space(action)]

    def get_max_value(self, state):
        """Return the maximum action value for the given state.
        """
        return max(self.table[state, a] for a in self.action_space)

    def get_best_action(self, state):
        """Returns the best action for this state."""
        best_action = None
        best_value = float("-inf")
        for action in self.action_space:
            av = self.get_action_value(state, action)
            if av > best_value:
                best_value = av
                best_action = action

        return best_action

    @staticmethod
    def load(cls, filename):
        """Retrieve a persisted policy from a file."""
        handler = open(filename, 'rb')
        return cPickle.load(handler)

    def save(self, filename):
        """Persist the policy to a file."""
        handler = open(filename, 'wb')
        cPickle.dump(self.table, handler)


class QLearner(object):

    """Learner class implemeting the Q algorithm."""

    def __init__(self, alpha, gamma, alpha_decay=1, min_alpha=None):
        """Initialize the larner.

        Arguments:
        alpha -- learning rate
        gamma -- discount rate

        Keyword arguments:
        alpha_decay -- learning rate decay
        min_alpha   -- minimum learning rate

        """
        self.alpha = alpha
        self.alpha_decay = alpha_decay
        self.min_alpha = min_alpha
        self.gamma = gamma

    def finish(self, policy):
        self.alpha *= self.alpha_decay
        if self.min_alpha is not None:
            self.alpha = max(self.min_alpha, self.alpha)

    def update(self, policy, selector, state, action, reward, next_state):
        """Update the (state, action, next_state) -> reward relationship."""
        prev_value = policy.get_value(state, action)
        max_value_next = policy.get_max_value(next_state)

        new_value = (
            prev_value + self.alpha *
            (reward + self.gamma * max_value_next - prev_value)
            )

        #print "%f + %f * ( %f + %f * %f - %f )"%(
        #    prev_value, self.alpha, reward, self.gamma, max_value_next,
        #    prev_value)

        #print "state:", state, prev_value, "->", next_state, new_value,
        #print "(r=%i, a=%i)"%(reward, action)
        #print "max_next", max_value_next
        policy.store_value(state, action, new_value)


class EGreedySelector(object):

    """Epsilon-greedy selector class."""

    def __init__(self, epsilon, decay=1, min_epsilon=0):
        """Initialize the selector.

        Arguments:
        epsilon -- random action rate.
        decay   -- random action rate decay.
        min_epsilon -- minimum random action rate.
        """
        self.epsilon = epsilon
        self.decay = decay
        self.min_epsilon = min_epsilon
        super(EGreedySelector, self).__init__(self)

    def new_episode(self):
        """Start a new episode."""
        self.epsilon = max(self.min_epsilon, self.epsilon * self.decay)

    def select_action(self, policy, state):
        """Return an action.

        The action returned is the optimal action with probability (1-p),
        and a random action with probability p.
        """
        if random.random() < self.epsilon:
            return random.choice(list(policy.action_space))

        return policy.get_best_action(state)


class Agent(object):
    def __init__(self, policy, learner=None, selector=None):
        self.policy = policy
        if learner is None:
            learner = QLearner(.1, .9)
        self.learner = learner

        if selector is None:
            selector = EGreedySelector(0.3)
        self.selector = selector

    def finish(self):
        self.learner.finish(self.policy)
        self.selector.finish()

    def update(self, old_state, action, new_state, reward):
        self.learner.update(self.policy, self.selector,
            old_state, action, new_state, reward)

    def select_action(self, state):
        self.selector.select_action(self.policy, state)