diff --git a/docs/chapter5/llama2_model.py b/docs/chapter5/code/model.py similarity index 96% rename from docs/chapter5/llama2_model.py rename to docs/chapter5/code/model.py index 49e808c..c0f4353 100644 --- a/docs/chapter5/llama2_model.py +++ b/docs/chapter5/code/model.py @@ -1,20 +1,8 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -@File : llama2_model.py -@Time : 2024/04/14 22:26:35 -@Author : 不要葱姜蒜 -@Version : 1.0 -@Desc : 部分代码借鉴llama2.c仓库代码 -''' - import math import struct import inspect from dataclasses import dataclass from typing import Any, Optional, Tuple - -import numpy as np import torch import torch.nn.functional as F from torch import nn @@ -35,7 +23,7 @@ class ModelArgs: dropout: float = 0.0 # 丢弃率 -class LLaMA2RMSNorm(nn.Module): +class RMSNorm(nn.Module): def __init__(self, dim: int, eps: float): super().__init__() # eps是为了防止除以0的情况 @@ -128,7 +116,7 @@ def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor: .reshape(bs, slen, n_kv_heads * n_rep, head_dim) # 重新塑形,合并键/值对头的数量和重复次数的维度 ) -class LLaMA2Attention(nn.Module): +class Attention(nn.Module): def __init__(self, args: ModelArgs): super().__init__() # 根据是否指定n_kv_heads,确定用于键(key)和值(value)的头的数量。 @@ -215,7 +203,7 @@ class LLaMA2Attention(nn.Module): output = self.resid_dropout(output) return output -class LLaMA2MLP(nn.Module): +class MLP(nn.Module): def __init__(self, dim: int, hidden_dim: int, multiple_of: int, dropout: float): super().__init__() # 如果没有指定隐藏层的维度,我们将其设置为输入维度的4倍 @@ -241,7 +229,7 @@ class LLaMA2MLP(nn.Module): return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x))) -class LLaMA2DecoderLayer(nn.Module): +class DecoderLayer(nn.Module): def __init__(self, layer_id: int, args: ModelArgs): super().__init__() # 定义多头注意力的头数 @@ -251,9 +239,9 @@ class LLaMA2DecoderLayer(nn.Module): # 定义每个头的维度,等于输入维度除以头数 self.head_dim = args.dim // args.n_heads # 定义LLaMA2Attention对象,用于进行多头注意力计算 - self.attention = LLaMA2Attention(args) + self.attention = Attention(args) # 定义LLaMAMLP对象,用于进行前馈神经网络计算 - self.feed_forward = LLaMA2MLP( + self.feed_forward = MLP( dim=args.dim, hidden_dim=args.hidden_dim, multiple_of=args.multiple_of, @@ -262,9 +250,9 @@ class LLaMA2DecoderLayer(nn.Module): # 定义层的ID self.layer_id = layer_id # 定义注意力计算的归一化层 - self.attention_norm = LLaMA2RMSNorm(args.dim, eps=args.norm_eps) + self.attention_norm = RMSNorm(args.dim, eps=args.norm_eps) # 定义前馈神经网络计算的归一化层 - self.ffn_norm = LLaMA2RMSNorm(args.dim, eps=args.norm_eps) + self.ffn_norm = RMSNorm(args.dim, eps=args.norm_eps) def forward(self, x, freqs_cos, freqs_sin): # 前向传播函数 @@ -274,7 +262,7 @@ class LLaMA2DecoderLayer(nn.Module): out = h + self.feed_forward.forward(self.ffn_norm(h)) return out -class LLaMA2Model(nn.Module): +class Transformer(nn.Module): last_loss: Optional[torch.Tensor] def __init__(self, args: ModelArgs): @@ -293,9 +281,9 @@ class LLaMA2Model(nn.Module): # Decoder层 self.layers = torch.nn.ModuleList() for layer_id in range(args.n_layers): - self.layers.append(LLaMA2DecoderLayer(layer_id, args)) + self.layers.append(DecoderLayer(layer_id, args)) # 归一化层 - self.norm = LLaMA2RMSNorm(args.dim, eps=args.norm_eps) + self.norm = RMSNorm(args.dim, eps=args.norm_eps) # 输出层 self.output = nn.Linear(args.dim, args.vocab_size, bias=False) @@ -383,6 +371,7 @@ class LLaMA2Model(nn.Module): def estimate_mfu(self, fwdbwd_per_iter, dt): """ 估计模型的 FLOPs 利用率 (MFU) 单位:A100 bfloat16 的峰值 FLOPS """ # 计算每次迭代的 FLOPs 数量(参考 PaLM 论文的附录 B) + # PaLM: Scaling Language Modeling with Pathways: https://arxiv.org/abs/2204.02311 N = sum(p.numel() for p in self.parameters()) cfg = self.args L, H, Q, T = cfg.n_layers, cfg.n_heads, cfg.dim//cfg.n_heads, cfg.max_seq_len @@ -432,7 +421,7 @@ if __name__ == '__main__': # LLaMA2Model.forward 接受两个参数,tokens和targets,其中tokens是输入的张量, 应为int类型 x = torch.randint(0, 32000, (1, 50)) # [bs, seq_len] # 实例化LLaMA2Model - model = LLaMA2Model(args=args) + model = Transformer(args=args) # 计算model的全部参数 num_params = sum(p.numel() for p in model.parameters()) print('Number of parameters:', num_params) diff --git a/docs/chapter5/code/preprocess.py b/docs/chapter5/code/preprocess.py new file mode 100644 index 0000000..978dd60 --- /dev/null +++ b/docs/chapter5/code/preprocess.py @@ -0,0 +1,194 @@ +import glob +import json +import os +import random +from concurrent.futures import ProcessPoolExecutor +from functools import partial + +import numpy as np +import sentencepiece as spm +import torch +import torch.distributed as dist +from tqdm import tqdm + +from tokenizer import Tokenizer + +DATA_CACHE_DIR = 'data' +TOKENIZER_MODEL = "./data/tok4096.model" + + +# 定义分片处理函数 +def process_shard(args, vocab_size, tokenizer_model_path): + """ + 处理数据分片,将其中的文本进行分词并保存为二进制文件。 + + 参数: + args: tuple, 包含分片ID和分片文件名 + vocab_size: int, 词汇表大小,用于决定输出文件存储路径 + """ + # 提取分片ID和文件名 + shard_id, shard = args + + # 初始化分词器 + enc = Tokenizer(tokenizer_model_path) + + # 打开并读取当前分片的JSON文件 + with open(shard, "r") as f: + data = json.load(f) + + # 用于保存所有的分词后的token + all_tokens = [] + + # 遍历每一个例子,tqdm显示进度条 + for example in tqdm(data, position=shard_id): + # 提取故事文本,并去除首尾空白字符 + text = example["story"] + text = text.strip() # 去掉首尾空白字符 + + # 对文本进行编码,使用BOS(开始标志)但不使用EOS(结束标志) + tokens = enc.encode(text, bos=True, eos=False) + # 将当前文本的token添加到总token列表 + all_tokens.extend(tokens) + + # 将所有的token转换为uint16类型的NumPy数组 + all_tokens = np.array(all_tokens, dtype=np.uint16) + + # 根据词汇表大小确定输出文件名 + if vocab_size == 0: + # 如果词汇表大小为0,使用默认的Llama 2分词器,将文件保存到原路径 + tokenized_filename = shard.replace(".json", ".bin") + else: + # 如果有指定词汇表大小,保存到新目录`tok{vocab_size}`下 + bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") + shard_basename = os.path.basename(shard) + bin_basename = shard_basename.replace(".json", ".bin") + tokenized_filename = os.path.join(bin_dir, bin_basename) + + # 将token以二进制形式保存 + with open(tokenized_filename, "wb") as f: + f.write(all_tokens.tobytes()) + + # 计算平均序列长度(以BOS标记`1`分隔的序列) + avg_seq_len = all_tokens.size / ((all_tokens == 1).sum()) + print(f"Saved {tokenized_filename}, average seqlen: {avg_seq_len:.2f}") + + +# 定义预处理函数,用于对多个数据分片进行批量处理 +def pretokenize(vocab_size): + """ + 预处理所有的数据分片,并将分词后的数据保存为二进制文件。 + + 参数: + vocab_size: int, 词汇表大小,用于决定输出文件存储路径 + """ + # 数据所在目录 + data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") + + # 获取所有JSON文件的文件名列表,并按字典序排序 + shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) + + # 如果词汇表大小大于0,则创建对应的保存目录 + if vocab_size > 0: + bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") + os.makedirs(bin_dir, exist_ok=True) + + # 使用partial函数将vocab_size绑定到process_shard函数 + fun = partial(process_shard, vocab_size=vocab_size, tokenizer_model_path=TOKENIZER_MODEL) + + # 使用进程池并行处理每个分片 + with ProcessPoolExecutor() as executor: + executor.map(fun, enumerate(shard_filenames)) + + print("Done.") + + +class PretokDataset(torch.utils.data.IterableDataset): + """从磁盘加载已预处理的分词数据,并将其以 PyTorch 张量的形式返回。""" + + def __init__(self, split, max_seq_len, vocab_size, vocab_source): + """ + 初始化数据集。 + + 参数: + split: str, 数据集的分割方式('train' 或 'test')。 + max_seq_len: int, 最大序列长度,用于生成输入输出序列。 + vocab_size: int, 词汇表的大小。 + vocab_source: str, 词汇表的来源('llama2' 或 'custom')。 + """ + super().__init__() + self.split = split # 数据集划分(训练集或测试集) + self.max_seq_len = max_seq_len # 最大序列长度 + self.vocab_size = vocab_size # 词汇表大小 + self.vocab_source = vocab_source # 词汇表来源 + + def __iter__(self): + """ + 返回迭代器,按批次加载数据并生成模型输入/输出。 + """ + # 获取DataLoader的worker信息(用于并行数据加载) + worker_info = torch.utils.data.get_worker_info() + worker_id = worker_info.id if worker_info else 0 # worker ID + # 获取分布式训练的rank信息(用于多GPU训练) + rank = dist.get_rank() if dist.is_initialized() else 0 + # 基于worker_id和rank生成唯一的随机数种子,确保数据在每个worker和rank之间是唯一的 + seed = 42 + worker_id + 1337 * rank + rng = random.Random(seed) + print(f"Created a PretokDataset with rng seed {seed}") + + # 根据词汇表来源决定数据路径 + if self.vocab_source == "llama2": + # 如果使用 Llama 2 词汇表,.bin 文件和 .json 文件在同一目录下 + bin_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") + shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin"))) + elif self.vocab_source == "custom": + # 如果使用自定义词汇表,.bin 文件在 tok{N} 目录下 + bin_dir = os.path.join(DATA_CACHE_DIR, f"tok{self.vocab_size}") + shard_filenames = sorted(glob.glob(os.path.join(bin_dir, "*.bin"))) + + # 根据数据集划分使用不同的分片文件 + # 训练集使用所有分片文件,测试集只使用第一个分片 + shard_filenames = shard_filenames[1:] if self.split == "train" else shard_filenames[:1] + assert len(shard_filenames) > 0, f"在 {bin_dir} 中未找到任何 .bin 文件" + + while True: + # 随机打乱分片文件 + rng.shuffle(shard_filenames) + for shard in shard_filenames: + # 使用 memmap 读取文件,使得数据留在磁盘上,减少内存占用 + m = np.memmap(shard, dtype=np.uint16, mode="r") + # 计算该分片中的批次数量 + num_batches = len(m) // self.max_seq_len + num_batches -= 1 # 去掉最后一个不完整的批次 + assert num_batches > 0, "这个分片文件太小了?请检查。" + # 随机打乱批次索引 + ixs = list(range(num_batches)) + rng.shuffle(ixs) + # 对每个批次生成输入 x 和目标输出 y + for ix in ixs: + start = ix * self.max_seq_len # 批次起始索引 + end = start + self.max_seq_len + 1 # 批次结束索引 + # 将数据转换为 NumPy 数组并拷贝到 RAM 中 + chunk = torch.from_numpy((m[start:end]).astype(np.int64)) + # 模型输入 x 是当前批次的前 max_seq_len 个词元 + x = chunk[:-1] + # 模型输出 y 是下一个词元 + y = chunk[1:] + # 生成 x, y 对 + yield x, y + + +class Task: + @staticmethod + def iter_batches(batch_size, device, num_workers=0, **dataset_kwargs): + ds = PretokDataset(**dataset_kwargs) + dl = torch.utils.data.DataLoader( + ds, batch_size=batch_size, pin_memory=True, num_workers=num_workers + ) + for x, y in dl: + x = x.to(device, non_blocking=True) + y = y.to(device, non_blocking=True) + yield x, y + + +if __name__ == "__main__": + pretokenize(vocab_size=4096) \ No newline at end of file diff --git a/docs/chapter5/code/requirements.txt b/docs/chapter5/code/requirements.txt new file mode 100644 index 0000000..b5b4c57 --- /dev/null +++ b/docs/chapter5/code/requirements.txt @@ -0,0 +1,5 @@ +numpy==1.23.5 +Requests==2.31.0 +sentencepiece==0.1.99 +torch==2.0.1 +tqdm==4.64.1 \ No newline at end of file diff --git a/docs/chapter5/code/sample.py b/docs/chapter5/code/sample.py new file mode 100644 index 0000000..c5dd94b --- /dev/null +++ b/docs/chapter5/code/sample.py @@ -0,0 +1,104 @@ +import os +import pickle +from contextlib import nullcontext +import torch +from model import ModelArgs, Transformer +from tokenizer import Tokenizer +import argparse + +class TextGenerator: + def __init__(self, + checkpoint='output/ckpt.pt', # 模型检查点路径 + tokenizer_model_path='tok4096.model', # 分词器模型路径 + seed=1337, # 随机种子,确保可重复性 + device=None, # 设备,优先使用 CUDA,如果没有可用的 CUDA,则使用 CPU + dtype="float32"): # 数据类型,默认为 float32,可以选择 float16 或 bfloat16 + """ + 初始化 TextGenerator 类,加载模型、设置设备和分词器等。 + """ + # 模型加载配置 + self.checkpoint = checkpoint # 保存的模型检查点路径 + self.tokenizer_model_path = tokenizer_model_path # 分词器模型文件路径 + self.seed = seed # 随机数种子,用于生成的可重复性 + self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu') # 根据硬件条件选择设备 + self.dtype = dtype # 模型的浮点数类型 + self.device_type = 'cuda' if 'cuda' in self.device else 'cpu' # 判断当前设备是否为 CUDA + + # 设置随机种子,确保生成的可重复性 + torch.manual_seed(seed) # 设置 CPU 随机种子 + torch.cuda.manual_seed(seed) # 设置 CUDA 随机种子 + torch.backends.cuda.matmul.allow_tf32 = True # 允许 CUDA 使用 TF32 精度进行矩阵乘法运算 + torch.backends.cudnn.allow_tf32 = True # 允许 cuDNN 使用 TF32 精度加速 + + # 根据 dtype 选择适当的自动混合精度上下文 + ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[self.dtype] + self.ctx = nullcontext() if self.device_type == 'cpu' else torch.amp.autocast(device_type=self.device_type, dtype=ptdtype) + + # 加载模型检查点文件 + checkpoint_dict = torch.load(self.checkpoint, map_location=self.device) # 加载模型参数 + gptconf = ModelArgs(**checkpoint_dict['model_args']) # 初始化模型参数 + self.model = Transformer(gptconf) # 实例化 Transformer 模型 + state_dict = checkpoint_dict['model'] # 获取模型状态字典 + + # 去除状态字典中的不必要前缀 + unwanted_prefix = '_orig_mod.' # 这个前缀在保存时可能被添加,现在要去除它 + for k, v in list(state_dict.items()): + if k.startswith(unwanted_prefix): + state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k) # 去除不必要的前缀 + + # 加载模型参数到模型中 + self.model.load_state_dict(state_dict, strict=False) + # 计算模型参数量 + num_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad) + print(f"Model has {num_params} parameters.") + # 设置模型为评估模式(evaluation mode),防止训练模式下的 dropout 等操作影响结果 + self.model.eval() + # 将模型放置到正确的设备上(GPU 或 CPU) + self.model.to(self.device) + # 初始化分词器 + self.tokenizer = Tokenizer(tokenizer_model=self.tokenizer_model_path) # 根据指定的路径加载分词器 + + def sample(self, + start="Hello!", # 生成文本的起始提示词,可以是任意字符串 + num_samples=3, # 生成样本的数量,默认生成 3 个样本 + max_new_tokens=256, # 每个样本生成的最大 token 数,默认最多生成 256 个 token + temperature=1.0, # 控制生成的随机性,1.0 为标准,值越大越随机 + top_k=300): # 保留概率最高的 top_k 个 token,限制生成时的选择范围 + """ + 根据给定的起始文本生成样本。 + + :param start: 生成文本的起始提示词 + :param num_samples: 要生成的文本样本数 + :param max_new_tokens: 每个样本生成的最大 token 数 + :param temperature: 控制生成的随机性,值越小生成越确定,值越大生成越随机 + :param top_k: 限制生成时选择的 token 范围 + :return: 生成的文本样本列表 + """ + # 如果 start 是以 'FILE:' 开头,表示从文件中读取起始文本 + if start.startswith('FILE:'): + with open(start[5:], 'r', encoding='utf-8') as f: + start = f.read() # 读取文件内容作为起始文本 + + # 将起始文本编码为 token id 序列 + start_ids = self.tokenizer.encode(start, bos=True, eos=False) # bos=True 表示加上句首标记,eos=False 表示不加句尾标记 + x = (torch.tensor(start_ids, dtype=torch.long, device=self.device)[None, ...]) # 将编码后的 token id 转为 PyTorch 张量 + + generated_texts = [] # 用于保存生成的文本样本 + with torch.no_grad(): # 禁用梯度计算,提升效率 + with self.ctx: # 进入自动混合精度的上下文(如果是 GPU 并使用 float16 时) + for k in range(num_samples): # 循环生成指定数量的样本 + y = self.model.generate(x, max_new_tokens, temperature=temperature, top_k=top_k) # 生成文本 + generated_texts.append(self.tokenizer.decode(y[0].tolist())) # 解码生成的 token 序列为可读文本 + + return generated_texts # 返回生成的文本样本 + +# 示例使用 +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--prompt", type=str, default="One day, Lily met a Shoggoth") + args = parser.parse_args() + + generator = TextGenerator() # 初始化生成器 + samples = generator.sample(start=args.prompt, num_samples=3, max_new_tokens=256) # 生成 3 个样本 + for i, sample in enumerate(samples): + print(f"\nSample {i+1}:\n{sample}\n{'-'*20}") # 打印生成的样本并用分隔线分割 diff --git a/docs/chapter5/code/tok4096.model b/docs/chapter5/code/tok4096.model new file mode 100644 index 0000000..b4a6273 Binary files /dev/null and b/docs/chapter5/code/tok4096.model differ diff --git a/docs/chapter5/code/tokenizer.py b/docs/chapter5/code/tokenizer.py new file mode 100644 index 0000000..e2fe5b3 --- /dev/null +++ b/docs/chapter5/code/tokenizer.py @@ -0,0 +1,68 @@ +import os +import struct +from sentencepiece import SentencePieceProcessor +from typing import List + +TOKENIZER_MODEL = "./data/tok4096.model" + +class Tokenizer: + def __init__(self, tokenizer_model=None): + """ + 初始化分词器。加载预训练的SentencePiece模型,并设置一些特殊的token ID。 + + 参数: + tokenizer_model: str, 可选,分词器模型的路径,如果不指定则使用默认路径 TOKENIZER_MODEL。 + """ + # 如果提供了分词器模型路径,使用该路径;否则使用默认模型路径 + model_path = tokenizer_model if tokenizer_model else TOKENIZER_MODEL + # 确保模型文件存在 + assert os.path.isfile(model_path), model_path + + # 加载 SentencePiece 模型 + self.sp_model = SentencePieceProcessor(model_file=model_path) + self.model_path = model_path + + # 获取分词器的特殊token和词汇表大小 + self.n_words: int = self.sp_model.vocab_size() # 词汇表大小 + self.bos_id: int = self.sp_model.bos_id() # 句子开头 (BOS) 的ID + self.eos_id: int = self.sp_model.eos_id() # 句子结尾 (EOS) 的ID + self.pad_id: int = self.sp_model.pad_id() # 填充 (PAD) 的ID + + # 验证分词器词汇表大小是否正确 + assert self.sp_model.vocab_size() == self.sp_model.get_piece_size() + + def encode(self, s: str, bos: bool, eos: bool) -> List[int]: + """ + 将字符串编码为词元ID列表。可以选择是否添加句子开头 (BOS) 和句子结尾 (EOS) 标记。 + + 参数: + s: str, 要编码的字符串。 + bos: bool, 是否在编码的词元列表前添加 BOS 标记。 + eos: bool, 是否在编码的词元列表末尾添加 EOS 标记。 + + 返回: + List[int]: 编码后的词元ID列表。 + """ + # 确保输入是字符串类型 + assert type(s) is str + # 使用SentencePiece将字符串编码为词元ID + t = self.sp_model.encode(s) + # 如果需要BOS标记,将其添加到词元列表开头 + if bos: + t = [self.bos_id] + t + # 如果需要EOS标记,将其添加到词元列表末尾 + if eos: + t = t + [self.eos_id] + return t + + def decode(self, t: List[int]) -> str: + """ + 将词元ID列表解码为字符串。 + + 参数: + t: List[int], 词元ID列表。 + + 返回: + str: 解码后的字符串。 + """ + return self.sp_model.decode(t) \ No newline at end of file diff --git a/docs/chapter5/code/train.py b/docs/chapter5/code/train.py new file mode 100644 index 0000000..6c63d6c --- /dev/null +++ b/docs/chapter5/code/train.py @@ -0,0 +1,257 @@ +import math +import os +import time +from contextlib import nullcontext +from datetime import datetime +from functools import partial + +import torch +from model import Transformer, ModelArgs +from preprocess import Task + +# ----------------------------------------------------------------------------- +# I/O 配置,用于定义输出目录和训练时的日志记录与评估设置 +out_dir = "output" # 模型输出保存路径 +eval_interval = 2000 # 评估间隔步数 +log_interval = 1 # 日志记录间隔步数 +eval_iters = 100 # 每次评估时迭代的步数 +eval_only = False # 如果为True,脚本在第一次评估后立即退出 +always_save_checkpoint = False # 如果为True,在每次评估后总是保存检查点 +init_from = "scratch" # 可以选择从头开始训练('scratch')或从已有的检查点恢复('resume') + +# 数据配置 +batch_size = 8 # 每个微批次的样本数量,如果使用梯度累积,实际批次大小将更大 +max_seq_len = 256 # 最大序列长度 +vocab_size = 4096 # 自定义词汇表大小 + +# 模型配置 +dim = 288 # 模型的隐藏层维度 +n_layers = 8 # Transformer的层数 +n_heads = 8 # 注意力头的数量 +n_kv_heads = 4 # 模型分组 +multiple_of = 32 # 在某些层的维度必须是该数的倍数 +dropout = 0.0 # Dropout概率 + +# AdamW优化器配置 +gradient_accumulation_steps = 4 # 梯度累积步数,用于模拟更大的批次 +learning_rate = 5e-4 # 最大学习率 +max_iters = 100000 # 总的训练迭代次数 +weight_decay = 1e-1 # 权重衰减系数 +beta1 = 0.9 # AdamW优化器的β1参数 +beta2 = 0.95 # AdamW优化器的β2参数 +grad_clip = 1.0 # 梯度裁剪阈值,0表示不裁剪 + +# 学习率衰减配置 +decay_lr = True # 是否启用学习率衰减 +warmup_iters = 1000 # 学习率预热的步数 + +# 系统设置 +device = "cuda:0" # 设备选择:'cpu','cuda','cuda:0'等 +dtype = "bfloat16" # 数据类型:'float32','bfloat16','float16' + +# ----------------------------------------------------------------------------- +# 获取配置参数的键值对,便于后续的日志记录 +config_keys = [ + k + for k, v in globals().items() + if not k.startswith("_") and isinstance(v, (int, float, bool, str)) +] +config = {k: globals()[k] for k in config_keys} # 保存配置到字典中,便于日志记录 +# ----------------------------------------------------------------------------- + +# 固定一些超参数的默认值 +lr_decay_iters = max_iters # 学习率衰减步数,设置为等于最大迭代步数 +min_lr = 0.0 # 最小学习率,建议为学习率的十分之一 +vocab_source = 'custom' # 词汇表来源 +master_process = True # 用于区分主进程 +seed_offset = 0 # 随机种子偏移量 +ddp_world_size = 1 # 分布式数据并行的世界大小 +tokens_per_iter = batch_size * max_seq_len # 每次迭代处理的token数 + +# 设置随机种子,确保可重复性 +torch.manual_seed(1337 + seed_offset) +torch.backends.cuda.matmul.allow_tf32 = True # 允许在matmul上使用tf32 +torch.backends.cudnn.allow_tf32 = True # 允许在cudnn上使用tf32 +device_type = "cuda" if "cuda" in device else "cpu" # 用于自动选择设备类型 +ptdtype = torch.float16 # 设置训练时使用的数据类型 + +# 混合精度训练相关 +ctx = ( + nullcontext() + if device_type == "cpu" + else torch.amp.autocast(device_type=device_type, dtype=ptdtype) +) + +# 为特定任务设置批次迭代器 iter_batches +iter_batches = partial( + Task.iter_batches, # 调用 Task 类中的 iter_batches 方法 + batch_size=batch_size, # 每个批次的样本数量 + max_seq_len=max_seq_len, # 每个序列的最大长度 + vocab_size=vocab_size, # 词汇表大小 + vocab_source=vocab_source, # 词汇表来源(如 llama2 或 custom) + device=device, # 运行模型的设备(如 GPU 或 CPU) + num_workers=0, # 用于数据加载的 worker 数量,0 表示在主线程中加载 +) + +# 训练迭代数初始化 +iter_num = 0 # 记录当前迭代数 + +# 验证集上的最好损失初始值设置为一个极大值,用于后续模型验证时对比更新 +best_val_loss = 1e9 # 设置初始的最佳验证损失为非常大的值,以便在训练中更新 + +# 模型初始化参数设置 +model_args = dict( + dim=dim, # 模型的隐藏层维度 + n_layers=n_layers, # Transformer 的层数 + n_heads=n_heads, # 多头注意力机制中的头数 + n_kv_heads=n_kv_heads, # 分组数(可能是用于并行化或其他优化目的) + vocab_size=vocab_size, # 词汇表大小 + multiple_of=multiple_of, # 用于调整某些维度的参数,确保其为特定数的倍数 + max_seq_len=max_seq_len, # 最大序列长度 + dropout=dropout, # dropout 概率,用于防止过拟合 +) + +# =========================================================== +# 模型初始化 +gptconf = ModelArgs(**model_args) +model = Transformer(gptconf) + + +model.to(device) + +# 初始化 GradScaler,用于自动混合精度训练(AMP) +# 如果 enabled=False,表示禁用混合精度,scaler 将不起作用 +scaler = torch.cuda.amp.GradScaler(enabled=(dtype == "float16")) + +# 优化器初始化,调用模型的 configure_optimizers 方法 +optimizer = model.configure_optimizers( + weight_decay, # 权重衰减(L2 正则化) + learning_rate, # 学习率 + (beta1, beta2), # Adam 优化器中的 beta1 和 beta2 参数 + device_type # 当前训练设备(如 GPU 或 CPU) +) + +# 定义评估损失的流程 +@torch.no_grad() # 使用 no_grad 装饰器,确保在评估过程中不计算梯度,从而节省内存 +def estimate_loss(): + out = {} # 用于存储训练集和验证集上的平均损失 + model.eval() # 将模型设置为评估模式,这会影响 dropout 和 batchnorm 等层的行为 + for split in ["train", "val"]: # 分别对训练集和验证集进行评估 + batch_iter = iter_batches(split=split) # 获取对应数据集的批次迭代器 + losses = torch.zeros(eval_iters) # 初始化一个张量用于存储多次迭代的损失,放在 CPU 上 + for k in range(eval_iters): # 进行多次迭代以计算平均损失 + X, Y = next(batch_iter) # 从迭代器中获取下一个批次的输入数据 X 和标签 Y + with ctx: # 上下文管理器,可以是 torch.autocast(),用于自动混合精度训练 + logits = model(X, Y) # 前向传播,计算模型的输出 + loss = raw_model.last_loss # 从模型中获取损失值 + losses[k] = loss.item() # 将损失值转换为 Python 标量并存储在 losses 张量中 + out[split] = losses.mean() # 计算当前数据集上的平均损失并保存到字典中 + model.train() # 恢复模型为训练模式 + return out # 返回包含训练集和验证集平均损失的字典 + +# 定义学习率调度函数 +def get_lr(it): + """ + 根据当前的训练迭代步数 it 返回当前的学习率值。 + 学习率调整策略包括线性预热、余弦退火和最小学习率限制。 + """ + # 1) 线性预热阶段,在 warmup_iters 之前,学习率线性增加到目标学习率 + if it < warmup_iters: + return learning_rate * it / warmup_iters # 预热阶段,学习率线性增长 + + # 2) 如果迭代步数超过 lr_decay_iters,返回最小学习率 min_lr + if it > lr_decay_iters: + return min_lr # 训练进入尾声时,学习率达到最小值并保持不变 + + # 3) 余弦退火阶段,在 warmup_iters 和 lr_decay_iters 之间,学习率逐渐降低 + decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters) + assert 0 <= decay_ratio <= 1 # 确保衰减比在合法范围内 + coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio)) # 余弦函数计算衰减系数,范围为0到1 + return min_lr + coeff * (learning_rate - min_lr) # 根据衰减系数调整学习率 + +# 初始化训练数据的迭代器 +train_batch_iter = iter_batches(split="train") +X, Y = next(train_batch_iter) # 获取第一个批次的数据 +t0 = time.time() # 记录开始时间 +local_iter_num = 0 # 本进程中的迭代次数 +raw_model = model # 如果使用了分布式数据并行 (DDP),需要解包模型 +running_mfu = -1.0 # 初始化模型浮点运算利用率 + +os.makedirs(out_dir, exist_ok=True) + +while True: + # 或许当前step的学习率 + lr = get_lr(iter_num) if decay_lr else learning_rate + # 更新优化器中的学习率 + for param_group in optimizer.param_groups: + param_group["lr"] = lr + + # 在指定的评估间隔进行模型评估和保存检查点 + if iter_num % eval_interval == 0 and master_process: + losses = estimate_loss() # 评估当前模型在训练集和验证集上的损失 + print(f"step {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}") + + # 如果验证损失降低,或者设置为始终保存检查点,则保存模型 + if losses["val"] < best_val_loss or always_save_checkpoint: + best_val_loss = losses["val"] + if iter_num > 0: + # 创建检查点字典,包含模型状态、优化器状态和其他信息 + checkpoint = { + "model": raw_model.state_dict(), + "optimizer": optimizer.state_dict(), + "model_args": model_args, + "iter_num": iter_num, + "best_val_loss": best_val_loss, + "config": config, + } + print(f"saving checkpoint to {out_dir}") + # 保存检查点到指定目录 + torch.save(checkpoint, os.path.join(out_dir, "ckpt.pt")) + # 如果只进行评估且已经完成第一次迭代,则退出循环 + if iter_num == 0 and eval_only: + break + + # 前向和反向传播过程,支持梯度累积 + for micro_step in range(gradient_accumulation_steps): + + with ctx: # 混合精度训练的上下文管理器 + logits = model(X, Y) # 前向传播,计算模型输出 + loss = raw_model.last_loss # 获取模型的损失值 + loss = loss / gradient_accumulation_steps # 平均损失以支持梯度累积 + + X, Y = next(train_batch_iter) # 获取下一个批次的数据 + # 反向传播,计算梯度 + scaler.scale(loss).backward() + # 梯度处理阶段 + if grad_clip != 0.0: + # 取消梯度缩放以进行梯度裁剪 + scaler.unscale_(optimizer) + # 对梯度进行裁剪,防止梯度爆炸 + torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip) + # 更新优化器和梯度缩放器(用于混合精度训练) + scaler.step(optimizer) + scaler.update() + # 清空优化器的梯度,释放显存 + optimizer.zero_grad(set_to_none=True) + + # 计时和日志记录 + t1 = time.time() + dt = t1 - t0 # 计算一次迭代所需时间 + t0 = t1 + if iter_num % log_interval == 0 and master_process: + # 获取当前损失值,并根据梯度累积步骤进行调整 + lossf = loss.item() * gradient_accumulation_steps + if local_iter_num >= 5: # 让训练循环先运行几个迭代再计算模型利用率 + mfu = raw_model.estimate_mfu(batch_size * gradient_accumulation_steps, dt) + # 使用滑动平均更新模型浮点运算利用率(MFU) + running_mfu = mfu if running_mfu == -1.0 else 0.9 * running_mfu + 0.1 * mfu + print( + f"{iter_num} | loss {lossf:.4f} | lr {lr:e} | {dt*1000:.2f}ms | mfu {running_mfu*100:.2f}%" + # mfu 表示模型浮点运算利用率 + ) + iter_num += 1 # 全局迭代次数自增 + local_iter_num += 1 # 本地迭代次数自增 + + # 终止条件,达到最大迭代次数则退出循环 + if iter_num > max_iters: + break \ No newline at end of file diff --git a/docs/chapter5/code/train_vocab.py b/docs/chapter5/code/train_vocab.py new file mode 100644 index 0000000..b260e19 --- /dev/null +++ b/docs/chapter5/code/train_vocab.py @@ -0,0 +1,147 @@ +import glob +import json +import os +from tqdm import tqdm +import requests +import sentencepiece as spm +import argparse + +DATA_CACHE_DIR = 'data' + +def download_file(url: str, fname: str, chunk_size=1024): + """发送HTTP GET请求以流式方式获取文件""" + resp = requests.get(url, stream=True) + + # 获取文件的总大小(以字节为单位),默认为0如果没有提供'content-length'头信息 + total = int(resp.headers.get("content-length", 0)) + + # 以写二进制模式打开一个文件以保存下载的内容 + with open(fname, "wb") as file, tqdm( + desc=fname, # 进度条前面的描述信息(通常是文件名) + total=total, # 总的字节数,用于设置进度条的总长度 + unit="iB", # 进度条的单位,'iB'代表二进制字节 + unit_scale=True, # 启用单位缩放,如KB、MB等 + unit_divisor=1024, # 设置单位换算的除数,这里为1024 + ) as bar: + # 逐块读取响应内容并写入文件 + for data in resp.iter_content(chunk_size=chunk_size): + size = file.write(data) # 写入数据块到文件 + bar.update(size) # 更新进度条 + +def download(): + """在DATA_CACHE_DIR中创建目录,如果目录不存在则创建""" + os.makedirs(DATA_CACHE_DIR, exist_ok=True) + + # 定义TinyStories数据集的下载URL和保存的文件名 + data_url = "https://www.modelscope.cn/datasets/AI-ModelScope/TinyStories/resolve/master/TinyStories_all_data.tar.gz" + data_filename = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data.tar.gz") + + # 检查数据集是否已经下载,如果没有下载则进行下载 + if not os.path.exists(data_filename): + print(f"Downloading {data_url} to {data_filename}...") + download_file(data_url, data_filename) # 使用之前定义的download_file函数进行下载 + else: + print(f"{data_filename} already exists, skipping download...") + + # 定义解压缩后的数据目录 + data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") + + # 检查数据目录是否存在,如果不存在则解压缩数据集 + if not os.path.exists(data_dir): + os.makedirs(data_dir, exist_ok=True) # 创建数据目录 + print(f"Unpacking {data_filename}...") + os.system(f"tar -xzf {data_filename} -C {data_dir}") # 使用系统命令解压缩.tar.gz文件 + else: + print(f"{data_dir} already exists, skipping unpacking...") + + # 查找解压后的所有JSON文件,排序后获取文件名列表 + shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) + + # 打开第一个JSON文件并读取内容 + with open(shard_filenames[0], "r") as f: + data = json.load(f) # 将JSON文件内容加载到变量data中 + + print("Download done.") # 下载完成信息 + print(f"Number of shards: {len(shard_filenames)}") # 打印解压后数据分片的数量 + print(f"Example story:\n{data[0]}") # 打印第一个分片中的一个示例故事 + +def load_text_from_files(path): + path_list = glob.glob(path) + text_data = [] + for file_path in path_list: + with open(file_path, 'r', encoding='utf-8') as file: + text_data.extend(file.readlines()) + return text_data + +def batch_iterator(text_data, batch_size=648): + for i in range(0, len(text_data), batch_size): + yield text_data[i:i + batch_size] + +def train_vocab(vocab_size: int=32000, num_shards: int=20): + """ + vocab_size: int, 词汇表的大小,决定分词器的词汇量。 + num_shards: int, 用于加快词汇表训练的效率,指定要处理的分片数量。 + """ + # 确保词汇表大小为正数 + assert vocab_size > 0, "Vocab size must be positive" + + # SentencePiece 模型的前缀路径,将用于保存分词器 + prefix = os.path.join(DATA_CACHE_DIR, f"tok{vocab_size}") + + # 1) 将多个分片中的文本导出为单个文本文件 tiny.txt + tiny_file = os.path.join(DATA_CACHE_DIR, "tiny.txt") + data_dir = os.path.join(DATA_CACHE_DIR, "TinyStories_all_data") + shard_filenames = sorted(glob.glob(os.path.join(data_dir, "*.json"))) + + # 创建 tiny.txt 文件并写入指定数量的分片中的文本 + print(f"Writing temporary file {tiny_file} with {num_shards} shards...") + with open(tiny_file, "w", encoding="utf-8") as of: + # 遍历前 num_shards 个分片 + for shard in tqdm(shard_filenames[:num_shards]): + with open(shard, "r") as f: + data = json.load(f) # 读取分片中的JSON数据 + # 遍历每个例子,将其中的故事文本写入 tiny.txt 文件 + for example in data: + text = example["story"] + text = text.strip() # 去除文本首尾的空白字符 + of.write(text + "\n") # 每个文本写入一行 + + # 输出生成的 tiny.txt 文件的大小 + print(f"Size is: {os.path.getsize(tiny_file) / 1024 / 1024:.2f} MB") + + # 2) 使用 SentencePiece 训练分词器 + print("Will now train the vocab...") + spm.SentencePieceTrainer.train( + input=tiny_file, # 输入文件为之前生成的 tiny.txt + model_prefix=prefix, # 模型前缀路径 + model_type="bpe", # 使用 Byte-Pair Encoding (BPE) 训练分词器 + vocab_size=vocab_size, # 词汇表大小 + self_test_sample_size=0, # 自测样本大小设置为 0 + input_format="text", # 输入文件格式为纯文本 + character_coverage=1.0, # 覆盖所有字符(包括非常见字符) + num_threads=os.cpu_count(), # 使用 CPU 的线程数 + split_digits=True, # 拆分数字 + allow_whitespace_only_pieces=True, # 允许仅由空格组成的词元 + byte_fallback=True, # 启用字节级回退 + unk_surface=r" \342\201\207 ", # UNK token 表示未知字符的方式 + normalization_rule_name="identity" # 使用“identity”归一化规则 + ) + + # 3) 可选的清理操作,询问用户是否删除临时文件 tiny.txt + dec = input(f"Delete the temporary file {tiny_file}? [y/N] ") + if dec.lower() == "y": + os.remove(tiny_file) # 删除临时文件 + print(f"Deleted {tiny_file}") + + # 输出模型保存的路径 + print(f"Trained tokenizer is in {prefix}.model") + print("Done.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--download", type=bool, default=True, help="download the dataset") + parser.add_argument("--vocab_size", type=int, default=4096, help="vocab size") + args = parser.parse_args() + if args.download: + download() + train_vocab(args.vocab_size) \ No newline at end of file diff --git a/docs/chapter5/llama2.ipynb b/docs/chapter5/llama2.ipynb deleted file mode 100644 index 9867524..0000000 --- a/docs/chapter5/llama2.ipynb +++ /dev/null @@ -1,715 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, - "outputs": [], - "source": [ - "import math\n", - "import struct\n", - "import inspect\n", - "from dataclasses import dataclass\n", - "from typing import Any, Optional, Tuple\n", - "\n", - "import numpy as np\n", - "import torch\n", - "import torch.nn.functional as F\n", - "from torch import nn" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "class ModelArgs:\n", - " # 自定义超参数\n", - " dim: int = 288 # 模型维度\n", - " n_layers: int = 6 # Transformer层数\n", - " n_heads: int = 6 # 注意力机制的头数\n", - " n_kv_heads: Optional[int] = 6 # 键/值头数,如果未指定,则默认为n_heads\n", - " vocab_size: int = 32000 # 词汇表大小\n", - " hidden_dim: Optional[int] = None # 隐藏层维度,如果未指定,则使用其他规则确定\n", - " multiple_of: int = 32 # MLP隐藏层大小是这个数的倍数\n", - " norm_eps: float = 1e-5 # 归一化层的epsilon值\n", - " max_seq_len: int = 256 # 最大序列长度\n", - " dropout: float = 0.0 # 丢弃率\n", - "\n", - "args = ModelArgs()" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "class LLaMA2RMSNorm(nn.Module):\n", - " def __init__(self, dim: int, eps: float):\n", - " super().__init__()\n", - " # eps是为了防止除以0的情况\n", - " self.eps = eps\n", - " # weight是一个可学习的参数,全部初始化为1\n", - " self.weight = nn.Parameter(torch.ones(dim))\n", - "\n", - " def _norm(self, x):\n", - " # 计算RMSNorm的核心部分\n", - " # x.pow(2).mean(-1, keepdim=True)计算了输入x的平方的均值\n", - " # torch.rsqrt是平方根的倒数,这样就得到了RMSNorm的分母部分,再加上eps防止分母为0\n", - " # 最后乘以x,得到RMSNorm的结果\n", - " return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)\n", - "\n", - " def forward(self, x):\n", - " # forward函数是模型的前向传播\n", - " # 首先将输入x转为float类型,然后进行RMSNorm,最后再转回原来的数据类型\n", - " # 最后乘以weight,这是RMSNorm的一个可学习的缩放因子\n", - " output = self._norm(x.float()).type_as(x)\n", - " return output * self.weight" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "torch.Size([1, 50, 288])\n" - ] - } - ], - "source": [ - "norm = LLaMA2RMSNorm(args.dim, args.norm_eps)\n", - "x = torch.randn(1, 50, args.dim)\n", - "output = norm(x)\n", - "print(output.shape)" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, - "outputs": [], - "source": [ - "# 获得旋转嵌入的实部和虚部\n", - "# 注意:此处的dim应为 dim//n_head,因为我们是对每个head进行旋转嵌入\n", - "def precompute_freqs_cis(dim: int, end: int, theta: float = 10000.0):\n", - " # torch.arange(0, dim, 2)[: (dim // 2)].float()生成了一个从0开始,步长为2的序列,长度为dim的一半\n", - " # 然后每个元素除以dim,再取theta的倒数,得到频率\n", - " freqs = 1.0 / (theta ** (torch.arange(0, dim, 2)[: (dim // 2)].float() / dim))\n", - " # 生成一个从0到end的序列,长度为end\n", - " t = torch.arange(end, device=freqs.device)\n", - " # 计算外积,得到一个二维矩阵,每一行是t的元素乘以freqs的元素\n", - " freqs = torch.outer(t, freqs).float()\n", - " # 计算频率的余弦值,得到实部\n", - " freqs_cos = torch.cos(freqs)\n", - " # 计算频率的正弦值,得到虚部\n", - " freqs_sin = torch.sin(freqs)\n", - " return freqs_cos, freqs_sin" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "torch.Size([50, 24]) torch.Size([50, 24])\n" - ] - } - ], - "source": [ - "x = torch.randn(1, 50, 288)\n", - "freqs_cos, freqs_sin = precompute_freqs_cis(288//6, 50)\n", - "print(freqs_cos.shape, freqs_sin.shape)" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "metadata": {}, - "outputs": [], - "source": [ - "# 此函数的作用是将freqs_cis调整为与x的形状相同,以便能够与x进行广播操作\n", - "def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):\n", - " # 获取x的维度数\n", - " ndim = x.ndim\n", - " # 断言,确保1在x的维度范围内\n", - " assert 0 <= 1 < ndim\n", - " # 断言,确保freqs_cis的形状与x的第二维和最后一维相同\n", - " assert freqs_cis.shape == (x.shape[1], x.shape[-1])\n", - " # 构造一个新的形状,除了第二维和最后一维,其他维度都为1,这样做是为了能够将freqs_cis与x进行广播操作\n", - " shape = [d if i == 1 or i == ndim - 1 else 1 for i, d in enumerate(x.shape)]\n", - " # 将freqs_cis调整为新的形状,并返回\n", - " return freqs_cis.view(shape)" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "metadata": {}, - "outputs": [], - "source": [ - "def apply_rotary_emb(\n", - " xq: torch.Tensor,\n", - " xk: torch.Tensor,\n", - " freqs_cos: torch.Tensor,\n", - " freqs_sin: torch.Tensor\n", - ") -> Tuple[torch.Tensor, torch.Tensor]:\n", - "\n", - " # 将查询和键张量转换为浮点数,并重塑形状以分离实部和虚部\n", - " xq_r, xq_i = xq.float().reshape(xq.shape[:-1] + (-1, 2)).unbind(-1)\n", - " xk_r, xk_i = xk.float().reshape(xk.shape[:-1] + (-1, 2)).unbind(-1)\n", - "\n", - " # 重新塑形频率张量以进行广播\n", - " freqs_cos = reshape_for_broadcast(freqs_cos, xq_r)\n", - " freqs_sin = reshape_for_broadcast(freqs_sin, xq_r)\n", - "\n", - " # 应用旋转,分别计算旋转后的实部和虚部\n", - " xq_out_r = xq_r * freqs_cos - xq_i * freqs_sin\n", - " xq_out_i = xq_r * freqs_sin + xq_i * freqs_cos\n", - " xk_out_r = xk_r * freqs_cos - xk_i * freqs_sin\n", - " xk_out_i = xk_r * freqs_sin + xk_i * freqs_cos\n", - "\n", - " # 将最后两个维度合并,并还原为原始张量的形状\n", - " xq_out = torch.stack([xq_out_r, xq_out_i], dim=-1).flatten(3)\n", - " xk_out = torch.stack([xk_out_r, xk_out_i], dim=-1).flatten(3)\n", - "\n", - " return xq_out.type_as(xq), xk_out.type_as(xk)" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "torch.Size([50, 24]) torch.Size([50, 24])\n" - ] - }, - { - "data": { - "text/plain": [ - "(torch.Size([1, 50, 6, 48]), torch.Size([1, 50, 6, 48]))" - ] - }, - "execution_count": 9, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "xq = torch.randn(1, 50, 6, 48) # bs, seq_len, dim//n_head, n_head_dim\n", - "xk = torch.randn(1, 50, 6, 48) # bs, seq_len, dim//n_head, n_head_dim\n", - "\n", - "# 使用 precompute_freqs_cis 函数获取 sin和cos\n", - "cos, sin = precompute_freqs_cis(288//6, 50)\n", - "print(cos.shape, sin.shape)\n", - "xq_out, xk_out = apply_rotary_emb(xq, xk, cos, sin)\n", - "\n", - "xq_out.shape, xk_out.shape" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [], - "source": [ - "def repeat_kv(x: torch.Tensor, n_rep: int) -> torch.Tensor:\n", - " # 获取输入张量的形状:批量大小、序列长度、键/值对头的数量、每个头的维度大小\n", - " bs, slen, n_kv_heads, head_dim = x.shape\n", - " \n", - " # 如果重复次数为1,则不需要重复,直接返回原始张量\n", - " if n_rep == 1:\n", - " return x\n", - " \n", - " # 对张量进行扩展和重塑操作以重复键值对\n", - " return (\n", - " x[:, :, :, None, :] # 在第四个维度(头的维度前)添加一个新的维度\n", - " .expand(bs, slen, n_kv_heads, n_rep, head_dim) # 将新添加的维度扩展到n_rep大小,实现重复的效果\n", - " .reshape(bs, slen, n_kv_heads * n_rep, head_dim) # 重新塑形,合并键/值对头的数量和重复次数的维度\n", - " )\n" - ] - }, - { - "cell_type": "code", - "execution_count": 13, - "metadata": {}, - "outputs": [], - "source": [ - "class LLaMA2Attention(nn.Module):\n", - " def __init__(self, args: ModelArgs):\n", - " super().__init__()\n", - " # 根据是否指定n_kv_heads,确定用于键(key)和值(value)的头的数量。\n", - " self.n_kv_heads = args.n_heads if args.n_kv_heads is None else args.n_kv_heads\n", - " # 确保总头数可以被键值头数整除。\n", - " assert args.n_heads % self.n_kv_heads == 0\n", - "\n", - " # 模型并行处理大小,默认为1。\n", - " model_parallel_size = 1\n", - " # 本地计算头数,等于总头数除以模型并行处理大小。\n", - " self.n_local_heads = args.n_heads // model_parallel_size\n", - " # 本地键值头数,等于键值头数除以模型并行处理大小。\n", - " self.n_local_kv_heads = self.n_kv_heads // model_parallel_size\n", - " # 重复次数,用于扩展键和值的尺寸。\n", - " self.n_rep = self.n_local_heads // self.n_local_kv_heads\n", - " # 每个头的维度,等于模型维度除以头的总数。\n", - " self.head_dim = args.dim // args.n_heads\n", - "\n", - " # 定义权重矩阵。\n", - " self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)\n", - " self.wk = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)\n", - " self.wv = nn.Linear(args.dim, self.n_kv_heads * self.head_dim, bias=False)\n", - " # 输出权重矩阵。\n", - " self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)\n", - "\n", - " # 定义dropout。\n", - " self.attn_dropout = nn.Dropout(args.dropout)\n", - " self.resid_dropout = nn.Dropout(args.dropout)\n", - " # 保存dropout概率。\n", - " self.dropout = args.dropout\n", - "\n", - " # 检查是否使用Flash Attention(需要PyTorch >= 2.0)。\n", - " self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')\n", - " if not self.flash:\n", - " # 若不支持Flash Attention,则使用手动实现的注意力机制,并设置mask。\n", - " print(\"WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0\")\n", - " # 创建一个上三角矩阵,用于遮蔽未来信息。\n", - " mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float(\"-inf\"))\n", - " mask = torch.triu(mask, diagonal=1)\n", - " # 注册为模型的缓冲区\n", - " self.register_buffer(\"mask\", mask)\n", - "\n", - " def forward(self, x: torch.Tensor, freqs_cos: torch.Tensor, freqs_sin: torch.Tensor):\n", - " # 获取批次大小和序列长度,[batch_size, seq_len, dim]\n", - " bsz, seqlen, _ = x.shape\n", - "\n", - " # 计算查询(Q)、键(K)、值(V)。\n", - " xq, xk, xv = self.wq(x), self.wk(x), self.wv(x)\n", - " # 调整形状以适应头的维度。\n", - " xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)\n", - " xk = xk.view(bsz, seqlen, self.n_local_kv_heads, self.head_dim)\n", - " xv = xv.view(bsz, seqlen, self.n_local_kv_heads, self.head_dim)\n", - "\n", - " # 应用旋转位置嵌入(RoPE)。\n", - " xq, xk = apply_rotary_emb(xq, xk, freqs_cos, freqs_sin)\n", - "\n", - " # 对键和值进行扩展以适应重复次数。\n", - " xk = repeat_kv(xk, self.n_rep)\n", - " xv = repeat_kv(xv, self.n_rep)\n", - "\n", - " # 将头作为批次维度处理。\n", - " xq = xq.transpose(1, 2)\n", - " xk = xk.transpose(1, 2)\n", - " xv = xv.transpose(1, 2)\n", - "\n", - " # 根据是否支持Flash Attention,选择实现方式。\n", - " if self.flash:\n", - " # 使用Flash Attention。\n", - " output = torch.nn.functional.scaled_dot_product_attention(xq, xk, xv, attn_mask=None, dropout_p=self.dropout if self.training else 0.0, is_causal=True)\n", - " else:\n", - " # 使用手动实现的注意力机制。\n", - " scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)\n", - " assert hasattr(self, 'mask')\n", - " scores = scores + self.mask[:, :, :seqlen, :seqlen]\n", - " scores = F.softmax(scores.float(), dim=-1).type_as(xq)\n", - " scores = self.attn_dropout(scores)\n", - " output = torch.matmul(scores, xv)\n", - "\n", - " # 恢复时间维度并合并头。\n", - " output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)\n", - "\n", - " # 最终投影回残差流。\n", - " output = self.wo(output)\n", - " output = self.resid_dropout(output)\n", - " return output" - ] - }, - { - "cell_type": "code", - "execution_count": 22, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "torch.Size([50, 24]) torch.Size([50, 24])\n", - "Output shape: torch.Size([1, 50, 288])\n" - ] - } - ], - "source": [ - "# 创建Attention实例\n", - "attention_model = LLaMA2Attention(args)\n", - "\n", - "# 模拟输入数据\n", - "batch_size = 1\n", - "seq_len = 50 # 假设实际使用的序列长度为50\n", - "dim = args.dim\n", - "x = torch.rand(batch_size, seq_len, dim) # 随机生成输入张量\n", - "# freqs_cos = torch.rand(seq_len, dim // 2) # 模拟cos频率,用于RoPE\n", - "# freqs_sin = torch.rand(seq_len, dim // 2) # 模拟sin频率,用于RoPE\n", - "\n", - "freqs_cos, freqs_sin = precompute_freqs_cis(dim//args.n_heads, seq_len)\n", - "\n", - "print(freqs_cos.shape, freqs_sin.shape)\n", - "\n", - "# 运行Attention模型\n", - "output = attention_model(x, freqs_cos, freqs_sin)\n", - "\n", - "# attention出来之后的形状 依然是[batch_size, seq_len, dim]\n", - "print(\"Output shape:\", output.shape)" - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "metadata": {}, - "outputs": [], - "source": [ - "class LLaMA2MLP(nn.Module):\n", - " def __init__(self, dim: int, hidden_dim: int, multiple_of: int, dropout: float):\n", - " super().__init__()\n", - " # 如果没有指定隐藏层的维度,我们将其设置为输入维度的4倍\n", - " # 然后将其减少到2/3,最后确保它是multiple_of的倍数\n", - " if hidden_dim is None:\n", - " hidden_dim = 4 * dim\n", - " hidden_dim = int(2 * hidden_dim / 3)\n", - " hidden_dim = multiple_of * ((hidden_dim + multiple_of - 1) // multiple_of)\n", - " # 定义第一层线性变换,从输入维度到隐藏维度\n", - " self.w1 = nn.Linear(dim, hidden_dim, bias=False)\n", - " # 定义第二层线性变换,从隐藏维度到输入维度\n", - " self.w2 = nn.Linear(hidden_dim, dim, bias=False)\n", - " # 定义第三层线性变换,从输入维度到隐藏维度\n", - " self.w3 = nn.Linear(dim, hidden_dim, bias=False)\n", - " # 定义dropout层,用于防止过拟合\n", - " self.dropout = nn.Dropout(dropout)\n", - "\n", - " def forward(self, x):\n", - " # 前向传播函数\n", - " # 首先,输入x通过第一层线性变换和SILU激活函数\n", - " # 然后,结果乘以输入x通过第三层线性变换的结果\n", - " # 最后,通过第二层线性变换和dropout层\n", - " return self.dropout(self.w2(F.silu(self.w1(x)) * self.w3(x)))" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "torch.Size([1, 50, 288])\n" - ] - } - ], - "source": [ - "# 创建MLP实例\n", - "mlp = LLaMA2MLP(args.dim, args.hidden_dim, args.multiple_of, args.dropout)\n", - "# 随机生成数据\n", - "x = torch.randn(1, 50, 288)\n", - "# 运行MLP模型\n", - "output = mlp(x)\n", - "print(output.shape)" - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "metadata": {}, - "outputs": [], - "source": [ - "class LLaMA2DecoderLayer(nn.Module):\n", - " def __init__(self, layer_id: int, args: ModelArgs):\n", - " super().__init__()\n", - " # 定义多头注意力的头数\n", - " self.n_heads = args.n_heads\n", - " # 定义输入维度\n", - " self.dim = args.dim\n", - " # 定义每个头的维度,等于输入维度除以头数\n", - " self.head_dim = args.dim // args.n_heads\n", - " # 定义LLaMA2Attention对象,用于进行多头注意力计算\n", - " self.attention = LLaMA2Attention(args)\n", - " # 定义LLaMAMLP对象,用于进行前馈神经网络计算\n", - " self.feed_forward = LLaMA2MLP(\n", - " dim=args.dim,\n", - " hidden_dim=args.hidden_dim,\n", - " multiple_of=args.multiple_of,\n", - " dropout=args.dropout,\n", - " )\n", - " # 定义层的ID\n", - " self.layer_id = layer_id\n", - " # 定义注意力计算的归一化层\n", - " self.attention_norm = LLaMA2RMSNorm(args.dim, eps=args.norm_eps)\n", - " # 定义前馈神经网络计算的归一化层\n", - " self.ffn_norm = LLaMA2RMSNorm(args.dim, eps=args.norm_eps)\n", - "\n", - " def forward(self, x, freqs_cos, freqs_sin):\n", - " # 前向传播函数\n", - " # 首先,输入x经过注意力归一化层,然后进行注意力计算,结果与输入x相加得到h\n", - " # 然后,h经过前馈神经网络归一化层,然后进行前馈神经网络计算,结果与h相加得到输出\n", - " h = x + self.attention.forward(self.attention_norm(x), freqs_cos, freqs_sin)\n", - " out = h + self.feed_forward.forward(self.ffn_norm(h))\n", - " return out" - ] - }, - { - "cell_type": "code", - "execution_count": 25, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "torch.Size([1, 50, 288]) torch.Size([50, 24]) torch.Size([50, 24])\n", - "torch.Size([1, 50, 288])\n" - ] - } - ], - "source": [ - "# LLaMADecoderLayer.forward 函数的输入是 x, freqs_cos, freqs_sin, 其中x的形状是[batch_size, seq_len, dim]\n", - "# 由于llama2使用了GQA Attention,所以precompute_freqs_cis函数输入参数应该为dim//n_heads,seq_len、\n", - "\n", - "# 创建LLaMADecoderLayer实例\n", - "decoderlayer = LLaMA2DecoderLayer(0, args)\n", - "\n", - "# 模拟输入数据\n", - "dim = args.dim\n", - "seq_len = 50\n", - "\n", - "x = torch.randn(1, seq_len, dim) # [bs, seq_len, dim]\n", - "\n", - "freqs_cos, freqs_sin = precompute_freqs_cis(dim//args.n_heads, seq_len)\n", - "print(x.shape, freqs_cos.shape, freqs_sin.shape)\n", - "\n", - "out = decoderlayer(x, freqs_cos, freqs_sin)\n", - "\n", - "print(out.shape) # 形状和输入的x一样 [batch_size, seq_len, dim]" - ] - }, - { - "cell_type": "code", - "execution_count": 33, - "metadata": {}, - "outputs": [], - "source": [ - "class LLaMA2Model(nn.Module):\n", - " last_loss: Optional[torch.Tensor]\n", - "\n", - " def __init__(self, args: ModelArgs):\n", - " super().__init__()\n", - " # 初始化模型参数\n", - " self.args = args\n", - " # 词汇表大小\n", - " self.vocab_size = args.vocab_size\n", - " # 层数\n", - " self.n_layers = args.n_layers\n", - "\n", - " # 词嵌入层\n", - " self.tok_embeddings = nn.Embedding(args.vocab_size, args.dim)\n", - " # Dropout层\n", - " self.dropout = nn.Dropout(args.dropout)\n", - " # Decoder层\n", - " self.layers = torch.nn.ModuleList()\n", - " for layer_id in range(args.n_layers):\n", - " self.layers.append(LLaMADecoderLayer(layer_id, args))\n", - " # 归一化层\n", - " self.norm = LLaMA2RMSNorm(args.dim, eps=args.norm_eps)\n", - " # 输出层\n", - " self.output = nn.Linear(args.dim, args.vocab_size, bias=False)\n", - "\n", - " # 将词嵌入层的权重与输出层的权重共享\n", - " self.tok_embeddings.weight = self.output.weight \n", - "\n", - " # 预计算相对位置嵌入的频率\n", - " freqs_cos, freqs_sin = precompute_freqs_cis(self.args.dim // self.args.n_heads, self.args.max_seq_len)\n", - " self.register_buffer(\"freqs_cos\", freqs_cos, persistent=False)\n", - " self.register_buffer(\"freqs_sin\", freqs_sin, persistent=False)\n", - "\n", - " # 初始化所有权重\n", - " self.apply(self._init_weights)\n", - " # 对残差投影进行特殊的缩放初始化\n", - " for pn, p in self.named_parameters():\n", - " if pn.endswith('w3.weight') or pn.endswith('wo.weight'):\n", - " torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2 * args.n_layers))\n", - "\n", - " # 初始化最后一次前向传播的损失属性\n", - " self.last_loss = None\n", - "\n", - " def _init_weights(self, module):\n", - " # 初始化权重的函数\n", - " if isinstance(module, nn.Linear):\n", - " torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)\n", - " if module.bias is not None:\n", - " torch.nn.init.zeros_(module.bias)\n", - " elif isinstance(module, nn.Embedding):\n", - " torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)\n", - " \n", - " def forward(self, tokens: torch.Tensor, targets: Optional[torch.Tensor] = None) -> torch.Tensor:\n", - " # 前向传播函数\n", - " _bsz, seqlen = tokens.shape\n", - " # 通过词嵌入层和Dropout层\n", - " h = self.tok_embeddings(tokens)\n", - " h = self.dropout(h)\n", - " # 获取相对位置嵌入的频率\n", - " freqs_cos = self.freqs_cos[:seqlen]\n", - " freqs_sin = self.freqs_sin[:seqlen]\n", - "\n", - " # 通过Decoder层\n", - " for layer in self.layers:\n", - " h = layer(h, freqs_cos, freqs_sin)\n", - " # 通过归一化层\n", - " h = self.norm(h)\n", - "\n", - " if targets is not None:\n", - " # 如果给定了目标,计算损失\n", - " logits = self.output(h)\n", - " self.last_loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)\n", - " else:\n", - " # 推理时的小优化:只对最后一个位置的输出进行前向传播\n", - " logits = self.output(h[:, [-1], :]) \n", - " self.last_loss = None\n", - "\n", - " return logits\n", - " \n", - " def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):\n", - " # 获取所有需要更新的参数\n", - " param_dict = {pn: p for pn, p in self.named_parameters() if p.requires_grad}\n", - " \n", - " # 将参数分为需要权重衰减和不需要权重衰减的两组\n", - " decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]\n", - " nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]\n", - " optim_groups = [\n", - " {'params': decay_params, 'weight_decay': weight_decay},\n", - " {'params': nodecay_params, 'weight_decay': 0.0}\n", - " ]\n", - " \n", - " # 打印参数数量信息\n", - " num_decay_params = sum(p.numel() for p in decay_params)\n", - " num_nodecay_params = sum(p.numel() for p in nodecay_params)\n", - " print(f\"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters\")\n", - " print(f\"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters\")\n", - " \n", - " # 根据设备类型选择使用标准 AdamW 或其融合版本\n", - " fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters\n", - " use_fused = fused_available and device_type == 'cuda'\n", - " extra_args = dict(fused=True) if use_fused else dict()\n", - " optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)\n", - " print(f\"using fused AdamW: {use_fused}\")\n", - "\n", - " return optimizer\n", - " \n", - " def estimate_mfu(self, fwdbwd_per_iter, dt):\n", - " \"\"\" 估计模型的 FLOPs 利用率 (MFU) 单位:A100 bfloat16 的峰值 FLOPS \"\"\"\n", - " # 计算每次迭代的 FLOPs 数量(参考 PaLM 论文的附录 B)\n", - " N = sum(p.numel() for p in self.parameters())\n", - " cfg = self.args\n", - " L, H, Q, T = cfg.n_layers, cfg.n_heads, cfg.dim//cfg.n_heads, cfg.max_seq_len\n", - " flops_per_token = 6*N + 12*L*H*Q*T\n", - " flops_per_fwdbwd = flops_per_token * T\n", - " flops_per_iter = flops_per_fwdbwd * fwdbwd_per_iter\n", - " \n", - " # 将 FLOPs 吞吐量表示为 A100 bfloat16 峰值 FLOPS 的比例\n", - " flops_achieved = flops_per_iter * (1.0/dt) # 每秒计算的 FLOPs\n", - " flops_promised = 312e12 # A100 GPU bfloat16 的峰值 FLOPS 为 312 TFLOPS\n", - " mfu = flops_achieved / flops_promised\n", - " return mfu\n", - " \n", - " @torch.inference_mode()\n", - " def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):\n", - " \"\"\"\n", - " 给定输入序列 idx(形状为 (bz,seq_len) 的长整型张量),通过多次生成新 token 来完成序列。\n", - " 在 model.eval() 模式下运行。效率较低的采样版本,没有使用键k/v cache。\n", - " \"\"\"\n", - " for _ in range(max_new_tokens):\n", - " # 如果序列上下文过长,截断它到最大长度\n", - " idx_cond = idx if idx.size(1) <= self.args.max_seq_len else idx[:, -self.args.max_seq_len:]\n", - " \n", - " # 前向传播获取序列中最后一个位置的 logits\n", - " logits = self(idx_cond)\n", - " logits = logits[:, -1, :] # 只保留最后一个时间步的输出\n", - " \n", - " if temperature == 0.0:\n", - " # 选择最有可能的索引\n", - " _, idx_next = torch.topk(logits, k=1, dim=-1)\n", - " else:\n", - " # 缩放 logits 并应用 softmax\n", - " logits = logits / temperature\n", - " if top_k is not None:\n", - " v, _ = torch.topk(logits, min(top_k, logits.size(-1)))\n", - " logits[logits < v[:, [-1]]] = -float('Inf')\n", - " probs = F.softmax(logits, dim=-1)\n", - " idx_next = torch.multinomial(probs, num_samples=1)\n", - " \n", - " # 将采样的索引添加到序列中并继续\n", - " idx = torch.cat((idx, idx_next), dim=1)\n", - "\n", - " return idx" - ] - }, - { - "cell_type": "code", - "execution_count": 37, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Number of parameters: 15191712\n", - "torch.Size([1, 1, 32000])\n" - ] - } - ], - "source": [ - "# LLaMA2Model.forward 接受两个参数,tokens和targets,其中tokens是输入的张量, 应为int类型\n", - "x = torch.randint(0, 32000, (1, 50)) # [bs, seq_len]\n", - "# 实例化LLaMA2Model\n", - "model = LLaMA2Model(args=args)\n", - "# 计算model的全部参数\n", - "num_params = sum(p.numel() for p in model.parameters())\n", - "print('Number of parameters:', num_params)\n", - "\n", - "out = model(x)\n", - "print(out.shape) # [batch_size, 1, vocab_size]" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "nlp", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.13" - } - }, - "nbformat": 4, - "nbformat_minor": 2 -}