强化学习在AI应用中的实践:从Q-Learning到PPO算法

摘要

强化学习(Reinforcement Learning, RL)作为机器学习的重要分支,通过智能体与环境的交互来学习最优策略,在游戏AI、机器人控制、推荐系统等领域取得了显著成果。本文从强化学习的基本概念出发,深入分析了从经典的Q-Learning算法到现代的PPO(Proximal Policy Optimization)算法的技术演进,并结合具体的代码实现和应用案例,为读者提供强化学习技术的全面指南。

1. 强化学习基础理论

1.1 强化学习的核心概念

强化学习是一种通过试错学习和延迟奖励来训练智能体的机器学习方法。其核心要素包括:

  • 智能体(Agent):执行动作的学习实体
  • 环境(Environment):智能体所处的外部世界
  • 状态(State):环境的当前情况描述
  • 动作(Action):智能体可以执行的操作
  • 奖励(Reward):环境对智能体动作的反馈
  • 策略(Policy):智能体选择动作的规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical

class RLEnvironment:
"""强化学习环境基类"""

def __init__(self):
self.state = None
self.done = False
self.reward = 0

def reset(self):
"""重置环境到初始状态"""
raise NotImplementedError

def step(self, action):
"""执行动作,返回新状态、奖励、是否结束"""
raise NotImplementedError

def get_valid_actions(self):
"""获取当前状态下的有效动作"""
raise NotImplementedError

def render(self):
"""可视化当前状态"""
pass

class GridWorldEnvironment(RLEnvironment):
"""网格世界环境示例"""

def __init__(self, width=5, height=5):
super().__init__()
self.width = width
self.height = height
self.start_pos = (0, 0)
self.goal_pos = (width-1, height-1)
self.obstacles = [(2, 2), (3, 2), (2, 3)] # 障碍物位置
self.reset()

def reset(self):
"""重置到起始位置"""
self.agent_pos = self.start_pos
self.done = False
return self._get_state()

def _get_state(self):
"""获取当前状态"""
return self.agent_pos[0] * self.height + self.agent_pos[1]

def step(self, action):
"""执行动作:0-上,1-右,2-下,3-左"""
if self.done:
return self._get_state(), 0, True

# 动作映射
moves = [(-1, 0), (0, 1), (1, 0), (0, -1)]
dx, dy = moves[action]

new_x = max(0, min(self.width-1, self.agent_pos[0] + dx))
new_y = max(0, min(self.height-1, self.agent_pos[1] + dy))
new_pos = (new_x, new_y)

# 检查是否撞到障碍物
if new_pos not in self.obstacles:
self.agent_pos = new_pos

# 计算奖励
if self.agent_pos == self.goal_pos:
reward = 100
self.done = True
elif self.agent_pos in self.obstacles:
reward = -10
else:
reward = -1 # 每步的小惩罚,鼓励快速到达目标

return self._get_state(), reward, self.done

def get_valid_actions(self):
"""获取有效动作(所有方向都可以尝试)"""
return list(range(4))

def render(self):
"""可视化网格世界"""
grid = np.zeros((self.width, self.height))

# 标记障碍物
for obs in self.obstacles:
grid[obs] = -1

# 标记目标
grid[self.goal_pos] = 2

# 标记智能体
grid[self.agent_pos] = 1

print("Grid World:")
symbols = {0: '.', -1: 'X', 1: 'A', 2: 'G'}
for i in range(self.width):
row = ''
for j in range(self.height):
row += symbols[grid[i, j]] + ' '
print(row)
print()

1.2 马尔可夫决策过程(MDP)

强化学习问题通常建模为马尔可夫决策过程,具有以下特性:

  1. 马尔可夫性质:未来状态只依赖于当前状态,与历史无关
  2. 状态转移概率:P(s’|s,a) 表示在状态s执行动作a后转移到状态s’的概率
  3. 奖励函数:R(s,a,s’) 表示状态转移的即时奖励
  4. 折扣因子:γ ∈ [0,1] 用于平衡即时奖励和未来奖励
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class MDPSolver:
"""马尔可夫决策过程求解器"""

def __init__(self, states, actions, transitions, rewards, gamma=0.9):
self.states = states
self.actions = actions
self.transitions = transitions # P(s'|s,a)
self.rewards = rewards # R(s,a,s')
self.gamma = gamma
self.values = {s: 0.0 for s in states}
self.policy = {s: random.choice(actions) for s in states}

def value_iteration(self, theta=1e-6, max_iterations=1000):
"""值迭代算法"""
for iteration in range(max_iterations):
delta = 0
new_values = self.values.copy()

for state in self.states:
if state in self.transitions:
# 计算所有动作的Q值
action_values = []
for action in self.actions:
if action in self.transitions[state]:
q_value = 0
for next_state, prob in self.transitions[state][action].items():
reward = self.rewards.get((state, action, next_state), 0)
q_value += prob * (reward + self.gamma * self.values[next_state])
action_values.append(q_value)

if action_values:
new_values[state] = max(action_values)
delta = max(delta, abs(new_values[state] - self.values[state]))

self.values = new_values

if delta < theta:
print(f"值迭代收敛,迭代次数: {iteration + 1}")
break

# 提取最优策略
self._extract_policy()
return self.values, self.policy

def _extract_policy(self):
"""从值函数提取最优策略"""
for state in self.states:
if state in self.transitions:
best_action = None
best_value = float('-inf')

for action in self.actions:
if action in self.transitions[state]:
q_value = 0
for next_state, prob in self.transitions[state][action].items():
reward = self.rewards.get((state, action, next_state), 0)
q_value += prob * (reward + self.gamma * self.values[next_state])

if q_value > best_value:
best_value = q_value
best_action = action

if best_action is not None:
self.policy[state] = best_action

def policy_iteration(self, max_iterations=100):
"""策略迭代算法"""
for iteration in range(max_iterations):
# 策略评估
self._policy_evaluation()

