update projects

This commit is contained in:
johnjim0816
2022-07-31 23:42:12 +08:00
parent e9b3e92141
commit ffab9e3028
236 changed files with 370 additions and 133 deletions

View File

@@ -0,0 +1,6 @@
## 环境汇总
[OpenAI Gym](./gym_info.md)
[MuJoCo](./mujoco_info.md)

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 113 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 767 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 510 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

View File

@@ -0,0 +1,122 @@
import gym
from gym import spaces
from gym.utils import seeding
def cmp(a, b):
return int((a > b)) - int((a < b))
# 1 = Ace, 2-10 = Number cards, Jack/Queen/King = 10
deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10]
def draw_card(np_random):
return np_random.choice(deck)
def draw_hand(np_random):
return [draw_card(np_random), draw_card(np_random)]
def usable_ace(hand): # Does this hand have a usable ace?
return 1 in hand and sum(hand) + 10 <= 21
def sum_hand(hand): # Return current hand total
if usable_ace(hand):
return sum(hand) + 10
return sum(hand)
def is_bust(hand): # Is this hand a bust?
return sum_hand(hand) > 21
def score(hand): # What is the score of this hand (0 if bust)
return 0 if is_bust(hand) else sum_hand(hand)
def is_natural(hand): # Is this hand a natural blackjack?
return sorted(hand) == [1, 10]
class BlackjackEnv(gym.Env):
"""Simple blackjack environment
Blackjack is a card game where the goal is to obtain cards that sum to as
near as possible to 21 without going over. They're playing against a fixed
dealer.
Face cards (Jack, Queen, King) have point value 10.
Aces can either count as 11 or 1, and it's called 'usable' at 11.
This game is placed with an infinite deck (or with replacement).
The game starts with each (player and dealer) having one face up and one
face down card.
The player can request additional cards (hit=1) until they decide to stop
(stick=0) or exceed 21 (bust).
After the player sticks, the dealer reveals their facedown card, and draws
until their sum is 17 or greater. If the dealer goes bust the player wins.
If neither player nor dealer busts, the outcome (win, lose, draw) is
decided by whose sum is closer to 21. The reward for winning is +1,
drawing is 0, and losing is -1.
The observation of a 3-tuple of: the players current sum,
the dealer's one showing card (1-10 where 1 is ace),
and whether or not the player holds a usable ace (0 or 1).
This environment corresponds to the version of the blackjack problem
described in Example 5.1 in Reinforcement Learning: An Introduction
by Sutton and Barto (1998).
https://webdocs.cs.ualberta.ca/~sutton/book/the-book.html
"""
def __init__(self, natural=False):
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Tuple((
spaces.Discrete(32),
spaces.Discrete(11),
spaces.Discrete(2)))
self._seed()
# Flag to payout 1.5 on a "natural" blackjack win, like casino rules
# Ref: http://www.bicyclecards.com/how-to-play/blackjack/
self.natural = natural
# Start the first game
self._reset() # Number of
self.n_actions = 2
def reset(self):
return self._reset()
def step(self, action):
return self._step(action)
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def _step(self, action):
assert self.action_space.contains(action)
if action: # hit: add a card to players hand and return
self.player.append(draw_card(self.np_random))
if is_bust(self.player):
done = True
reward = -1
else:
done = False
reward = 0
else: # stick: play out the dealers hand, and score
done = True
while sum_hand(self.dealer) < 17:
self.dealer.append(draw_card(self.np_random))
reward = cmp(score(self.player), score(self.dealer))
if self.natural and is_natural(self.player) and reward == 1:
reward = 1.5
return self._get_obs(), reward, done, {}
def _get_obs(self):
return (sum_hand(self.player), self.dealer[0], usable_ace(self.player))
def _reset(self):
self.dealer = draw_hand(self.np_random)
self.player = draw_hand(self.np_random)
# Auto-draw another card if the score is less than 12
while sum_hand(self.player) < 12:
self.player.append(draw_card(self.np_random))
return self._get_obs()

View File

@@ -0,0 +1,84 @@
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class CliffWalkingEnv(discrete.DiscreteEnv):
metadata = {'render.modes': ['human', 'ansi']}
def _limit_coordinates(self, coord):
coord[0] = min(coord[0], self.shape[0] - 1)
coord[0] = max(coord[0], 0)
coord[1] = min(coord[1], self.shape[1] - 1)
coord[1] = max(coord[1], 0)
return coord
def _calculate_transition_prob(self, current, delta):
new_position = np.array(current) + np.array(delta)
new_position = self._limit_coordinates(new_position).astype(int)
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
reward = -100.0 if self._cliff[tuple(new_position)] else -1.0
is_done = self._cliff[tuple(new_position)] or (tuple(new_position) == (3,11))
return [(1.0, new_state, reward, is_done)]
def __init__(self):
self.shape = (4, 12)
nS = np.prod(self.shape)
n_actions = 4
# Cliff Location
self._cliff = np.zeros(self.shape, dtype=np.bool)
self._cliff[3, 1:-1] = True
# Calculate transition probabilities
P = {}
for s in range(nS):
position = np.unravel_index(s, self.shape)
P[s] = { a : [] for a in range(n_actions) }
P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])
# We always start in state (3, 0)
isd = np.zeros(nS)
isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
super(CliffWalkingEnv, self).__init__(nS, n_actions, P, isd)
def render(self, mode='human', close=False):
self._render(mode, close)
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
for s in range(self.nS):
position = np.unravel_index(s, self.shape)
# print(self.s)
if self.s == s:
output = " x "
elif position == (3,11):
output = " T "
elif self._cliff[position]:
output = " C "
else:
output = " o "
if position[1] == 0:
output = output.lstrip()
if position[1] == self.shape[1] - 1:
output = output.rstrip()
output += "\n"
outfile.write(output)
outfile.write("\n")

View File

