This commit is contained in:
JohnJim0816
2021-03-23 16:10:11 +08:00
parent d4690c2058
commit bf0f2990cf
198 changed files with 1668 additions and 1545 deletions

122
codes/envs/blackjack.py Normal file
View File

@@ -0,0 +1,122 @@
import gym
from gym import spaces
from gym.utils import seeding
def cmp(a, b):
return int((a > b)) - int((a < b))
# 1 = Ace, 2-10 = Number cards, Jack/Queen/King = 10
deck = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10]
def draw_card(np_random):
return np_random.choice(deck)
def draw_hand(np_random):
return [draw_card(np_random), draw_card(np_random)]
def usable_ace(hand): # Does this hand have a usable ace?
return 1 in hand and sum(hand) + 10 <= 21
def sum_hand(hand): # Return current hand total
if usable_ace(hand):
return sum(hand) + 10
return sum(hand)
def is_bust(hand): # Is this hand a bust?
return sum_hand(hand) > 21
def score(hand): # What is the score of this hand (0 if bust)
return 0 if is_bust(hand) else sum_hand(hand)
def is_natural(hand): # Is this hand a natural blackjack?
return sorted(hand) == [1, 10]
class BlackjackEnv(gym.Env):
"""Simple blackjack environment
Blackjack is a card game where the goal is to obtain cards that sum to as
near as possible to 21 without going over. They're playing against a fixed
dealer.
Face cards (Jack, Queen, King) have point value 10.
Aces can either count as 11 or 1, and it's called 'usable' at 11.
This game is placed with an infinite deck (or with replacement).
The game starts with each (player and dealer) having one face up and one
face down card.
The player can request additional cards (hit=1) until they decide to stop
(stick=0) or exceed 21 (bust).
After the player sticks, the dealer reveals their facedown card, and draws
until their sum is 17 or greater. If the dealer goes bust the player wins.
If neither player nor dealer busts, the outcome (win, lose, draw) is
decided by whose sum is closer to 21. The reward for winning is +1,
drawing is 0, and losing is -1.
The observation of a 3-tuple of: the players current sum,
the dealer's one showing card (1-10 where 1 is ace),
and whether or not the player holds a usable ace (0 or 1).
This environment corresponds to the version of the blackjack problem
described in Example 5.1 in Reinforcement Learning: An Introduction
by Sutton and Barto (1998).
https://webdocs.cs.ualberta.ca/~sutton/book/the-book.html
"""
def __init__(self, natural=False):
self.action_space = spaces.Discrete(2)
self.observation_space = spaces.Tuple((
spaces.Discrete(32),
spaces.Discrete(11),
spaces.Discrete(2)))
self._seed()
# Flag to payout 1.5 on a "natural" blackjack win, like casino rules
# Ref: http://www.bicyclecards.com/how-to-play/blackjack/
self.natural = natural
# Start the first game
self._reset() # Number of
self.n_actions = 2
def reset(self):
return self._reset()
def step(self, action):
return self._step(action)
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def _step(self, action):
assert self.action_space.contains(action)
if action: # hit: add a card to players hand and return
self.player.append(draw_card(self.np_random))
if is_bust(self.player):
done = True
reward = -1
else:
done = False
reward = 0
else: # stick: play out the dealers hand, and score
done = True
while sum_hand(self.dealer) < 17:
self.dealer.append(draw_card(self.np_random))
reward = cmp(score(self.player), score(self.dealer))
if self.natural and is_natural(self.player) and reward == 1:
reward = 1.5
return self._get_obs(), reward, done, {}
def _get_obs(self):
return (sum_hand(self.player), self.dealer[0], usable_ace(self.player))
def _reset(self):
self.dealer = draw_hand(self.np_random)
self.player = draw_hand(self.np_random)
# Auto-draw another card if the score is less than 12
while sum_hand(self.player) < 12:
self.player.append(draw_card(self.np_random))
return self._get_obs()

View File

