Обучение с подкреплением

Reinforcement learning (RL)¶

В заключительной лекции нашего курса мы познакомимся с ещё одной парадигмой машинного обучения, обучением с подкреплением, которая в настоящее время крайне активно развивается и имеет свои приложения в различных предметных областях. Для удобства и непротиворечивости определения данной парадигмы рассмотрим сперва ключевые особенности уже известных нам подходов. В предыдущих лекциях мы с вами познакомились с такими парадигмами машинного обучения, как обучение с учителем (supervised learning) и обучение без учителя (unsupervised learning):

Обучение с учителем:

  • Существует исчерпывающий набор данных (датасет), который эмпирически задает распределение признакового описания объектов в решаемой задаче.
  • Для всех объектов в датасете известно соответствующее значение целевой переменной (может быть меткой номера класса, вещественным числом и тому подобным).
  • Нужно подобрать неизвестную функцию, которая ставит в соответствие признаковому описанию некоторого объекта число — целевую переменную.
  • Можно построить дифференцируемую функцию (так называемую функцию потерь, loss), которая характеризует качество подобранной функции как аппроксиматора искомой зависимости между признаками и целевой переменной.

Типичный пример — решение задачи классификации рукописных цифр в датасете MNIST при помощи CNN.

Обучение без учителя:

  • Существует исчерпывающий набор данных (датасет), который эмпирически задает распределение признакового описания объектов в решаемой задаче.
  • Для объектов в датасете неизвестно соответствующее значение целевой переменной.
  • Нужно преобразовать имеющиеся признаковые описания объектов в новое представление, в котором будут видны закономерности и скрытые связи между ними.

Типичный пример — использование метода k-средних для решения задачи кластеризации.

Легко заметить, что ключевой особенностью этих ML-парадигм является работа с датасетом как с исчерпывающей и не изменяющейся сущностью. По сути, именно набор данных и определяет решаемую в этих подходах задачу.

Обучение с подкреплением¶

Drawing
Обучение с учителем vs обучение с подкреплением

Легко придумать постановку задачи, в которой невозможно составить такой исчерпывающий датасет. Например, разрабатывая автопилот для автомобиля, мы вряд ли сможем составить обучающую выборку, в которой будут перечислены все возможные дорожные ситуации в условиях плотного городского трафика, для которых к тому же будет известна разметка "ground true" целевой переменной о предпочтительном решении автопилота в них.

Невозможность собрать исчерпывающий датасет во многом может быть связана с тем, как в примере с автопилотом автомобиля, что результат работы алгоритма может существенно изменить состояние окружающей его среды, которая и является источником данных. При этом, не имея всей картины возможных действий и их последствий, мы можем предоставить нашему алгоритму возможность взаимодействовать с источником данных (средой) и постфактум оценить действия нашего алгоритма (агента), и, используя эту оценку, подстроить алгоритм так, чтобы он чаще совершал желательные действия, и реже — нежелательные. В литературе такую оценку называют вознаграждением (reward). В этом случае обучение строится таким образом, чтобы алгоритм (агент) стремился максимизировать получаемое вознаграждение.

Функция вознаграждения (reward) в этом случае является аналогом функции потерь из парадигмы обучения с учителем, но имеет ряд отличий:

  • может быть определена как для одного предсказания алгоритма (решения аганта), так и для серии предсказаний

Для вычисления функции потерь в парадигме обучения с учителем нам было достаточно одного ответа модели для сравнения с целевой переменной. В свою очередь, reward может быть связан с конкретной последовательностью предсказаний алгоритма (решений). Например: получению очков за победу в шахматной партии соответствует выбор целой последовательности правильных ходов игрока; безаварийная поездка беспилотного автомобиля между двумя точками на карте раскладывается на огромное количество решений о замедлении, ускорении и поворотах.

  • не обязана быть дифференцируемой

Весь используемый нами ранее аппарат обучения с учителем основывался на том, что при помощи градиентных методов мы подбирали такие веса модели машинного обучения, чтобы дифференцируемая loss-функция имела при них локальный минимум. Некоторые постановки задач, такие как рассмотренные ранее задачи построения автопилота или написание играющего в шахматы алгоритма, не допускают возможности составить дифференцируемую функцию награды. Убирая требование дифференцируемости reward-функции, мы лишаемся возможности применить известные нам алгоритмы оптимизации, такие как стохастический градиентный спуск и его улучшения.

Если у нас есть набор примеров с правильными ответами, то мы используем эту выборку для обучения нашей модели, а после обучения применяем её к неразмеченным данным. Именно этот подход мы использовали, когда обучали классификатор для MNIST, подавая на вход сети картинки с изображениями рукописных цифр и считая градиент для подстройки весов на основе разницы между известным лэйблом цифры и выходом нейросети.

Таким образом, мы можем резюмировать особенности парадигмы обучения с подкреплением.

Обучение с подкрплением:

  • Существует произвольный источник данных — среда.
  • Среда не обязана быть определена в виде исчерпывающего и стационарного датасета.
  • Среда может изменяться после каждого предсказания алгоритма (решения агента).
  • Существует некоторый алгоритм (агент), который строит свое предсказание на основе данных, полученных из среды.
  • Можно сформулировать количественную оценку характера взаимодействия агента со средой (reward, функцию награды).
  • Функция награды не обязана быть дифференцируемой по весам модели машинного обучения (агента).
  • Функция награды может быть определена для последовательности решений агента.

Типичные примеры — оптимизация дискретных функционалов, задачи оптимального управления, RLHF.

Достаточную интуицию о парадигме обучения с подкреплением и об области решаемых с её помощью задач можно сформировать, если поставить знак равенства между терминами "обучение с подкреплением" и "обучение методом проб и ошибок".

Обучение с подкреплением

Примеры задач, решаемых с использованием RL¶

  • Управление беспилотным транспортом, задачи математической теории оптимального управления

[arxiv] 🎓 Deep Reinforcement Learning for Autonomous Driving: A Survey
[arxiv] 🎓 Reinforcement Learning and Control as Probabilistic Inference: Tutorial and Review
[git] 🐾 Lecture notes and exercises for control theory course

Управление системой охлаждения датацентра при помощи методов обучения с подкреплением
Source: Safety-first AI for autonomous data centre cooling and industrial control

В настоящее время большое число исследовательских групп занято разработкой решений для создания беспилотного транспорта. Алгоритм принятия решений в таких решениях часто основан на алгоритмах обучения с подкреплением (Deep reinforcement learning on-board an autonomous car ✏️[blog]):

In [1]:
from IPython.display import HTML
from base64 import b64encode

# Source: https://wayve.ai/wp-content/uploads/2022/06/ezgif.com-gif-maker1.mp4
!wget -qN https://edunet.kea.su/repo/EduNet-web_dependencies/dev-2.0/L15/rl_control_system.mp4

mp4 = open("rl_control_system.mp4", "rb").read()
data_url = f"data:video/mp4;base64,{b64encode(mp4).decode()}"
HTML(f"<video width=1000 controls><source src={data_url} type='video/mp4'></video>")
Out[1]:
  • Оптимизация недифференцируемых функционалов

[article] 🎓 Reinforcement learning for combinatorial optimization: A survey
[arxiv] 🎓 Neural Combinatorial Optimization with Reinforcement Learning

  • Создание игровых агентов

[arxiv] 🎓 Mastering Chess and Shogi by Self-Play with a General Reinforcement Learning Algorithm
[blog] ✏️ AlphaGo

The trained network is used to guide a search algorithm – known as Monte-Carlo Tree Search (MCTS) — to select the most promising moves in games. For each move, AlphaZero searches only a small fraction of the positions considered by traditional chess engines. In Chess, for example, it searches only 60 thousand positions per second, compared to roughly 60 million for Stockfish.
Source: AlphaZero: Shedding new light on chess, shogi, and Go
  • Обработка естественного языка

[book] 📚 Reinforcement Learning for NLP
[arxiv] 🎓 Training language models to follow instructions with human feedback

Source: RLHF: Reinforcement Learning from Human Feedback
  • Прочие приложения:

    • Рекомендательные системы
    • Трейдинг, принятие решений в условиях неопределенности, управление инвестициями
    • Drug discovery
    • (Neural Architecture Search)

Примеры работ с использованием обучения с подкреплением у выпускников курса¶

В работе Управление движением сферического робота с помощью нейронной сети решалась задача подбора управляющего сигнала для робота, позволяющего ему двигаться по определенной заранее траектории на плоскости.

Сферический робот
Source: EduNet-archive: RL sphere robot
Сравнение целевых траекторий движения с полученными вследствие управления роботом при помощи RL-алгоритма
Source: EduNet-archive: RL sphere robot

Подробнее:

  • [video] 📺 Видеозапись выступления
  • [slides] 📊 Презентация
  • [git] 🐾 Код

В работе Самоюстирующиеся оптические системы на основе машинного обучения с подкреплением был разработан управляющий алгоритм, позволяющий автоматизировать и ускорить процесс настройки сложной оптической экспериментальной установки.

Схема оптической установки, генерирующей рентгеновские лучи
Source: EduNet-archive: NN for controlling z-axis
Результаты работы алгоритма стабилизации выходного сигнала на основе RL-алгоритма
Source: EduNet-archive: NN for controlling z-axis

Подробнее:

  • [video] 📺 Видеозапись выступления
  • [slides] 📊 Презентация
  • [git] 🐾 Код

Устоявшаяся терминология¶

Drawing

Агент и среда — ключевые понятия в обучении с подкреплением.

Агент — программа, принимающая решение о дальнейших действиях на основе информации о состоянии среды.

Среда — это всё, с чем агент может прямо или косвенно взаимодействовать. Среда обладает состоянием (State). Агент может влиять на среду, совершая какие-то действия (Actions), переводя среду при этом из одного состояния в другое и получая какое-то вознаграждение. Среда описывается пространством возможных состояний. Конкретное состояние — вектор в этом пространстве.

В зависимости от конкретной задачи агент может наблюдать либо полное состояние среды, либо только некоторую его часть. Во втором случае агенту может потребоваться какое-то внутреннее представление полного состояния, которое будет обновляться по мере получения новых данных.

Функция награды (reward) — вводимая программистом формула вычисления ценности действия на основе финального результата, ожидания этого результата, промежуточных результатов и любых других параметров, которые будут подсказывать путь к наилучшей последовательности действий агента. Это некоторый аналог функции потерь, без которой непонятно, чему учиться. Например, в шахматах истинная награда — это победа, но взятая фигура соперника тоже ценна и должна увеличивать награду, если мы хотим подсказать агенту, что брать чужие фигуры полезно. Может ли после этого агент получить мат, позарившись на незащищенную фигуру? Да, ровно как и неопытный шахматный игрок.

Попытка передать через дополнительные неосновные награды подсказки к получению основной награды называется reward shaping.

Эпизод — серия взаимодействий агента со средой (например, партия в игре).

Stateless environment in RL¶

Рассмотрим простейший пример среды для задачи обучения с подкреплением:

  • среда стационарна, т.е. не изменяется во времени
  • среда не изменяется после действий агента

Фактически можно сказать, что такая среда всегда находится в одном состоянии или вообще не имеет состояния. Для нас будет важно, что выдаваемый средой reward не обязан быть дифференцируемым, и потому работа агента по взаимодействию с такой вырожденной средой не может быть корректно решена в рамках парадигмы обучения с учителем.

Задача о многоруких бандитах¶

Примером stateless-среды может послужить классическая задача о многоруких бандитах 📚[wiki].

Согласно условиям данной задачи мы имеем:

  • $K$ игровых автоматов, которые мы можем дернуть за ручку в надежде получить выигрыш,
  • $k$-тый игровой автомат возвращает выигрыш с вероятностью $p_k$,
  • вероятности выигрыша $\{p_k\}$ не изменяются во времени и вследствие решений игрока,
  • вероятности выигрыша неизвестны игроку.

Задача: игроку необходимо придумать такую стратегию нажатия на рычаги многорукого бандита, которая бы обеспечивала максимум возможного выигрыша.

Для того, чтобы приступить к решению этой задачи, нам необходимо создать модель описанной среды. Воспользуемся библиотекой Gymnasium, которую часто используют при решении задач методами обучения с подкреплением.

Gymnasium framework¶

Перед тем, как приступить к созданию интересующей нас среды, разберемся, что такое Gymnasium и какие возможности он предоставляет для обучения с подкреплением.

Gymnasium (Gym) — это набор инструментов для разработки и сравнения алгоритмов RL, позволяющий стандартизировать взаимодействие между разными алгоритмами RL и средами. Также он предоставляет набор стандартных сред, которые могут, в том числе, использоваться для бенчмаркинга.

  • [git] 🐾 Gymnasium
  • [doc] 🛠️ Документация
  • [colab] 🥨 Colab demonstration

Для начала рассмотрим устройство среды Gym в целом. Среда — это некоторая модель мира, отвечающая за предоставление наблюдений и вознаграждений, в которой существует агент. Состояние среды будет изменяться в зависимости от действий агента.