@@ -0,0 +1,125 @@
import io
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class GridworldEnv(discrete.DiscreteEnv):
"""
Grid World environment from Sutton's Reinforcement Learning book chapter 4.
You are an agent on an MxN grid and your goal is to reach the terminal
state at the top left or the bottom right corner.
For example, a 4x4 grid looks as follows:
T o o o
o x o o
o o o o
o o o T
x is your position and T are the two terminal states.
You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
Actions going off the edge leave you in your current state.
You receive a reward of -1 at each step until you reach a terminal state.
"""
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, shape=[4,4]):
if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
raise ValueError('shape argument must be a list/tuple of length 2')
self.shape = shape
nS = np.prod(shape)
n_actions = 4
MAX_Y = shape[0]
MAX_X = shape[1]
P = {}
grid = np.arange(nS).reshape(shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
# P[s][a] = (prob, next_state, reward, is_done)
P[s] = {a : [] for a in range(n_actions)}
is_done = lambda s: s == 0 or s == (nS - 1)
reward = 0.0 if is_done(s) else -1.0
# We're stuck in a terminal state
if is_done(s):
P[s][UP] = [(1.0, s, reward, True)]
P[s][RIGHT] = [(1.0, s, reward, True)]
P[s][DOWN] = [(1.0, s, reward, True)]
P[s][LEFT] = [(1.0, s, reward, True)]
# Not a terminal state
else:
ns_up = s if y == 0 else s - MAX_X
ns_right = s if x == (MAX_X - 1) else s + 1
ns_down = s if y == (MAX_Y - 1) else s + MAX_X
ns_left = s if x == 0 else s - 1
P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]
it.iternext()
# Initial state distribution is uniform
isd = np.ones(nS) / nS
# We expose the model of the environment for educational purposes
# This should not be used in any model-free learning algorithm
self.P = P
super(GridworldEnv, self).__init__(nS, n_actions, P, isd)
def _render(self, mode='human', close=False):
""" Renders the current gridworld layout
For example, a 4x4 grid with the mode="human" looks like:
T o o o
o x o o
o o o o
o o o T
where x is your position and T are the two terminal states.
"""
if close:
return
outfile = io.StringIO() if mode == 'ansi' else sys.stdout
grid = np.arange(self.nS).reshape(self.shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
if self.s == s:
output = " x "
elif s == 0 or s == self.nS - 1:
output = " T "
else:
output = " o "
if x == 0:
output = output.lstrip()
if x == self.shape[1] - 1:
output = output.rstrip()
outfile.write(output)
if x == self.shape[1] - 1:
outfile.write("\n")
it.iternext()

View File

@@ -0,0 +1,50 @@
# OpenAi Gym 环境说明
## 基础控制
### [CartPole v0](https://github.com/openai/gym/wiki/CartPole-v0)
<img src="assets/image-20200820174307301.png" alt="image-20200820174307301" style="zoom:50%;" />
通过向左或向右推车能够实现平衡所以动作空间由两个动作组成。每进行一个step就会给一个reward如果无法保持平衡那么done等于true本次episode失败。理想状态下每个episode至少能进行200个step也就是说每个episode的reward总和至少为200step数目至少为200
### CartPole-v1
```CartPole v1```环境其实跟```CartPole v0```是一模一样的区别在于每回合最大步数max_episode_steps以及奖励阈值reward_threshold如下是相关源码
![](assets/gym_info_20211130180023.png)
这里先解释一下奖励阈值reward_threshold即Gym设置的一个合格标准比如对于```CartPole v0```如果算法能够将奖励收敛到195以上说明该算法合格。但实际上```CartPole v0```的每回合最大步数max_episode_steps是200每步的奖励最大是1也就是每回合最大奖励是200比Gym设置的奖励阈值高。笔者猜测这是Gym可能是给算法学习者们设置的一个参考线而实际中在写算法时并不会用到这个算法阈值所以可以忽略。
再看每回合最大步数,可以看到```CartPole v1```的步数更长,相应的奖励要求更高,可以理解为```v1```是```v0```的难度升级版。
### [Pendulum-v0](https://github.com/openai/gym/wiki/Pendulum-v0)
gym 0.18.0之后版本中Pendulum-v0已经改为Pendulum-v1
<img src="assets/image-20200820174814084.png" alt="image-20200820174814084" style="zoom:50%;" />
钟摆以随机位置开始,目标是将其摆动,使其保持向上直立。动作空间是连续的,值的区间为[-2,2]。每个step给的reward最低为-16.27最高为0。目前最好的成绩是100个episode的reward之和为-123.11 ± 6.86。
###
悬崖寻路问题CliffWalking是指在一个4 x 12的网格中智能体以网格的左下角位置为起点以网格的下角位置为终点目标是移动智能体到达终点位置智能体每次可以在上、下、左、右这4个方向中移动一步每移动一步会得到-1单位的奖励。
<img src="./assets/image-20201007211441036.png" alt="image-20201007211441036" style="zoom:50%;" />
如图红色部分表示悬崖数字代表智能体能够观测到的位置信息即observation总共会有0-47等48个不同的值智能体再移动中会有以下限制
* 智能体不能移出网格,如果智能体想执行某个动作移出网格,那么这一步智能体不会移动,但是这个操作依然会得到-1单位的奖励
* 如果智能体“掉入悬崖” ,会立即回到起点位置,并得到-100单位的奖励
* 当智能体移动到终点时,该回合结束,该回合总奖励为各步奖励之和
实际的仿真界面如下:
<img src="./assets/image-20201007211858925.png" alt="image-20201007211858925" style="zoom:50%;" />
由于从起点到终点最少需要13步每步得到-1的reward因此最佳训练算法下每个episode下reward总和应该为-13。
## 参考
[Gym环境相关源码](https://github.com/openai/gym/tree/master/gym/envs)

View File

@@ -0,0 +1,42 @@
# MuJoCo
MuJoCoMulti-Joint dynamics with Contact是一个物理模拟器可以用于机器人控制优化等研究。安装见[Mac安装MuJoCo以及mujoco_py](https://blog.csdn.net/JohnJim0/article/details/115656392?spm=1001.2014.3001.5501)
## HalfCheetah-v2
该环境基于mujoco仿真引擎该环境的目的是使一只两只脚的“猎豹”跑得越快越好(下面图谷歌HalfCheetah-v2的https://gym.openai.com/envs/HalfCheetah-v2/)。
<img src="assets/image-20210429150630806.png" alt="image-20210429150630806" style="zoom:50%;" />
动作空间Box(6,)一只脚需要控制三个关节一共6个关节每个关节的运动范围为[-1, 1]。
状态空间Box(17, ),包含各种状态,每个值的范围为![img](assets/9cd6ae68c9aad008ede4139da358ec26.svg),主要描述“猎豹”本身的姿态等信息。
回报定义:每一步的回报与这一步的中猎豹的速度和猎豹行动的消耗有关,定义回报的代码如下。
```python
def step(self, action):
xposbefore = self.sim.data.qpos[0]
self.do_simulation(action, self.frame_skip)
xposafter = self.sim.data.qpos[0]
ob = self._get_obs()
reward_ctrl = - 0.1 * np.square(action).sum()
reward_run = (xposafter - xposbefore)/self.dt
# =========== reward ===========
reward = reward_ctrl + reward_run
# =========== reward ===========
done = False
return ob, reward, done, dict(reward_run=reward_run, reward_ctrl=reward_ctrl)
```
当猎豹无法控制平衡而倒下时,一个回合(episode)结束。
但是这个环境有一些问题目前经过搜索并不知道一个回合的reward上限实验中训练好的episode能跑出平台之外
<img src="assets/image-20210429150622353.png" alt="image-20210429150622353" style="zoom:50%;" />
加上时间有限所以训练中reward一直处于一个平缓上升的状态本人猜测这可能是gym的一个bug。

View File

@@ -0,0 +1,37 @@
## The Racetrack Environment
We have implemented a custom environment called "Racetrack" for you to use during this piece of coursework. It is inspired by the environment described in the course textbook (Reinforcement Learning, Sutton & Barto, 2018, Exercise 5.12), but is not exactly the same.
### Environment Description
Consider driving a race car around a turn on a racetrack. In order to complete the race as quickly as possible, you would want to drive as fast as you can but, to avoid running off the track, you must slow down while turning.
In our simplified racetrack environment, the agent is at one of a discrete set of grid positions. The agent also has a discrete speed in two directions, $x$ and $y$. So the state is represented as follows:
$$(\text{position}_y, \text{position}_x, \text{velocity}_y, \text{velocity}_x)$$
The agent collects a reward of -1 at each time step, an additional -10 for leaving the track (i.e., ending up on a black grid square in the figure below), and an additional +10 for reaching the finish line (any of the red grid squares). The agent starts each episode in a randomly selected grid-square on the starting line (green grid squares) with a speed of zero in both directions. At each time step, the agent can change its speed in both directions. Each speed can be changed by +1, -1 or 0, giving a total of nine actions. For example, the agent may increase its speed in the $x$ direction by -1 and its speed in the $y$ direction by +1. The agent's speed cannot be greater than +10 or less than -10 in either direction.
<img src="assets/track_big.png" style="width: 600px;"/>
The agent's next state is determined by its current grid square, its current speed in two directions, and the changes it makes to its speed in the two directions. This environment is stochastic. When the agent tries to change its speed, no change occurs (in either direction) with probability 0.2. In other words, 20% of the time, the agent's action is ignored and the car's speed remains the same in both directions.
If the agent leaves the track, it is returned to a random start grid-square and has its speed set to zero in both directions; the episode continues. An episode ends only when the agent transitions to a goal grid-square.
### Environment Implementation
We have implemented the above environment in the `racetrack_env.py` file, for you to use in this coursework. Please use this implementation instead of writing your own, and please do not modify the environment.
We provide a `RacetrackEnv` class for your agents to interact with. The class has the following methods:
- **`reset()`** - this method initialises the environment, chooses a random starting state, and returns it. This method should be called before the start of every episode.
- **`step(action)`** - this method takes an integer action (more on this later), and executes one time-step in the environment. It returns a tuple containing the next state, the reward collected, and whether the next state is a terminal state.
- **`render(sleep_time)`** - this method renders a matplotlib graph representing the environment. It takes an optional float parameter giving the number of seconds to display each time-step. This method is useful for testing and debugging, but should not be used during training since it is *very* slow. **Do not use this method in your final submission**.
- **`get_actions()`** - a simple method that returns the available actions in the current state. Always returns a list containing integers in the range [0-8] (more on this later).
In our code, states are represented as Python tuples - specifically a tuple of four integers. For example, if the agent is in a grid square with coordinates ($Y = 2$, $X = 3$), and is moving zero cells vertically and one cell horizontally per time-step, the state is represented as `(2, 3, 0, 1)`. Tuples of this kind will be returned by the `reset()` and `step(action)` methods.
There are nine actions available to the agent in each state, as described above. However, to simplify your code, we have represented each of the nine actions as an integer in the range [0-8]. The table below shows the index of each action, along with the corresponding changes it will cause to the agent's speed in each direction.
<img src="assets/action_grid.png" style="width: 250px;"/>
For example, taking action 8 will increase the agent's speed in the $x$ direction, but decrease its speed in the $y$ direction.

View File

@@ -0,0 +1,260 @@
# Please do not make changes to this file - it will be overwritten with a clean
# version when your work is marked.
#
# This file contains code for the racetrack environment that you will be using
# as part of the second part of the CM50270: Reinforcement Learning coursework.
import time
import random
import numpy as np
import os
import matplotlib.pyplot as plt
import matplotlib.patheffects as pe
from IPython.display import clear_output
from matplotlib import colors
class RacetrackEnv(object) :
"""
Class representing a race-track environment inspired by exercise 5.12 in Sutton & Barto 2018 (p.111).
Please do not make changes to this class - it will be overwritten with a clean version when it comes to marking.
The dynamics of this environment are detailed in this coursework exercise's jupyter notebook, although I have
included rather verbose comments here for those of you who are interested in how the environment has been
implemented (though this should not impact your solution code).
If you find any *bugs* with this code, please let me know immediately - thank you for finding them, sorry that I didn't!
However, please do not suggest optimisations - some things have been purposely simplified for readability's sake.
"""
ACTIONS_DICT = {
0 : (1, -1), # Acc Vert., Brake Horiz.
1 : (1, 0), # Acc Vert., Hold Horiz.
2 : (1, 1), # Acc Vert., Acc Horiz.
3 : (0, -1), # Hold Vert., Brake Horiz.
4 : (0, 0), # Hold Vert., Hold Horiz.
5 : (0, 1), # Hold Vert., Acc Horiz.
6 : (-1, -1), # Brake Vert., Brake Horiz.
7 : (-1, 0), # Brake Vert., Hold Horiz.
8 : (-1, 1) # Brake Vert., Acc Horiz.
}
CELL_TYPES_DICT = {
0 : "track",
1 : "wall",
2 : "start",
3 : "goal"
}
def __init__(self) :
# Load racetrack map from file.
self.track = np.flip(np.loadtxt(os.path.dirname(__file__)+"/track.txt", dtype = int), axis = 0)
# Discover start grid squares.
self.initial_states = []
for y in range(self.track.shape[0]) :
for x in range(self.track.shape[1]) :
if (self.CELL_TYPES_DICT[self.track[y, x]] == "start") :
self.initial_states.append((y, x))
self.is_reset = False
#print("Racetrack Environment File Loaded Successfully.")
#print("Be sure to call .reset() before starting to initialise the environment and get an initial state!")
def step(self, action : int) :
"""
Takes a given action in the environment's current state, and returns a next state,
reward, and whether the next state is terminal or not.
Arguments:
action {int} -- The action to take in the environment's current state. Should be an integer in the range [0-8].
Raises:
RuntimeError: Raised when the environment needs resetting.\n
TypeError: Raised when an action of an invalid type is given.\n
ValueError: Raised when an action outside the range [0-8] is given.\n
Returns:
A tuple of:\n
{(int, int, int, int)} -- The next state, a tuple of (y_pos, x_pos, y_velocity, x_velocity).\n
{int} -- The reward earned by taking the given action in the current environment state.\n
{bool} -- Whether the environment's next state is terminal or not.\n
"""
# Check whether a reset is needed.
if (not self.is_reset) :
raise RuntimeError(".step() has been called when .reset() is needed.\n" +
"You need to call .reset() before using .step() for the first time, and after an episode ends.\n" +
".reset() initialises the environment at the start of an episode, then returns an initial state.")
# Check that action is the correct type (either a python integer or a numpy integer).
if (not (isinstance(action, int) or isinstance(action, np.integer))) :
raise TypeError("action should be an integer.\n" +
"action value {} of type {} was supplied.".format(action, type(action)))
# Check that action is an allowed value.
if (action < 0 or action > 8) :
raise ValueError("action must be an integer in the range [0-8] corresponding to one of the legal actions.\n" +
"action value {} was supplied.".format(action))
# Update Velocity.
# With probability, 0.85 update velocity components as intended.
if (np.random.uniform() < 0.8) :
(d_y, d_x) = self.ACTIONS_DICT[action]
# With probability, 0.15 Do not change velocity components.
else :
(d_y, d_x) = (0, 0)
self.velocity = (self.velocity[0] + d_y, self.velocity[1] + d_x)
# Keep velocity within bounds (-10, 10).
if (self.velocity[0] > 10) :
self.velocity[0] = 10
elif (self.velocity[0] < -10) :
self.velocity[0] = -10
if (self.velocity[1] > 10) :
self.velocity[1] = 10
elif (self.velocity[1] < -10) :
self.velocity[1] = -10
# Update Position.
new_position = (self.position[0] + self.velocity[0], self.position[1] + self.velocity[1])
reward = 0
terminal = False
# If position is out-of-bounds, return to start and set velocity components to zero.
if (new_position[0] < 0 or new_position[1] < 0 or new_position[0] >= self.track.shape[0] or new_position[1] >= self.track.shape[1]) :
self.position = random.choice(self.initial_states)
self.velocity = (0, 0)
reward -= 10
# If position is in a wall grid-square, return to start and set velocity components to zero.
elif (self.CELL_TYPES_DICT[self.track[new_position]] == "wall") :
self.position = random.choice(self.initial_states)
self.velocity = (0, 0)
reward -= 10
# If position is in a track grid-squre or a start-square, update position.
elif (self.CELL_TYPES_DICT[self.track[new_position]] in ["track", "start"]) :
self.position = new_position
# If position is in a goal grid-square, end episode.
elif (self.CELL_TYPES_DICT[self.track[new_position]] == "goal") :
self.position = new_position
reward += 10
terminal = True
# If this gets reached, then the student has touched something they shouldn't have. Naughty!
else :
raise RuntimeError("You've met with a terrible fate, haven't you?\nDon't modify things you shouldn't!")
# Penalise every timestep.
reward -= 1
# Require a reset if the current state is terminal.
if (terminal) :
self.is_reset = False
# Return next state, reward, and whether the episode has ended.
return (self.position[0], self.position[1], self.velocity[0], self.velocity[1]), reward, terminal
def reset(self) :
"""
Resets the environment, ready for a new episode to begin, then returns an initial state.
The initial state will be a starting grid square randomly chosen using a uniform distribution,
with both components of the velocity being zero.
Returns:
{(int, int, int, int)} -- an initial state, a tuple of (y_pos, x_pos, y_velocity, x_velocity).
"""
# Pick random starting grid-square.
self.position = random.choice(self.initial_states)
# Set both velocity components to zero.
self.velocity = (0, 0)
self.is_reset = True
return (self.position[0], self.position[1], self.velocity[0], self.velocity[1])
def render(self, sleep_time : float = 0.1) :
"""
Renders a pretty matplotlib plot representing the current state of the environment.
Calling this method on subsequent timesteps will update the plot.
This is VERY VERY SLOW and wil slow down training a lot. Only use for debugging/testing.
Arguments:
sleep_time {float} -- How many seconds (or partial seconds) you want to wait on this rendered frame.
"""
# Turn interactive mode on.
plt.ion()
fig = plt.figure(num = "env_render")
ax = plt.gca()
ax.clear()
clear_output(wait = True)
# Prepare the environment plot and mark the car's position.
env_plot = np.copy(self.track)
env_plot[self.position] = 4
env_plot = np.flip(env_plot, axis = 0)
# Plot the gridworld.
cmap = colors.ListedColormap(["white", "black", "green", "red", "yellow"])
bounds = list(range(6))
norm = colors.BoundaryNorm(bounds, cmap.N)
ax.imshow(env_plot, cmap = cmap, norm = norm, zorder = 0)
# Plot the velocity.
if (not self.velocity == (0, 0)) :
ax.arrow(self.position[1], self.track.shape[0] - 1 - self.position[0], self.velocity[1], -self.velocity[0],
path_effects=[pe.Stroke(linewidth=1, foreground='black')], color = "yellow", width = 0.1, length_includes_head = True, zorder = 2)
# Set up axes.
ax.grid(which = 'major', axis = 'both', linestyle = '-', color = 'k', linewidth = 2, zorder = 1)
ax.set_xticks(np.arange(-0.5, self.track.shape[1] , 1));
ax.set_xticklabels([])
ax.set_yticks(np.arange(-0.5, self.track.shape[0], 1));
ax.set_yticklabels([])
# Draw everything.
#fig.canvas.draw()
#fig.canvas.flush_events()
plt.show()
# Sleep if desired.
if (sleep_time > 0) :
time.sleep(sleep_time)
def get_actions(self) :
"""
Returns the available actions in the current state - will always be a list
of integers in the range [0-8].
"""
return [*self.ACTIONS_DICT]
# num_steps = 1000000
# env = RacetrackEnv()
# state = env.reset()
# print(state)
# for _ in range(num_steps) :
# next_state, reward, terminal = env.step(random.choice(env.get_actions()))
# print(next_state)
# env.render()
# if (terminal) :
# _ = env.reset()

View File

@@ -0,0 +1,38 @@
# 贪吃蛇
贪吃蛇是一个起源于1976年的街机游戏 Blockade玩家控制蛇上下左右吃到食物并将身体增长吃到食物后移动速度逐渐加快直到碰到墙体或者蛇的身体算游戏结束。
![image-20200901202636603](img/image-20200901202636603.png)
如图本次任务整个游戏版面大小为560X560绿色部分就是我们的智能体贪吃蛇红色方块就是食物墙位于四周一旦食物被吃掉会在下一个随机位置刷出新的食物。蛇的每一节以及食物的大小为40X40除开墙体(厚度也为40)蛇可以活动的范围为480X480也就是12X12的栅格。环境的状态等信息如下
* state为一个元组包含(adjoining_wall_x, adjoining_wall_y, food_dir_x, food_dir_y, adjoining_body_top, adjoining_body_bottom, adjoining_body_left, adjoining_body_right).
* [adjoining_wall_x, adjoining_wall_y]提供蛇头是否与墙体相邻的信息具体包含9个状态
adjoining_wall_x0表示x轴方向蛇头无墙体相邻1表示有墙在蛇头左边2表示有墙在右边adjoining_wall_y0表示y轴方向蛇头无墙体相邻1表示有墙在蛇头上边2表示有墙在下边
注意[0,0]也包括蛇跑出480X480范围的情况
* [food_dir_x, food_dir_y]:表示食物与蛇头的位置关系
food_dir_x0表示食物与蛇头同在x轴上1表示食物在蛇头左侧(不一定相邻)2表示在右边
food_dir_y0表示食物与蛇头同在y轴上1表示食物在蛇头上面2表示在下面
* [adjoining_body_top, adjoining_body_bottom, adjoining_body_left, adjoining_body_right]:用以检查蛇的身体是否在蛇头的附近
adjoining_body_top1表示蛇头上边有蛇的身体0表示没有
adjoining_body_bottom1表示蛇头下边有蛇的身体0表示没有
adjoining_body_left1表示蛇头左边有蛇的身体0表示没有
adjoining_body_right1表示蛇头右边有蛇的身体0表示没有
* action即上下左右
* reward如果吃到食物给一个+1的reward如果蛇没了就-1其他情况给-0.1的reward

View File

@@ -0,0 +1,106 @@
import numpy as np
import utils
import random
import math
class Agent:
def __init__(self, actions, Ne, C, gamma):
self.actions = actions
self.Ne = Ne # used in exploration function
self.C = C
self.gamma = gamma
# Create the Q and N Table to work with
self.Q = utils.create_q_table()
self.N = utils.create_q_table()
self.reset()
def train(self):
self._train = True
def eval(self):
self._train = False
# At the end of training save the trained model
def save_model(self, model_path):
utils.save(model_path, self.Q)
# Load the trained model for evaluation
def load_model(self, model_path):
self.Q = utils.load(model_path)
def reset(self):
self.points = 0
self.s = None
self.a = None
def f(self,u,n):
if n < self.Ne:
return 1
return u
def R(self,points,dead):
if dead:
return -1
elif points > self.points:
return 1
return -0.1
def get_state(self, state):
# [adjoining_wall_x, adjoining_wall_y]
adjoining_wall_x = int(state[0] == utils.WALL_SIZE) + 2 * int(state[0] == utils.DISPLAY_SIZE - utils.WALL_SIZE)
adjoining_wall_y = int(state[1] == utils.WALL_SIZE) + 2 * int(state[1] == utils.DISPLAY_SIZE - utils.WALL_SIZE)
# [food_dir_x, food_dir_y]
food_dir_x = 1 + int(state[0] < state[3]) - int(state[0] == state[3])
food_dir_y = 1 + int(state[1] < state[4]) - int(state[1] == state[4])
# [adjoining_body_top, adjoining_body_bottom, adjoining_body_left, adjoining_body_right]
adjoining_body = [(state[0] - body_state[0], state[1] - body_state[1]) for body_state in state[2]]
adjoining_body_top = int([0, utils.GRID_SIZE] in adjoining_body)
adjoining_body_bottom = int([0, -utils.GRID_SIZE] in adjoining_body)
adjoining_body_left = int([utils.GRID_SIZE, 0] in adjoining_body)
adjoining_body_right = int([-utils.GRID_SIZE, 0] in adjoining_body)
return adjoining_wall_x, adjoining_wall_y, food_dir_x, food_dir_y, adjoining_body_top, adjoining_body_bottom, adjoining_body_left, adjoining_body_right
def update(self, _state, points, dead):
if self.s:
maxq = max(self.Q[_state])
reward = self.R(points,dead)
alpha = self.C / (self.C + self.N[self.s][self.a])
self.Q[self.s][self.a] += alpha * (reward + self.gamma * maxq - self.Q[self.s][self.a])
self.N[self.s][self.a] += 1.0
def choose_action(self, state, points, dead):
'''
:param state: a list of [snake_head_x, snake_head_y, snake_body, food_x, food_y] from environment.
:param points: float, the current points from environment
:param dead: boolean, if the snake is dead
:return: the index of action. 0,1,2,3 indicates up,down,left,right separately
Return the index of action the snake needs to take, according to the state and points known from environment.
Tips: you need to discretize the state to the state space defined on the webpage first.
(Note that [adjoining_wall_x=0, adjoining_wall_y=0] is also the case when snake runs out of the 480x480 board)
'''
_state = self.get_state(state)
Qs = self.Q[_state][:]
if self._train:
self.update(_state, points, dead)
if dead:
self.reset()
return
Ns = self.N[_state]
Fs = [self.f(Qs[a], Ns[a]) for a in self.actions]
action = np.argmax(Fs)
self.s = _state
self.a = action
else:
if dead:
self.reset()
return
action = np.argmax(Qs)
self.points = points
return action

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,185 @@
import pygame
from pygame.locals import *
import argparse
from agent import Agent
from snake_env import SnakeEnv
import utils
import time
def get_args():
parser = argparse.ArgumentParser(description='CS440 MP4 Snake')
parser.add_argument('--human', default = False, action="store_true",
help='making the game human playable - default False')
parser.add_argument('--model_name', dest="model_name", type=str, default="checkpoint3.npy",
help='name of model to save if training or to load if evaluating - default q_agent')
parser.add_argument('--train_episodes', dest="train_eps", type=int, default=10000,
help='number of training episodes - default 10000')
parser.add_argument('--test_episodes', dest="test_eps", type=int, default=1000,
help='number of testing episodes - default 1000')
parser.add_argument('--show_episodes', dest="show_eps", type=int, default=10,
help='number of displayed episodes - default 10')
parser.add_argument('--window', dest="window", type=int, default=100,
help='number of episodes to keep running stats for during training - default 100')
parser.add_argument('--Ne', dest="Ne", type=int, default=40,
help='the Ne parameter used in exploration function - default 40')
parser.add_argument('--C', dest="C", type=int, default=40,
help='the C parameter used in learning rate - default 40')
parser.add_argument('--gamma', dest="gamma", type=float, default=0.2,
help='the gamma paramter used in learning rate - default 0.7')
parser.add_argument('--snake_head_x', dest="snake_head_x", type=int, default=200,
help='initialized x position of snake head - default 200')
parser.add_argument('--snake_head_y', dest="snake_head_y", type=int, default=200,
help='initialized y position of snake head - default 200')
parser.add_argument('--food_x', dest="food_x", type=int, default=80,
help='initialized x position of food - default 80')
parser.add_argument('--food_y', dest="food_y", type=int, default=80,
help='initialized y position of food - default 80')
cfg = parser.parse_args()
return cfg
class Application:
def __init__(self, args):
self.args = args
self.env = SnakeEnv(args.snake_head_x, args.snake_head_y, args.food_x, args.food_y)
self.agent = Agent(self.env.get_actions(), args.Ne, args.C, args.gamma)
def execute(self):
if not self.args.human:
if self.args.train_eps != 0:
self.train()
self.eval()
self.show_games()
def train(self):
print("Train Phase:")
self.agent.train()
window = self.args.window
self.points_results = []
first_eat = True
start = time.time()
for game in range(1, self.args.train_eps + 1):
state = self.env.get_state()
dead = False
action = self.agent.choose_action(state, 0, dead)
while not dead:
state, points, dead = self.env.step(action)
# For debug convenience, you can check if your Q-table mathches ours for given setting of parameters
# (see Debug Convenience part on homework 4 web page)
if first_eat and points == 1:
self.agent.save_model(utils.CHECKPOINT)
first_eat = False
action = self.agent.choose_action(state, points, dead)
points = self.env.get_points()
self.points_results.append(points)
if game % self.args.window == 0:
print(
"Games:", len(self.points_results) - window, "-", len(self.points_results),
"Points (Average:", sum(self.points_results[-window:])/window,
"Max:", max(self.points_results[-window:]),
"Min:", min(self.points_results[-window:]),")",
)
self.env.reset()
print("Training takes", time.time() - start, "seconds")
self.agent.save_model(self.args.model_name)
def eval(self):
print("Evaling Phase:")
self.agent.eval()
self.agent.load_model(self.args.model_name)
points_results = []
start = time.time()
for game in range(1, self.args.test_eps + 1):
state = self.env.get_state()
dead = False
action = self.agent.choose_action(state, 0, dead)
while not dead:
state, points, dead = self.env.step(action)
action = self.agent.choose_action(state, points, dead)
points = self.env.get_points()
points_results.append(points)
self.env.reset()
print("Testing takes", time.time() - start, "seconds")
print("Number of Games:", len(points_results))
print("Average Points:", sum(points_results)/len(points_results))
print("Max Points:", max(points_results))
print("Min Points:", min(points_results))
def show_games(self):
print("Display Games")
self.env.display()
pygame.event.pump()
self.agent.eval()
points_results = []
end = False
for game in range(1, self.args.show_eps + 1):
state = self.env.get_state()
dead = False
action = self.agent.choose_action(state, 0, dead)
count = 0
while not dead:
count +=1
pygame.event.pump()
keys = pygame.key.get_pressed()
if keys[K_ESCAPE] or self.check_quit():
end = True
break
state, points, dead = self.env.step(action)
# Qlearning agent
if not self.args.human:
action = self.agent.choose_action(state, points, dead)
# for human player
else:
for event in pygame.event.get():
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_UP:
action = 2
elif event.key == pygame.K_DOWN:
action = 3
elif event.key == pygame.K_LEFT:
action = 1
elif event.key == pygame.K_RIGHT:
action = 0
if end:
break
self.env.reset()
points_results.append(points)
print("Game:", str(game)+"/"+str(self.args.show_eps), "Points:", points)
if len(points_results) == 0:
return
print("Average Points:", sum(points_results)/len(points_results))
def check_quit(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
return True
return False
def main():
cfg = get_args()
app = Application(cfg)
app.execute()
if __name__ == "__main__":
main()

Binary file not shown.

View File

@@ -0,0 +1,202 @@
import random
import pygame
import utils
class SnakeEnv:
def __init__(self, snake_head_x, snake_head_y, food_x, food_y):
self.game = Snake(snake_head_x, snake_head_y, food_x, food_y)
self.render = False
def get_actions(self):
return self.game.get_actions()
def reset(self):
return self.game.reset()
def get_points(self):
return self.game.get_points()
def get_state(self):
return self.game.get_state()
def step(self, action):
state, points, dead = self.game.step(action)
if self.render:
self.draw(state, points, dead)
# return state, reward, done
return state, points, dead
def draw(self, state, points, dead):
snake_head_x, snake_head_y, snake_body, food_x, food_y = state
self.display.fill(utils.BLUE)
pygame.draw.rect( self.display, utils.BLACK,
[
utils.GRID_SIZE,
utils.GRID_SIZE,
utils.DISPLAY_SIZE - utils.GRID_SIZE * 2,
utils.DISPLAY_SIZE - utils.GRID_SIZE * 2
])
# draw snake head
pygame.draw.rect(
self.display,
utils.GREEN,
[
snake_head_x,
snake_head_y,
utils.GRID_SIZE,
utils.GRID_SIZE
],
3
)
# draw snake body
for seg in snake_body:
pygame.draw.rect(
self.display,
utils.GREEN,
[
seg[0],
seg[1],
utils.GRID_SIZE,
utils.GRID_SIZE,
],
1
)
# draw food
pygame.draw.rect(
self.display,
utils.RED,
[
food_x,
food_y,
utils.GRID_SIZE,
utils.GRID_SIZE
]
)
text_surface = self.font.render("Points: " + str(points), True, utils.BLACK)
text_rect = text_surface.get_rect()
text_rect.center = ((280),(25))
self.display.blit(text_surface, text_rect)
pygame.display.flip()
if dead:
# slow clock if dead
self.clock.tick(1)
else:
self.clock.tick(5)
return
def display(self):
pygame.init()
pygame.display.set_caption('MP4: Snake')
self.clock = pygame.time.Clock()
pygame.font.init()
self.font = pygame.font.Font(pygame.font.get_default_font(), 15)
self.display = pygame.display.set_mode((utils.DISPLAY_SIZE, utils.DISPLAY_SIZE), pygame.HWSURFACE)
self.draw(self.game.get_state(), self.game.get_points(), False)
self.render = True
class Snake:
def __init__(self, snake_head_x, snake_head_y, food_x, food_y):
self.init_snake_head_x,self.init_snake_head_y = snake_head_x,snake_head_y # 蛇头初始位置
self.init_food_x, self.init_food_y = food_x, food_y # 食物初始位置
self.reset()
def reset(self):
self.points = 0
self.snake_head_x, self.snake_head_y = self.init_snake_head_x, self.init_snake_head_y
self.food_x, self.food_y = self.init_food_x, self.init_food_y
self.snake_body = [] # 蛇身的位置集合
def get_points(self):
return self.points
def get_actions(self):
return [0, 1, 2, 3]
def get_state(self):
return [
self.snake_head_x,
self.snake_head_y,
self.snake_body,
self.food_x,
self.food_y
]
def move(self, action):
'''根据action指令移动蛇头并返回是否撞死
'''
delta_x = delta_y = 0
if action == 0: # 上
delta_x = utils.GRID_SIZE
elif action == 1:
delta_x = - utils.GRID_SIZE
elif action == 2:
delta_y = - utils.GRID_SIZE
elif action == 3:
delta_y = utils.GRID_SIZE
old_body_head = None
if len(self.snake_body) == 1:
old_body_head = self.snake_body[0]
self.snake_body.append((self.snake_head_x, self.snake_head_y))
self.snake_head_x += delta_x
self.snake_head_y += delta_y
if len(self.snake_body) > self.points: # 说明没有吃到食物
del(self.snake_body[0])
self.handle_eatfood()
# 蛇长大于1时蛇头与蛇身任一位置重叠则看作蛇与自身相撞
if len(self.snake_body) >= 1:
for seg in self.snake_body:
if self.snake_head_x == seg[0] and self.snake_head_y == seg[1]:
return True
# 蛇长为1时如果蛇头与之前的位置重复则看作蛇与自身相撞
if len(self.snake_body) == 1:
if old_body_head == (self.snake_head_x, self.snake_head_y):
return True
# 蛇头是否撞墙
if (self.snake_head_x < utils.GRID_SIZE or self.snake_head_y < utils.GRID_SIZE or
self.snake_head_x + utils.GRID_SIZE > utils.DISPLAY_SIZE-utils.GRID_SIZE or self.snake_head_y + utils.GRID_SIZE > utils.DISPLAY_SIZE-utils.GRID_SIZE):
return True
return False
def step(self, action):
is_dead = self.move(action)
return self.get_state(), self.get_points(), is_dead
def handle_eatfood(self):
if (self.snake_head_x == self.food_x) and (self.snake_head_y == self.food_y):
self.random_food()
self.points += 1
def random_food(self):
'''生成随机位置的食物
'''
max_x = (utils.DISPLAY_SIZE - utils.WALL_SIZE - utils.GRID_SIZE)
max_y = (utils.DISPLAY_SIZE - utils.WALL_SIZE - utils.GRID_SIZE)
self.food_x = random.randint(utils.WALL_SIZE, max_x)//utils.GRID_SIZE * utils.GRID_SIZE
self.food_y = random.randint(utils.WALL_SIZE, max_y)//utils.GRID_SIZE * utils.GRID_SIZE
while self.check_food_on_snake(): # 食物不能生成在蛇身上
self.food_x = random.randint(utils.WALL_SIZE, max_x)//utils.GRID_SIZE * utils.GRID_SIZE
self.food_y = random.randint(utils.WALL_SIZE, max_y)//utils.GRID_SIZE * utils.GRID_SIZE
def check_food_on_snake(self):
if self.food_x == self.snake_head_x and self.food_y == self.snake_head_y:
return True
for seg in self.snake_body:
if self.food_x == seg[0] and self.food_y == seg[1]:
return True
return False

View File

@@ -0,0 +1,55 @@
import numpy as np
DISPLAY_SIZE = 560
GRID_SIZE = 40
WALL_SIZE = 40
WHITE = (255, 255, 255)
RED = (255, 0, 0)
BLUE = (72, 61, 139)
BLACK = (0, 0, 0)
GREEN = (0, 255, 0)
NUM_ADJOINING_WALL_X_STATES=3
NUM_ADJOINING_WALL_Y_STATES=3
NUM_FOOD_DIR_X=3
NUM_FOOD_DIR_Y=3
NUM_ADJOINING_BODY_TOP_STATES=2
NUM_ADJOINING_BODY_BOTTOM_STATES=2
NUM_ADJOINING_BODY_LEFT_STATES=2
NUM_ADJOINING_BODY_RIGHT_STATES=2
NUM_ACTIONS = 4
CHECKPOINT = 'checkpoint.npy'
def create_q_table():
return np.zeros((NUM_ADJOINING_WALL_X_STATES, NUM_ADJOINING_WALL_Y_STATES, NUM_FOOD_DIR_X, NUM_FOOD_DIR_Y,
NUM_ADJOINING_BODY_TOP_STATES, NUM_ADJOINING_BODY_BOTTOM_STATES, NUM_ADJOINING_BODY_LEFT_STATES,
NUM_ADJOINING_BODY_RIGHT_STATES, NUM_ACTIONS))
def sanity_check(arr):
if (type(arr) is np.ndarray and
arr.shape==(NUM_ADJOINING_WALL_X_STATES, NUM_ADJOINING_WALL_Y_STATES, NUM_FOOD_DIR_X, NUM_FOOD_DIR_Y,
NUM_ADJOINING_BODY_TOP_STATES, NUM_ADJOINING_BODY_BOTTOM_STATES, NUM_ADJOINING_BODY_LEFT_STATES,
NUM_ADJOINING_BODY_RIGHT_STATES,NUM_ACTIONS)):
return True
else:
return False
def save(filename, arr):
if sanity_check(arr):
np.save(filename,arr)
return True
else:
print("Failed to save model")
return False
def load(filename):
try:
arr = np.load(filename)
if sanity_check(arr):
print("Loaded model successfully")
return arr
print("Model loaded is not in the required format")
return None
except:
print("Filename doesnt exist")
return None

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python
# coding=utf-8
'''
Author: John
Email: johnjim0816@gmail.com
Date: 2021-03-24 22:12:19
LastEditor: John
LastEditTime: 2021-03-26 17:12:43
Discription:
Environment:
'''
import numpy as np
import random
class StochasticMDP:
def __init__(self):
self.end = False
self.curr_state = 2
self.n_actions = 2
self.n_states = 6
self.p_right = 0.5
def reset(self):
self.end = False
self.curr_state = 2
state = np.zeros(self.n_states)
state[self.curr_state - 1] = 1.
return state
def step(self, action):
if self.curr_state != 1:
if action == 1:
if random.random() < self.p_right and self.curr_state < self.n_states:
self.curr_state += 1
else:
self.curr_state -= 1
if action == 0:
self.curr_state -= 1
if self.curr_state == self.n_states:
self.end = True
state = np.zeros(self.n_states)
state[self.curr_state - 1] = 1.
if self.curr_state == 1:
if self.end:
return state, 1.00, True, {}
else:
return state, 1.00/100.00, True, {}
else:
return state, 0.0, False, {}

View File

@@ -0,0 +1,15 @@
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 0 0 0 0 0 3 3 3 3 3 1
1 1 1 1 1 1 0 0 0 0 0 0 0 3 3 3 3 3 1
1 1 1 1 1 0 0 0 0 0 0 0 0 3 3 3 3 3 1
1 1 1 1 0 0 0 0 0 0 0 0 0 3 3 3 3 3 1
1 1 1 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1
1 1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1
1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1
1 1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1

View File

@@ -0,0 +1,82 @@
import gym
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class WindyGridworldEnv(discrete.DiscreteEnv):
metadata = {'render.modes': ['human', 'ansi']}
def _limit_coordinates(self, coord):
coord[0] = min(coord[0], self.shape[0] - 1)
coord[0] = max(coord[0], 0)
coord[1] = min(coord[1], self.shape[1] - 1)
coord[1] = max(coord[1], 0)
return coord
def _calculate_transition_prob(self, current, delta, winds):
new_position = np.array(current) + np.array(delta) + np.array([-1, 0]) * winds[tuple(current)]
new_position = self._limit_coordinates(new_position).astype(int)
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
is_done = tuple(new_position) == (3, 7)
return [(1.0, new_state, -1.0, is_done)]
def __init__(self):
self.shape = (7, 10)
nS = np.prod(self.shape)
n_actions = 4
# Wind strength
winds = np.zeros(self.shape)
winds[:,[3,4,5,8]] = 1
winds[:,[6,7]] = 2
# Calculate transition probabilities
P = {}
for s in range(nS):
position = np.unravel_index(s, self.shape)
P[s] = { a : [] for a in range(n_actions) }
P[s][UP] = self._calculate_transition_prob(position, [-1, 0], winds)
P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1], winds)
P[s][DOWN] = self._calculate_transition_prob(position, [1, 0], winds)
P[s][LEFT] = self._calculate_transition_prob(position, [0, -1], winds)
# We always start in state (3, 0)
isd = np.zeros(nS)
isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
super(WindyGridworldEnv, self).__init__(nS, n_actions, P, isd)
def render(self, mode='human', close=False):
self._render(mode, close)
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
for s in range(self.nS):
position = np.unravel_index(s, self.shape)
# print(self.s)
if self.s == s:
output = " x "
elif position == (3,7):
output = " T "
else:
output = " o "
if position[1] == 0:
output = output.lstrip()
if position[1] == self.shape[1] - 1:
output = output.rstrip()
output += "\n"
outfile.write(output)
outfile.write("\n")