# 策略改进
policy_stable = self._policy_improvement()

if policy_stable:
print(f"策略迭代收敛,迭代次数: {iteration + 1}")
break

return self.values, self.policy

def _policy_evaluation(self, theta=1e-6, max_iterations=1000):
"""策略评估"""
for _ in range(max_iterations):
delta = 0
new_values = self.values.copy()

for state in self.states:
if state in self.transitions:
action = self.policy[state]
if action in self.transitions[state]:
value = 0
for next_state, prob in self.transitions[state][action].items():
reward = self.rewards.get((state, action, next_state), 0)
value += prob * (reward + self.gamma * self.values[next_state])

new_values[state] = value
delta = max(delta, abs(new_values[state] - self.values[state]))

self.values = new_values

if delta < theta:
break

def _policy_improvement(self):
"""策略改进"""
policy_stable = True

for state in self.states:
if state in self.transitions:
old_action = self.policy[state]
best_action = None
best_value = float('-inf')

for action in self.actions:
if action in self.transitions[state]:
q_value = 0
for next_state, prob in self.transitions[state][action].items():
reward = self.rewards.get((state, action, next_state), 0)
q_value += prob * (reward + self.gamma * self.values[next_state])

if q_value > best_value:
best_value = q_value
best_action = action

if best_action is not None:
self.policy[state] = best_action
if old_action != best_action:
policy_stable = False

return policy_stable

2. Q-Learning算法详解

2.1 Q-Learning基本原理

Q-Learning是一种无模型的强化学习算法,通过学习动作-价值函数Q(s,a)来找到最优策略。Q函数表示在状态s下执行动作a的期望累积奖励。

Q-Learning更新公式

1
Q(s,a) ← Q(s,a) + α[r + γ max Q(s',a') - Q(s,a)]

其中:

  • α 是学习率
  • γ 是折扣因子
  • r 是即时奖励
  • s’ 是下一个状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class QLearningAgent:
"""Q-Learning智能体"""

def __init__(self, state_size, action_size, learning_rate=0.1,
discount_factor=0.95, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.01):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min

# Q表初始化
self.q_table = defaultdict(lambda: np.zeros(action_size))

# 训练统计
self.training_scores = []
self.training_steps = []

def get_action(self, state, training=True):
"""ε-贪婪策略选择动作"""
if training and np.random.random() <= self.epsilon:
return np.random.choice(self.action_size)
else:
return np.argmax(self.q_table[state])

def update_q_table(self, state, action, reward, next_state, done):
"""更新Q表"""
current_q = self.q_table[state][action]

if done:
target_q = reward
else:
target_q = reward + self.discount_factor * np.max(self.q_table[next_state])

# Q-Learning更新
self.q_table[state][action] += self.learning_rate * (target_q - current_q)

def decay_epsilon(self):
"""衰减探索率"""
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

def train(self, env, episodes=1000, max_steps=200):
"""训练Q-Learning智能体"""
scores = []

for episode in range(episodes):
state = env.reset()
total_reward = 0
steps = 0

for step in range(max_steps):
action = self.get_action(state, training=True)
next_state, reward, done = env.step(action)

self.update_q_table(state, action, reward, next_state, done)

state = next_state
total_reward += reward
steps += 1

if done:
break

scores.append(total_reward)
self.training_scores.append(total_reward)
self.training_steps.append(steps)

self.decay_epsilon()

# 打印训练进度
if (episode + 1) % 100 == 0:
avg_score = np.mean(scores[-100:])
print(f"Episode {episode + 1}, Average Score: {avg_score:.2f}, Epsilon: {self.epsilon:.3f}")

return self.training_scores

def test(self, env, episodes=10, render=False):
"""测试训练好的智能体"""
test_scores = []

for episode in range(episodes):
state = env.reset()
total_reward = 0
steps = 0

if render:
print(f"\n=== Test Episode {episode + 1} ===")
env.render()

while True:
action = self.get_action(state, training=False)
next_state, reward, done = env.step(action)

state = next_state
total_reward += reward
steps += 1

if render:
print(f"Step {steps}, Action: {action}, Reward: {reward}")
env.render()

if done:
break

test_scores.append(total_reward)

if render:
print(f"Episode {episode + 1} finished with total reward: {total_reward}")

avg_score = np.mean(test_scores)
print(f"\nTest Results: Average Score: {avg_score:.2f} over {episodes} episodes")
return test_scores

def save_q_table(self, filename):
"""保存Q表"""
import pickle
with open(filename, 'wb') as f:
pickle.dump(dict(self.q_table), f)
print(f"Q-table saved to {filename}")

def load_q_table(self, filename):
"""加载Q表"""
import pickle
with open(filename, 'rb') as f:
q_table_dict = pickle.load(f)
self.q_table = defaultdict(lambda: np.zeros(self.action_size))
self.q_table.update(q_table_dict)
print(f"Q-table loaded from {filename}")

2.2 Q-Learning实践示例

让我们在网格世界环境中训练Q-Learning智能体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def run_qlearning_example():
"""运行Q-Learning示例"""
# 创建环境
env = GridWorldEnvironment(width=5, height=5)

# 创建Q-Learning智能体
agent = QLearningAgent(
state_size=25, # 5x5网格
action_size=4, # 上下左右四个动作
learning_rate=0.1,
discount_factor=0.95,
epsilon=1.0,
epsilon_decay=0.995,
epsilon_min=0.01
)

print("开始训练Q-Learning智能体...")
training_scores = agent.train(env, episodes=1000, max_steps=100)

# 绘制训练曲线
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(training_scores)
plt.title('Training Scores')
plt.xlabel('Episode')
plt.ylabel('Total Reward')

# 计算移动平均
window_size = 50
moving_avg = []
for i in range(len(training_scores)):
start_idx = max(0, i - window_size + 1)
moving_avg.append(np.mean(training_scores[start_idx:i+1]))

