Files
easy-rl/notebooks/envs/simple_grid.py
2022-12-04 20:54:36 +08:00

303 lines
9.9 KiB
Python

#!/usr/bin/env python
# simple_grid.py
# based on frozen_lake.py
# adapted by Frans Oliehoek.
#
import sys
from contextlib import closing
import numpy as np
from io import StringIO
#from six import StringIO, b
import gym
from gym import utils
from gym import Env, spaces
from gym.utils import seeding
def categorical_sample(prob_n, np_random):
"""
Sample from categorical distribution
Each row specifies class probabilities
"""
prob_n = np.asarray(prob_n)
csprob_n = np.cumsum(prob_n)
return (csprob_n > np_random.rand()).argmax()
class DiscreteEnv(Env):
"""
Has the following members
- nS: number of states
- nA: number of actions
- P: transitions (*)
- isd: initial state distribution (**)
(*) dictionary of lists, where
P[s][a] == [(probability, nextstate, reward, done), ...]
(**) list or array of length nS
"""
def __init__(self, nS, nA, P, isd):
self.P = P
self.isd = isd
self.lastaction = None # for rendering
self.nS = nS
self.nA = nA
self.action_space = spaces.Discrete(self.nA)
self.observation_space = spaces.Discrete(self.nS)
self.seed()
self.s = categorical_sample(self.isd, self.np_random)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self.s = categorical_sample(self.isd, self.np_random)
self.lastaction = None
return int(self.s)
def step(self, a):
transitions = self.P[self.s][a]
i = categorical_sample([t[0] for t in transitions], self.np_random)
p, s, r, d = transitions[i]
self.s = s
self.lastaction = a
return (int(s), r, d, {"prob": p})
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3
MAPS = {
"theAlley": [
"S...H...H...G"
],
"walkInThePark": [
"S.......",
".....H..",
"........",
"......H.",
"........",
"...H...G"
],
"1Dtest": [
],
"4x4": [
"S...",
".H.H",
"...H",
"H..G"
],
"8x8": [
"S.......",
"........",
"...H....",
".....H..",
"...H....",
".HH...H.",
".H..H.H.",
"...H...G"
],
}
POTHOLE_PROB = 0.2
BROKEN_LEG_PENALTY = -5
SLEEP_DEPRIVATION_PENALTY = -0.0
REWARD = 10
def generate_random_map(size=8, p=0.8):
"""Generates a random valid map (one that has a path from start to goal)
:param size: size of each side of the grid
:param p: probability that a tile is frozen
"""
valid = False
# DFS to check that it's a valid path.
def is_valid(res):
frontier, discovered = [], set()
frontier.append((0,0))
while frontier:
r, c = frontier.pop()
if not (r,c) in discovered:
discovered.add((r,c))
directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
for x, y in directions:
r_new = r + x
c_new = c + y
if r_new < 0 or r_new >= size or c_new < 0 or c_new >= size:
continue
if res[r_new][c_new] == 'G':
return True
if (res[r_new][c_new] not in '#H'):
frontier.append((r_new, c_new))
return False
while not valid:
p = min(1, p)
res = np.random.choice(['.', 'H'], (size, size), p=[p, 1-p])
res[0][0] = 'S'
res[-1][-1] = 'G'
valid = is_valid(res)
return ["".join(x) for x in res]
class DrunkenWalkEnv(DiscreteEnv):
"""
A simple grid environment, completely based on the code of 'FrozenLake', credits to
the original authors.
You're finding your way home (G) after a great party which was happening at (S).
Unfortunately, due to recreational intoxication you find yourself only moving into
the intended direction 80% of the time, and perpendicular to that the other 20%.
To make matters worse, the local community has been cutting the budgets for pavement
maintenance, which means that the way to home is full of potholes, which are very likely
to make you trip. If you fall, you are obviously magically transported back to the party,
without getting some of that hard-earned sleep.
S...
.H.H
...H
H..G
S : starting point
. : normal pavement
H : pothole, you have a POTHOLE_PROB chance of tripping
G : goal, time for bed
The episode ends when you reach the goal or trip.
You receive a reward of +10 if you reach the goal,
but get a SLEEP_DEPRIVATION_PENALTY and otherwise.
"""
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, desc=None, map_name="4x4",is_slippery=True):
""" This generates a map and sets all transition probabilities.
(by passing constructed nS, nA, P, isd to DiscreteEnv)
"""
if desc is None and map_name is None:
desc = generate_random_map()
elif desc is None:
desc = MAPS[map_name]
self.desc = desc = np.asarray(desc,dtype='c')
self.nrow, self.ncol = nrow, ncol = desc.shape
self.reward_range = (0, 1)
nA = 4
nS = nrow * ncol
isd = np.array(desc == b'S').astype('float64').ravel()
isd /= isd.sum()
# We need to pass 'P' to DiscreteEnv:
# P dictionary dict of dicts of lists, where
# P[s][a] == [(probability, nextstate, reward, done), ...]
P = {s : {a : [] for a in range(nA)} for s in range(nS)}
def convert_rc_to_s(row, col):
return row*ncol + col
#def inc(row, col, a):
def intended_destination(row, col, a):
if a == LEFT:
col = max(col-1,0)
elif a == DOWN:
row = min(row+1,nrow-1)
elif a == RIGHT:
col = min(col+1,ncol-1)
elif a == UP:
row = max(row-1,0)
return (row, col)
def construct_transition_for_intended(row, col, a, prob, li):
""" this constructs a transition to the "intended_destination(row, col, a)"
and adds it to the transition list (which could be for a different action b).
"""
newrow, newcol = intended_destination(row, col, a)
newstate = convert_rc_to_s(newrow, newcol)
newletter = desc[newrow, newcol]
done = bytes(newletter) in b'G'
rew = REWARD if newletter == b'G' else SLEEP_DEPRIVATION_PENALTY
li.append( (prob, newstate, rew, done) )
#THIS IS WHERE THE MATRIX OF TRANSITION PROBABILITIES IS COMPUTED.
for row in range(nrow):
for col in range(ncol):
# specify transitions for s=(row, col)
s = convert_rc_to_s(row, col)
letter = desc[row, col]
for a in range(4):
# specify transitions for action a
li = P[s][a]
if letter in b'G':
# We are at the goal ('G')....
# This is a strange case:
# - conceptually, we can think of this as:
# always transition to a 'terminated' state where we willget 0 reward.
#
# - But in gym, in practie, this case should not be happening at all!!!
# Gym will alreay have returned 'done' when transitioning TO the goal state (not from it).
# So we will never use the transition probabilities *from* the goal state.
# So, from gym's perspective we could specify anything we like here. E.g.,:
# li.append((1.0, 59, 42000000, True))
#
# However, if we want to be able to use the transition matrix to do value iteration, it is important
# that we get 0 reward ever after.
li.append((1.0, s, 0, True))
if letter in b'H':
#We are at a pothole ('H')
#when we are at a pothole, we trip with prob. POTHOLE_PROB
li.append((POTHOLE_PROB, s, BROKEN_LEG_PENALTY, True))
construct_transition_for_intended(row, col, a, 1.0 - POTHOLE_PROB, li)
else:
# We are at normal pavement (.)
# with prob. 0.8 we move as intended:
construct_transition_for_intended(row, col, a, 0.8, li)
# but with prob. 0.1 we move sideways to intended:
for b in [(a-1)%4, (a+1)%4]:
construct_transition_for_intended(row, col, b, 0.1, li)
super(DrunkenWalkEnv, self).__init__(nS, nA, P, isd)
def action_to_string(self, action_index):
s ="{}".format(["Left","Down","Right","Up"][action_index])
return s
def render(self, mode='human'):
outfile = StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc = [[c.decode('utf-8') for c in line] for line in desc]
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
if self.lastaction is not None:
outfile.write(" (last action was '{action}')\n".format( action=self.action_to_string(self.lastaction) ) )
else:
outfile.write("\n")
outfile.write("\n".join(''.join(line) for line in desc)+"\n")
if mode != 'human':
with closing(outfile):
return outfile.getvalue()
if __name__ == "__main__":
# env = DrunkenWalkEnv(map_name="walkInThePark")
env = DrunkenWalkEnv(map_name="theAlley")
n_states = env.observation_space.n
n_actions = env.action_space.n