Рассмотрим для начала стандартную среду MountainCar, в которой стоит задача — довести машину до вершины горы.

In [2]:
!pip install -q gymnasium
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 953.9/953.9 kB 8.6 MB/s eta 0:00:00
In [3]:
import gym as openaigym
import gymnasium as gym

env = gym.make("MountainCar-v0")

В Gym среды представлены классом gym.Env, который является унифицированным интерфейсом среды со следующими атрибутами и методами:

  • action_space: описание действий, допустимых в данной среде;
  • observation_space: структура и допустимые значения наблюдений состояния среды;
  • reset(): сбрасывает среду и возвращает случайное исходное состояние;
  • step(action): метод, продвигающий развитие окружающей среды на одно действие и возвращающий информацию о результате этого действия, а именно:
    • observation — следующее наблюдение;
    • reward — локальное вознаграждение;
    • done — флаг конца эпизода.

Также в классе gym.Env есть несколько вспомогательных методов, например, render(), позволяющий представить наблюдение в понятной человеку форме, но мы не будем их касаться в нашей лекции.

Пространства действий и наблюдений¶

Действия агента могут быть дискретными, непрерывными и комбинированными. Дискретные действия представлены фиксированным набором и взаимно исключают друг друга, например, нажатие/отпускание клавиши. Непрерывным действиям соответствуют значения в некотором диапазоне, например, поворот руля от -720 до 720 градусов. В среде мы не ограничены каким-то одним действием, и вполне допустимо одновременное нажатие нескольких кнопок и одновременный поворот руля. Аналогичным образом наблюдения могут быть дискретными (лампочка включена/выключена) или непрерывными (тензоры, соответствующие цветным изображениям).

Давайте посмотрим на то, как выглядят пространства действий и наблюдений в среде MountainCar:

In [4]:
from IPython.display import clear_output


# Action and observation space
action_space = env.action_space
obs_space = env.observation_space

clear_output()

print(f"The action space: {action_space}")
print(f"The observation space: {obs_space}")
The action space: Discrete(3)
The observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)

Мы видим, что пространство наблюдений и действий представлено некоторыми классами Box и Discrete соответственно. Что же это за классы? Для объединения нескольких пространств действий (например, непрерывных и дискретных) в одно действие, в Gym существует специальный класс контейнеров.

  • Класс Discrete представляет набор $n$ взаимоисключающих элементов. Например, Discrete(n=4) может быть использован для пространства действий с 4 направлениями движения ($\leftarrow \downarrow \rightarrow \uparrow$).
  • Класс Box представляет n-мерный тензор рациональных чисел в некотором диапазоне [low, high]. Например, нажатие педали газа от минимального значения 0 до максимального — 1, можно закодировать как с, аналогично наблюдение экрана игры, можно закодировать как Box(low=0, high=255, shape=(100, 50, 3), dtype=np.float32).

Давайте посмотрим на диапазон допустимых значений пространства наблюдений в среде MountainCar:

In [5]:
print("Upper Bound for Env Observation", env.observation_space.high)
print("Lower Bound for Env Observation", env.observation_space.low)
Upper Bound for Env Observation [0.6  0.07]
Lower Bound for Env Observation [-1.2  -0.07]

Обе структуры данных происходят от класса gym.Space

In [6]:
print(type(env.action_space))
print(type(env.observation_space))
<class 'gymnasium.spaces.discrete.Discrete'>
<class 'gymnasium.spaces.box.Box'>

Также стоит упомянуть еще один дочерний класс gym.Space — Tuple, позволяющий объединять несколько экземпляров класса gym.Space вместе. Благодаря этому классу мы можем создавать пространства действий и наблюдений любой сложности.

В классе gym.Space реализованы методы, позволяющие взаимодействовать с пространствами действий и наблюдений:

  • sample(): возвращает случайный пример из пространства наблюдений,
  • contains(x): проверяет принадлежность аргумента к пространству наблюдений.

Давайте возьмем случайное действие, доступное нам в исходном состоянии среды MountainCar:

In [7]:
random_action = env.action_space.sample()  # random number from 0 to 2
print(random_action)
1

Взаимодействие со средой¶

Давайте попробуем совершить случайное действие, которое мы выбрали выше, для этого перезапустим среду, чтобы вернуться в изначальное состояние, и сделаем шаг с помощью метода step():

In [8]:
# Reset the environment and see the initial observation
obs, info = env.reset()
print(f"The initial observation is {obs}")

# Take the action and get the new observation space
new_obs, reward, done, truncated, info = env.step(random_action)
print(f"The new observation is {new_obs}")
The initial observation is [-0.57499045  0.        ]
The new observation is [-5.7460654e-01  3.8391253e-04]

Создание своей среды¶

Одна из простых задач обучения с подкреплением — задача о многоруких бандитах. В этой задаче состояние среды не меняется, а у агента есть фиксированный набор действий, которые он может выполнять.

Эту задачу можно представить таким образом: вы игрок в казино с игровыми автоматами, в которых с различной вероятностью возможно выиграть приз различной величины. Ваша задача как агента — выиграть как можно больше денег, бросая монетки в автоматы на ваш выбор.

Допустим, в казино находятся 4 автомата, лишь для одного из которых средний выигрыш будет > 1.

Давайте напишем класс, который будет описывать эту ситуацию.

In [9]:
import random
import numpy as np
from gym import Env, spaces


np.random.seed(42)


class MultiArmedBanditEnv(Env):
    def __init__(self, n=4):
        """
        n - number of arms in the bandit
        """
        self.num_bandits = n
        self.action_space = spaces.Discrete(self.num_bandits)
        self.observation_space = spaces.Discrete(1)  # the reward of the last action
        self.bandit_success_prob = np.array(
            [0.5, 0.1, 0.9, 0.2]
        )  # success probabilities
        self.bandit_reward = np.array([2, 20, 1, 3])

    def step(self, action):
        done = True
        result_prob = np.random.random()
        if result_prob < self.bandit_success_prob[action]:
            reward = self.bandit_reward[action]
        else:
            reward = 0
        return reward, done

Поиск оптимальной стратегии решения¶

Жадная стратегия¶

В нашем казино для того, чтобы принять участие в игре, нужно заплатить 1 монету. После оплаты игрок может нажать на любой рычаг игрового автомата и получить выигрыш с соответствующей вероятностью.

Как именно в таком случае мы можем определить, за какой рычаг нам стоит дергать, чтобы максимизировать выигрыш? Ведь мы как игрок не знаем вероятности выигрыша для разных рычагов.

Попробуем сделать это жадным способом: на каждом шаге будем совершать действие, имеющее максимальную оценку, и, в зависимости от результата, будем обновлять эту оценку.

Давайте разберемся, что она представляет.

Оценить ценность того или иного действия можно по среднему выигрышу, к которому оно приведет.

Пусть $R_n$ — награда, полученная за выполнение $n$-го действия. Тогда оценкой ценности действия будет $\displaystyle Q_n=\frac{R_1+R_2+...+R_n}{n}$. Величина $Q_n$ по сути представляет собой простое скользящее среднее (simple moving average), которое может вычисляться постепенно:

$\displaystyle Q_n = \frac{1}{n}(R_n+(n-1)Q_{n-1}) = \frac{1}{n}(R_n+nQ_{n-1}-Q_{n-1}) = Q_{n-1}+\frac{1}{n}(R_n-Q_{n-1})$

В целом, подобная схема обновления параметров в обучении с подкреплением, представимая как новая оценка:=старая оценка + шаг[цель — старая оценка], встречается довольно часто и называется incremental update rule.

Для получения оценок действий жадным способом нам понадобится функция argmax, которая будет выбирать действие с наибольшей оценкой, а в случае равенства максимальной оценки будет выбирать случайное действие.

In [10]:
def argmax(x):
    return np.random.choice(np.flatnonzero(x == x.max()))


class GreedyAgent:
    def __init__(self):
        """
        Our agent initializes reward estimates as zeros.
        This estimates will be updated incrementally after each
        interaction with the environment.
        """
        self.reward_estimates = np.zeros(4)
        self.action_count = np.zeros(4)
        self.cache = 1000  # initial amount of coins agent posesses

    def get_action(self):
        # Pay 1 coin for the action
        self.cache -= 1

        # Select greedy action
        action = argmax(self.reward_estimates)

        # Add a 1 to action selected in the action count
        self.action_count[action] += 1

        return action

    def update_estimates(self, reward, action):
        # Update amount of cache by reward from our previuos action
        self.cache += reward

        # Compute the difference between the received rewards vs the reward estimates
        error = reward - self.reward_estimates[action]

        # Update the reward estimate incementally
        n = self.action_count[action]
        self.reward_estimates[action] += (1 / n) * error
In [11]:
import matplotlib.pyplot as plt


env = MultiArmedBanditEnv()
agent = GreedyAgent()

mean_reward_greedy = []
rewards = []

while (agent.cache > 0) and (len(mean_reward_greedy) != 10000):
    act = agent.get_action()
    reward, done = env.step(act)
    agent.update_estimates(reward, act)
    rewards.append(reward)
    mean_reward_greedy.append(sum(rewards) / (len(rewards)))

print(f"Action counts: {agent.action_count}")
print(f"Reward estimates: {agent.reward_estimates}")
plt.plot(mean_reward_greedy)
plt.show()
Action counts: [    0.     0. 10000.     0.]
Reward estimates: [0.     0.     0.9051 0.    ]

$\epsilon$-жадная стратегия¶

Видно, что жадная стратегия не позволяет нам добиться оптимального принятия решений, поскольку оптимальную стратегию легко пропустить, если в начале агенту не повезет.

Вместо жадной стратегии будем использовать $\varepsilon$-жадную стратегию, которую можно описать как "оптимизм при неопределенности". Суть $\varepsilon$-жадной стратегии заключается в следующем: с некоторой вероятностью $\varepsilon$ совершать случайное действие, а с вероятностью $1-\varepsilon$ вести себя жадно.

$\varepsilon$ — важный параметр алгоритма. Как правило, в начале используют большие значения $\varepsilon$, тогда агент действует почти случайно и "исследует мир", а затем их уменьшают, и действия агента становятся близкими к оптимальным.

Выбор величины $\varepsilon$ связан с так называемым exploration-exploitation trade-off. Он может быть легко проиллюстрирован примером, когда вы не можете решить, пойти ли сегодня в любимый ресторан или попробовать сходить в новый? Новый ресторан может оказаться лучшего привычного, но это решение связано с риском разочарования.

Drawing

In [12]:
class EpsilonGreedyAgent(GreedyAgent):
    def __init__(self, epsilon):
        GreedyAgent.__init__(self)
        # Store the epsilon value
        assert epsilon >= 0 and epsilon <= 1
        self.epsilon = epsilon

    def get_action(self):
        # We need to redefine this function so that it takes an exploratory action with epsilon probability

        # Pay 1 coin for the action
        self.cache -= 1
        # One hot encoding: 0 if exploratory, 1 otherwise
        action_type = int(np.random.random() > self.epsilon)
        # Generate both types of actions for every experiment
        exploratory_action = np.random.randint(4)
        greedy_action = argmax(self.reward_estimates)
        # Use the one hot encoding to mask the actions for each experiment
        action = greedy_action * action_type + exploratory_action * (1 - action_type)

        self.action_count[action] += 1

        return action
In [13]:
agent = EpsilonGreedyAgent(epsilon=0.25)

mean_reward_e_greedy = []
rewards = []

while (agent.cache > 0) and (len(mean_reward_e_greedy) != 10000):
    act = agent.get_action()
    reward, done = env.step(act)
    agent.update_estimates(reward, act)
    rewards.append(reward)
    mean_reward_e_greedy.append(sum(rewards) / len(rewards))
    # Decrease epsilon with time until we reach 0.01 chanse to perform exploratory action
    if agent.epsilon > 0.01:
        agent.epsilon -= 0.0001

print(f"Action counts: {agent.action_count}")
print(f"Reward estimates: {agent.reward_estimates}")
plt.plot(mean_reward_greedy)
plt.plot(mean_reward_e_greedy)
plt.show()
Action counts: [  91. 9683.  137.   89.]
Reward estimates: [0.96703297 2.00970774 0.8540146  0.80898876]

softmax-стратегия¶

Использование $\epsilon$-жадной стратегии позволило нам иногда совершать "разведывательные" решения, которые могли быть не оптимальны относительно буквальной интерпретации накопленного нами ранее опыта. Аналогичного эффекта мы бы могли достигнуть, если бы определили нашу стратегию принятия решения вероятностной.

Пусть мы находимся в состоянии $s$, для которого возможны $K$ действий $\{a_i\}$. Определим величину $\pi(a_i|s)$:

  • $\pi(a_i| s)$ — вероятность принять решение $a_i$, если агент находится в состоянии $s$.