plt.subplot(1, 2, 2)
plt.plot(moving_avg)
plt.title(f'Moving Average (window={window_size})')
plt.xlabel('Episode')
plt.ylabel('Average Reward')

plt.tight_layout()
plt.show()

# 测试训练好的智能体
print("\n测试训练好的智能体:")
test_scores = agent.test(env, episodes=5, render=True)

# 保存Q表
agent.save_q_table('qlearning_gridworld.pkl')

return agent, training_scores

# 运行示例
if __name__ == "__main__":
agent, scores = run_qlearning_example()

2.3 Double Q-Learning

Double Q-Learning通过使用两个Q表来减少Q-Learning中的过估计偏差:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class DoubleQLearningAgent:
"""Double Q-Learning智能体"""

def __init__(self, state_size, action_size, learning_rate=0.1,
discount_factor=0.95, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.01):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min

# 两个Q表
self.q_table_1 = defaultdict(lambda: np.zeros(action_size))
self.q_table_2 = defaultdict(lambda: np.zeros(action_size))

self.training_scores = []

def get_action(self, state, training=True):
"""基于两个Q表的平均值选择动作"""
if training and np.random.random() <= self.epsilon:
return np.random.choice(self.action_size)
else:
# 使用两个Q表的平均值
combined_q = (self.q_table_1[state] + self.q_table_2[state]) / 2
return np.argmax(combined_q)

def update_q_tables(self, state, action, reward, next_state, done):
"""更新两个Q表"""
# 随机选择更新哪个Q表
if np.random.random() < 0.5:
# 更新Q表1
current_q = self.q_table_1[state][action]
if done:
target_q = reward
else:
# 使用Q表1选择动作,Q表2评估价值
best_action = np.argmax(self.q_table_1[next_state])
target_q = reward + self.discount_factor * self.q_table_2[next_state][best_action]

self.q_table_1[state][action] += self.learning_rate * (target_q - current_q)
else:
# 更新Q表2
current_q = self.q_table_2[state][action]
if done:
target_q = reward
else:
# 使用Q表2选择动作,Q表1评估价值
best_action = np.argmax(self.q_table_2[next_state])
target_q = reward + self.discount_factor * self.q_table_1[next_state][best_action]

self.q_table_2[state][action] += self.learning_rate * (target_q - current_q)

def decay_epsilon(self):
"""衰减探索率"""
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

def train(self, env, episodes=1000, max_steps=200):
"""训练Double Q-Learning智能体"""
scores = []

for episode in range(episodes):
state = env.reset()
total_reward = 0

for step in range(max_steps):
action = self.get_action(state, training=True)
next_state, reward, done = env.step(action)

self.update_q_tables(state, action, reward, next_state, done)

state = next_state
total_reward += reward

if done:
break

scores.append(total_reward)
self.training_scores.append(total_reward)
self.decay_epsilon()

if (episode + 1) % 100 == 0:
avg_score = np.mean(scores[-100:])
print(f"Episode {episode + 1}, Average Score: {avg_score:.2f}, Epsilon: {self.epsilon:.3f}")

return self.training_scores

3. 深度Q网络(DQN)

3.1 DQN基本原理

当状态空间过大时,传统的Q表方法变得不可行。深度Q网络(DQN)使用神经网络来近似Q函数,能够处理高维状态空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class DQNNetwork(nn.Module):
"""深度Q网络"""

def __init__(self, state_size, action_size, hidden_sizes=[64, 64]):
super(DQNNetwork, self).__init__()

layers = []
input_size = state_size

# 构建隐藏层
for hidden_size in hidden_sizes:
layers.append(nn.Linear(input_size, hidden_size))
layers.append(nn.ReLU())
input_size = hidden_size

# 输出层
layers.append(nn.Linear(input_size, action_size))

self.network = nn.Sequential(*layers)

def forward(self, x):
return self.network(x)

class ReplayBuffer:
"""经验回放缓冲区"""

def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)

def push(self, state, action, reward, next_state, done):
"""添加经验"""
self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size):
"""随机采样批次数据"""
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)

return (
torch.FloatTensor(states),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(next_states),
torch.BoolTensor(dones)
)

def __len__(self):
return len(self.buffer)

class DQNAgent:
"""DQN智能体"""

def __init__(self, state_size, action_size, learning_rate=0.001,
discount_factor=0.95, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.01, buffer_size=10000, batch_size=32,
target_update_freq=100):

self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.batch_size = batch_size
self.target_update_freq = target_update_freq

# 神经网络
self.q_network = DQNNetwork(state_size, action_size)
self.target_network = DQNNetwork(state_size, action_size)
self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)

# 经验回放
self.replay_buffer = ReplayBuffer(buffer_size)

# 训练统计
self.training_scores = []
self.losses = []
self.update_count = 0

# 初始化目标网络
self.update_target_network()

def update_target_network(self):
"""更新目标网络"""
self.target_network.load_state_dict(self.q_network.state_dict())

def get_action(self, state, training=True):
"""选择动作"""
if training and np.random.random() <= self.epsilon:
return np.random.choice(self.action_size)

with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()

def remember(self, state, action, reward, next_state, done):
"""存储经验"""
self.replay_buffer.push(state, action, reward, next_state, done)

def replay(self):
"""经验回放训练"""
if len(self.replay_buffer) < self.batch_size:
return

# 采样批次数据
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

# 计算当前Q值
current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))

# 计算目标Q值
with torch.no_grad():
next_q_values = self.target_network(next_states).max(1)[0]
target_q_values = rewards + (self.discount_factor * next_q_values * ~dones)

# 计算损失
loss = F.mse_loss(current_q_values.squeeze(), target_q_values)

# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

self.losses.append(loss.item())
self.update_count += 1

# 定期更新目标网络
if self.update_count % self.target_update_freq == 0:
self.update_target_network()

