Andrej Karpathy刚发布了一个仅用约 250 行纯 Python 代码就实现了 GPT 训练和推理全过程的演示,非常适合用来理解大型语言模型底层的数学原理。

源文件在这里:https://gist.github.com/karpathy/8627fe009c40f57531cb18360106ce95
我用AI添加了注释并手动整理了一下:
# 导入所需依赖库
import torch # PyTorch核心库,用于张量计算和神经网络构建
import torch.nn as nn # PyTorch的神经网络模块,包含层、损失函数等
from torch.nn import functional as F # PyTorch的优化器模块,用于模型参数更新
import torch.optim as optim # GPT2的分词器,用于文本的编码和解码
from transformers import GPT2Tokenizer # 从huggingface/transformers导入GPT2分词器,适配英文文本
# 定义全局超参数,控制模型训练和结构
batch_size = 16 # 每次训练的样本数,小批量梯度下降用
block_size = 32 # 上下文窗口大小,模型能看到的最大文本长度
max_iters = 5000 # 训练的最大迭代次数
eval_interval = 100 # 每多少轮迭代评估一次模型性能
learning_rate = 1e-3 # 优化器的学习率,控制参数更新步长
device = 'cuda' if torch.cuda.is_available() else 'cpu' # 模型运行的设备,优先使用GPU(cuda),无则用CPU
eval_iters = 200 # 每次评估时的迭代次数,取平均减少波动
n_embd = 64 # 嵌入层的维度,模型中隐藏层的特征维度
n_head = 4 # 注意力头的数量,实现多头自注意力
n_layer = 4 # Transformer解码器的层数
# 固定随机种子,保证实验结果可复现
torch.manual_seed(1337)
# 加载GPT2分词器,设置为不使用填充(padding)和未知词(unk)的特殊处理
# GPT2分词器基于BPE(字节对编码),适配英文自然语言,词表大小约50k
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
# 禁用填充token,因为本微型GPT不做批量填充对齐
tokenizer.pad_token = None
# 禁用未知token,遇到未登录词时直接拆分为子词
tokenizer.unk_token = None
# 加载训练数据:这里使用经典的莎士比亚文本作为训练语料
# 从github拉取原始文本文件,读取为字符串格式
with open('https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt', 'r', encoding='utf-8') as f:
text = f.read()
# 对原始文本进行分词编码,将字符串转换为模型可处理的整数张量
# return_tensors='pt':返回PyTorch张量;truncation=True:超长文本截断
# max_length=None:不限制单条文本长度,后续按block_size切分
encoded_text = tokenizer(text, return_tensors='pt', truncation=True, max_length=None)
# 提取编码后的输入id,展平为一维张量(shape: [总词数])
data = encoded_text.input_ids.flatten()
# 划分训练集和验证集:90%数据用于训练,10%用于验证
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]
# 数据加载函数:随机生成一批训练/验证样本
# split:指定数据集('train'/'val'),控制加载训练集还是验证集
def get_batch(split):
# 根据split选择对应的数据集
data = train_data if split == 'train' else val_data
# 随机生成batch_size个起始索引,范围:[0, 数据长度-block_size),保证能取到连续的block_size个词
ix = torch.randint(len(data) - block_size, (batch_size,))
# 构造输入张量x:取每个起始索引后连续的block_size个词,shape: [batch_size, block_size]
x = torch.stack([data[i:i+block_size] for i in ix])
# 构造目标张量y:取每个起始索引后偏移1的block_size个词(语言模型的预测目标是下一个词),shape: [batch_size, block_size]
y = torch.stack([data[i+1:i+block_size+1] for i in ix])
# 将张量移到指定设备(GPU/CPU)
x, y = x.to(device), y.to(device)
return x, y
# 定义评估函数:计算模型在训练/验证集上的平均损失(无梯度计算,提升效率)
# model:待评估的模型实例
@torch.no_grad() # 装饰器,禁用梯度计算,减少内存占用
def estimate_loss(model):
# 初始化损失字典,存储训练集和验证集的损失
out = {}
# 将模型设为评估模式,关闭Dropout等训练特有的层
model.eval()
# 遍历训练集和验证集
for split in ['train', 'val']:
# 初始化损失数组,存储每次迭代的损失
losses = torch.zeros(eval_iters)
# 循环eval_iters次,计算平均损失
for k in range(eval_iters):
# 获取一批样本
X, Y = get_batch(split)
# 前向传播,得到模型输出和损失
logits, loss = model(X, Y)
# 记录当前迭代的损失
losses[k] = loss.item()
# 计算该数据集的平均损失,存入字典
out[split] = losses.mean()
# 将模型恢复为训练模式,开启Dropout等层
model.train()
return out
# 定义单头自注意力层:实现自注意力的核心逻辑(缩放点积注意力)
class Head(nn.Module):
def __init__(self, head_size):
super().__init__()
# 定义查询(q)、键(k)、值(v)的线性投影层,将n_embd维映射到head_size维
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
# 注册三角掩码张量,用于遮蔽未来的词(自回归语言模型,不能看到未来信息)
# tril:下三角矩阵,upper三角部分填充为-∞,后续softmax后为0
self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
# 定义Dropout层,防止过拟合,随机失活20%的神经元
self.dropout = nn.Dropout(0.2)
def forward(self, x):
# x:输入张量,shape: [batch_size, block_size, n_embd]
B, T, C = x.shape
# 线性投影得到q、k、v,shape均为[batch_size, block_size, head_size]
k = self.key(x)
q = self.query(x)
v = self.value(x)
# 计算注意力权重:q @ k.T / sqrt(head_size)(缩放点积)
# wei shape: [batch_size, block_size, block_size]
wei = q @ k.transpose(-2, -1) * C**-0.5
# 应用掩码:将上三角部分设为-∞,遮蔽未来的词
wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
# softmax归一化,得到注意力权重(每行和为1)
wei = F.softmax(wei, dim=-1)
# 应用Dropout,随机失活部分注意力权重
wei = self.dropout(wei)
# 注意力加权求和:权重 @ v,得到输出,shape: [batch_size, block_size, head_size]
out = wei @ v
return out
# 定义多头自注意力层:将多个单头注意力的输出拼接,实现多维度的特征提取
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, head_size):
super().__init__()
# 创建num_heads个单头注意力层,存入nn.ModuleList(可被PyTorch识别的层列表)
self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
# 线性投影层,将拼接后的特征映射回n_embd维
self.proj = nn.Linear(head_size * num_heads, n_embd)
# Dropout层,防止过拟合
self.dropout = nn.Dropout(0.2)
def forward(self, x):
# 拼接所有单头注意力的输出,dim=-1表示在最后一维拼接
# 输出shape: [batch_size, block_size, head_size*num_heads]
out = torch.cat([h(x) for h in self.heads], dim=-1)
# 线性投影+Dropout,完成多头注意力的输出变换
out = self.dropout(self.proj(out))
return out
# 定义前馈网络层:Transformer解码器中的全连接层,实现特征的非线性变换
class FeedFoward(nn.Module):
def __init__(self, n_embd):
super().__init__()
# 两层线性层+ReLU激活+Dropout,隐藏层维度设为n_embd*4(Transformer原论文设定)
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd), # 升维
nn.ReLU(), # 非线性激活
nn.Linear(4 * n_embd, n_embd), # 降维回原维度
nn.Dropout(0.2), # 随机失活,防止过拟合
)
def forward(self, x):
# 前向传播,输入输出shape均为[batch_size, block_size, n_embd]
return self.net(x)
# 定义Transformer解码器块:由「多头自注意力 + 前馈网络」组成,带残差连接和层归一化
class Block(nn.Module):
def __init__(self, n_embd, n_head):
super().__init__()
# 计算每个注意力头的维度:总嵌入维 / 头数
head_size = n_embd // n_head
# 多头自注意力层
self.sa = MultiHeadAttention(n_head, head_size)
# 前馈网络层
self.ffwd = FeedFoward(n_embd)
# 层归一化层(Pre-LN架构,Transformer原论文是Post-LN,Pre-LN更易训练)
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
# 残差连接 + 层归一化 + 多头自注意力:x = x + sa(ln1(x))
# 残差连接缓解深度网络的梯度消失问题
x = x + self.sa(self.ln1(x))
# 残差连接 + 层归一化 + 前馈网络:x = x + ffwd(ln2(x))
x = x + self.ffwd(self.ln2(x))
return x
# 定义微型GPT模型核心类,继承自nn.Module(PyTorch所有网络的基类)
class GPTLanguageModel(nn.Module):
def __init__(self, vocab_size):
super().__init__()
# 词嵌入层:将词的整数ID映射为n_embd维的向量,vocab_size为词表大小
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
# 位置嵌入层:将位置索引映射为n_embd维的向量,捕捉文本的位置信息
# 因为Transformer是并行计算,无内置位置信息,需手动加入
self.position_embedding_table = nn.Embedding(block_size, n_embd)
# Transformer解码器块序列:n_layer个Block堆叠
self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])
# 最后的层归一化层
self.ln_f = nn.LayerNorm(n_embd)
# 输出线性层:将n_embd维的特征映射回词表大小,用于预测下一个词的概率
self.lm_head = nn.Linear(n_embd, vocab_size)
# 初始化模型参数:使用自定义的初始化方式,提升训练稳定性
self.apply(self._init_weights)
# 模型参数初始化函数
def _init_weights(self, module):
# 如果是线性层,初始化权重为正态分布(均值0,标准差0.02),偏置为0
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
# 如果是嵌入层,初始化权重为正态分布(均值0,标准差0.02)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
# 模型前向传播函数:输入x(词ID),y(目标词ID,可选),返回预测logits和损失
def forward(self, idx, targets=None):
# idx shape: [batch_size, block_size]
# targets shape: [batch_size, block_size]
B, T = idx.shape
# 词嵌入 + 位置嵌入,shape均为[batch_size, block_size, n_embd]
tok_emb = self.token_embedding_table(idx)
pos_emb = self.position_embedding_table(torch.arange(T, device=device))
# 嵌入层输出:词嵌入+位置嵌入,捕捉词的语义和位置信息
x = tok_emb + pos_emb
# 经过Transformer解码器块序列,输出shape不变:[batch_size, block_size, n_embd]
x = self.blocks(x)
# 最后的层归一化
x = self.ln_f(x)
# 输出线性层,得到logits(未归一化的概率),shape: [batch_size, block_size, vocab_size]
logits = self.lm_head(x)
# 如果没有目标值,仅返回logits(用于生成文本)
if targets is None:
loss = None
else:
# 重塑logits和targets,适配交叉熵损失的输入格式
# cross_entropy要求输入为[B*T, vocab_size],目标为[B*T]
B, T, C = logits.shape
logits = logits.view(B*T, C)
targets = targets.view(B*T)
# 计算交叉熵损失(语言模型的核心损失,预测下一个词的概率)
loss = F.cross_entropy(logits, targets)
return logits, loss
# 文本生成函数:基于当前输入idx,生成后续max_new_tokens个词
# 自回归生成:每次预测一个词,拼接到输入后,继续预测下一个
def generate(self, idx, max_new_tokens):
# idx: 初始输入张量,shape: [batch_size, block_size]
for _ in range(max_new_tokens):
# 截取最后block_size个词,保证输入长度不超过模型的上下文窗口
idx_cond = idx[:, -block_size:]
# 前向传播,得到logits(无目标值,loss=None)
logits, loss = self(idx_cond)
# 取最后一个时间步的logits(预测下一个词的logits),shape: [batch_size, vocab_size]
logits = logits[:, -1, :]
# softmax归一化,得到下一个词的概率分布
probs = F.softmax(logits, dim=-1)
# 根据概率分布随机采样一个词ID(也可以用argmax取最可能的词,即贪心生成)
idx_next = torch.multinomial(probs, num_samples=1)
# 将采样的词ID拼接到输入后,更新输入张量
idx = torch.cat((idx, idx_next), dim=1)
# 返回生成后的完整词ID张量
return idx
# 主程序入口:实例化模型、优化器,开始训练和生成
if __name__ == "__main__":
# 获取GPT2分词器的词表大小,作为模型的vocab_size
vocab_size = tokenizer.vocab_size
# 实例化微型GPT模型,移到指定设备
model = GPTLanguageModel(vocab_size).to(device)
# 打印模型参数量,查看模型规模
print(f"Model parameters: {sum(p.numel() for p in model.parameters())/1e6:.2f}M")
# 定义优化器:使用AdamW(Adam的改进版,带权重衰减,防止过拟合)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)
# 训练循环:迭代max_iters次
for iter in range(max_iters):
# 每隔eval_interval次迭代,评估模型损失并打印
if iter % eval_interval == 0:
losses = estimate_loss(model)
print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
# 获取一批训练样本
xb, yb = get_batch('train')
# 前向传播,得到logits和损失
logits, loss = model(xb, yb)
# 梯度清零:PyTorch梯度会累加,每次迭代前需清零
optimizer.zero_grad(set_to_none=True)
# 反向传播:计算损失对模型参数的梯度
loss.backward()
# 优化器步骤:更新模型参数
optimizer.step()
# 训练完成后,进行文本生成
# 初始化输入:<|endoftext|>是GPT2的特殊起始token,编码为张量并移到设备
# shape: [1, 1](batch_size=1, block_size=1)
start_idx = tokenizer.encode("<|endoftext|>", return_tensors='pt').to(device)
# 生成max_new_tokens=500个词
generated_ids = model.generate(start_idx, max_new_tokens=500)
# 将生成的词ID解码为字符串,跳过特殊token
generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
# 打印生成的文本
print("\nGenerated text:\n")
print(generated_text)