Данное распределение вероятности принято называть политикой (policy).

Для "игры" с использованием такой политики нам нужно:

  1. Вычислить распределение вероятности $\pi(a_i| s)$.
  2. Сгенерировать случайное решение $a$, удовлетворяющее данному распределению: $a \sim \pi(a_i| s)$.
In [14]:
from numpy.random import choice
from scipy.special import softmax


class SoftmaxAgent(GreedyAgent):
    def __init__(self):
        GreedyAgent.__init__(self)

    def get_action(self):
        # We need to redefine this function so that it takes an action with probability pi(a,s)

        # Pay 1 coin for the action
        self.cache -= 1

        # get \pi(a_i|s):
        pi = softmax(self.reward_estimates)
        # sample action
        action = choice(np.arange(0, 4, 1), 1, p=pi)[0]

        self.action_count[action] += 1

        return action
In [15]:
agent = SoftmaxAgent()

mean_reward_softmax_agent = []
rewards = []

while (agent.cache > 0) and (len(mean_reward_softmax_agent) != 10000):
    act = agent.get_action()
    reward, done = env.step(act)
    agent.update_estimates(reward, act)
    rewards.append(reward)
    mean_reward_softmax_agent.append(sum(rewards) / len(rewards))

clear_output()

print(f"Action counts: {agent.action_count}")
print(f"Reward estimates: {agent.reward_estimates}")
plt.plot(mean_reward_greedy)
plt.plot(mean_reward_e_greedy)
plt.plot(mean_reward_softmax_agent)
plt.show()
Action counts: [1766. 5262. 1660. 1312.]
Reward estimates: [0.99207248 2.05245154 0.89578313 0.59222561]

Cross-entropy method (CEM)¶

Одним из простейших алгоритмов нахождения оптимальной политики является Cross-entropy method (CEM). Он состоит из следующих простых этапов:

  1. Сэмплирование действий из начального случайного распределения, нулевого приближения к стратегии.
  2. Отбор 5–10% "лучших" действий во всех посещенных состояниях — по сути создание датасета для supervised обучения.
  3. Изменение стратегии для повышения вероятности генерации лучших (шаг обучения на созданном датасете).

Пример оптимизации недифференцируемого функционала¶

Drawing

Пример CEM в Gym¶

In [16]:
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device
Out[16]:
device(type='cuda', index=0)

Воспользуемся готовой средой MountainCarContinuous 🛠️[doc]:

In [17]:
import gym
from IPython.display import clear_output

env = gym.make("MountainCarContinuous-v0")
env.seed(42)
clear_output()

print("observation space:", env.observation_space)
print("action space:", env.action_space)
print("  - low:", env.action_space.low)
print("  - high:", env.action_space.high)
observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
action space: Box(-1.0, 1.0, (1,), float32)
  - low: [-1.]
  - high: [1.]

Создадим агента:

In [18]:
import math
import torch.nn as nn
import torch.nn.functional as F


class Agent(nn.Module):
    def __init__(self, env, h_size=16):
        super(Agent, self).__init__()
        self.env = env
        # state, hidden layer, action sizes
        self.s_size = env.observation_space.shape[0]
        self.h_size = h_size
        self.a_size = env.action_space.shape[0]
        # define layers (we used 2 layers)
        self.fc1 = nn.Linear(self.s_size, self.h_size)
        self.fc2 = nn.Linear(self.h_size, self.a_size)

    def set_weights(self, weights):
        s_size = self.s_size
        h_size = self.h_size
        a_size = self.a_size
        # separate the weights for each layer
        fc1_end = (s_size * h_size) + h_size
        fc1_W = torch.from_numpy(weights[: s_size * h_size].reshape(s_size, h_size))
        fc1_b = torch.from_numpy(weights[s_size * h_size : fc1_end])
        fc2_W = torch.from_numpy(
            weights[fc1_end : fc1_end + (h_size * a_size)].reshape(h_size, a_size)
        )
        fc2_b = torch.from_numpy(weights[fc1_end + (h_size * a_size) :])
        # set the weights for each layer
        self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
        self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
        self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
        self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))

    def get_weights_dim(self):
        return (self.s_size + 1) * self.h_size + (self.h_size + 1) * self.a_size

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        return x.cpu().data

    def act(self, state):
        state = torch.from_numpy(state).float().to(device)
        with torch.no_grad():
            action = self.forward(state)
        return action

    def evaluate(self, weights, gamma=1.0, max_t=5000):
        self.set_weights(weights)
        episode_return = 0.0
        state = self.env.reset()
        for t in range(max_t):
            state = torch.from_numpy(state).float().to(device)
            action = self.forward(state)
            state, reward, done, _ = self.env.step(action)
            episode_return += reward * math.pow(gamma, t)
            if done:
                break
        return episode_return


clear_output()

Реализуем Cross-Entropy Method:

In [19]:
import numpy as np
from collections import deque
from tqdm.notebook import tqdm


np.random.seed(42)


def cem(
    agent,
    n_iterations=5,
    max_t=1000,
    gamma=1.0,
    print_every=10,
    pop_size=50,
    elite_frac=0.2,
    sigma=0.5,
):
    """PyTorch implementation of the cross-entropy method.

    Params
    ======
        Agent (object): agent instance
        n_iterations (int): maximum number of training iterations
        max_t (int): maximum number of timesteps per episode
        gamma (float): discount rate
        print_every (int): how often to print average score (over last 100 episodes)
        pop_size (int): size of population at each iteration
        elite_frac (float): percentage of top performers to use in update
        sigma (float): standard deviation of additive noise
    """
    n_elite = int(pop_size * elite_frac)

    scores_deque = deque(maxlen=100)
    scores = []
    # Initialize the weight with random noise
    best_weight = sigma * np.random.randn(agent.get_weights_dim())

    for i_iteration in tqdm(range(1, n_iterations + 1)):
        # Define the cadidates and get the reward of each candidate
        weights_pop = [
            best_weight + (sigma * np.random.randn(agent.get_weights_dim()))
            for i in range(pop_size)
        ]
        rewards = np.array(
            [agent.evaluate(weights, gamma, max_t) for weights in weights_pop]
        )

        # Select best candidates from collected rewards
        elite_idxs = rewards.argsort()[-n_elite:]
        elite_weights = [weights_pop[i] for i in elite_idxs]
        best_weight = np.array(elite_weights).mean(axis=0)

        reward = agent.evaluate(best_weight, gamma=1.0)
        scores_deque.append(reward)
        scores.append(reward)

        torch.save(agent.state_dict(), "checkpoint.pth")

        if i_iteration % print_every == 0:
            print(
                "Episode {}\tAverage Score: {:.2f}".format(
                    i_iteration, np.mean(scores_deque)
                )
            )

        if np.mean(scores_deque) >= 90.0:
            print(
                "\nEnvironment solved in {:d} iterations!\tAverage Score: {:.2f}".format(
                    i_iteration - 100, np.mean(scores_deque)
                )
            )
            break
    return scores
In [20]:
np.bool8 = np.bool_
agent = Agent(env).to(device)
scores = cem(agent)
  0%|          | 0/5 [00:00<?, ?it/s]
In [21]:
import matplotlib.pyplot as plt

# Plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores) + 1), scores)
plt.ylabel("Score")
plt.xlabel("Episode #")
plt.show()
In [22]:
!mkdir -p video
In [23]:
import glob
import base64, io
from IPython import display
from IPython.display import HTML
from gym.wrappers.monitoring import video_recorder


def show_video(env_name):
    mp4list = glob.glob("video/*.mp4")
    if len(mp4list) > 0:
        mp4 = "video/{}.mp4".format(env_name)
        video = io.open(mp4, "r+b").read()
        encoded = base64.b64encode(video)
        display.display(
            HTML(
                data="""<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>""".format(
                    encoded.decode("ascii")
                )
            )
        )
    else:
        print("Could not find video")


def show_video_of_model(agent, env_name):
    env = gym.make(env_name)
    vid = video_recorder.VideoRecorder(env, path="video/{}.mp4".format(env_name))
    agent.load_state_dict(torch.load("checkpoint.pth"))
    state = env.reset()
    done = False
    while not done:
        vid.capture_frame()

        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        state = next_state
        if done:
            break
    env.close()
In [24]:
agent = Agent(env).to(device)
show_video_of_model(agent, "MountainCarContinuous-v0")
clear_output()
In [25]:
clear_output()
print("Необученная модель:")
show_video("MountainCarContinuous-v0")
Необученная модель:

Загрузим веса обученной в течение 500 эпизодов модели:

In [26]:
!wget -qN https://edunet.kea.su/repo/EduNet-content/dev-2.0/L15/weights/checkpoint.pth
In [27]:
agent = Agent(env).to(device)
show_video_of_model(agent, "MountainCarContinuous-v0")
clear_output()
In [28]:
clear_output()
print("Обученная модель:")
show_video("MountainCarContinuous-v0")
Обученная модель:

Марковский процесс принятия решений (Markov decision process, MDP)¶

Drawing

Марковский процесс принятия решений

Markov property¶

Итак, мы разобрались с самой простой ситуацией, многорукими бандитами, в которой среда постоянно находится в одном и том же состоянии. Однако в реальности чаще всего состояние среды, из которой агент делает новый ход, будет изменяться. Например, при игре в шахматы агент должен учитывать то, что после его хода и хода оппонента позиция на доске изменится.

Нам нужно определить некий процесс, в котором агент последовательно переходит из состояния в состояние в зависимости от своих действий, в некоторых состояниях получая награды.

Сделаем также важное предположение о природе нашего процесса: он полностью описывается своим текущим состоянием. Все, что произойдет в будущем, не зависит от информации из прошлого, кроме той, что мы уже наблюдаем в настоящем. Такой процесс называется марковским. Заметим, что описанный нами процесс в примере с многорукими бандитами также является марковским.

Приведем другие классические примеры марковского процесса:

  1. Игральный кубик. Мы знаем, что на нем выпадет любая из граней с некоторой фиксированной вероятностью. На это никак не влияет то, что до этого на кубике выпало 6 шестерок подряд. Это может повлиять на нашу оценку вероятностей выпадения той или иной грани, но не на реальную вероятность.

  2. Шахматы. "Текущая позиция на доске + чей ход" однозначно описывает игру.

  3. А подходит ли покер?

С одной стороны, да. Текущее количество денег у каждого из игроков однозначно описывает игру. Но это если мы не учитываем блеф и другие факторы, которые могут сработать / не сработать в зависимости от предыдущих ситуаций в игре. То есть, все зависит от того, как именно мы будем описывать покер и какие допущения накладываем.

И получение хорошего представления S — тоже важная задача.

Определение Markov Property¶

"The future is independent of the past given the present"

Состояние $S_{t}$ является Марковским тогда и только тогда, когда: $$ \large p\left(r_{t}, s_{t+1} \mid s_{0}, a_{0}, r_{0}, \ldots, s_{t}, a_{t}\right)=p\left(r_{t}, s_{t+1} \mid s_{t}, a_{t}\right) $$

Что происходит потом, зависит только от предыдущего состояния.

  • Вся актуальная информация берется из истории.
  • Как только состояние среды станет известно, историю можно выбросить.
  • То есть, состояние среды — достаточная статистика для будущего обучения.

Вероятность перехода в новое состояние зависит только от текущего состояния и текущего действия.

Марковский процесс описывает полностью наблюдаемые (fully observable) среды. Можно описать их так:

  • Информации, которую получает агент в момент времени t, достаточно, чтобы принять оптимальное решение для момента времени t.
  • Текущее состояние среды содержит всю релевантную информацию из прошлого.
  • Текущее состояние — это достаточная статистика будущего $\rightarrow$ не существует никакой дополнительной информации, которая могла бы улучшить наше описание будущего.

Markov process¶

Представим себе, что студент живет вот по такой схеме. Заметим, что влиять в такой схеме на свои решения он не может: все решается подкидыванием кубика.

Определение¶

Марковский процесс (цепь) — это кортеж $(S, P)$, где

  • $S$ — принимает дискретные (конечные) значения,
  • $P$ — матрица переходов (transition matrix): $$\large P_{s s^{\prime}}=\operatorname{Pr}\left(S_{t+1}=s^{\prime} \mid S_{t}=s\right) $$

Строго говоря, необходимо еще распределение начальных состояний (но мы предполагаем, что оно вырождено, т.е. мы знаем, где начинаем, с вероятностью 1).

Марковский процесс — основа для RL. Мы будем постепенно усложнять эту модель, добавляя rewards и actions.

В определении нам встретилась матрица переходов, зададим ее тоже формально.

Марковский процесс принятия решений¶

Markov decision process (MDP)

До этого у нас получалось не совсем адекватное представление процесса — студент не мог ни на что повлиять. На самом же деле студент может решать, куда ему надо: в аудиторию, домой спать или в бар. Однако наше действие не всегда определяет состояние, в которое мы перейдем.