def decay_epsilon(self):
"""衰减探索率"""
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay

def train(self, env, episodes=1000, max_steps=200):
"""训练DQN智能体"""
scores = []

for episode in range(episodes):
state = env.reset()
# 将状态转换为向量形式(如果需要)
if isinstance(state, int):
state_vector = np.zeros(self.state_size)
state_vector[state] = 1.0
state = state_vector

total_reward = 0

for step in range(max_steps):
action = self.get_action(state, training=True)
next_state, reward, done = env.step(action)

# 转换下一个状态
if isinstance(next_state, int):
next_state_vector = np.zeros(self.state_size)
next_state_vector[next_state] = 1.0
next_state = next_state_vector

self.remember(state, action, reward, next_state, done)
self.replay()

state = next_state
total_reward += reward

if done:
break

scores.append(total_reward)
self.training_scores.append(total_reward)
self.decay_epsilon()

if (episode + 1) % 100 == 0:
avg_score = np.mean(scores[-100:])
avg_loss = np.mean(self.losses[-100:]) if self.losses else 0
print(f"Episode {episode + 1}, Average Score: {avg_score:.2f}, "
f"Average Loss: {avg_loss:.4f}, Epsilon: {self.epsilon:.3f}")

return self.training_scores

3.2 Double DQN和Dueling DQN

Double DQN通过解耦动作选择和价值评估来减少过估计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class DoubleDQNAgent(DQNAgent):
"""Double DQN智能体"""

def replay(self):
"""Double DQN的经验回放"""
if len(self.replay_buffer) < self.batch_size:
return

states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

# 当前Q值
current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))

# Double DQN: 使用主网络选择动作,目标网络评估价值
with torch.no_grad():
next_actions = self.q_network(next_states).argmax(1)
next_q_values = self.target_network(next_states).gather(1, next_actions.unsqueeze(1)).squeeze()
target_q_values = rewards + (self.discount_factor * next_q_values * ~dones)

loss = F.mse_loss(current_q_values.squeeze(), target_q_values)

self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

self.losses.append(loss.item())
self.update_count += 1

if self.update_count % self.target_update_freq == 0:
self.update_target_network()

class DuelingDQNNetwork(nn.Module):
"""Dueling DQN网络"""

def __init__(self, state_size, action_size, hidden_size=64):
super(DuelingDQNNetwork, self).__init__()

# 共享特征层
self.feature_layer = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU()
)

# 价值流
self.value_stream = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1)
)

# 优势流
self.advantage_stream = nn.Sequential(
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, action_size)
)

def forward(self, x):
features = self.feature_layer(x)

value = self.value_stream(features)
advantage = self.advantage_stream(features)

# Q(s,a) = V(s) + A(s,a) - mean(A(s,a))
q_values = value + advantage - advantage.mean(dim=1, keepdim=True)

return q_values

4. 策略梯度方法

4.1 REINFORCE算法

REINFORCE是最基本的策略梯度算法,直接优化策略参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
class PolicyNetwork(nn.Module):
"""策略网络"""

def __init__(self, state_size, action_size, hidden_size=64):
super(PolicyNetwork, self).__init__()

self.network = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, action_size),
nn.Softmax(dim=-1)
)

def forward(self, x):
return self.network(x)

class REINFORCEAgent:
"""REINFORCE智能体"""

def __init__(self, state_size, action_size, learning_rate=0.001,
discount_factor=0.99):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor

# 策略网络
self.policy_network = PolicyNetwork(state_size, action_size)
self.optimizer = optim.Adam(self.policy_network.parameters(), lr=learning_rate)

# 存储轨迹
self.states = []
self.actions = []
self.rewards = []
self.log_probs = []

self.training_scores = []

def get_action(self, state):
"""根据策略选择动作"""
if isinstance(state, int):
state_vector = np.zeros(self.state_size)
state_vector[state] = 1.0
state = state_vector

state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy_network(state_tensor)

# 根据概率分布采样动作
dist = Categorical(action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)

return action.item(), log_prob

def store_transition(self, state, action, reward, log_prob):
"""存储转移"""
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.log_probs.append(log_prob)

def calculate_returns(self):
"""计算折扣回报"""
returns = []
G = 0

# 从后往前计算
for reward in reversed(self.rewards):
G = reward + self.discount_factor * G
returns.insert(0, G)

# 标准化回报
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)

return returns

def update_policy(self):
"""更新策略"""
returns = self.calculate_returns()

policy_loss = []
for log_prob, G in zip(self.log_probs, returns):
policy_loss.append(-log_prob * G)

policy_loss = torch.stack(policy_loss).sum()

self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()

# 清空轨迹
self.states.clear()
self.actions.clear()
self.rewards.clear()
self.log_probs.clear()

return policy_loss.item()

def train(self, env, episodes=1000, max_steps=200):
"""训练REINFORCE智能体"""
scores = []

for episode in range(episodes):
state = env.reset()
total_reward = 0

# 收集一个完整的轨迹
for step in range(max_steps):
action, log_prob = self.get_action(state)
next_state, reward, done = env.step(action)

self.store_transition(state, action, reward, log_prob)

state = next_state
total_reward += reward

if done:
break

# 更新策略
loss = self.update_policy()

scores.append(total_reward)
self.training_scores.append(total_reward)

if (episode + 1) % 100 == 0:
avg_score = np.mean(scores[-100:])
print(f"Episode {episode + 1}, Average Score: {avg_score:.2f}, Loss: {loss:.4f}")

return self.training_scores

4.2 Actor-Critic方法

Actor-Critic结合了价值函数和策略梯度的优点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class ActorNetwork(nn.Module):
"""Actor网络(策略网络)"""

def __init__(self, state_size, action_size, hidden_size=64):
super(ActorNetwork, self).__init__()

self.network = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, action_size),
nn.Softmax(dim=-1)
)

def forward(self, x):
return self.network(x)

