第一部分(前八章)Code
第二章 Code.2
Code 2.1 多臂老虎机实验
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
# ----------------------------
# 类:多臂老虎机环境(BanditEnv)
# ----------------------------
class BanditEnv:
def __init__(self, k=10):
self.k = k
# 按照标准正态分布生成 k 个动作的真实平均奖励 q*
self.q_star = np.random.normal(0, 1, k)
# 记录当前环境下的最优动作索引
self.best_action = np.argmax(self.q_star)
def get_reward(self, action):
"""
根据动作 a 的真实平均值 self.q_star[action],从 N(q_star[action], 1) 中采样
并返回该次的奖励
"""
return np.random.normal(self.q_star[action], 1)
# ----------------------------
# 类:ε-Greedy 智能体(EpsilonGreedyAgent)
# ----------------------------
class EpsilonGreedyAgent:
def __init__(self, k, epsilon):
# 探索概率 ε
self.epsilon = epsilon
self.k = k
# 初始化 k 个动作的估计值 Q[a] = 0
self.Q = np.zeros(k)
# 初始化每个动作被选择的计数 N[a] = 0
self.N = np.zeros(k)
def select_action(self):
"""
ε-greedy 策略:
- 以 ε 的概率随机探索动作;
- 否则以 1−ε 的概率进行贪心选择(选择当前 Q[a] 最大的动作,若有并列则随机选一个)。
"""
if np.random.rand() < self.epsilon:
return np.random.randint(0, self.k)
else:
max_Q = np.max(self.Q)
candidates = np.where(self.Q == max_Q)[0]
return np.random.choice(candidates)
def update(self, action, reward):
"""
样本平均法更新 Q 值:
N[a] += 1
Q[a] ← Q[a] + (1 / N[a]) * (reward − Q[a])
"""
self.N[action] += 1
self.Q[action] += (reward - self.Q[action]) / self.N[action]
# ----------------------------
# 函数:run_one_episode
# 执行一次(单 trial)→ 固定 k, steps, ε
# ----------------------------
def run_one_episode(k, steps, epsilon):
"""
对应于单次试验(1000 步):
- 先初始化 环境(BanditEnv) 和 智能体(EpsilonGreedyAgent);
- 每个时间步 t:
1. agent.select_action() 选择动作 a
2. reward = env.get_reward(a)
3. agent.update(a, reward)
4. 记录 rewards[t] 和 是否选到最优动作 optimal_actions[t]
返回:
rewards: 长度为 steps 的奖励序列
optimal_actions: 长度为 steps 的二值序列(1 表示该步选到最优动作)
"""
env = BanditEnv(k)
agent = EpsilonGreedyAgent(k, epsilon)
rewards = np.zeros(steps)
optimal_actions = np.zeros(steps)
for t in range(steps):
a = agent.select_action()
r = env.get_reward(a)
agent.update(a, r)
rewards[t] = r
if a == env.best_action:
optimal_actions[t] = 1
return rewards, optimal_actions
# ----------------------------
# 函数:run_experiment
# 对不同 ε 值,进行多次 runs(例如 2000 次)试验,统计平均
# 同时在 runs 循环中使用 tqdm 显示进度
# ----------------------------
def run_experiment(k=10, steps=1000, runs=2000, epsilons=[0, 0.01, 0.1]):
"""
输出:
avg_rewards: 形状为 (len(epsilons), steps) 的数组,每行对应某个 ε 的“每步平均奖励”
optimal_rates: 形状为 (len(epsilons), steps) 的数组,每行对应某个 ε 的“每步最优动作命中率(%)”
"""
avg_rewards = np.zeros((len(epsilons), steps))
optimal_rates = np.zeros((len(epsilons), steps))
for idx, epsilon in enumerate(epsilons):
# tqdm 显示当前 ε 下的 runs 次试验进度
for _ in tqdm(range(runs), desc=f"ε = {epsilon}"):
rewards, optimal_actions = run_one_episode(k, steps, epsilon)
avg_rewards[idx] += rewards
optimal_rates[idx] += optimal_actions
# 对 runs 次试验结果求平均
avg_rewards[idx] /= runs
# 由命中次数转换为百分比
optimal_rates[idx] = (optimal_rates[idx] / runs) * 100
return avg_rewards, optimal_rates
# ----------------------------
# 主程序入口
# ----------------------------
if __name__ == "__main__":
# 参数设定
k = 10
steps = 1000
runs = 2000
epsilons = [0, 0.01, 0.1]
# 运行实验,显示进度条
avg_rewards, optimal_rates = run_experiment(k, steps, runs, epsilons)
# 可视化:不同 ε 下的平均奖励曲线
plt.figure(figsize=(12, 5))
for idx, epsilon in enumerate(epsilons):
plt.plot(avg_rewards[idx], label=f"ε = {epsilon}")
plt.xlabel("Time Step")
plt.ylabel("Average Reward")
plt.legend()
plt.title("Average Reward over Time for Different ε Values")
plt.grid(True)
plt.show()
# 可视化:不同 ε 下的最优动作选择率(百分比)曲线
plt.figure(figsize=(12, 5))
for idx, epsilon in enumerate(epsilons):
plt.plot(optimal_rates[idx], label=f"ε = {epsilon}")
plt.xlabel("Time Step")
plt.ylabel("Optimal Action Percentage (%)")
plt.legend()
plt.title("Percentage of Optimal Action over Time for Different ε Values")
plt.grid(True)
plt.show()
Code 2.2 非平稳实验
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm # Import tqdm for progress bars
class NonstationaryBandit:
def __init__(self, k=10, q_init=0.0, random_walk_std=0.01):
self.k = k
self.q_true = np.ones(k) * q_init
self.random_walk_std = random_walk_std
def get_reward(self, action):
# Sample reward from N(q_true[action], 1)
reward = np.random.normal(loc=self.q_true[action], scale=1.0)
# Apply random walk to q_true for all actions
self.q_true += np.random.normal(loc=0.0, scale=self.random_walk_std, size=self.k)
return reward
def optimal_action(self):
return np.argmax(self.q_true)
class Agent:
def __init__(self, k=10, epsilon=0.1, alpha=None):
self.k = k
self.epsilon = epsilon
self.alpha = alpha # None for sample-average, otherwise fixed-step size
self.Q = np.zeros(k)
self.N = np.zeros(k, dtype=int)
def select_action(self):
if np.random.rand() < self.epsilon:
return np.random.randint(self.k)
else:
max_val = np.max(self.Q)
candidates = np.where(self.Q == max_val)[0]
return np.random.choice(candidates)
def update(self, action, reward):
self.N[action] += 1
if self.alpha is None:
n = self.N[action]
self.Q[action] += (1.0 / n) * (reward - self.Q[action])
else:
self.Q[action] += self.alpha * (reward - self.Q[action])
def run_experiment(n_runs=2000, n_steps=10000, epsilon=0.1, alpha=None):
avg_rewards = np.zeros(n_steps)
optimal_action_counts = np.zeros(n_steps)
# Use tqdm to display progress over runs
for _ in tqdm(range(n_runs), desc=f'Running {"Sample-Average" if alpha is None else "Constant-Step α="+str(alpha)}'):
env = NonstationaryBandit()
agent = Agent(epsilon=epsilon, alpha=alpha)
for t in range(n_steps):
action = agent.select_action()
reward = env.get_reward(action)
agent.update(action, reward)
avg_rewards[t] += reward
if action == env.optimal_action():
optimal_action_counts[t] += 1
avg_rewards /= n_runs
optimal_action_percents = (optimal_action_counts / n_runs) * 100.0
return avg_rewards, optimal_action_percents
# Parameters
n_runs = 2000
n_steps = 10000 # Reduced from 100000 for reasonable output
epsilon = 0.1
# Run and plot for Sample-Average method
avg_rewards_sample, opt_percents_sample = run_experiment(
n_runs=n_runs, n_steps=n_steps, epsilon=epsilon, alpha=None
)
# Run and plot for Constant-Step method (alpha = 0.1)
avg_rewards_const, opt_percents_const = run_experiment(
n_runs=n_runs, n_steps=n_steps, epsilon=epsilon, alpha=0.1
)
# Plot average rewards
plt.figure(figsize=(10, 4))
plt.plot(avg_rewards_sample, label="Sample-average")
plt.plot(avg_rewards_const, label="Constant-step α=0.1")
plt.xlabel("Time steps")
plt.ylabel("Average reward")
plt.legend()
plt.title("Average Reward vs. Time (Nonstationary 10-Arm Bandit)")
plt.show()
# Plot % optimal action
plt.figure(figsize=(10, 4))
plt.plot(opt_percents_sample, label="Sample-average")
plt.plot(opt_percents_const, label="Constant-step α=0.1")
plt.xlabel("Time steps")
plt.ylabel("% Optimal action")
plt.legend()
plt.title("Optimal Action % vs. Time (ε=0.1)")
plt.show()
Code 2.3 乐观初始值
import numpy as np
import matplotlib.pyplot as plt
# 老虎机环境
class SlotMachine:
def __init__(self, k=10):
self.k = k
self.real_q = np.random.normal(0, 1, k) # 每个动作的真实期望奖励
def reward(self, a):
return np.random.normal(self.real_q[a], 1) # 奖励分布 N(real_q[a], 1)
# 代理
class Agent:
def __init__(self, Q_init, epsilon, alpha=0.1, k=10):
self.Q = np.ones(k) * Q_init # 初始化 Q 表
self.epsilon = epsilon # ε-贪婪参数
self.alpha = alpha # 学习率
self.k = k
def choose_action(self):
if np.random.rand() < self.epsilon:
return np.random.randint(0, self.k) # 探索
else:
return np.argmax(self.Q) # 利用
def update_Q(self, a, reward):
self.Q[a] += self.alpha * (reward - self.Q[a]) # Q 更新公式
# 主实验函数
def main(runs=2000, steps=1000):
avg_rewards_1 = np.zeros(steps) # 乐观初始值策略
avg_rewards_2 = np.zeros(steps) # ε-贪婪策略
for run in range(runs):
bandit = SlotMachine()
agent1 = Agent(Q_init=5, epsilon=0) # 乐观贪婪
agent2 = Agent(Q_init=0, epsilon=0.1) # ε-贪婪
rewards1 = []
rewards2 = []
for t in range(steps):
# Agent 1
a1 = agent1.choose_action()
r1 = bandit.reward(a1)
agent1.update_Q(a1, r1)
rewards1.append(r1)
# Agent 2
a2 = agent2.choose_action()
r2 = bandit.reward(a2)
agent2.update_Q(a2, r2)
rewards2.append(r2)
avg_rewards_1 += np.array(rewards1)
avg_rewards_2 += np.array(rewards2)
# 求平均
avg_rewards_1 /= runs
avg_rewards_2 /= runs
# 绘图
plt.figure(figsize=(10, 6))
plt.plot(avg_rewards_1, label='Optimistic Greedy Q=5, ε=0')
plt.plot(avg_rewards_2, label='ε-Greedy Q=0, ε=0.1')
plt.xlabel("Steps")
plt.ylabel("Average Reward")
plt.title("Optimistic Initial Values vs ε-Greedy")
plt.legend()
plt.grid(True)
plt.show()
# 运行实验
if __name__ == "__main__":
main()
Code 2.4 UCB
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
class Bandit:
def __init__(self, k=10):
self.k = k
self.real_q = np.random.normal(0, 1, k)
def reward(self, a):
return np.random.normal(self.real_q[a], 1)
class AgentUCB:
def __init__(self, k=10, c=2):
self.k = k
self.c = c
self.Q = np.zeros(k)
self.counts = np.zeros(k, dtype=int)
self.total_count = 0
def choose_action(self):
self.total_count += 1
ucb_values = self.Q + self.c * np.sqrt(np.log(self.total_count) / (self.counts + 1e-10))
zero_mask = (self.counts == 0)
if np.any(zero_mask):
return np.argmax(zero_mask)
return np.argmax(ucb_values)
def update(self, a, reward):
self.counts[a] += 1
step = 1 / self.counts[a]
self.Q[a] += step * (reward - self.Q[a])
class AgentEpsilon:
def __init__(self, k=10, epsilon=0.1):
self.k = k
self.epsilon = epsilon
self.Q = np.zeros(k)
self.counts = np.zeros(k, dtype=int)
def choose_action(self):
if np.random.rand() < self.epsilon:
return np.random.randint(self.k)
return np.argmax(self.Q)
def update(self, a, reward):
self.counts[a] += 1
step = 1 / self.counts[a]
self.Q[a] += step * (reward - self.Q[a])
def main(runs=2000, steps=1000):
avg_rewards_ucb = np.zeros(steps)
avg_rewards_eps = np.zeros(steps)
for _ in range(runs):
bandit = Bandit()
agent_ucb = AgentUCB()
agent_eps = AgentEpsilon(epsilon=0.1)
rewards_ucb = np.zeros(steps)
rewards_eps = np.zeros(steps)
for t in range(steps):
a_ucb = agent_ucb.choose_action()
r_ucb = bandit.reward(a_ucb)
agent_ucb.update(a_ucb, r_ucb)
rewards_ucb[t] = r_ucb
a_eps = agent_eps.choose_action()
r_eps = bandit.reward(a_eps)
agent_eps.update(a_eps, r_eps)
rewards_eps[t] = r_eps
avg_rewards_ucb += rewards_ucb
avg_rewards_eps += rewards_eps
avg_rewards_ucb /= runs
avg_rewards_eps /= runs
plt.figure(figsize=(10, 6))
plt.plot(avg_rewards_ucb, label='UCB (c=2)')
plt.plot(avg_rewards_eps, label='Epsilon-Greedy (ε=0.1)')
plt.xlabel('Steps')
plt.ylabel('Average Reward')
plt.title('Average Reward: UCB vs Epsilon-Greedy')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()
Code 2.5 梯度老虎机
import numpy as np
import matplotlib.pyplot as plt
# Gradient Bandit with optional baseline
class Bandit:
def __init__(self, k=10):
self.k = k
self.q_star = np.random.normal(loc=0.0, scale=1.0, size=k)
self.optimal_action = np.argmax(self.q_star)
def reward(self, a):
return np.random.normal(loc=self.q_star[a], scale=1.0)
class Agent:
def __init__(self, k, alpha, use_baseline=True):
self.k = k
self.alpha = alpha
self.use_baseline = use_baseline
self.H = np.zeros(k)
self.baseline = 0.0
self.t = 0
def select_action(self):
expH = np.exp(self.H - np.max(self.H))
pi = expH / np.sum(expH)
action = np.random.choice(self.k, p=pi)
return action, pi
def update(self, a, R, pi):
self.t += 1
if self.use_baseline:
self.baseline += (R - self.baseline) / self.t
advantage = R - self.baseline
else:
advantage = R
self.H[a] += self.alpha * advantage * (1 - pi[a])
for i in range(self.k):
if i != a:
self.H[i] -= self.alpha * advantage * pi[i]
def simulate(runs=1000, time_steps=1000, k=10, alphas=[0.1, 0.4]):
optimal_results = {}
reward_results = {}
for alpha in alphas:
for use_baseline in [True, False]:
optimal_counts = np.zeros(time_steps)
reward_sums = np.zeros(time_steps)
for run in range(runs):
bandit = Bandit(k)
agent = Agent(k, alpha, use_baseline)
for t in range(time_steps):
a, pi = agent.select_action()
R = bandit.reward(a)
agent.update(a, R, pi)
reward_sums[t] += R
if a == bandit.optimal_action:
optimal_counts[t] += 1
optimal_pct = (optimal_counts / runs) * 100
avg_rewards = reward_sums / runs
label = f"{'with' if use_baseline else 'without'} baseline, α={alpha}"
optimal_results[label] = optimal_pct
reward_results[label] = avg_rewards
return optimal_results, reward_results
# Run simulation
opt_results, rew_results = simulate(runs=1000, time_steps=1000, k=10, alphas=[0.1, 0.4])
# Plotting Optimal Action %
plt.figure(figsize=(10, 6))
for label, data in opt_results.items():
plt.plot(data, label=label)
plt.xlabel('Steps')
plt.ylabel('% Optimal Action')
plt.title('Gradient Bandit: Optimal Action % over Time')
plt.legend()
plt.grid(True)
plt.show()
# Plotting Average Rewards
plt.figure(figsize=(10, 6))
for label, data in rew_results.items():
plt.plot(data, label=label)
plt.xlabel('Steps')
plt.ylabel('Average Reward')
plt.title('Gradient Bandit: Average Reward over Time')
plt.legend()
plt.grid(True)
plt.show()
Code 2.6 非平稳状态的参数研究图
常规版本:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
class NonstationaryBandit:
def __init__(self, k=10, q_init=0.0, random_walk_std=0.01):
self.k = k
self.q_true = np.ones(k) * q_init
self.random_walk_std = random_walk_std
def get_reward(self, action):
reward = np.random.normal(loc=self.q_true[action], scale=1.0)
self.q_true += np.random.normal(0.0, self.random_walk_std, size=self.k)
return reward
def optimal_action(self):
return np.argmax(self.q_true)
class Agent:
def __init__(self, k=10, epsilon=0.1, alpha=0.1):
self.k = k
self.epsilon = epsilon
self.alpha = alpha
self.Q = np.zeros(k)
def select_action(self):
if np.random.rand() < self.epsilon:
return np.random.randint(self.k)
else:
max_val = np.max(self.Q)
candidates = np.where(self.Q == max_val)[0]
return np.random.choice(candidates)
def update(self, action, reward):
self.Q[action] += self.alpha * (reward - self.Q[action])
def parameter_study(epsilon_list, n_runs=2000, n_steps=200000, eval_start=100000):
avg_final_rewards = []
for epsilon in tqdm(epsilon_list, desc="Parameter sweep"):
rewards = np.zeros(n_steps)
for _ in tqdm(range(n_runs), leave=False, desc=f"ε = {epsilon:.3f}"):
env = NonstationaryBandit()
agent = Agent(epsilon=epsilon, alpha=0.1)
for t in range(n_steps):
a = agent.select_action()
r = env.get_reward(a)
agent.update(a, r)
rewards[t] += r
rewards /= n_runs
final_avg_reward = np.mean(rewards[eval_start:])
avg_final_rewards.append(final_avg_reward)
return avg_final_rewards
# 参数设置
epsilons = np.logspace(-3, 0, num=10) # ε from 0.001 to 1.0
avg_rewards = parameter_study(epsilons)
# 绘图
plt.figure(figsize=(8, 5))
plt.plot(epsilons, avg_rewards, marker='o')
plt.xscale('log')
plt.xlabel("ε (exploration rate)")
plt.ylabel("Average reward (last 100k steps)")
plt.title("Parameter Study of ε-Greedy with α=0.1 (Nonstationary Bandit)")
plt.grid(True)
plt.show()
GPU加速(JAX)版本:
import jax
import jax.numpy as jnp
from jax import random, vmap
import matplotlib.pyplot as plt
from tqdm import tqdm
def run_bandit(key, epsilon, alpha,
k=10,
steps=200_000,
eval_start=100_000,
random_walk_std=0.01):
"""
在非平稳多臂老虎机环境下跑一次 ε-greedy 实验:
- 每条臂的初始真实值 q_true(i,0) ~ N(0,1)
- 每一步拉动臂 i 得到的 reward ~ N(q_true(i,t), 1)
- q_true 每一步做独立同分布的高斯随机游走:q_true(i,t+1) = q_true(i,t) + N(0, random_walk_std^2)
- 估计值用常数步长 α 更新:Q[action] += α * (reward - Q[action])
- 最终返回后半段(steps - eval_start)步的平均 reward。
"""
# 1) 用一个子键初始化 q_true(0) ~ N(0,1)
key, init_key = random.split(key)
q_true = random.normal(init_key, shape=(k,)) # shape=(k,)
# 2) 估计值 Q 初始为零
Q = jnp.zeros((k,))
def step(carry, _):
q_true, Q, key = carry
# 3) 从 key 中拆分出 5 个子键:分别用于 ε-greedy 的 uniform、randint、reward、随机游走,以及保留新的 key
key, sub_a, sub_b, sub_c, sub_d = random.split(key, 5)
# 3.1) ε-greedy 选动作
greedy_a = jnp.argmax(Q)
rand_a = random.randint(sub_b, (), 0, k) # 随机动作
is_explore = (random.uniform(sub_a) < epsilon) # 是否探索
action = jnp.where(is_explore, rand_a, greedy_a)
# 3.2) 采样 reward:reward ~ N(q_true[action], 1)
reward = q_true[action] + random.normal(sub_c)
# 3.3) 用常数步长 α 更新估计值 Q
Q = Q.at[action].set(Q[action] + alpha * (reward - Q[action]))
# 3.4) 非平稳:对每条臂的 q_true 做高斯随机游走
q_true = q_true + random.normal(sub_d, shape=(k,)) * random_walk_std
return (q_true, Q, key), reward
# 4) 用 lax.scan 位置参数形式迭代 steps 次
# signature: lax.scan(f, init_carry, xs, length)
(_, _, _), rewards = jax.lax.scan(
step,
(q_true, Q, key), # init carry
None, # xs=None,因为我们不需要用到 xs
steps # length=steps
)
# 5) 取后半段(eval_start:)的平均 reward
avg_reward = jnp.mean(rewards[eval_start:])
return avg_reward
def run_all(key, epsilon, alpha, n_runs=2000):
"""
并行地跑 n_runs 次 run_bandit,每次用不同的子键。
返回 shape=(n_runs,) 的 avg_reward 数组。
"""
subkeys = random.split(key, n_runs)
batched_fn = vmap(lambda k: run_bandit(k, epsilon, alpha))
return batched_fn(subkeys)
def parameter_sweep(epsilons, alpha, n_runs=2000):
"""
对一组不同的 ε 值做参数扫描。对于每个 ε:
1) 从当前主键 key 中 split 出一个子键用作 this_key
2) 用 this_key 跑 run_all(..., ε, α) 得到 n_runs 次实验的 avg_reward
3) 计算 n_runs 次实验中 avg_reward 的平均值
最终返回 shape=(len(epsilons),) 的 avg_rewards 数组
"""
key = random.PRNGKey(42)
avg_rewards = []
for eps in tqdm(epsilons, desc="Sweeping ε"):
key, this_key = random.split(key)
rewards = run_all(this_key, eps, alpha, n_runs)
avg_rewards.append(jnp.mean(rewards))
return jnp.array(avg_rewards)
if __name__ == "__main__":
# 参数设置
epsilons = jnp.logspace(-3, 0, num=10) # 从 0.001 到 1.0 的 10 个 ε
alpha = 0.1
n_runs = 2000
# 进行参数扫描
avg_rewards = parameter_sweep(epsilons, alpha, n_runs)
# 绘制结果
plt.figure(figsize=(8, 5))
plt.plot(epsilons, avg_rewards, marker='o')
plt.xscale('log')
plt.xlabel("ε (exploration rate)")
plt.ylabel("Average reward (last 100k steps)")
plt.title("JAX ε-Greedy with α=0.1 in Nonstationary Bandit")
plt.grid(True)
plt.show()
第三章 Code.3
Code 3.1 平衡杆小车
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
class CartPoleEnvironment:
"""
CartPole environment wrapper with discretization and optional Sutton-Barto reward and rendering.
"""
def __init__(self,
n_bins=(6, 6, 12, 12),
obs_bounds=((-4.8, 4.8), (-5, 5), (-0.418, 0.418), (-5, 5)),
sutton_barto_reward=False,
render_mode=None):
self.env = gym.make("CartPole-v1", render_mode=render_mode)
self.sutton_barto = sutton_barto_reward
self.n_bins = n_bins
self.bins = [np.linspace(low, high, bins - 1)
for (low, high), bins in zip(obs_bounds, n_bins)]
def discretize(self, obs):
return tuple(np.digitize(obs[i], self.bins[i]) for i in range(len(obs)))
def reset(self, seed=None):
obs, _ = self.env.reset(seed=seed)
return self.discretize(obs)
def step(self, action):
obs, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
if self.sutton_barto:
reward = -1 if done else 0
return self.discretize(obs), reward, done
class MonteCarloAgent:
"""
On-Policy Monte Carlo Control agent with epsilon-greedy policy.
"""
def __init__(self, n_bins, n_actions, epsilon=0.1, gamma=0.99):
self.n_bins = n_bins
self.n_actions = n_actions
self.epsilon = epsilon
self.gamma = gamma
self.Q = np.zeros(n_bins + (n_actions,))
self.N = np.zeros(n_bins + (n_actions,), dtype=int)
def choose_action(self, state, greedy=False):
if not greedy and np.random.rand() < self.epsilon:
return np.random.randint(self.n_actions)
return int(np.argmax(self.Q[state]))
def update(self, episode):
# Compute discounted returns for the episode
G = 0
returns = []
for state, action, reward in reversed(episode):
G = reward + self.gamma * G
returns.insert(0, G)
# Sample-average update of Q
for (state, action, _), Gt in zip(episode, returns):
self.N[state + (action,)] += 1
alpha = 1.0 / self.N[state + (action,)]
self.Q[state + (action,)] += alpha * (Gt - self.Q[state + (action,)])
def main():
episodes = 500
stats = {
'Default': {'lengths': [], 'returns': [], 'totals': []},
'Sutton-Barto': {'lengths': [], 'returns': [], 'totals': []}
}
# Training both modes
for mode, sb in [('Default', False), ('Sutton-Barto', True)]:
env = CartPoleEnvironment(sutton_barto_reward=sb)
agent = MonteCarloAgent(env.n_bins, env.env.action_space.n,
epsilon=0.1, gamma=0.99)
for ep in range(episodes):
state = env.reset(seed=ep)
episode = []
steps = 0
total_reward = 0
done = False
while not done:
action = agent.choose_action(state)
nxt, r, done = env.step(action)
episode.append((state, action, r))
state = nxt
total_reward += r
steps += 1
# Record metrics
stats[mode]['lengths'].append(steps)
# Discounted return G0
G0 = 0
for (_, _, r) in reversed(episode):
G0 = r + agent.gamma * G0
stats[mode]['returns'].append(G0)
stats[mode]['totals'].append(total_reward)
agent.update(episode)
# Plot Default Reward metrics
fig, axes = plt.subplots(3, 1, figsize=(10, 12))
axes[0].plot(stats['Default']['lengths'])
axes[0].set_title('Episode Length - Default Reward')
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Length (steps)')
axes[0].grid(True)
axes[1].plot(stats['Default']['returns'])
axes[1].set_title('Discounted Return $G_0$ - Default Reward')
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('$G_0$')
axes[1].grid(True)
axes[2].plot(stats['Default']['totals'])
axes[2].set_title('Total Reward - Default Reward')
axes[2].set_xlabel('Episode')
axes[2].set_ylabel('Total')
axes[2].grid(True)
plt.tight_layout()
plt.show()
# Plot Sutton-Barto Reward metrics
fig, axes = plt.subplots(3, 1, figsize=(10, 12))
axes[0].plot(stats['Sutton-Barto']['lengths'])
axes[0].set_title('Episode Length - Sutton-Barto Reward')
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Length (steps)')
axes[0].grid(True)
axes[1].plot(stats['Sutton-Barto']['returns'])
axes[1].set_title('Discounted Return $G_0$ - Sutton-Barto Reward')
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('$G_0$')
axes[1].grid(True)
axes[2].plot(stats['Sutton-Barto']['totals'])
axes[2].set_title('Total Reward - Sutton-Barto Reward')
axes[2].set_xlabel('Episode')
axes[2].set_ylabel('Total')
axes[2].grid(True)
plt.tight_layout()
plt.show()
# Demonstrate Sutton-Barto policy
demo_env = CartPoleEnvironment(sutton_barto_reward=True, render_mode='human')
state = demo_env.reset()
done = False
print("Demonstrating Sutton-Barto policy...")
while not done:
action = agent.choose_action(state, greedy=True)
state, _, done = demo_env.step(action)
demo_env.env.close()
if __name__ == '__main__':
main()