Хочет наш студент пойти на лекцию, но по пути встречает товарища, и идут они в бар. Хочет он пойти спать, а по пути к выходу из университета встречает лектора с хорошей памятью и идет на лекцию. И т.д.

Как это отразить на схеме так, чтобы это можно было прочитать? Введем промежуточные состояния, куда нас переводят действия студента. А уже из этих промежуточных состояний случайно будем переходить в состояния среды.

Drawing

Формальное описание MDP¶

MDP — это кортеж $(S, A, R, P, \gamma)$, где:

  • $S$ — состояния (дискретное пространство),
  • $A$ — действия (дискретное пространство),
  • $R$ — функция reward: $R_{s,s'}^{a}$ (мгновенное вознаграждение за переход из состояния $s$ в $s'$ при выборе действия $a$)
  • $P$ — матрица переходов (transition matrix): $P_{s s^{\prime}}^{a}=\operatorname{Pr}\left(S_{t+1}=s^{\prime} \mid S_{t}=s, A_{t}=a\right)$,
  • $\gamma$ — discount factor.

Пример¶

Drawing

  • черные беззнаковые числа — вероятности переходов;
  • знаковые — это награды.

Воспользуемся примером 🐾[git]:

In [29]:
from IPython.display import clear_output


# After this setup we can import mdp package
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/setup_colab.sh -O- | bash
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/week02_value_based/mdp.py
!touch .setup_complete

clear_output()
In [30]:
from mdp import MDP

transition_probs = {
    "s0": {"a0": {"s0": 0.5, "s2": 0.5}, "a1": {"s2": 1}},
    "s1": {"a0": {"s0": 0.7, "s1": 0.1, "s2": 0.2}, "a1": {"s1": 0.95, "s2": 0.05}},
    "s2": {"a0": {"s0": 0.4, "s2": 0.6}, "a1": {"s0": 0.3, "s1": 0.3, "s2": 0.4}},
}
rewards = {"s1": {"a0": {"s0": +5}}, "s2": {"a1": {"s0": -1}}}

mdp = MDP(transition_probs, rewards, initial_state="s0")

# We can now use MDP just as any other gym environment:
print("initial state =", mdp.reset())
next_state, reward, done, info = mdp.step("a1")
print("next_state = %s, reward = %s, done = %s" % (next_state, reward, done))
initial state = s0
next_state = s2, reward = 0.0, done = False
In [31]:
# MDP methods

print("mdp.get_all_states =", mdp.get_all_states())
print("mdp.get_possible_actions('s1') = ", mdp.get_possible_actions("s1"))
print("mdp.get_next_states('s1', 'a0') = ", mdp.get_next_states("s1", "a0"))

# state, action, next_state
print("mdp.get_reward('s1', 'a0', 's0') = ", mdp.get_reward("s1", "a0", "s0"))

# get_transition_prob(self, state, action, next_state)
print(
    "mdp.get_transition_prob('s1', 'a0', 's0') = ",
    mdp.get_transition_prob("s1", "a0", "s0"),
)
mdp.get_all_states = ('s0', 's1', 's2')
mdp.get_possible_actions('s1') =  ('a0', 'a1')
mdp.get_next_states('s1', 'a0') =  {'s0': 0.7, 's1': 0.1, 's2': 0.2}
mdp.get_reward('s1', 'a0', 's0') =  5
mdp.get_transition_prob('s1', 'a0', 's0') =  0.7
In [32]:
from IPython.display import display_jpeg
import mdp as MDP
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)

display_jpeg(MDP.plot_graph(mdp))

Функция награды (reward)¶

Наши состояния, очевидно, неравноценны. Давайте к каждому состоянию привяжем награду $R_s$. Она может быть как нулевой, так и положительной либо отрицательной.

Например, в марковском процессе принятия решений (MDP), описывающем игру в шахматы или GO, награда агента может быть положительной только при переходе в состояние "партия выиграна".

На всех остальных ходах награда будет нулевой.

Для перемещающегося робота награда на каждом шаге может быть отрицательной, так как на перемещение требуется энергия и т.п.

Для покера наградой можно назначить изменение текущей суммы игрока после сыгранной раздачи.

Схема для студента в этом случае модифицируется следующим образом. Заметьте, награды тут расставляются с точки зрения актора — студента:

Суммарная награда (Return)¶

Для турнира по шахматам, где играется 10 партий, финальный reward будет складываться из reward за каждую партию в отдельности.

Если у нас есть тест из 40 вопросов, то опять же, нам не важно, как именно мы получили 30 баллов, главное — мы их получили.

То есть, в таких случаях мы можем считать Return следующим образом:

$$\text{Return} = \sum_i {R_i}$$

А теперь представим себе, что мы можем работать 2 года бесплатно и получить разом 5 миллионов рублей. А можем работать 2 года и каждый год получать 70 тысяч.

Или мы можем 6 лет учиться, получая сторонними активностями 20 тысяч в месяц, а затем сразу начать получать 150 тысяч в месяц, а затем и больше. А можем сразу пойти на работу и начать зарабатывать 50 тысяч в месяц, постепенно дойдя до 100.

Что лучше: опубликовать две статьи в журнале с импакт фактором 5 в этом году или одну, но через 2 года и в журнале с импакт-фактором 20?

Что выгоднее?

Очевидно, в таких ситуациях лучше учитывать не только Reward, но и то, когда он получен.

$\gamma$-коэффициент дисконтирования¶

Поэтому при оценке кумулятивной награды на шаге $t$ $(G_t)$ используется коэффициент дисконтирования $\gamma$, который показывает, насколько ценными являются будущие награды на текущий момент (см. Markov Decision Processes (David Silver Lectures) 📚[book]):

$$\large G_t = R_{t+1} + \gamma R_{t+2} + ... =\sum^{\infty}_{k=0} \gamma ^ kR_{t+k+1}, $$

где:

  • $R_{t+1}, R_{t+1}, \dots$ — вознаграждения по рёбрам переходов,
  • дисконтирование $\gamma \in[0,1]$ — это текущая стоимость будущих вознаграждений,
  • $\gamma^{k} R$ — ценность получения награды $R$ после $k+1$ шагов.

При этом:

  • немедленное вознаграждение ценится выше отложенного вознаграждения,
  • $\gamma$ близко к 0 приводит к «близорукости»,
  • $\gamma$ близко к 1 приводит к «дальновидной» оценке.

Какое значение $\gamma$ выбрать?

Можно ли его выбрать равным 1?

Да, мы уже это делали ранее.

Можно ли его выбрать равным 0?

Тоже да. В этом случае у нас получится "жадный" алгоритм: мы всегда выбираем решение, которое дает максимальную награду сейчас, нас не волнуют будущие награды.

Может $\gamma = 0$ привести к проблемам?

Да, например:

  • Вариант 1: Получить сегодня 1000 рублей.

  • Вариант 2: Получить завтра 100000 рублей

Обычно, второй вариант более предпочтителен. Но жадный алгоритм его не увидит.

Может $\gamma = 1$ привести к проблемам?

Представим себе генератор шуток про медведя. Большая часть людей знает эту шутку, поэтому было бы хорошо посередине вставлять дополнительные детали — "для ценителей". Но заканчиваться шутка должна одинаково.

Мы решили написать программу, которая генерирует шутки про медведя. Шутка должна начинаться с "Шел медведь по лесу" и заканчиваться "Сел в машину и сгорел". Как бы это описать?

Есть состояния, а модель сама решает, куда ей переходить. Вероятностей мы тут не добавили. Что модель решила, то и будет.

Сразу видим и проблему, которую мы получаем при $\gamma = 1$, пусть и в утрированном виде. Ничто не запрещает нашей модели бесконечно добавлять детали к анекдоту, тем самым увеличивая $G_t$. Работать с такой моделью невозможно.

Если в какой-то момент обучения наша модель стала походить на эту, то ничего осмысленного мы и далее не получим. А вот если введением дискаунта дать модели понять, что слушатели анекдота все же хотят вскоре услышать его конец, то проблема уйдет.

Обычно $0 \le \gamma \le 1$

Обычно $\gamma$ ставят равной чему-то между двумя этими крайностями.

Близость $\gamma$ к 0 отражает нашу "нетерпеливость" — насколько важно получить награду именно сейчас.

Наличие такого $\gamma$ позволяет нам не делать различия между моделями с ограниченным числом шагов и неограниченным: теперь в любом случае return будет конечным числом, т.к. выражение для return ограничено сверху суммой бесконечно убывающей геометрической прогрессии.

Пусть $R_{\max} = \max R_i$ — максимальная награда, которую мы в принципе можем получить в каком-то состоянии.

$\displaystyle G_t = R_{t+1} + \gamma \cdot R_{t+2} + \gamma^2 \cdot R_{t+3} + ... \le R_{\max} + \gamma \cdot R_{\max} + \gamma^2 \cdot R_{\max} + ... = R_{\max} (1 + \gamma + \gamma^2 + ... ) \le R_{\max} \cdot \frac{1}{1-\gamma} = const $

Discounting makes sums finite

Drawing

\begin{equation} G_{0}=\sum_{k=0}^{\infty} \gamma^{k}=\frac{1}{1-\gamma} \end{equation}

Важно понимать, что выбор $\gamma$ меняет задачу, которую мы решаем, и, соответственно, меняет решение.

Проблема поиска оптимальной политики¶

Нужно найти последовательность переходов, которой будет соответствовать максимальная награда ($G_t$).

Политика (policy)¶

Политика (policy) $\pi$ — это функция, которая для текущего состояния $s$ дает распределение вероятностей на множестве действий $A$.

$$ \large \pi(a|s)=\mathbb{P}\left[A_{t}=a \mid S_{t}=s\right] $$
  • Политика полностью определяет поведение агента.
  • Политики MDP зависят от текущего состояния среды (а не от прошлых состояний).
  • Т. е. политики являются стационарными (не зависящими от времени):
$$ \large A_{t} \sim \pi\left(\cdot \mid S_{t}\right), \forall t>0 $$

Для нашего примера со студентом:

Drawing

State-value function $v_{\pi}(s)$ (V-функция)¶

  • $v_\pi(s)$ — измеряет ценность каждого состояния, а именно какое ожидаемое вознаграждение можно получить, если начать двигаться из состояния $s$ в течение всего оставшегося времени, придерживаясь политики $\pi$.

Так, например, агент может следовать политике, заключающейся в переходе между состояниями с высокой ценностью. Однако функция ценности состояния удалена от принятия решений агентом, поскольку агент зачастую не может сам выбрать следующее состояние, в которое перейдет, поскольку на него будет оказываться влияние случайности (например, броски кубиков) или внешних сил (ход оппонента). Поэтому при выборе оптимальной политики рассматривают ценность не состояний, а действий.

Drawing

Формально V-функция $v_\pi(s)$ вводится в качестве математического ожидания (по политике $\pi$ и по MDP среды $P_{s s^{\prime}}^{a}$) от будущей дисконтированной награды $G_t$, в случае если мы начинаем свои действия в состоянии среды $s$ и будем действовать строго согласно нашей политики $\pi$:

$$\large \begin{equation} \begin{aligned} v_{\pi}(s) &=\mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \end{aligned} \end{equation}$$

Action-value function $q_{\pi}(s, a)$ (Q-функция)¶

  • $q_\pi(s,a)$ — аналогично функции ценности состояния, но при условии, что в состоянии $s (s_0 = s)$ было выбрано действие $a$ (мы фиксируем действие). То есть в $q$ мы, в отличие от $v$, фиксируем наше первое действие (не обязательно соответствующее политике), а в $v$ мы первое действие выбираем согласно политике $\pi$.

Формально Q-функция вводится в качестве математического ожидания от будущей дисконтированной награды $G_t$, в случае если мы находясь в состоянии $s$ выбрали конкретное действие $a$, а все последующие действия совершали строго согласно политике $\pi$:

$$\large \begin{equation} \begin{aligned} q_{\pi}(s, a) &=\mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s \mid A_{t}=a\right]. \end{aligned} \end{equation}$$

Сразу отметим, что Q и V функции, очевидно, связаны. Если мы воспользуемся нашей политикой $\pi$ для выбора действия $a$, то сразу же придём к определению V-функции (то есть усредним значения Q-функции согласно вероятностям принятия решений, которые задаёт политика): $$\large \begin{equation} \begin{aligned} \sum_a \pi(a|s) q_{\pi}(s, a) &=\mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s \right]\\ &=v_{\pi}(s). \end{aligned} \end{equation}$$

То есть V-функцию легко записать через Q функцию и политику $\pi$. Обратное соотношение, позволяющее выразить Q функцию через V функцию мы рассмотрим ниже.

Уравнение Беллмана¶

Беллман 📚[wiki] показал, что задача динамической оптимизации в дискретном времени может быть сформулирована в рекурсивной пошаговой форме, путем записи связи между функцией ценности в текущий период и функцией ценности в следующем периоде.