class CriticNetwork(nn.Module):
"""Critic网络(价值网络)"""

def __init__(self, state_size, hidden_size=64):
super(CriticNetwork, self).__init__()

self.network = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1)
)

def forward(self, x):
return self.network(x)

class ActorCriticAgent:
"""Actor-Critic智能体"""

def __init__(self, state_size, action_size, actor_lr=0.001,
critic_lr=0.001, discount_factor=0.99):
self.state_size = state_size
self.action_size = action_size
self.discount_factor = discount_factor

# Actor和Critic网络
self.actor = ActorNetwork(state_size, action_size)
self.critic = CriticNetwork(state_size)

# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

self.training_scores = []

def get_action(self, state):
"""选择动作"""
if isinstance(state, int):
state_vector = np.zeros(self.state_size)
state_vector[state] = 1.0
state = state_vector

state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.actor(state_tensor)

dist = Categorical(action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)

return action.item(), log_prob

def get_value(self, state):
"""获取状态价值"""
if isinstance(state, int):
state_vector = np.zeros(self.state_size)
state_vector[state] = 1.0
state = state_vector

state_tensor = torch.FloatTensor(state).unsqueeze(0)
value = self.critic(state_tensor)

return value

def update(self, state, action, reward, next_state, done, log_prob):
"""更新Actor和Critic"""
# 计算TD误差
current_value = self.get_value(state)

if done:
target_value = reward
else:
next_value = self.get_value(next_state)
target_value = reward + self.discount_factor * next_value

td_error = target_value - current_value

# 更新Critic
critic_loss = td_error.pow(2)
self.critic_optimizer.zero_grad()
critic_loss.backward(retain_graph=True)
self.critic_optimizer.step()

# 更新Actor
actor_loss = -log_prob * td_error.detach()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

return critic_loss.item(), actor_loss.item()

def train(self, env, episodes=1000, max_steps=200):
"""训练Actor-Critic智能体"""
scores = []

for episode in range(episodes):
state = env.reset()
total_reward = 0

for step in range(max_steps):
action, log_prob = self.get_action(state)
next_state, reward, done = env.step(action)

critic_loss, actor_loss = self.update(
state, action, reward, next_state, done, log_prob
)

state = next_state
total_reward += reward

if done:
break

scores.append(total_reward)
self.training_scores.append(total_reward)

if (episode + 1) % 100 == 0:
avg_score = np.mean(scores[-100:])
print(f"Episode {episode + 1}, Average Score: {avg_score:.2f}")

return self.training_scores

5. PPO算法详解

5.1 PPO基本原理

Proximal Policy Optimization(PPO)是目前最流行的策略梯度算法之一,通过限制策略更新的幅度来保证训练稳定性。

PPO-Clip目标函数

1
L^CLIP(θ) = E[min(r_t(θ)A_t, clip(r_t(θ), 1-ε, 1+ε)A_t)]

其中:

  • r_t(θ) = π_θ(a_t|s_t) / π_θ_old(a_t|s_t) 是重要性采样比率
  • A_t 是优势函数
  • ε 是裁剪参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class PPOAgent:
"""PPO智能体"""

def __init__(self, state_size, action_size, actor_lr=3e-4, critic_lr=3e-4,
discount_factor=0.99, gae_lambda=0.95, clip_epsilon=0.2,
entropy_coef=0.01, value_coef=0.5, max_grad_norm=0.5):

self.state_size = state_size
self.action_size = action_size
self.discount_factor = discount_factor
self.gae_lambda = gae_lambda
self.clip_epsilon = clip_epsilon
self.entropy_coef = entropy_coef
self.value_coef = value_coef
self.max_grad_norm = max_grad_norm

# 网络
self.actor = ActorNetwork(state_size, action_size)
self.critic = CriticNetwork(state_size)

# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

# 存储轨迹数据
self.states = []
self.actions = []
self.rewards = []
self.values = []
self.log_probs = []
self.dones = []

self.training_scores = []

def get_action_and_value(self, state):
"""获取动作和价值"""
if isinstance(state, int):
state_vector = np.zeros(self.state_size)
state_vector[state] = 1.0
state = state_vector

state_tensor = torch.FloatTensor(state).unsqueeze(0)

# 获取动作概率
action_probs = self.actor(state_tensor)
dist = Categorical(action_probs)
action = dist.sample()
log_prob = dist.log_prob(action)

# 获取状态价值
value = self.critic(state_tensor)

return action.item(), log_prob, value.squeeze()

def store_transition(self, state, action, reward, value, log_prob, done):
"""存储转移"""
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.values.append(value)
self.log_probs.append(log_prob)
self.dones.append(done)

def compute_gae(self, next_value=0):
"""计算广义优势估计(GAE)"""
advantages = []
gae = 0

# 添加最后一个价值(如果episode没有结束)
values = self.values + [next_value]

# 从后往前计算GAE
for i in reversed(range(len(self.rewards))):
delta = self.rewards[i] + self.discount_factor * values[i + 1] * (1 - self.dones[i]) - values[i]
gae = delta + self.discount_factor * self.gae_lambda * (1 - self.dones[i]) * gae
advantages.insert(0, gae)

# 计算回报
returns = []
for i, advantage in enumerate(advantages):
returns.append(advantage + self.values[i])

return torch.FloatTensor(advantages), torch.FloatTensor(returns)

def update_policy(self, epochs=4, batch_size=64):
"""更新策略"""
# 计算优势和回报
advantages, returns = self.compute_gae()

# 标准化优势
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

# 转换为张量
states = torch.FloatTensor(self.states)
actions = torch.LongTensor(self.actions)
old_log_probs = torch.stack(self.log_probs)
old_values = torch.stack(self.values)

# 多轮更新
for epoch in range(epochs):
# 随机打乱数据
indices = torch.randperm(len(states))

for start in range(0, len(states), batch_size):
end = start + batch_size
batch_indices = indices[start:end]

