Files
happy-llm/docs/chapter6/code/pretrain.py
2025-04-25 10:04:43 +08:00

257 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
'''
预训练脚本
'''
import logging
import math
import os
import sys
from dataclasses import dataclass, field
from torchdata.datapipes.iter import IterableWrapper
from itertools import chain
import deepspeed
from typing import Optional,List
import datasets
import pandas as pd
import torch
from datasets import load_dataset
import transformers
from transformers import (
AutoConfig,
AutoModelForCausalLM,
AutoTokenizer,
HfArgumentParser,
Trainer,
TrainingArguments,
default_data_collator,
set_seed,
)
import datetime
from transformers.testing_utils import CaptureLogger
from transformers.trainer_utils import get_last_checkpoint
import wandb
logger = logging.getLogger(__name__)
# 超参类
@dataclass
class ModelArguments:
"""
关于模型的参数
"""
model_name_or_path: Optional[str] = field(
default=None,
metadata={
"help": (
"后训练使用,为预训练模型参数地址"
)
},
)
config_name: Optional[str] = field(
default=None, metadata={"help": "预训练使用Config 文件地址"}
)
tokenizer_name: Optional[str] = field(
default=None, metadata={"help": "预训练 Tokenizer 地址"}
)
torch_dtype: Optional[str] = field(
default=None,
metadata={
"help": (
"模型训练使用的数据类型,推荐 bfloat16"
),
"choices": ["auto", "bfloat16", "float16", "float32"],
},
)
@dataclass
class DataTrainingArguments:
"""
关于训练的参数
"""
train_files: Optional[List[str]] = field(default=None, metadata={"help": "训练数据路径"})
block_size: Optional[int] = field(
default=None,
metadata={
"help": (
"设置的文本块长度"
)
},
)
preprocessing_num_workers: Optional[int] = field(
default=None,
metadata={"help": "预处理使用线程数."},
)
def main():
# 加载脚本参数
parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
# 初始化 WandB
wandb.init(project="pretrain", name="from_scrach")
# 设置日志
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
# 将日志级别设置为 INFO
transformers.utils.logging.set_verbosity_info()
log_level = training_args.get_process_log_level()
logger.setLevel(log_level)
datasets.utils.logging.set_verbosity(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()
# 训练整体情况记录
logger.warning(
f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}"
+ f"distributed training: {bool(training_args.local_rank != -1)}, 16-bits training: {training_args.fp16}"
)
logger.info(f"Training/evaluation parameters {training_args}")
# 检查 checkpoint
last_checkpoint = None
if os.path.isdir(training_args.output_dir):
last_checkpoint = get_last_checkpoint(training_args.output_dir)
if last_checkpoint is None and len(os.listdir(training_args.output_dir)) > 0:
raise ValueError(
f"输出路径 ({training_args.output_dir}) 非空 "
)
elif last_checkpoint is not None and training_args.resume_from_checkpoint is None:
logger.info(
f"{last_checkpoint}恢复训练"
)
# 设置随机数种子.
set_seed(training_args.seed)
# 初始化模型
if model_args.config_name is not None:
# from scrach
config = AutoConfig.from_pretrained(model_args.config_name)
logger.warning("你正在从零初始化一个模型")
logger.info(f"模型参数配置地址:{model_args.config_name}")
logger.info(f"模型参数:{config}")
model = AutoModelForCausalLM.from_config(config,trust_remote_code=True)
n_params = sum({p.data_ptr(): p.numel() for p in model.parameters()}.values())
logger.info(f"预训练一个新模型 - Total size={n_params/2**20:.2f}M params")
elif model_args.model_name_or_path is not None:
logger.warning("你正在初始化一个预训练模型")
logger.info(f"模型参数地址:{model_args.model_name_or_path}")
model = AutoModelForCausalLM.from_pretrained(model_args.model_name_or_path,trust_remote_code=True)
n_params = sum({p.data_ptr(): p.numel() for p in model.parameters()}.values())
logger.info(f"继承一个预训练模型 - Total size={n_params/2**20:.2f}M params")
else:
logger.error("config_name 和 model_name_or_path 不能均为空")
raise ValueError("config_name 和 model_name_or_path 不能均为空")
# 初始化 Tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_args.tokenizer_name)
logger.info("完成 tokenzier 加载")
logger.info(f"tokenzier 配置地址:{model_args.tokenizer_name}")
# 加载预训练数据
ds = load_dataset('json', data_files=data_args.train_files)
logger.info("完成训练集加载")
logger.info(f"训练集地址:{data_args.train_files}")
logger.info(f'训练文件总数:{len(ds["train"])}')
# logger.info(f"训练集采样:{ds["train"][0]}")
# 文本 tokenize
column_names = list(ds["train"].features)
logger.info('训练集特征:', column_names)
text_column_name = "text" if "text" in column_names else column_names[0]
# tokenize 函数
def tokenize_function(examples):
output = tokenizer([item for item in examples[text_column_name]])
return output
# 仅主进程进行数据预处理
with training_args.main_process_first(desc="dataset map tokenization"):
tokenized_datasets = ds.map(
tokenize_function,
batched=True,
num_proc=data_args.preprocessing_num_workers,
remove_columns=column_names,
load_from_cache_file=True,
desc="Running tokenizer on dataset"
)
# 文本切块
if data_args.block_size is None:
block_size = tokenizer.model_max_length
if block_size > 1024:
logger.warning(
"tokenizer 支持大于 1K 的上下文长度,默认设置为 1K"
)
block_size = 1024
else:
if data_args.block_size > tokenizer.model_max_length:
logger.warning(
f"设定的块长为 ({data_args.block_size}) ,大于模型的上下文长度"
f"将块长设置为模型上下文长度:{tokenizer.model_max_length}."
)
block_size = min(data_args.block_size, tokenizer.model_max_length)
def group_texts(examples):
# 将文本段拼接起来
concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()}
# 计算拼起来的整体长度
total_length = len(concatenated_examples[list(examples.keys())[0]])
# 如果长度太长,进行分块
if total_length >= block_size:
total_length = (total_length // block_size) * block_size
result = {
k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
result["labels"] = result["input_ids"].copy()
return result
with training_args.main_process_first(desc="文本分块"):
lm_datasets = tokenized_datasets.map(
group_texts,
batched=True,
num_proc=data_args.preprocessing_num_workers,
load_from_cache_file=True,
desc=f"文本分块到{block_size}",
batch_size = 40000,
)
logger.info("完成数据预处理")
train_dataset = lm_datasets["train"]
logger.info("初始化 Trainer")
trainer = Trainer(
model=model,
args=training_args,
train_dataset= IterableWrapper(train_dataset),
tokenizer=tokenizer,
data_collator=default_data_collator
)
# 从 checkpoint 加载
checkpoint = None
if training_args.resume_from_checkpoint is not None:
checkpoint = training_args.resume_from_checkpoint
elif last_checkpoint is not None:
checkpoint = last_checkpoint
logger.info("开始训练")
train_result = trainer.train(resume_from_checkpoint=checkpoint)
trainer.save_model()
if __name__ == "__main__":
main()