$$\large \begin{equation} \begin{aligned} v_{\pi}(s) &=\mathbb{E}_{\pi}\left[G_{t} \mid S_{t}=s\right] \\ &=\mathbb{E}_{\pi}\left[R_{t+1}+\gamma R_{t+2}+\gamma^{2} R_{t+3}+\ldots \mid S_{t}=s\right] \\ &=\mathbb{E}_{\pi}\left[R_{t+1}+\gamma\left(R_{t+2}+\gamma R_{t+3}+\ldots\right) \mid S_{t}=s\right] \\ &=\mathbb{E}_{\pi}\left[R_{t+1}+\gamma G_{t+1} \mid S_{t}=s\right]\\ &=\mathbb{E}_{\pi}\left[R_{t+1}\right]+\mathbb{E}_{\pi}\left[\gamma G_{t+1} \mid S_{t}=s\right],\\ \end{aligned} \end{equation}$$

где последнее слагаемое можно записать по определению V-функции, учитывая что вероятность перейти из состояния $s$ в возможное состояние $s'$ в момент времени $t$ определяется значением политики $\pi(a|s)$ и MDP средой $P_{s s^{\prime}}^{a}$: $$\large \begin{equation} \begin{aligned} \mathbb{E}_{\pi}\left[\gamma G_{t+1} \mid S_{t}=s\right] = \sum_{a}\pi(a|s)\sum_{s}P_{s s^{\prime}}^{a}\left[{\mathbb{E}_{\pi}\left[\gamma G_{t+1} \mid S_{t+1}=s'\right]}\right] = \sum_{a}\pi(a|s)\sum_{s}P_{s s^{\prime}}^{a}\left[v_{\pi}(s')\right], \end{aligned} \end{equation}$$ что теперь позволяет записать окончательно так называемое уравнение Беллмана: $$\Large \begin{equation} \begin{aligned} v_{\pi}(s) = \mathbb{E}_{\pi}\left[R_{t+1}\right] + \sum_{a}\pi(a|s)\sum_{s}P_{s s^{\prime}}^{a}\left[v_{\pi}(s')\right]. \end{aligned} \end{equation}$$

Уравнение Беллмана - рекуррентное соотношение, позволяющее связать значение V-функции для двух последовательных состояний $s$ и $s'$, если нам известна политика $\pi$ и матрица перехода в MDP $P$. Данное уравнение является ключевым результатом для построения всех последующих RL-алгоритмов.

Представленный выше вывод можно повторить для Q-функции и записать аналогичное рекуррентное соотношение: $$\Large \begin{equation} \begin{aligned} q_{\pi}(s, a) = \mathbb{E}_{\pi}\left[R_{t+1}\right] + \sum_{a}\pi(a|s)\sum_{s}P_{s s^{\prime}}^{a}\left[\sum_{a'}\pi(a'|s')q_{\pi}(s', a')\right]. \end{aligned} \end{equation}$$

Функцию ценности состояния можно представить в виде резервной диаграммы 🎓[article] (backup diagram):

Drawing

$ \mathbf {s} $ — начальное состояние, из него исходят стрелки, отображающие вероятность совершить то или иное действие в соответствии с политикой $ \mathbf{\pi} $, действия $\mathbf{a}$ обозначаются переходом из кружка в чёрную точку, черные точки обозначают $\mathbf{q} $.

Если раскрыть знак матожидания для награды $R_t$ получим запись системы уравнений наиболее широко используемую в литературе:

$$ \large v_{\pi}(s) = \sum_a\pi(a|s)\sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma\mathbb{E}_{\pi}\left[G_{t+1}\mid S_{t+1}=s'\right]) = \sum_a\pi(a|s)\sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma v_{\pi}(s')). $$

Аналогично для функции ценности действия:

$$ \large q_{\pi}(s, a) = \sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma\sum_{a'}\pi(a'|s') q_{\pi}(s', a')) = \sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma \large v_{\pi}(s')). $$

Важно отметить, что описанные выше рекуррентные соотношения выполняются в том числе и для оптимального выбора политики $\pi$, к поиску которого мы и стремимся. Для этого воспользуемся ещё одной идеей, которая лежит в определении V и Q функций.

Принцип оптимальности Беллмана

Беллманом был сформулирован принцип, позволяющий из анализа выписанных выше рекуррентных соотношений определить оптимальную политику принятия решений в MDP.

Оптимальная политика обладает тем свойством, что какими бы ни были начальное состояние и начальное действие, остальные действие должны представлять собой оптимальную политику в отношении состояния, полученного в результате первого действия. (Bellman, 1957, Chap. III.3.)

Backup Diagram for State Value $v_\pi(s)$ and Action Value $q_\pi(s,a)$ with all components
  1. Ценность (Value) $v$ в состоянии $s$ — это $v_{\pi}(s)$

  2. Из состояния $s$ агент может сделать 3 действия: $(a_1, a_2, a_3)$

  3. Ценность действия (Action Value) — это $v_{\pi}(s,a)$, где $a = \{a_1, a_2, a_3\}$

  4. Агент предпринял действие $a_3$. А так он может перейти в состояния $s’_1$, $s’_2$ или $s’_3$ с вероятностью перехода p1, p2 или p3 соответственно

  5. Полученная награда — $r_1$, $r_2$ или $r_3$ в зависимости от текущего состояния

Понять это несколько громоздкое определение можно последовательно задав следующие вопросы:

  1. Что вообще можно назвать оптимальной политикой?

Оптимальным решением для любого узла графа MDP, очевидно, будет такой выбор стратегии принятия решений $a$, при котором матожидание будущей дисконтированной награды будет максимально. Будущую дисконтированную награду измеряет функция V. То есть $\pi^*$ - оптимальная политика, если среди всех допустимых уравнением Беллмана V-функций $v_{\pi^*}$ такая, что для любого $s$ она принимает максимально возможное значение ($v_{\pi^{*}}(s) \rightarrow max$)

  1. Как мы можем оценить значение V-функции для известной политики?