batch_states = states[batch_indices]
batch_actions = actions[batch_indices]
batch_old_log_probs = old_log_probs[batch_indices]
batch_advantages = advantages[batch_indices]
batch_returns = returns[batch_indices]
batch_old_values = old_values[batch_indices]

# 计算新的动作概率和价值
action_probs = self.actor(batch_states)
dist = Categorical(action_probs)
new_log_probs = dist.log_prob(batch_actions)
entropy = dist.entropy().mean()

new_values = self.critic(batch_states).squeeze()

# 计算重要性采样比率
ratio = torch.exp(new_log_probs - batch_old_log_probs)

# PPO裁剪目标
surr1 = ratio * batch_advantages
surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages
actor_loss = -torch.min(surr1, surr2).mean()

# 价值函数损失
value_loss = F.mse_loss(new_values, batch_returns)

# 总损失
total_loss = actor_loss + self.value_coef * value_loss - self.entropy_coef * entropy

# 更新网络
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
total_loss.backward()

# 梯度裁剪
torch.nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm)
torch.nn.utils.clip_grad_norm_(self.critic.parameters(), self.max_grad_norm)

self.actor_optimizer.step()
self.critic_optimizer.step()

# 清空轨迹数据
self.clear_memory()

def clear_memory(self):
"""清空记忆"""
self.states.clear()
self.actions.clear()
self.rewards.clear()
self.values.clear()
self.log_probs.clear()
self.dones.clear()

def train(self, env, episodes=1000, max_steps=200, update_freq=2048):
"""训练PPO智能体"""
scores = []
step_count = 0

for episode in range(episodes):
state = env.reset()
total_reward = 0

for step in range(max_steps):
action, log_prob, value = self.get_action_and_value(state)
next_state, reward, done = env.step(action)

self.store_transition(state, action, reward, value, log_prob, done)

state = next_state
total_reward += reward
step_count += 1

# 定期更新策略
if step_count % update_freq == 0:
self.update_policy()

if done:
break

scores.append(total_reward)
self.training_scores.append(total_reward)

if (episode + 1) % 100 == 0:
avg_score = np.mean(scores[-100:])
print(f"Episode {episode + 1}, Average Score: {avg_score:.2f}")

return self.training_scores

5.2 PPO实践示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def compare_algorithms():
"""比较不同强化学习算法的性能"""
env = GridWorldEnvironment(width=5, height=5)

# 创建不同的智能体
agents = {
'Q-Learning': QLearningAgent(25, 4, learning_rate=0.1),
'DQN': DQNAgent(25, 4, learning_rate=0.001),
'REINFORCE': REINFORCEAgent(25, 4, learning_rate=0.001),
'Actor-Critic': ActorCriticAgent(25, 4, actor_lr=0.001, critic_lr=0.001),
'PPO': PPOAgent(25, 4, actor_lr=3e-4, critic_lr=3e-4)
}

results = {}

# 训练每个智能体
for name, agent in agents.items():
print(f"\n训练 {name} 智能体...")
scores = agent.train(env, episodes=500, max_steps=100)
results[name] = scores

# 绘制比较结果
plt.figure(figsize=(15, 10))

# 训练曲线
plt.subplot(2, 2, 1)
for name, scores in results.items():
plt.plot(scores, label=name, alpha=0.7)
plt.title('Training Scores')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.legend()
plt.grid(True)

# 移动平均
plt.subplot(2, 2, 2)
window_size = 50
for name, scores in results.items():
moving_avg = []
for i in range(len(scores)):
start_idx = max(0, i - window_size + 1)
moving_avg.append(np.mean(scores[start_idx:i+1]))
plt.plot(moving_avg, label=name, alpha=0.7)
plt.title(f'Moving Average (window={window_size})')
plt.xlabel('Episode')
plt.ylabel('Average Reward')
plt.legend()
plt.grid(True)

# 最终性能比较
plt.subplot(2, 2, 3)
final_scores = [np.mean(scores[-100:]) for scores in results.values()]
plt.bar(results.keys(), final_scores)
plt.title('Final Performance (Last 100 Episodes)')
plt.ylabel('Average Reward')
plt.xticks(rotation=45)

# 收敛速度比较
plt.subplot(2, 2, 4)
convergence_episodes = []
for name, scores in results.items():
# 找到达到90%最终性能的episode
final_perf = np.mean(scores[-100:])
target = 0.9 * final_perf
for i, score in enumerate(scores):
if score >= target:
convergence_episodes.append(i)
break
else:
convergence_episodes.append(len(scores))

plt.bar(results.keys(), convergence_episodes)
plt.title('Convergence Speed (Episodes to 90% Performance)')
plt.ylabel('Episodes')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

return results

# 运行比较
if __name__ == "__main__":
results = compare_algorithms()

6. 强化学习的实际应用

6.1 游戏AI

强化学习在游戏AI领域取得了巨大成功,从Atari游戏到围棋、星际争霸等复杂游戏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class AtariDQNAgent:
"""Atari游戏DQN智能体"""

def __init__(self, action_size, learning_rate=0.00025,
discount_factor=0.99, epsilon=1.0, epsilon_decay=0.995,
epsilon_min=0.1, buffer_size=1000000, batch_size=32):

self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.batch_size = batch_size

# 卷积神经网络
self.q_network = self._build_cnn()
self.target_network = self._build_cnn()
self.optimizer = optim.RMSprop(self.q_network.parameters(), lr=learning_rate)

# 经验回放
self.replay_buffer = ReplayBuffer(buffer_size)

# 更新目标网络
self.update_target_network()

