強化学習の初心者向けに、Q学習でCart-Poleを動かしてみます。
【強化学習入門】Q学習でCart-Poleを動かす(Google Colab+OpenAI Gym)
強化学習の入門として「Cart-Pole問題」というものがあります(強化学習の”Hello World!”と呼ばれているらしい)。Cart-Pole問題とは、棒(Pole)の支点を台車(Cart)に固定して、その台車を「左右」に動かすことでバランスをとり、棒が倒れないようにする問題のことです。
Cart-Poleのモデルを自身で実装してもよいのですが、少し面倒なので、この記事ではOpenAI Gymという有名なライブラリを利用します。
そして、そのOpenAI GymのCart-PoleをQ学習で動かしてみます。
(Q学習と深層学習を組み合わせたDQNで動かしてみたい方々はコチラ)
実行環境はGoogle Colaboratoryを想定しています。ブラウザでGoogle Colaboratoryをひらき、下図のように、本記事のプログラムをコピー&ペーストすれば実行できます。
この記事のポイント
- 強化学習の入門者向け
- 強化学習を実装レベルで勉強(i.e. アルゴリズムの詳細解説は省略)
- Google Colaboratoryで動かせるコードをご紹介(i.e. ブラウザ上だけで完結可能)
パッケージのインストール
Google Colaboratoryで以下のコマンドをコピー&ペーストして、OpenAI Gymをインストールします。
!pip install gym
表示のためのパッケージもインストールします。
!apt update !apt install xvfb !pip install pyvirtualdisplay
まずはランダムに動かしてみる
OpenAI GymのCart-Poleの挙動を見るために、まずはランダムに動かしてみます。
Google Colaboratoryで動かすために、描画関連の実装が少し複雑になっています。その部分は理解する必要がないので、軽く読み飛ばすとよいでしょう。
ソースコード
## OpenAI Gym import gym ## 表示用のパッケージ import base64 import io from gym.wrappers import Monitor from IPython import display from pyvirtualdisplay import Display ## 仮想のディスプレイを用意 virtual_display = Display() virtual_display.start() ## Cart-Poleの環境を用意 ## Google Colabで描画するためにgym.wrappers.Monitorを利用 env = Monitor(gym.make('CartPole-v0'),'./videos/', force=True) print("env.observation_space.shape = ", env.observation_space.shape) print("env.action_space.n = ", env.action_space.n) obs = env.reset() ## ランダムに100ステップ動かす for t in range(100): obs, reward, is_done, info = env.step(env.action_space.sample()) print("obs = ", obs) print("reward = ", reward) print("info = ", info) ## 終了判定(e.g. ポールが倒れた場合など) if is_done: print("Episode finished after {} timesteps".format(t+1)) env.reset() break ## 以下は描画用 for frame in env.videos: print("frame = ", frame) video = io.open(frame[0], 'r+b').read() encoded = base64.b64encode(video) ## 動画を埋め込み display.display(display.HTML(data=""" <video controls> <source src="data:video/mp4;base64,{0}" type="video/mp4" /> </video> """.format(encoded.decode('ascii'))))
結果
ランダムに動かしているだけなので、すぐに倒れてしまいます。
Q学習で動かしてみる
ソースコード
## OpenAI Gym import gym ## 表示用のパッケージ import base64 import io from gym.wrappers import Monitor from IPython import display from pyvirtualdisplay import Display ## その他必要なパッケージ import numpy as np ## Brainクラス class Brain: def __init__(self, num_states, list_state_range, list_state_reso, num_actions, gamma, r, lr): ## パラメータをセット self.num_states = num_states self.list_state_range = list_state_range self.list_state_reso = list_state_reso self.num_actions = num_actions self.eps = 1.0 # for epsilon greedy algorithm self.gamma = gamma self.r = r self.lr = lr ## Qテーブルを用意 self.q_table = np.random.rand(np.prod(list_state_reso), num_actions) ## ビンの配列(等差数列)を生成 ## 引数:最初の値、最後の値、要素数 def bins(self, clip_min, clip_max, num): return np.linspace(clip_min, clip_max, num + 1)[1:-1] ## 観測情報をQテーブル上のインデックスへ変換 ## 引数:観測情報 def getStateIndex(self, obs): list_index = [] for i in range(self.num_states): index = np.digitize(obs[i], bins=self.bins(self.list_state_range[i][0], self.list_state_range[i][1], self.list_state_reso[i])) # obs[i]が所属するビンのインデックスを取得 list_index.append(index) return sum([index*int(np.prod(self.list_state_reso[:i])) for i, index in enumerate(list_index)]) # 4次元のインデックスを1次元に変換して返す ## Qテーブルを更新 ## 引数:観測情報、アクションのインデックス、報酬、アクション後の観測情報 def updateQtable(self, obs, action, reward, next_obs): q = self.q_table[self.getStateIndex(obs), action] next_q_max = np.max(self.q_table[self.getStateIndex(next_obs)]) self.q_table[self.getStateIndex(obs), action] = q + self.lr*(reward + self.gamma*next_q_max - q) ## アクションを決定 ## 引数:観測情報、訓練フラグ def getAction(self, obs, is_training): if is_training and np.random.rand() < self.eps: action = np.random.randint(self.num_actions) else: action = np.argmax(self.q_table[self.getStateIndex(obs)]) ## epsを更新 if is_training and self.eps > 0.1: self.eps *= self.r return action ## Agentクラス class Agent: def __init__(self, num_states, list_state_range, list_state_reso, num_actions, gamma, r, lr): ## Brainを用意 self.brain = Brain(num_states, list_state_range, list_state_reso, num_actions, gamma, r, lr) ## Qテーブルを更新 ## 引数:観測情報、アクションのインデックス、報酬、アクション後の観測情報 def updateQtable(self, obs, action, reward, next_obs): self.brain.updateQtable(obs, action, reward, next_obs) ## アクションを決定 ## 引数:観測情報、訓練フラグ def getAction(self, obs, is_training): action = self.brain.getAction(obs, is_training) return action ## Environmentクラス class Environment: def __init__(self, num_episodes, max_step, gamma, r, lr): ## パラメータをセット self.num_episodes = num_episodes self.max_step = max_step ## Cart-Poleの環境を用意 self.env = Monitor(gym.make('CartPole-v0'), './videos/', video_callable=(lambda ep: ep % 100 == 0), force=True) # 100エピソードごとの動画を保存 ## Agentを用意 num_states = self.env.observation_space.shape[0] # position, velocity, angle, angular velocity list_state_range = [] for i in range(num_states): list_state_range.append([self.env.observation_space.low[i], self.env.observation_space.high[i]]) list_state_range[1] = [-3.0, 3.0] # 適当に範囲を設定 list_state_range[3] = [-0.5, 0.5] # 適当に範囲を設定 print("list_state_range = ", list_state_range) list_state_reso = [4, 4, 6, 6] # 各状態量のQテーブルにおける解像度を適当に設定 num_actions = self.env.action_space.n # アクションは「右」、「左」の2つ self.agent = Agent(num_states, list_state_range, list_state_reso, num_actions, gamma, r, lr) ## 訓練 ## 引数:なし def train(self): num_completed_episodes = 0 ## 指定するエピソード数でループ for episode in range(self.num_episodes): obs = self.env.reset() episode_reward = 0 ## 指定する最大ステップ数でループ for step in range(self.max_step): ## アクションを決定 action = self.agent.getAction(obs, is_training=True) ## アクション後の状態を取得 next_obs, _, is_done, _ = self.env.step(action) ## 報酬を付与 if is_done: if step < max_step - 1: reward = -100 else: reward = 1 num_completed_episodes += 1 else: reward = 1 episode_reward += reward ## Qテーブルを更新 self.agent.updateQtable(obs, action, reward, next_obs) ## 次のステップへ obs = next_obs ## 終了判定 if is_done: print('{0} Episode: Finished after {1} time steps with reward {2}'.format(episode, step+1, episode_reward)) break print("num_completed_episodes = ", num_completed_episodes) ## 評価(Q値が最大となるアクションを選択して、Qテーブルの更新はしない) ## 引数:なし def evaluate(self): obs = self.env.reset() for step in range(self.max_step): ## Q値が最大となるアクションを選択 action = self.agent.getAction(obs, is_training=False) ## アクション後の状態を取得 next_obs, _, is_done, _ = self.env.step(action) ## 次のステップへ obs = next_obs ## 終了判定 if is_done: print('Evaluation: Finished after {} time steps'.format(step+1)) break ## 描画用の関数 def show_video(env): env.reset() for frame in env.videos: print("frame = ", frame) video = io.open(frame[0], 'r+b').read() encoded = base64.b64encode(video) display.display(display.HTML(data=""" <video controls> <source src="data:video/mp4;base64,{0}" type="video/mp4" /> </video> """.format(encoded.decode('ascii'))) ) ## 仮想のディスプレイを用意 virtual_display = Display() virtual_display.start() ## パラメータ num_episodes = 500 max_step = 200 gamma = 0.9 r = 0.99 lr = 0.5 ## 実行 cartpole_env = Environment(num_episodes, max_step, gamma, r, lr) cartpole_env.train() cartpole_env.evaluate() show_video(cartpole_env.env)
結果
学習終了後は、上手くバランスをとって倒れなくなりました。
さいごに
Google Colaboratory上で、OpenAI GymのCart-PoleをQ学習で動かしてみました。
少しでも参考になれば幸いです。
以上です。
関連記事
Q学習の実装を理解した次は、Q学習と深層学習を組み合わせたDQN(Deep Q Network)を実装してみませんか?
コメント