@@ -0,0 +1,84 @@
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class CliffWalkingEnv(discrete.DiscreteEnv):
metadata = {'render.modes': ['human', 'ansi']}
def _limit_coordinates(self, coord):
coord[0] = min(coord[0], self.shape[0] - 1)
coord[0] = max(coord[0], 0)
coord[1] = min(coord[1], self.shape[1] - 1)
coord[1] = max(coord[1], 0)
return coord
def _calculate_transition_prob(self, current, delta):
new_position = np.array(current) + np.array(delta)
new_position = self._limit_coordinates(new_position).astype(int)
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
reward = -100.0 if self._cliff[tuple(new_position)] else -1.0
is_done = self._cliff[tuple(new_position)] or (tuple(new_position) == (3,11))
return [(1.0, new_state, reward, is_done)]
def __init__(self):
self.shape = (4, 12)
nS = np.prod(self.shape)
n_actions = 4
# Cliff Location
self._cliff = np.zeros(self.shape, dtype=np.bool)
self._cliff[3, 1:-1] = True
# Calculate transition probabilities
P = {}
for s in range(nS):
position = np.unravel_index(s, self.shape)
P[s] = { a : [] for a in range(n_actions) }
P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])
# We always start in state (3, 0)
isd = np.zeros(nS)
isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
super(CliffWalkingEnv, self).__init__(nS, n_actions, P, isd)
def render(self, mode='human', close=False):
self._render(mode, close)
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
for s in range(self.nS):
position = np.unravel_index(s, self.shape)
# print(self.s)
if self.s == s:
output = " x "
elif position == (3,11):
output = " T "
elif self._cliff[position]:
output = " C "
else:
output = " o "
if position[1] == 0:
output = output.lstrip()
if position[1] == self.shape[1] - 1:
output = output.rstrip()
output += "\n"
outfile.write(output)
outfile.write("\n")

125
codes/envs/gridworld.py Normal file
View File