def _build_cnn(self):
"""构建卷积神经网络"""
return nn.Sequential(
nn.Conv2d(4, 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(64 * 7 * 7, 512),
nn.ReLU(),
nn.Linear(512, self.action_size)
)

def preprocess_frame(self, frame):
"""预处理游戏帧"""
# 转换为灰度图
gray = np.dot(frame[...,:3], [0.299, 0.587, 0.114])
# 调整大小
resized = np.array(Image.fromarray(gray).resize((84, 84)))
# 归一化
normalized = resized / 255.0
return normalized

def get_action(self, state, training=True):
"""选择动作"""
if training and np.random.random() <= self.epsilon:
return np.random.choice(self.action_size)

with torch.no_grad():
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_network(state_tensor)
return q_values.argmax().item()

6.2 机器人控制

强化学习在机器人控制中的应用,特别是连续控制任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class ContinuousActorCritic:
"""连续动作空间的Actor-Critic"""

def __init__(self, state_size, action_size, action_bound,
actor_lr=0.001, critic_lr=0.002):
self.state_size = state_size
self.action_size = action_size
self.action_bound = action_bound

# Actor网络(输出动作的均值和标准差)
self.actor = self._build_actor()
self.critic = self._build_critic()

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)

def _build_actor(self):
"""构建Actor网络"""
return nn.Sequential(
nn.Linear(self.state_size, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, self.action_size * 2) # 均值和标准差
)

def _build_critic(self):
"""构建Critic网络"""
return nn.Sequential(
nn.Linear(self.state_size, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, 1)
)

def get_action(self, state):
"""获取连续动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
actor_output = self.actor(state_tensor)

# 分离均值和标准差
mean = actor_output[:, :self.action_size]
log_std = actor_output[:, self.action_size:]
std = torch.exp(log_std)

# 创建正态分布
dist = torch.distributions.Normal(mean, std)
action = dist.sample()
log_prob = dist.log_prob(action).sum(dim=-1)

# 应用动作边界
action = torch.tanh(action) * self.action_bound

return action.squeeze().detach().numpy(), log_prob

6.3 推荐系统

强化学习在推荐系统中的应用,考虑长期用户满意度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class RecommendationAgent:
"""推荐系统强化学习智能体"""

def __init__(self, user_features, item_features, embedding_dim=64):
self.user_features = user_features
self.item_features = item_features
self.embedding_dim = embedding_dim

# 用户和物品嵌入
self.user_embedding = nn.Embedding(user_features, embedding_dim)
self.item_embedding = nn.Embedding(item_features, embedding_dim)

# 策略网络
self.policy_network = nn.Sequential(
nn.Linear(embedding_dim * 2, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, item_features),
nn.Softmax(dim=-1)
)

# 价值网络
self.value_network = nn.Sequential(
nn.Linear(embedding_dim * 2, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1)
)

self.optimizer = optim.Adam(
list(self.user_embedding.parameters()) +
list(self.item_embedding.parameters()) +
list(self.policy_network.parameters()) +
list(self.value_network.parameters())
)

def get_recommendation(self, user_id, candidate_items):
"""获取推荐"""
user_emb = self.user_embedding(torch.LongTensor([user_id]))

recommendations = []
for item_id in candidate_items:
item_emb = self.item_embedding(torch.LongTensor([item_id]))
state = torch.cat([user_emb, item_emb], dim=1)

# 计算推荐概率
prob = self.policy_network(state)
recommendations.append((item_id, prob[0, item_id].item()))

# 按概率排序
recommendations.sort(key=lambda x: x[1], reverse=True)
return recommendations

def update_from_feedback(self, user_id, item_id, reward, next_user_state):
"""根据用户反馈更新模型"""
user_emb = self.user_embedding(torch.LongTensor([user_id]))
item_emb = self.item_embedding(torch.LongTensor([item_id]))
state = torch.cat([user_emb, item_emb], dim=1)

# 计算价值和策略损失
value = self.value_network(state)
policy_prob = self.policy_network(state)[0, item_id]

# 简化的策略梯度更新
policy_loss = -torch.log(policy_prob) * (reward - value.detach())
value_loss = F.mse_loss(value, torch.FloatTensor([[reward]]))

total_loss = policy_loss + value_loss

self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()

7. 强化学习的挑战与解决方案

7.1 样本效率问题

强化学习通常需要大量的样本才能学到有效的策略。解决方案包括:

  1. 模型基础强化学习(Model-Based RL)
  2. 元学习(Meta-Learning)
  3. 迁移学习(Transfer Learning)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
class ModelBasedAgent:
"""基于模型的强化学习智能体"""

def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size

# 环境模型
self.dynamics_model = self._build_dynamics_model()
self.reward_model = self._build_reward_model()

# 策略网络
self.policy_network = self._build_policy_network()

# 优化器
self.dynamics_optimizer = optim.Adam(self.dynamics_model.parameters())
self.reward_optimizer = optim.Adam(self.reward_model.parameters())
self.policy_optimizer = optim.Adam(self.policy_network.parameters())

# 经验缓冲区
self.real_buffer = ReplayBuffer(10000)
self.model_buffer = ReplayBuffer(100000)

def _build_dynamics_model(self):
"""构建动力学模型"""
return nn.Sequential(
nn.Linear(self.state_size + self.action_size, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, self.state_size)
)

def _build_reward_model(self):
"""构建奖励模型"""
return nn.Sequential(
nn.Linear(self.state_size + self.action_size, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1)
)

def _build_policy_network(self):
"""构建策略网络"""
return nn.Sequential(
nn.Linear(self.state_size, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, self.action_size),
nn.Softmax(dim=-1)
)

def train_models(self, batch_size=32):
"""训练环境模型"""
if len(self.real_buffer) < batch_size:
return

# 采样真实经验
states, actions, rewards, next_states, dones = self.real_buffer.sample(batch_size)

# 训练动力学模型
state_action = torch.cat([states, actions.float()], dim=1)
predicted_next_states = self.dynamics_model(state_action)
dynamics_loss = F.mse_loss(predicted_next_states, next_states)

self.dynamics_optimizer.zero_grad()
dynamics_loss.backward()
self.dynamics_optimizer.step()

# 训练奖励模型
predicted_rewards = self.reward_model(state_action)
reward_loss = F.mse_loss(predicted_rewards.squeeze(), rewards)

self.reward_optimizer.zero_grad()
reward_loss.backward()
self.reward_optimizer.step()

def generate_model_data(self, num_samples=1000):
"""使用模型生成虚拟数据"""
# 从真实缓冲区采样初始状态
if len(self.real_buffer) == 0:
return

for _ in range(num_samples):
# 随机选择一个真实状态作为起点
idx = np.random.randint(len(self.real_buffer.buffer))
state = self.real_buffer.buffer[idx][0]

# 使用策略网络选择动作
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy_network(state_tensor)
action = torch.multinomial(action_probs, 1).item()

# 使用模型预测下一个状态和奖励
state_action = torch.cat([
state_tensor,
torch.FloatTensor([[action]])
], dim=1)

with torch.no_grad():
next_state = self.dynamics_model(state_action).squeeze().numpy()
reward = self.reward_model(state_action).item()

# 添加到模型缓冲区
self.model_buffer.push(state, action, reward, next_state, False)

7.2 探索与利用平衡

有效的探索策略对强化学习至关重要:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class CuriosityDrivenAgent:
"""好奇心驱动的探索智能体"""

def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size

# 主要网络
self.policy_network = PolicyNetwork(state_size, action_size)
self.value_network = CriticNetwork(state_size)

# 好奇心模块
self.forward_model = self._build_forward_model()
self.inverse_model = self._build_inverse_model()

# 优化器
self.policy_optimizer = optim.Adam(self.policy_network.parameters())
self.value_optimizer = optim.Adam(self.value_network.parameters())
self.curiosity_optimizer = optim.Adam(
list(self.forward_model.parameters()) +
list(self.inverse_model.parameters())
)

def _build_forward_model(self):
"""前向模型:预测下一个状态特征"""
return nn.Sequential(
nn.Linear(self.state_size + self.action_size, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, self.state_size)
)

def _build_inverse_model(self):
"""逆向模型:从状态变化预测动作"""
return nn.Sequential(
nn.Linear(self.state_size * 2, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, self.action_size)
)

def compute_intrinsic_reward(self, state, action, next_state):
"""计算内在奖励(好奇心奖励)"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_tensor = torch.FloatTensor([action]).unsqueeze(0)
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