Вместо того, чтобы рассчитывать ценность состояния $ \mathbf{s}$ на основе ценности всех состояний $ \mathbf{s'}$, в которые мы можем перейти, мы можем рассчитать ценность состояния, учитывая ценность всех действий $\mathbf{a}$, которые мы можем совершить, находясь в состоянии $\mathbf{s}$. Получаем таким образом выражение одной функции через другую:

$$\large v_{\pi}(s) = \sum_a\pi(a|s)q_{\pi}(s,a).$$
  1. Как оценивать ценность выбора действия для определенного состояния $s$ в MDP?

Для этого мы ввели Q-функцию. В случае нахождения оптимальной политики $q_{\pi^{*}}$ лучшим с точки зрения матожидания награды будет выбор действия $\text{argmax}_a q_{\pi^{*}}(s,a)$. Отсюда сразу следует, что в случае оптимальной игры максимальное значение V функции достигается на максимальном значении Q функции: $$\large v_{\pi^*}(s) = \max_{a(s)}q_{\pi^*}(s,a) = \max_{a(s)}\sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma v_{\pi^*}(s')).$$

Отсюда сразу следует основной результат для Беллман-оптимальной политики $\pi^*$: $$\large \pi^*(s) = \text{argmax}_aq_*(s,a)$$

Примечание: $\text{argmax}$ может быть заменен на $\text{softmax}$, если матрица перехода MDP явно зависит от $a$.

Поиск оптимальной политики Беллмана для MDP (решение "MDP")¶

Методы динамического программирования¶

Возьмём рассмотренные выше уравнения Беллмана и определим с их помощью оператор $\mathcal{T}$ по его действию на $V$ и $Q$ функции:

$$\large \mathcal{T}[\color{red}{v}](s) = \sum_a\pi(a|s)\sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma \color{red}{v(s')}),$$$$\large \mathcal{T}[\color{red}{q}](s,a) = \sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma\sum_{a'}\pi(a'|s') \color{red}{q(s', a')}).$$

Как подробно показано в Reinforcement Learning: An Introduction 📚[book], такой оператор $\mathcal{T}$ является сжимающим. То есть если взять любую пару функций $V_1$ и $V_2$, найдётся такое неотрицательное число $C$, что:

$$\large || \mathcal{T}[V_1] - \mathcal{T}[V_2]|| \leq C||V_1 - V_2||,$$

что означает, что после применения оператора $\mathcal{T}$ функции $V_1$ и $V_2$ становятся ближе друг к другу.

По теореме о неподвижной точке 📚[wiki] это означает, что существует как минимум одна функция, которая не изменяется оператором $\mathcal{T}$. Так как мы построили наш оператор как правую часть уравнения Беллмана, то эта неподвижная точка — искомое нами решение. Отсюда появляется идея итеративного поиска решения для функций $Q$ или $V$:

  1. Инициализируем произвольную $V_0$.
  2. Действуем на неё оператором $\mathcal{T}[V_0] = V_1$.
  3. Повторяем $\mathcal{T}[V_k] = V_{k+1}$ пока не $||V_{k} - V_{k+1}|| \rightarrow 0$.

Policy iteration¶

Весь процесс поиска оптимальной политики можно разделить на два этапа.

Policy evaluation

Фиксируем произвольную политику $\pi$ и V-функцию $V_0$. Проведём итерационную процедуру с оператором $\large \mathcal{T}[\color{red}{v}](s)$, пока не сойдёмся к неподвижной точке.

Policy improvement

Теперь мы знаем правило $v$ для нашей политики. Как нам его улучшить? Будем в каждом состоянии менять нашу политику таким образом, чтобы мы шли в состояние с лучшим $q$. Мы могли бы улучшить это, действуя жадно $\mathrm{q}(\mathrm{s}, \mathrm{a}) !$ $$ \large \pi^{\prime}(s) \leftarrow \underset{a}{\arg \max } \overbrace{\sum_{r, s^{\prime}} P_{ss'}^a\left[r+\gamma v_{\pi}\left(s^{\prime}\right)\right]}^{q_{\pi}(s, a)} $$

Эта процедура гарантированно приведет к улучшению политики!

$$ \large \begin{equation} \begin{aligned} &\text { если } \quad q_{\pi}\left(s, \pi^{\prime}(s)\right) \geq v_{\pi}(s) \quad \text { для всех состояний, } \\ &\text { тогда } \quad v_{\pi^{\prime}}(s) \geq v_{\pi}(s) \text { означает, } \\ &\text { что } \qquad \pi^{\prime} \geq \pi . \end{aligned} \end{equation} $$

Метод действительно сходится:

Drawing

Value Iteration¶

Фактически этап policy evaluation от policy iteration может быть сокращен различными способами без потери гарантий сходимости policy iteration.

Один важный особый случай — это когда оценка политики останавливается сразу после одного цикла (одно обновление каждого состояния). Этот алгоритм называется value iteration. Его можно записать как простую операцию обновления, которая сочетает в себе этапы улучшения политики и усеченной policy evaluation (один шаг).

Это позволяет, в частности, вообще не записывать политику в явном виде, т.к. она однозначно определяется Q-функцией.

Value iteration (VI) vs. Policy iteration (PI):

  • VI быстрее за одну итерацию $-\mathrm{O}\left(|\mathrm{A} \| \mathrm{S}|^{2}\right)$
  • VI требуется много итераций -PI медленнее за одну итерацию $-\mathrm{O}\left(|\mathrm{A} \| \mathrm{S}|^{2}+|\mathrm{S}|^{3}\right)$
  • PI требуется мало итераций

Проведем эксперимент с каким-то количеством шагов для нахождения лучшей политики.

Алгоритм

  1. Инициализация:
  • создаем массив V с количеством элементов, равным количеству состояний,
  • заполняем его нулями.
  1. Оценка политики (Policy evaluation):
  • для всех состояний считаем Q(s,a),
  • обновляем массив V[s] $\rightarrow$ max(Q(s,a)).

*В отличие от Policy iteration, сама политика в памяти не хранится, она генерируется при помощи Q-функции.

  1. Обновление политики (Policy improvement):
  • для каждого состояния считаем Q(s,a) для всех a,
  • выбираем a, для которого Q(s,a) максимально,
  • если политика изменилась, переходим к шагу 2, иначе останавливаемся.

Теперь давайте построим что-нибудь, чтобы решить эту MDP.

Запишем решения для этого MDP. Самый простой алгоритм — это Value Iteration.

Псевдо код для VI:


1. Initialize $V^{(0)}(s)=0$, for all $s$

2. For $i=0, 1, 2, \dots$

3. $ \quad V_{(i+1)}(s) = \max_a \sum_{s'} P_{ss'}^a \cdot [ R_{ss'}^a + \gamma V_{i}(s')]$, for all $s$


$R_{ss'}^a$ — награда, соответствующая выбору действия a в s,

$s$ — исходное состояние,

$s'$ — новое состояние.

Функция для вычисления функции значения состояния-действия $Q^{\pi}$ определяется следующим образом:

$$\large Q_i(s, a) = \sum_{s'} P_{ss'}^a \cdot [ R_{ss'}^a + \gamma V_{i}(s')],$$

$s'$ — зависит от вероятности перехода.

In [33]:
def get_action_value(mdp, state_values, state, action, gamma):
    """
    Computes Q(s,a) as in formula above

    mdp : MDP object
    state_values : dictionayry of { state_i : V_i }
    state: string id of current state
    gamma: float discount coeff

    """

    next_states = mdp.get_next_states(state, action)

    Q = 0.0

    for next_state in next_states.keys():
        # alternatively p = mdp.get_transition_prob(state, action, next_state)
        p = next_states[next_state]
        Q += p * (
            mdp.get_reward(state, action, next_state) + gamma * state_values[next_state]
        )
    return Q

Используя $Q(s,a)$, мы можем определить "следующее" V(s) для VI:

$$\large V_{(i+1)}(s) = \max_a \sum_{s'} P_{ss'}^a \cdot [ R_{ss'}^a + \gamma V_{i}(s')] = \max_a Q_i(s,a)$$
In [34]:
def get_new_state_value(mdp, state_values, state, gamma):
    """Computes next V(s) as in formula above. Please do not change state_values in process."""
    if mdp.is_terminal(state):
        return 0  # Game over

    q_max = float("-inf")
    actions = mdp.get_possible_actions(state)
    for a in actions:
        q = get_action_value(mdp, state_values, state, a, gamma)
        q_max = max(q_max, q)
    return q_max

Наконец, объединим все, что мы написали, в алгоритм итерации рабочего значения.

In [35]:
# parameters
gamma = 0.9  # discount for MDP
num_iter = 100  # maximum iterations, excluding initialization
# stop VI if new values are this close to old values (or closer)
min_difference = 0.001

# initialize V(s)
state_values = {s: 0 for s in mdp.get_all_states()}

display_jpeg(MDP.plot_graph_with_state_values(mdp, state_values))

Здесь нет никакого "текущего состояния"! Добавим матрицу состояний и построим новый граф:

In [36]:
for i in range(num_iter):
    # Compute new state values using the functions you defined above.
    # It must be a dict {state : float V_new(state)}

    new_state_values = {}
    for s in state_values.keys():
        new_state_values[s] = get_new_state_value(mdp, state_values, s, gamma)

    # Compute difference
    diff = max(abs(new_state_values[s] - state_values[s]) for s in mdp.get_all_states())
    print("iter %4i   |   diff: %6.5f   |   " % (i, diff), end="")
    print("   ".join("V(%s) = %.3f" % (s, v) for s, v in state_values.items()))
    state_values = new_state_values

    if diff < min_difference:
        print("Terminated")
        break
iter    0   |   diff: 3.50000   |   V(s0) = 0.000   V(s1) = 0.000   V(s2) = 0.000
iter    1   |   diff: 0.64500   |   V(s0) = 0.000   V(s1) = 3.500   V(s2) = 0.000
iter    2   |   diff: 0.58050   |   V(s0) = 0.000   V(s1) = 3.815   V(s2) = 0.645
iter    3   |   diff: 0.43582   |   V(s0) = 0.581   V(s1) = 3.959   V(s2) = 0.962
iter    4   |   diff: 0.30634   |   V(s0) = 0.866   V(s1) = 4.395   V(s2) = 1.272
iter    5   |   diff: 0.27571   |   V(s0) = 1.145   V(s1) = 4.670   V(s2) = 1.579
iter    6   |   diff: 0.24347   |   V(s0) = 1.421   V(s1) = 4.926   V(s2) = 1.838
iter    7   |   diff: 0.21419   |   V(s0) = 1.655   V(s1) = 5.169   V(s2) = 2.075
iter    8   |   diff: 0.19277   |   V(s0) = 1.868   V(s1) = 5.381   V(s2) = 2.290
iter    9   |   diff: 0.17327   |   V(s0) = 2.061   V(s1) = 5.573   V(s2) = 2.481
iter   10   |   diff: 0.15569   |   V(s0) = 2.233   V(s1) = 5.746   V(s2) = 2.654
iter   11   |   diff: 0.14012   |   V(s0) = 2.389   V(s1) = 5.902   V(s2) = 2.810
iter   12   |   diff: 0.12610   |   V(s0) = 2.529   V(s1) = 6.042   V(s2) = 2.950
iter   13   |   diff: 0.11348   |   V(s0) = 2.655   V(s1) = 6.168   V(s2) = 3.076
iter   14   |   diff: 0.10213   |   V(s0) = 2.769   V(s1) = 6.282   V(s2) = 3.190
iter   15   |   diff: 0.09192   |   V(s0) = 2.871   V(s1) = 6.384   V(s2) = 3.292
iter   16   |   diff: 0.08272   |   V(s0) = 2.963   V(s1) = 6.476   V(s2) = 3.384
iter   17   |   diff: 0.07445   |   V(s0) = 3.045   V(s1) = 6.558   V(s2) = 3.467
iter   18   |   diff: 0.06701   |   V(s0) = 3.120   V(s1) = 6.633   V(s2) = 3.541
iter   19   |   diff: 0.06031   |   V(s0) = 3.187   V(s1) = 6.700   V(s2) = 3.608
iter   20   |   diff: 0.05428   |   V(s0) = 3.247   V(s1) = 6.760   V(s2) = 3.668
iter   21   |   diff: 0.04885   |   V(s0) = 3.301   V(s1) = 6.814   V(s2) = 3.723
iter   22   |   diff: 0.04396   |   V(s0) = 3.350   V(s1) = 6.863   V(s2) = 3.771
iter   23   |   diff: 0.03957   |   V(s0) = 3.394   V(s1) = 6.907   V(s2) = 3.815
iter   24   |   diff: 0.03561   |   V(s0) = 3.434   V(s1) = 6.947   V(s2) = 3.855
iter   25   |   diff: 0.03205   |   V(s0) = 3.469   V(s1) = 6.982   V(s2) = 3.891
iter   26   |   diff: 0.02884   |   V(s0) = 3.502   V(s1) = 7.014   V(s2) = 3.923
iter   27   |   diff: 0.02596   |   V(s0) = 3.530   V(s1) = 7.043   V(s2) = 3.951
iter   28   |   diff: 0.02336   |   V(s0) = 3.556   V(s1) = 7.069   V(s2) = 3.977
iter   29   |   diff: 0.02103   |   V(s0) = 3.580   V(s1) = 7.093   V(s2) = 4.001
iter   30   |   diff: 0.01892   |   V(s0) = 3.601   V(s1) = 7.114   V(s2) = 4.022
iter   31   |   diff: 0.01703   |   V(s0) = 3.620   V(s1) = 7.133   V(s2) = 4.041
iter   32   |   diff: 0.01533   |   V(s0) = 3.637   V(s1) = 7.150   V(s2) = 4.058
iter   33   |   diff: 0.01380   |   V(s0) = 3.652   V(s1) = 7.165   V(s2) = 4.073
iter   34   |   diff: 0.01242   |   V(s0) = 3.666   V(s1) = 7.179   V(s2) = 4.087
iter   35   |   diff: 0.01117   |   V(s0) = 3.678   V(s1) = 7.191   V(s2) = 4.099
iter   36   |   diff: 0.01006   |   V(s0) = 3.689   V(s1) = 7.202   V(s2) = 4.110
iter   37   |   diff: 0.00905   |   V(s0) = 3.699   V(s1) = 7.212   V(s2) = 4.121
iter   38   |   diff: 0.00815   |   V(s0) = 3.708   V(s1) = 7.221   V(s2) = 4.130
iter   39   |   diff: 0.00733   |   V(s0) = 3.717   V(s1) = 7.230   V(s2) = 4.138
iter   40   |   diff: 0.00660   |   V(s0) = 3.724   V(s1) = 7.237   V(s2) = 4.145
iter   41   |   diff: 0.00594   |   V(s0) = 3.731   V(s1) = 7.244   V(s2) = 4.152
iter   42   |   diff: 0.00534   |   V(s0) = 3.736   V(s1) = 7.249   V(s2) = 4.158
iter   43   |   diff: 0.00481   |   V(s0) = 3.742   V(s1) = 7.255   V(s2) = 4.163
iter   44   |   diff: 0.00433   |   V(s0) = 3.747   V(s1) = 7.260   V(s2) = 4.168
iter   45   |   diff: 0.00390   |   V(s0) = 3.751   V(s1) = 7.264   V(s2) = 4.172
iter   46   |   diff: 0.00351   |   V(s0) = 3.755   V(s1) = 7.268   V(s2) = 4.176
iter   47   |   diff: 0.00316   |   V(s0) = 3.758   V(s1) = 7.271   V(s2) = 4.179
iter   48   |   diff: 0.00284   |   V(s0) = 3.762   V(s1) = 7.275   V(s2) = 4.183
iter   49   |   diff: 0.00256   |   V(s0) = 3.764   V(s1) = 7.277   V(s2) = 4.185
iter   50   |   diff: 0.00230   |   V(s0) = 3.767   V(s1) = 7.280   V(s2) = 4.188
iter   51   |   diff: 0.00207   |   V(s0) = 3.769   V(s1) = 7.282   V(s2) = 4.190
iter   52   |   diff: 0.00186   |   V(s0) = 3.771   V(s1) = 7.284   V(s2) = 4.192
iter   53   |   diff: 0.00168   |   V(s0) = 3.773   V(s1) = 7.286   V(s2) = 4.194
iter   54   |   diff: 0.00151   |   V(s0) = 3.775   V(s1) = 7.288   V(s2) = 4.196
iter   55   |   diff: 0.00136   |   V(s0) = 3.776   V(s1) = 7.289   V(s2) = 4.197
iter   56   |   diff: 0.00122   |   V(s0) = 3.778   V(s1) = 7.291   V(s2) = 4.199
iter   57   |   diff: 0.00110   |   V(s0) = 3.779   V(s1) = 7.292   V(s2) = 4.200
iter   58   |   diff: 0.00099   |   V(s0) = 3.780   V(s1) = 7.293   V(s2) = 4.201
Terminated
In [37]:
display_jpeg(MDP.plot_graph_with_state_values(mdp, state_values))

Теперь воспользуемся этим $V^{*}(s)$, чтобы найти оптимальные действия в каждом состоянии:

$$\pi^*(s) = \text{argmax}_a \sum_{s'} P_{ss'}^a \cdot [ R_{ss'}^a + \gamma V_{i}(s')] = \text{argmax}_a Q_i(s,a)$$

Единственное отличие от $V(s)$ в том, что здесь мы берем не $\max$, а $\text{argmax}$: найти действие с максимальным $Q(s,a)$.

In [38]:
def get_optimal_action(mdp, state_values, state, gamma=0.9):
    """Finds optimal action using formula above."""
    if mdp.is_terminal(state):
        return None

    best_action = None
    q_max = float("-inf")
    actions = mdp.get_possible_actions(state)
    for a in actions:
        q = get_action_value(mdp, state_values, state, a, gamma)
        if q > q_max:
            best_action = a
            q_max = q

    return best_action
In [39]:
display_jpeg(
    MDP.plot_graph_optimal_strategy_and_state_values(
        mdp, state_values, get_action_value
    )
)
In [40]:
import numpy as np

# Measure agent's average reward

s = mdp.reset()
rewards = []
for _ in range(10000):
    s, r, done, _ = mdp.step(get_optimal_action(mdp, state_values, s, gamma))
    rewards.append(r)

print("average reward: ", np.mean(rewards))

assert 0.40 < np.mean(rewards) < 0.55
average reward:  0.4532

Temporal difference (TD)-обучение (TD-learning)¶

Итак, в тех случаях, когда известна вероятность $P_{ss'}^a$, задача обучения может быть решена с помощью методов динамического программирования, как было рассмотрено выше. В реальных же задачах динамика среды зачастую неизвестна. Два главных подхода, позволяющие обходить незнание динамики среды, включают метод Монте Карло (MC) и temporal difference (TD)-обучение (TD-learning). Оба эти подхода основываются на симулировании опыта взаимодействия со средой в виде траекторий (MDP trajectory).

MDP trajectory

Drawing

Траектория представляет из себя последовательность

• states ($s$) • actions ($a$) • rewards ($r$)

При этом MC оперирует полными траекториями, оканчивающимися в терминальном состоянии, а TD-learning позволяет учиться на неполных эпизодах, не дожидаясь конечного состояния.

Drawing

Рассмотрим второй подход подробнее.

TD-learning — один из наиболее мощных подходов, используемых во многих алгоритмах RL. Смысл в том, чтобы обновлять оценки ценности (состояния или действия) на основе уже обученных оценок для последующих состояний. Эту процедуру можно представить как "подтягивание" значения функции ценности того состояния, в котором мы находились на предыдущем шаге, к значению функции ценности того состояния, в котором мы находимся сейчас. Каким образом это можно сделать?

Базовый алгоритм TD-обучения — $TD(0)$ заключается в следующем:

Инициализируем функцию ценности, например, $V(s)$, и стратегию $\pi$. Пусть, у нас есть текущее состояние $s$ из которого мы:

  • выбираем действие $a$ по стратегии $\pi$,
  • совершаем действие $a$, наблюдаем награду $r$ и следующее состояние $s'$,
  • обновляем оценку $V(s)$:
$$V(s) := V(s) + \alpha(\underbrace{\overbrace{r+\gamma V(s')}^{\text{TD target}} - \overbrace{V(s)}^{\text{old estimate}}}_{\text{TD error}}),$$
  • переходим к следующему шагу $s:=s'$

При оптимальной политике $\pi$: $$\overbrace{r+\gamma V(s')}^{\text{TD target}} = \overbrace{V(s)}^{\text{old estimate}}$$

Cоответственно, их разницу, называемую TD-ошибкой (TD-error), мы и будем пытаться минимизировать.

Почему это работает? Ведь мы обучаем случайно инициализированную оценку $V(s)$ на основе других оценок $V(s')$, которые также были инициализированы случайно. Дело в том, что состояния, которые предшествуют получению реальных наград (например, за победу) будут обучаться первыми и затем распространять свою ценность на состояния, предшествующие им.

Стоит отметить также, что TD-обучение не обязательно всегда будет смотреть на один шаг вперед. Алгоритм, называемый $TD(\lambda)$, будет обновлять оценки функции ценности сразу на несколько шагов назад.

Разновидностями TD-обучения являются алгоритмы Q-learning и SARSA.

Q-Learning¶

Матрица перехода $P_{ss'}^{a}$ может быть нам неизвестна явно, что затруднит прямое применение алгоритма динамического программирования для нахождения Q или V функций в общем случае: $$ \large q_{\pi}(s, a) = \sum_{s'}P_{ss'}^{a}(R_{ss'}^{a}+\gamma\sum_{a'}\pi(a'|s') q_{\pi}(s', a')) = \mathbb{E}(R_{ss'}^a +\gamma \mathbb{E}_{\pi} q_{\pi}(s', a')). $$

Мы можем обойти это ограничение, заменив "честное" матожидание по MDP, на выборочное среднее от серии наблюдений вдоль различных траекторий процесса:

$$ \large q_{\pi}(s, a) = \frac{1}{N}\sum_n(R_{ss'}^a +\gamma \mathbb{E}_{\pi} q_{\pi}(s', a')). $$

Допустим, мы ходим по полю размером 12×4. Наша цель — попасть в ячейку (3,11). Максимизируя сумму будущих наград, мы также находим самый быстрый путь к цели, поэтому наша цель сейчас — найти способ сделать это!

Drawing
Source: Понимание Q-learning, проблема «Прогулка по скале»

Начнем с построения таблицы, которая измеряет, насколько хорошо будет выполнить определенное действие в любом состоянии (чем больше значение, тем лучше действие).

  • Таблица будет иметь одну строку для каждого состояния и один столбец для каждого действия. Сетка имеет 48 (4 по Y на 12 по X) состояний, и разрешены 4 действия ($\leftarrow \downarrow \rightarrow \uparrow$), поэтому таблица будет 48×4.
  • Значения, хранящиеся в этой таблице, называются «Q-values».
  • Это оценки суммы будущих наград. Другими словами, они оценивают, сколько еще вознаграждения мы можем получить до конца игры, находясь в состоянии S и выполняя действие A.
  • Мы инициализируем таблицу случайными значениями (или некоторой константой, например, всеми нулями).

Оптимальная «Q-table» имеет значения, которые позволяют нам предпринимать лучшие действия в каждом состоянии, давая нам в итоге лучший путь к победе.

Drawing
Source: Понимание Q-learning, проблема «Прогулка по скале»

Q-learning — это алгоритм, который «изучает» эти значения. На каждом шагу мы получаем больше информации о мире. Эта информация используется для обновления значений в таблице.

Drawing
Source: Понимание Q-learning, проблема «Прогулка по скале»

Итак, мы не знаем вероятности переходов. Мы можем только генерировать траектории и учиться на них.

Drawing

Возникает вопрос, какую из функций ценности нам лучше пытаться учить: $V(s)$ или $Q(s,a)$? Поскольку $V(s)$ не позволит определять политику без знания $P_{ss'}^a$, будет выгоднее учить $Q(s,a)$.

Drawing

Инициализировать таблицу $Q(s, a)$ нулями.

  • Цикл:
    • Семплировать $<s, a, r, s^{'}>$ из среды
    • Вычислить $\hat{Q}(s, a) = r(s,a) + \gamma Q(s^{'}, a_{i})$
    • Обновить $Q(s,a) \longleftarrow \alpha \hat{Q}(s, a) + (1-\alpha)Q(s,a)$

$\color{red}\triangle$ Мы хотим несмещённую оценку с меньшей дисперисей, поэтому используем скользящее среднее.

$\color{red}\triangle$ Пересчёт функции (поле в таблице) выполняется после перехода.

$\hat{Q}$ — расчетное значение, $Q$ — то, что было в таблице, $\alpha$ — шаг обучения.

Данный алгоритм называется Q-learning. Он напрямую аппроксимирует функцию $q_*(s,a)$, которая фактически представляет оптимальную политику $\pi_*$. В данном алгоритме оценка $Q(s, a)$ обновляется в соответствии с правилами TD-обучения:

$$\large Q(s,a) := Q(s,a) + \alpha(R_{ss'}^a+\gamma\max_{a'}Q(s',a') -Q(s,a))$$

Видно, что за счет максимума Q-learning будет аппроксимировать именно $q_*(s,a)$, независимо от того, какой политики мы придерживаемся (которая может быть, например $\varepsilon$-жадной). Наш вариант Q-learning не учитывает того, что мы хотим иногда "рандомить". В совокупности группа методов, при которых мы можем придерживаться абсолютно любой стратегии, а обучаться все равно будут правильные оптимальные значения политики, называется off-policy.

Deep Q-Learning¶

Есть задачи, где количество состояний и переходов огромно, и поддерживать таблицу значений не представляется возможным.

Иллюстрация игры Space Invaders

Количество состояний для $8$-мибитной цветной ($3$ RGB-канала) игры Space Invaders с размером игрового поля $210 \times 160$ пикселей:

$$\large |S| = 210\times160\times2^{8\times3}$$

В таких ситуациях Q-функцию не считают в явном виде, а аппроксимируют нейросетью.

Drawing

Чтобы считать градиент, приходится принимать значение Q-функции фиксированным и не зависящим от параметров нашей нейросети. Что, конечно, не так.

Loss¶

Drawing

Approximate Q-learning¶

Q-values: $$ \large \hat{Q}\left(s_{t}, a_{t}\right)=r+\gamma \cdot \max _{a^{\prime}} Q\left(s_{t+1}, a^{\prime}\right) $$ Objective: $$ \large L=\left(Q\left(s_{t}, a_{t}\right)-\left[r+\gamma \cdot \max _{a^{\prime}} Q\left(s_{t+1}, a^{\prime}\right)\right]\right)^{2} $$ Gradient step: $$ \large w_{t+1}=w_{t}-\alpha \cdot \frac{\delta L}{\delta w} $$

Алгоритм обучения¶

Basic deep Q-learning

Drawing

$\varepsilon -greedy$ нужна для исследования среды

Experience replay¶

Проблема:

Drawing

В большинстве окружений информация, получаемая агентом, распределена не независимо. Т.е. последовательные наблюдения агента сильно коррелированы между собой (что понятно из интуитивных соображений, т.к. большинство окружений, в которых применяется RL, предполагают, что все изменения в них последовательны).

Мы можем столкнуться с такой проблемой при обучении нейросети, если в игре есть большое количество стандартных персонажей и редко встречающиеся "боссы". После повторения большого количества типовых действий нейросеть будет забывать, как проходить боссов.

Корреляция примеров ухудшает сходимость стохастического градиентного спуска. Таким образом, нам нужен способ, который позволяет улучшить распределение примеров для обучения (устранить или снизить корреляцию между ними).

Drawing

Идея: сохранять несколько предыдущих взаимодействий $<s,a,r,s^{'}>$.

Обучать на случайных подвыборках.

Процесс обучения:

  • Сыграть 1 шаг и записать его
  • Выбрать $N$ случайных записей для обучения

Profit: не требуется заново возвращаться в те же пары состояние-действие $(s,a)$, чтобы выучить их.

  • Можно получить хорошую стратегию, сделав меньше взаимодействий со средой
  • Боремся с катастрофическим забыванием данных

$\color{red}\triangle$ Работает только с алгоритмами без политик.

Обычно используется метод проигрывания опыта (experience replay). Суть этого метода в том, что мы сохраняем некоторое количество примеров (состояние, действия, вознаграждение) в специальном буфере и для обучения выбираем случайные мини-батчи из этого буфера.

Так же experience replay позволяет агенту эффективнее использовать свой прошлый опыт.

Target network¶

В DQN для эффективного обучения должны использоваться две сети: одна непосредственно обучается, другая определяет цель (TD target). Почему мы не можем пользоваться одной и той же сетью для получения оценки текущего $Q(s', a')$ и предыдущего $Q(s, a)$ состояний?

Напомним, что в процессе TD-обучения мы "подтягиваем" значение функции ценности того состояния, в котором мы находились на предыдущем шаге, к значению функции ценности того состояния, в котором мы находимся сейчас, а в DQN мы делаем это с помощью нейронной сети, и если использовать одну и ту же сеть для оценки $Q(s', a')$ и $Q(s, a)$, то этот процесс может превратиться в нескончаемую погоню за целью, которую невозможно достичь, поскольку с каждым обновлением весов, приближающим нас к цели, одновременно сама цель будет удаляться от нас (как на рисунке ниже):

Drawing

К счастью, эта проблема решаема: нужно сделать так, чтобы сеть обучалась некоторое время аппроксимировать оценку $\max_{a'} Q(s', a')$, полученную без использования обновленных весов Q-сети. Для этого используют целевую сеть (target network) с зафиксированными весами, обновляемыми раз в несколько эпизодов обучения (например, эпох или игровых эпизодов, если обучение происходит после каждого игрового эпизода).

Drawing

Таким образом, параллельно будут существовать две сети, одна из которых будет отвечать за выбор действия, другая — за целевую функцию, одна из которых будет обучаться с отставанием.

Drawing

Пример c CartPole DQN¶

Сеть Deep Q = DQN (Deep Q-Network).

CartPole — перевернутый маятник с центром тяжести над своей точкой поворота. Он нестабилен, но его можно контролировать, перемещая точку поворота под центром масс. Цель состоит в том, чтобы сохранить равновесие, прикладывая соответствующие усилия к точке поворота.

Другой env можно использовать без каких-либо изменений кода. Пространство состояний должно быть единым вектором, действия должны быть дискретными.

CartPole — самый простой. На ее решение должно уйти несколько минут.

Для LunarLander может потребоваться 1–2 часа, чтобы получить 200 баллов (хороший балл) в Colab, и прогресс в обучении не выглядит информативным.

In [41]:
from IPython.display import clear_output
import os

################################################
# For CartPole
################################################

!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/setup_colab.sh -O- | bash

!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/week04_approx_rl/atari_wrappers.py
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/week04_approx_rl/utils.py
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/week04_approx_rl/replay_buffer.py
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/week04_approx_rl/framebuffer.py

!pip install swig
!pip install gym[box2d]

!touch .setup_complete


# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
    !bash ../xvfb start
    os.environ["DISPLAY"] = ":1"

clear_output()
In [42]:
import gym

ENV_NAME = "CartPole-v1"


def make_env(seed=None):
    # some envs are wrapped with a time limit wrapper by default
    env = gym.make(ENV_NAME, render_mode="rgb_array", new_step_api=True).unwrapped
    if seed is not None:
        env.seed(seed)
    return env
In [43]:
import matplotlib.pyplot as plt
import numpy as np
from warnings import simplefilter

simplefilter("ignore", category=DeprecationWarning)

env = make_env()
env.reset()
env_img = np.squeeze(env.render())
clear_output()
plt.imshow(env_img)
state_shape, n_actions = env.observation_space.shape, env.action_space.n

Архитектура сети¶

Теперь нам нужно построить нейронную сеть, которая может сопоставлять наблюдения с состоянием q-значений.

Модель не должна быть слишком сложной: 1–2 скрытых слоя с < 200 нейронами и активацией ReLU, вероятно, будет достаточно.

Batch Normalization и Dropout могут все испортить, поэтому их не используем.

In [44]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# those who have a GPU but feel unfair to use it can uncomment:
# device = torch.device('cpu')

print(state_shape)
(4,)
In [45]:
from torch import nn


class DQNAgent(nn.Module):
    def __init__(self, state_shape, n_actions, epsilon=0):
        super().__init__()
        self.epsilon = epsilon
        self.n_actions = n_actions
        self.state_shape = state_shape
        # Define your network body here. Please make sure agent is fully contained here
        assert len(state_shape) == 1
        state_dim = state_shape[0]

        # Define NN
        ##############################################
        hidden_size = 150
        self._nn = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions),
            nn.ReLU(),
        )
        ##############################################

    def forward(self, state_t):
        """
        takes agent's observation (tensor), returns qvalues (tensor)
        :param state_t: a batch states, shape = [batch_size, *state_dim=4]
        """
        # Use your network to compute qvalues for given state

        ##############################################
        qvalues = self._nn(state_t)
        ##############################################

        assert qvalues.requires_grad, "qvalues must be a torch tensor with grad"
        assert (
            len(qvalues.shape) == 2
            and qvalues.shape[0] == state_t.shape[0]
            and qvalues.shape[1] == n_actions
        )

        return qvalues

    def get_qvalues(self, states):
        """
        like forward, but works on numpy arrays, not tensors
        """
        model_device = next(self.parameters()).device
        states = torch.tensor(states, device=model_device, dtype=torch.float32)
        qvalues = self.forward(states)
        return qvalues.data.cpu().numpy()

    def sample_actions(self, qvalues):
        """pick actions given qvalues. Uses epsilon-greedy exploration strategy."""
        epsilon = self.epsilon
        batch_size, n_actions = qvalues.shape

        random_actions = np.random.choice(n_actions, size=batch_size)
        best_actions = qvalues.argmax(axis=-1)

        should_explore = np.random.choice([0, 1], batch_size, p=[1 - epsilon, epsilon])
        return np.where(should_explore, random_actions, best_actions)
In [46]:
agent = DQNAgent(state_shape, n_actions, epsilon=0.5).to(device)
In [47]:
def evaluate(env, agent, n_games=1, greedy=False, t_max=10000):
    """Plays n_games full games. If greedy, picks actions as argmax(qvalues). Returns mean reward."""
    rewards = []
    for _ in range(n_games):
        s = env.reset()
        reward = 0
        for _ in range(t_max):
            qvalues = agent.get_qvalues([s])
            action = (
                qvalues.argmax(axis=-1)[0]
                if greedy
                else agent.sample_actions(qvalues)[0]
            )
            s, r, done, _, _ = env.step(action)
            reward += r
            if done:
                break

        rewards.append(reward)
    return np.mean(rewards)

Experience Replay Buffer and Target Networks¶

Интерфейс довольно прост:

  • exp_replay.add(obs, act, rw, next_obs, done) — сохраняет (s,a,r,s',done) кортеж в буфер?
  • exp_replay.sample(batch_size) — возвращает observations, actions, rewards, next_observations и is_done для batch_size random samples,
  • len(exp_replay) — возвращает количество элементов, хранящихся в replay buffer.
In [48]:
from replay_buffer import ReplayBuffer

exp_replay = ReplayBuffer(2000)


target_network = DQNAgent(agent.state_shape, agent.n_actions, epsilon=0.5).to(device)
# This is how you can load weights from agent into target network
target_network.load_state_dict(agent.state_dict())
Out[48]:
<All keys matched successfully>

TD-Loss¶

Вычислим ошибку TD Q-learning:

$$ \large L = { 1 \over N} \sum_i [ Q_{\theta}(s,a) - Q_\text{reference}(s,a) ] ^2 $$

С Q-reference, определенным как:

$$ \large Q_\text{reference}(s,a) = r(s,a) + \gamma \cdot \max_{a'} Q_\text{target}(s', a'), $$

где:

  • $Q_\text{target}(s',a')$ обозначает $Q$-значение следующего предсказанного состояния и следующего действия target_network,
  • $s, a, r, s'$ — текущее состояние, действие, вознаграждение и следующее состояние соответственно,
  • $\gamma$ является коэффициентом дисконтирования, определенным двумя ячейками выше.
In [49]:
def compute_td_loss(
    states,
    actions,
    rewards,
    next_states,
    is_done,
    agent,
    target_network,
    gamma=0.99,
    check_shapes=False,
    device=device,
):
    """Compute td loss using torch operations only. Use the formulae above."""
    states = torch.tensor(
        states, device=device, dtype=torch.float32
    )  # shape: [batch_size, *state_shape]
    actions = torch.tensor(
        actions, device=device, dtype=torch.int64
    )  # shape: [batch_size]
    rewards = torch.tensor(
        rewards, device=device, dtype=torch.float32
    )  # shape: [batch_size]
    # shape: [batch_size, *state_shape]
    next_states = torch.tensor(next_states, device=device, dtype=torch.float)
    is_done = torch.tensor(
        is_done.astype("float32"),
        device=device,
        dtype=torch.float32,
    )  # shape: [batch_size]
    is_not_done = 1 - is_done

    # get q-values for all actions in current states
    predicted_qvalues = agent(states)  # shape: [batch_size, n_actions]

    # compute q-values for all actions in next states
    # with torch.no_grad():
    predicted_next_qvalues = target_network(
        next_states
    )  # shape: [batch_size, n_actions]

    # select q-values for chosen actions
    predicted_qvalues_for_actions = predicted_qvalues[
        range(len(actions)), actions
    ]  # shape: [batch_size]

    # compute V*(next_states) using predicted next q-values
    ##############################################
    next_state_values = predicted_next_qvalues.max(axis=-1)[0]
    ##############################################

    assert (
        next_state_values.dim() == 1 and next_state_values.shape[0] == states.shape[0]
    ), "must predict one value per state"

    # compute "target q-values" for loss - it's what's inside square parentheses in the above formula.
    # at the last state use the simplified formula: Q(s,a) = r(s,a) since s' doesn't exist
    # you can multiply next state values by is_not_done to achieve this.
    ###############################################
    target_qvalues_for_actions = rewards + gamma * next_state_values * is_not_done
    ##############################################

    # mean squared error loss to minimize
    loss = torch.mean(
        (predicted_qvalues_for_actions - target_qvalues_for_actions.detach()) ** 2
    )

    if check_shapes:
        assert (
            predicted_next_qvalues.data.dim() == 2
        ), "make sure you predicted q-values for all actions in next state"
        assert (
            next_state_values.data.dim() == 1
        ), "make sure you computed V(s') as maximum over just the actions axis and not all axes"
        assert (
            target_qvalues_for_actions.data.dim() == 1
        ), "there's something wrong with target q-values, they must be a vector"

    return loss

Реализация алгоритма¶

In [50]:
import random

seed = 42

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

env = make_env()
env.reset(seed=seed)
state_dim = env.observation_space.shape
n_actions = env.action_space.n
state = env.reset()

agent = DQNAgent(state_dim, n_actions, epsilon=1).to(device)
target_network = DQNAgent(state_dim, n_actions, epsilon=1).to(device)
target_network.load_state_dict(agent.state_dict())
Out[50]:
<All keys matched successfully>

Один агент для игры, второй для обучения. Периодически веса обученного агента копируются в "игрока".

In [51]:
def play_and_record(initial_state, agent, env, exp_replay, n_steps=1):
    """
    Play the game for exactly n_steps, record every (s,a,r,s', done) to replay buffer.
    Whenever game ends, add record with done=True and reset the game.
    It is guaranteed that env has done=False when passed to this function.

    PLEASE DO NOT RESET ENV UNLESS IT IS "DONE"

    :returns: return sum of rewards over time and the state in which the env stays

    hint: use agent.sample.actions
    """
    s = initial_state
    sum_rewards = 0

    # Play the game for n_steps as per instructions above
    for _ in range(n_steps):
        qvalues = agent.get_qvalues([s])

        action = agent.sample_actions(qvalues)[0]
        # action = action.argmax(axis=-1)[0]
        state, reward, done, _, _ = env.step(action)
        sum_rewards += reward

        exp_replay.add(s, action, reward, state, done)

        if done:
            state = env.reset()

        s = state

    return sum_rewards, s
In [52]:
import utils
from warnings import simplefilter

simplefilter("ignore", category=UserWarning)

REPLAY_BUFFER_SIZE = 10**4

exp_replay = ReplayBuffer(REPLAY_BUFFER_SIZE)
for i in range(100):
    if not utils.is_enough_ram(min_available_gb=0.1):
        print(
            """
            Less than 100 Mb RAM available.
            Make sure the buffer size in not too huge.
            Also check, maybe other processes consume RAM heavily.
            """
        )
        break
    play_and_record(state, agent, env, exp_replay, n_steps=10**2)
    if len(exp_replay) == REPLAY_BUFFER_SIZE:
        break
print(len(exp_replay))
10000
In [53]:
import time

timesteps_per_epoch = 1
batch_size = 32
total_steps = 4 * 10**4
decay_steps = 1 * 10**4

optimizer = torch.optim.Adam(agent.parameters(), lr=1e-4)

init_epsilon = 1
final_epsilon = 0.1

loss_freq = 20
refresh_target_network_freq = 100
eval_freq = 1000

max_grad_norm = 5000

mean_rw_history = []
td_loss_history = []
grad_norm_history = []
initial_state_v_history = []
step = 0


def wait_for_keyboard_interrupt():
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
In [54]:
from tqdm import trange

state = env.reset()
with trange(step, total_steps + 1) as progress_bar:
    for step in progress_bar:
        if not utils.is_enough_ram():
            print("less that 100 Mb RAM available, freezing")
            print("make sure everything is ok and use KeyboardInterrupt to continue")
            wait_for_keyboard_interrupt()

        agent.epsilon = utils.linear_decay(
            init_epsilon, final_epsilon, step, decay_steps
        )

        # play
        _, state = play_and_record(state, agent, env, exp_replay, timesteps_per_epoch)

        # train
        # sample batch_size of data from experience replay
        s, a, r, next_s, is_done = exp_replay.sample(batch_size)
        # loss = compute TD loss
        loss = compute_td_loss(s, a, r, next_s, is_done, agent, target_network)

        loss.backward()
        grad_norm = nn.utils.clip_grad_norm_(agent.parameters(), max_grad_norm)
        optimizer.step()
        optimizer.zero_grad()

        if step % loss_freq == 0:
            td_loss_history.append(loss.data.cpu().item())
            grad_norm_history.append(grad_norm.data.cpu().item())

        if step % refresh_target_network_freq == 0:
            # Load agent weights into target_network
            target_network.load_state_dict(agent.state_dict())

        if step % eval_freq == 0:
            mean_rw_history.append(
                evaluate(make_env(seed=step), agent, n_games=3, greedy=True, t_max=1000)
            )
            initial_state_q_values = agent.get_qvalues([make_env(seed=step).reset()])
            initial_state_v_history.append(np.max(initial_state_q_values))

            clear_output(True)
            print("buffer size = %i, epsilon = %.5f" % (len(exp_replay), agent.epsilon))

            plt.figure(figsize=[16, 9])

            plt.subplot(2, 2, 1)
            plt.title("Mean reward per episode")
            plt.plot(mean_rw_history)
            plt.grid()

            assert not np.isnan(td_loss_history[-1])
            plt.subplot(2, 2, 2)
            plt.title("TD loss history (smoothened)")
            plt.plot(utils.smoothen(td_loss_history))
            plt.grid()

            plt.subplot(2, 2, 3)
            plt.title("Initial state V")
            plt.plot(initial_state_v_history)
            plt.grid()

            plt.subplot(2, 2, 4)
            plt.title("Grad norm history (smoothened)")
            plt.plot(utils.smoothen(grad_norm_history))
            plt.grid()

            plt.show()
buffer size = 10000, epsilon = 0.10000
100%|██████████| 40001/40001 [05:35<00:00, 119.07it/s]
In [55]:
final_score = evaluate(make_env(), agent, n_games=30, greedy=True, t_max=1000)
print("final score:", final_score)
if final_score > 300:
    print("Well done")
else:
    print("not good enough for DQN")
final score: 98.9
not good enough for DQN

Литература

  • [arxiv] 🎓 Deep Reinforcement Learning for Autonomous Driving: A Survey
  • [arxiv] 🎓 Reinforcement Learning and Control as Probabilistic Inference: Tutorial and Review
  • [git] 🐾 Lecture notes and exercises for control theory course
  • [article] 🎓 Reinforcement learning for combinatorial optimization: A survey
  • [arxiv] 🎓 Neural Combinatorial Optimization with Reinforcement Learning
  • [arxiv] 🎓 Mastering Chess and Shogi by Self-Play with a General Reinforcement Learning Algorithm
  • [blog] ✏️ AlphaGo
  • [book] 📚 Reinforcement Learning for NLP
  • [arxiv] 🎓 Training language models to follow instructions with human feedback
  • [doc] 🛠️ Gymnasium

Дополнительно:

  • [git] 🐾 A course in reinforcement learning in the wild
  • [book] 📚 8.1. Обучение с подкреплением, Учебник по машинному обучению ШАД
  • [course] 📚 UC Berkeley CS188 Intro to AI
  • [course] 📚 Deep Reinforcement Learning