@@ -0,0 +1,125 @@
import io
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class GridworldEnv(discrete.DiscreteEnv):
"""
Grid World environment from Sutton's Reinforcement Learning book chapter 4.
You are an agent on an MxN grid and your goal is to reach the terminal
state at the top left or the bottom right corner.
For example, a 4x4 grid looks as follows:
T o o o
o x o o
o o o o
o o o T
x is your position and T are the two terminal states.
You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
Actions going off the edge leave you in your current state.
You receive a reward of -1 at each step until you reach a terminal state.
"""
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, shape=[4,4]):
if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
raise ValueError('shape argument must be a list/tuple of length 2')
self.shape = shape
nS = np.prod(shape)
n_actions = 4
MAX_Y = shape[0]
MAX_X = shape[1]
P = {}
grid = np.arange(nS).reshape(shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
# P[s][a] = (prob, next_state, reward, is_done)
P[s] = {a : [] for a in range(n_actions)}
is_done = lambda s: s == 0 or s == (nS - 1)
reward = 0.0 if is_done(s) else -1.0
# We're stuck in a terminal state
if is_done(s):
P[s][UP] = [(1.0, s, reward, True)]
P[s][RIGHT] = [(1.0, s, reward, True)]
P[s][DOWN] = [(1.0, s, reward, True)]
P[s][LEFT] = [(1.0, s, reward, True)]
# Not a terminal state
else:
ns_up = s if y == 0 else s - MAX_X
ns_right = s if x == (MAX_X - 1) else s + 1
ns_down = s if y == (MAX_Y - 1) else s + MAX_X
ns_left = s if x == 0 else s - 1
P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]
it.iternext()
# Initial state distribution is uniform
isd = np.ones(nS) / nS
# We expose the model of the environment for educational purposes
# This should not be used in any model-free learning algorithm
self.P = P
super(GridworldEnv, self).__init__(nS, n_actions, P, isd)
def _render(self, mode='human', close=False):
""" Renders the current gridworld layout
For example, a 4x4 grid with the mode="human" looks like:
T o o o
o x o o
o o o o
o o o T
where x is your position and T are the two terminal states.
"""
if close:
return
outfile = io.StringIO() if mode == 'ansi' else sys.stdout
grid = np.arange(self.nS).reshape(self.shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
if self.s == s:
output = " x "
elif s == 0 or s == self.nS - 1:
output = " T "
else:
output = " o "
if x == 0:
output = output.lstrip()
if x == self.shape[1] - 1:
output = output.rstrip()
outfile.write(output)
if x == self.shape[1] - 1:
outfile.write("\n")
it.iternext()

195
codes/envs/gridworld_env.py Normal file
View File

@@ -0,0 +1,195 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
import gym
import turtle
import numpy as np
# turtle tutorial : https://docs.python.org/3.3/library/turtle.html
def GridWorld(gridmap=None, is_slippery=False):
if gridmap is None:
gridmap = ['SFFF', 'FHFH', 'FFFH', 'HFFG']
env = gym.make("FrozenLake-v0", desc=gridmap, is_slippery=False)
env = FrozenLakeWapper(env)
return env
class FrozenLakeWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.max_y = env.desc.shape[0]
self.max_x = env.desc.shape[1]
self.t = None
self.unit = 50
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for _ in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for i in range(self.desc.shape[0]):
for j in range(self.desc.shape[1]):
x = j
y = self.max_y - 1 - i
if self.desc[i][j] == b'S': # Start
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'F': # Frozen ice
self.draw_box(x, y, 'white')
elif self.desc[i][j] == b'G': # Goal
self.draw_box(x, y, 'yellow')
elif self.desc[i][j] == b'H': # Hole
self.draw_box(x, y, 'black')
else:
self.draw_box(x, y, 'white')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
class CliffWalkingWapper(gym.Wrapper):
def __init__(self, env):
gym.Wrapper.__init__(self, env)
self.t = None
self.unit = 50
self.max_x = 12
self.max_y = 4
def draw_x_line(self, y, x0, x1, color='gray'):
assert x1 > x0
self.t.color(color)
self.t.setheading(0)
self.t.up()
self.t.goto(x0, y)
self.t.down()
self.t.forward(x1 - x0)
def draw_y_line(self, x, y0, y1, color='gray'):
assert y1 > y0
self.t.color(color)
self.t.setheading(90)
self.t.up()
self.t.goto(x, y0)
self.t.down()
self.t.forward(y1 - y0)
def draw_box(self, x, y, fillcolor='', line_color='gray'):
self.t.up()
self.t.goto(x * self.unit, y * self.unit)
self.t.color(line_color)
self.t.fillcolor(fillcolor)
self.t.setheading(90)
self.t.down()
self.t.begin_fill()
for i in range(4):
self.t.forward(self.unit)
self.t.right(90)
self.t.end_fill()
def move_player(self, x, y):
self.t.up()
self.t.setheading(90)
self.t.fillcolor('red')
self.t.goto((x + 0.5) * self.unit, (y + 0.5) * self.unit)
def render(self):
if self.t == None:
self.t = turtle.Turtle()
self.wn = turtle.Screen()
self.wn.setup(self.unit * self.max_x + 100,
self.unit * self.max_y + 100)
self.wn.setworldcoordinates(0, 0, self.unit * self.max_x,
self.unit * self.max_y)
self.t.shape('circle')
self.t.width(2)
self.t.speed(0)
self.t.color('gray')
for _ in range(2):
self.t.forward(self.max_x * self.unit)
self.t.left(90)
self.t.forward(self.max_y * self.unit)
self.t.left(90)
for i in range(1, self.max_y):
self.draw_x_line(
y=i * self.unit, x0=0, x1=self.max_x * self.unit)
for i in range(1, self.max_x):
self.draw_y_line(
x=i * self.unit, y0=0, y1=self.max_y * self.unit)
for i in range(1, self.max_x - 1):
self.draw_box(i, 0, 'black')
self.draw_box(self.max_x - 1, 0, 'yellow')
self.t.shape('turtle')
x_pos = self.s % self.max_x
y_pos = self.max_y - 1 - int(self.s / self.max_x)
self.move_player(x_pos, y_pos)
if __name__ == '__main__':
# 环境1FrozenLake, 可以配置冰面是否是滑的
# 0 left, 1 down, 2 right, 3 up
env = gym.make("FrozenLake-v0", is_slippery=False)
env = FrozenLakeWapper(env)
# 环境2CliffWalking, 悬崖环境
# env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
# env = CliffWalkingWapper(env)
# 环境3自定义格子世界可以配置地图, S为出发点Start, F为平地Floor, H为洞Hole, G为出口目标Goal
# gridmap = [
# 'SFFF',
# 'FHFF',
# 'FFFF',
# 'HFGF' ]
# env = GridWorld(gridmap)
env.reset()
for step in range(10):
action = np.random.randint(0, 4)
obs, reward, done, info = env.step(action)
print('step {}: action {}, obs {}, reward {}, done {}, info {}'.format(\
step, action, obs, reward, done, info))
# env.render() # 渲染一帧图像

260
codes/envs/racetrack_env.py Normal file
View File

@@ -0,0 +1,260 @@
# Please do not make changes to this file - it will be overwritten with a clean
# version when your work is marked.
#
# This file contains code for the racetrack environment that you will be using
# as part of the second part of the CM50270: Reinforcement Learning coursework.
import time
import random
import numpy as np
import os
import matplotlib.pyplot as plt
import matplotlib.patheffects as pe
from IPython.display import clear_output
from matplotlib import colors
class RacetrackEnv(object) :
"""
Class representing a race-track environment inspired by exercise 5.12 in Sutton & Barto 2018 (p.111).
Please do not make changes to this class - it will be overwritten with a clean version when it comes to marking.
The dynamics of this environment are detailed in this coursework exercise's jupyter notebook, although I have
included rather verbose comments here for those of you who are interested in how the environment has been
implemented (though this should not impact your solution code).
If you find any *bugs* with this code, please let me know immediately - thank you for finding them, sorry that I didn't!
However, please do not suggest optimisations - some things have been purposely simplified for readability's sake.
"""
ACTIONS_DICT = {
0 : (1, -1), # Acc Vert., Brake Horiz.
1 : (1, 0), # Acc Vert., Hold Horiz.
2 : (1, 1), # Acc Vert., Acc Horiz.
3 : (0, -1), # Hold Vert., Brake Horiz.
4 : (0, 0), # Hold Vert., Hold Horiz.
5 : (0, 1), # Hold Vert., Acc Horiz.
6 : (-1, -1), # Brake Vert., Brake Horiz.
7 : (-1, 0), # Brake Vert., Hold Horiz.
8 : (-1, 1) # Brake Vert., Acc Horiz.
}
CELL_TYPES_DICT = {
0 : "track",
1 : "wall",
2 : "start",
3 : "goal"
}
def __init__(self) :
# Load racetrack map from file.
self.track = np.flip(np.loadtxt(os.path.dirname(__file__)+"/track.txt", dtype = int), axis = 0)
# Discover start grid squares.
self.initial_states = []
for y in range(self.track.shape[0]) :
for x in range(self.track.shape[1]) :
if (self.CELL_TYPES_DICT[self.track[y, x]] == "start") :
self.initial_states.append((y, x))
self.is_reset = False
#print("Racetrack Environment File Loaded Successfully.")
#print("Be sure to call .reset() before starting to initialise the environment and get an initial state!")
def step(self, action : int) :
"""
Takes a given action in the environment's current state, and returns a next state,
reward, and whether the next state is terminal or not.
Arguments:
action {int} -- The action to take in the environment's current state. Should be an integer in the range [0-8].
Raises:
RuntimeError: Raised when the environment needs resetting.\n
TypeError: Raised when an action of an invalid type is given.\n
ValueError: Raised when an action outside the range [0-8] is given.\n
Returns:
A tuple of:\n
{(int, int, int, int)} -- The next state, a tuple of (y_pos, x_pos, y_velocity, x_velocity).\n
{int} -- The reward earned by taking the given action in the current environment state.\n
{bool} -- Whether the environment's next state is terminal or not.\n
"""
# Check whether a reset is needed.
if (not self.is_reset) :
raise RuntimeError(".step() has been called when .reset() is needed.\n" +
"You need to call .reset() before using .step() for the first time, and after an episode ends.\n" +
".reset() initialises the environment at the start of an episode, then returns an initial state.")
# Check that action is the correct type (either a python integer or a numpy integer).
if (not (isinstance(action, int) or isinstance(action, np.integer))) :
raise TypeError("action should be an integer.\n" +
"action value {} of type {} was supplied.".format(action, type(action)))
# Check that action is an allowed value.
if (action < 0 or action > 8) :
raise ValueError("action must be an integer in the range [0-8] corresponding to one of the legal actions.\n" +
"action value {} was supplied.".format(action))
# Update Velocity.
# With probability, 0.85 update velocity components as intended.
if (np.random.uniform() < 0.8) :
(d_y, d_x) = self.ACTIONS_DICT[action]
# With probability, 0.15 Do not change velocity components.
else :
(d_y, d_x) = (0, 0)
self.velocity = (self.velocity[0] + d_y, self.velocity[1] + d_x)
# Keep velocity within bounds (-10, 10).
if (self.velocity[0] > 10) :
self.velocity[0] = 10
elif (self.velocity[0] < -10) :
self.velocity[0] = -10
if (self.velocity[1] > 10) :
self.velocity[1] = 10
elif (self.velocity[1] < -10) :
self.velocity[1] = -10
# Update Position.
new_position = (self.position[0] + self.velocity[0], self.position[1] + self.velocity[1])
reward = 0
terminal = False
# If position is out-of-bounds, return to start and set velocity components to zero.
if (new_position[0] < 0 or new_position[1] < 0 or new_position[0] >= self.track.shape[0] or new_position[1] >= self.track.shape[1]) :
self.position = random.choice(self.initial_states)
self.velocity = (0, 0)
reward -= 10
# If position is in a wall grid-square, return to start and set velocity components to zero.
elif (self.CELL_TYPES_DICT[self.track[new_position]] == "wall") :
self.position = random.choice(self.initial_states)
self.velocity = (0, 0)
reward -= 10
# If position is in a track grid-squre or a start-square, update position.
elif (self.CELL_TYPES_DICT[self.track[new_position]] in ["track", "start"]) :
self.position = new_position
# If position is in a goal grid-square, end episode.
elif (self.CELL_TYPES_DICT[self.track[new_position]] == "goal") :
self.position = new_position
reward += 10
terminal = True
# If this gets reached, then the student has touched something they shouldn't have. Naughty!
else :
raise RuntimeError("You've met with a terrible fate, haven't you?\nDon't modify things you shouldn't!")
# Penalise every timestep.
reward -= 1
# Require a reset if the current state is terminal.
if (terminal) :
self.is_reset = False
# Return next state, reward, and whether the episode has ended.
return (self.position[0], self.position[1], self.velocity[0], self.velocity[1]), reward, terminal
def reset(self) :
"""
Resets the environment, ready for a new episode to begin, then returns an initial state.
The initial state will be a starting grid square randomly chosen using a uniform distribution,
with both components of the velocity being zero.
Returns:
{(int, int, int, int)} -- an initial state, a tuple of (y_pos, x_pos, y_velocity, x_velocity).
"""
# Pick random starting grid-square.
self.position = random.choice(self.initial_states)
# Set both velocity components to zero.
self.velocity = (0, 0)
self.is_reset = True
return (self.position[0], self.position[1], self.velocity[0], self.velocity[1])
def render(self, sleep_time : float = 0.1) :
"""
Renders a pretty matplotlib plot representing the current state of the environment.
Calling this method on subsequent timesteps will update the plot.
This is VERY VERY SLOW and wil slow down training a lot. Only use for debugging/testing.
Arguments:
sleep_time {float} -- How many seconds (or partial seconds) you want to wait on this rendered frame.
"""
# Turn interactive mode on.
plt.ion()
fig = plt.figure(num = "env_render")
ax = plt.gca()
ax.clear()
clear_output(wait = True)
# Prepare the environment plot and mark the car's position.
env_plot = np.copy(self.track)
env_plot[self.position] = 4
env_plot = np.flip(env_plot, axis = 0)
# Plot the gridworld.
cmap = colors.ListedColormap(["white", "black", "green", "red", "yellow"])
bounds = list(range(6))
norm = colors.BoundaryNorm(bounds, cmap.N)
ax.imshow(env_plot, cmap = cmap, norm = norm, zorder = 0)
# Plot the velocity.
if (not self.velocity == (0, 0)) :
ax.arrow(self.position[1], self.track.shape[0] - 1 - self.position[0], self.velocity[1], -self.velocity[0],
path_effects=[pe.Stroke(linewidth=1, foreground='black')], color = "yellow", width = 0.1, length_includes_head = True, zorder = 2)
# Set up axes.
ax.grid(which = 'major', axis = 'both', linestyle = '-', color = 'k', linewidth = 2, zorder = 1)
ax.set_xticks(np.arange(-0.5, self.track.shape[1] , 1));
ax.set_xticklabels([])
ax.set_yticks(np.arange(-0.5, self.track.shape[0], 1));
ax.set_yticklabels([])
# Draw everything.
#fig.canvas.draw()
#fig.canvas.flush_events()
plt.show()
# Sleep if desired.
if (sleep_time > 0) :
time.sleep(sleep_time)
def get_actions(self) :
"""
Returns the available actions in the current state - will always be a list
of integers in the range [0-8].
"""
return [*self.ACTIONS_DICT]
# num_steps = 1000000
# env = RacetrackEnv()
# state = env.reset()
# print(state)
# for _ in range(num_steps) :
# next_state, reward, terminal = env.step(random.choice(env.get_actions()))
# print(next_state)
# env.render()
# if (terminal) :
# _ = env.reset()

15
codes/envs/track.txt Normal file
View File

@@ -0,0 +1,15 @@
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 0 0 0 0 0 3 3 3 3 3 1
1 1 1 1 1 1 0 0 0 0 0 0 0 3 3 3 3 3 1
1 1 1 1 1 0 0 0 0 0 0 0 0 3 3 3 3 3 1
1 1 1 1 0 0 0 0 0 0 0 0 0 3 3 3 3 3 1
1 1 1 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1
1 1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1
1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1
1 1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 2 2 2 2 1 1 1 1 1 1 1 1 1 1 1 1 1 1
1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1

View File

@@ -0,0 +1,82 @@
import gym
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class WindyGridworldEnv(discrete.DiscreteEnv):
metadata = {'render.modes': ['human', 'ansi']}
def _limit_coordinates(self, coord):
coord[0] = min(coord[0], self.shape[0] - 1)
coord[0] = max(coord[0], 0)
coord[1] = min(coord[1], self.shape[1] - 1)
coord[1] = max(coord[1], 0)
return coord
def _calculate_transition_prob(self, current, delta, winds):
new_position = np.array(current) + np.array(delta) + np.array([-1, 0]) * winds[tuple(current)]
new_position = self._limit_coordinates(new_position).astype(int)
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
is_done = tuple(new_position) == (3, 7)
return [(1.0, new_state, -1.0, is_done)]
def __init__(self):
self.shape = (7, 10)
nS = np.prod(self.shape)
n_actions = 4
# Wind strength
winds = np.zeros(self.shape)
winds[:,[3,4,5,8]] = 1
winds[:,[6,7]] = 2
# Calculate transition probabilities
P = {}
for s in range(nS):
position = np.unravel_index(s, self.shape)
P[s] = { a : [] for a in range(n_actions) }
P[s][UP] = self._calculate_transition_prob(position, [-1, 0], winds)
P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1], winds)
P[s][DOWN] = self._calculate_transition_prob(position, [1, 0], winds)
P[s][LEFT] = self._calculate_transition_prob(position, [0, -1], winds)
# We always start in state (3, 0)
isd = np.zeros(nS)
isd[np.ravel_multi_index((3,0), self.shape)] = 1.0
super(WindyGridworldEnv, self).__init__(nS, n_actions, P, isd)
def render(self, mode='human', close=False):
self._render(mode, close)
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
for s in range(self.nS):
position = np.unravel_index(s, self.shape)
# print(self.s)
if self.s == s:
output = " x "
elif position == (3,7):
output = " T "
else:
output = " o "
if position[1] == 0:
output = output.lstrip()
if position[1] == self.shape[1] - 1:
output = output.rstrip()
output += "\n"
outfile.write(output)
outfile.write("\n")