# 预测下一个状态
state_action = torch.cat([state_tensor, action_tensor], dim=1)
predicted_next_state = self.forward_model(state_action)

# 计算预测误差作为内在奖励
prediction_error = F.mse_loss(predicted_next_state, next_state_tensor)
intrinsic_reward = prediction_error.item()

return intrinsic_reward

def update_curiosity_models(self, state, action, next_state):
"""更新好奇心模型"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_tensor = torch.FloatTensor([action]).unsqueeze(0)
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

# 前向模型损失
state_action = torch.cat([state_tensor, action_tensor], dim=1)
predicted_next_state = self.forward_model(state_action)
forward_loss = F.mse_loss(predicted_next_state, next_state_tensor)

# 逆向模型损失
state_next_state = torch.cat([state_tensor, next_state_tensor], dim=1)
predicted_action = self.inverse_model(state_next_state)
inverse_loss = F.cross_entropy(predicted_action, torch.LongTensor([action]))

# 总损失
curiosity_loss = forward_loss + inverse_loss

self.curiosity_optimizer.zero_grad()
curiosity_loss.backward()
self.curiosity_optimizer.step()

return forward_loss.item(), inverse_loss.item()

8. 总结与展望

8.1 核心贡献

本文全面介绍了强化学习从基础理论到先进算法的发展历程:

  1. 理论基础:详细阐述了强化学习的核心概念、马尔可夫决策过程和基本算法框架
  2. 经典算法:深入分析了Q-Learning、DQN等价值函数方法的原理和实现
  3. 策略方法:介绍了REINFORCE、Actor-Critic、PPO等策略梯度算法
  4. 实际应用:展示了强化学习在游戏AI、机器人控制、推荐系统等领域的应用
  5. 技术挑战:讨论了样本效率、探索利用等关键问题及解决方案

8.2 技术发展趋势

强化学习领域正在快速发展,主要趋势包括:

  1. 大规模预训练:结合大语言模型的强化学习方法
  2. 多智能体系统:协作和竞争环境下的学习算法
  3. 离线强化学习:从历史数据中学习而无需在线交互
  4. 可解释性:提高强化学习决策的透明度和可理解性
  5. 安全强化学习:确保学习过程和结果的安全性

8.3 应用前景

强化学习将在更多领域发挥重要作用:

  • 自动驾驶:复杂交通环境下的决策控制
  • 金融交易:动态市场环境下的投资策略
  • 医疗诊断:个性化治疗方案的优化
  • 能源管理:智能电网和可再生能源调度
  • 教育技术:自适应学习系统和个性化教学

强化学习作为实现人工智能的重要途径,将继续推动AI技术的发展和应用,为解决复杂的现实世界问题提供强有力的工具。通过不断的理论创新和技术突破,强化学习必将在构建更加智能和自主的AI系统中发挥核心作用。


参考文献

  1. Sutton, R. S., & Barto, A. G. (2018). Reinforcement Learning: An Introduction (2nd ed.)
  2. Mnih, V., et al. (2015). Human-level control through deep reinforcement learning. Nature
  3. Schulman, J., et al. (2017). Proximal Policy Optimization Algorithms. arXiv preprint
  4. Silver, D., et al. (2016). Mastering the game of Go with deep neural networks and tree search. Nature
  5. Lillicrap, T. P., et al. (2015). Continuous control with deep reinforcement learning. arXiv preprint

关键词:强化学习, Q-Learning, DQN, PPO, 策略梯度, Actor-Critic, 深度强化学习, AI应用

版权所有,如有侵权请联系我