seq搜索引擎优化至少包括那几步?(关于第一个就是数据增强阶段的一些事儿,你知道吗?)
优采云 发布时间: 2021-11-07 03:06seq搜索引擎优化至少包括那几步?(关于第一个就是数据增强阶段的一些事儿,你知道吗?)
TinyBERT提供了一个经过General Distillation阶段的checkpoint,可以认为是一个小型的BERT,包括6L786H版本和4L312H版本。而我们的后续复刻是基于4L312H v2版本。值得注意的是,TinyBERT 对任务数据集进行了数据增强操作,通过基于 Glove's Embedding Distance 和 BERT MLM 预测替换的相似词替换,将原创数据集扩大到 20 倍。我们遇到的第一个错误是在数据增强阶段。
数据增强中的错误
我们可以根据官方代码对数据进行增强,但是在QNLI上会报错:
索引错误:索引 514 超出维度 1,大小为 512
当数据增强一半时程序崩溃。为什么?
很简单,因为数据增强代码BERT MLM换词模块对很长(> 512)个句子,导致下标越界)没有特殊处理,具体请参考#Issue50。
只需在对应函数中判断边界即可:
def _masked_language_model(self, sent, word_pieces, mask_id):
if mask_id > 511: # if mask id is longer than max length
return []
tokenized_text = self.tokenizer.tokenize(sent)
tokenized_text = ['[CLS]'] + tokenized_text
tokenized_len = len(tokenized_text)
tokenized_text = word_pieces + ['[SEP]'] + tokenized_text[1:] + ['[SEP]']
segments_ids = [0] * (tokenized_len + 1) + [1] * (len(tokenized_text) - tokenized_len - 1)
if len(tokenized_text) > 512: # truncation
tokenized_text = tokenized_text[:512]
segments_ids = segments_ids[:512]
token_ids = self.tokenizer.convert_tokens_to_ids(tokenized_text)
tokens_tensor = torch.tensor([token_ids]).to(device)
segments_tensor = torch.tensor([segments_ids]).to(device)
self.model.to(device)
predictions = self.model(tokens_tensor, segments_tensor)
word_candidates = torch.argsort(predictions[0, mask_id], descending=True)[:self.M].tolist()
word_candidates = self.tokenizer.convert_ids_to_tokens(word_candidates)
return list(filter(lambda x: x.find("##"), word_candidates))
数据并行加速
在我们辛苦快乐地完成了数据增强之后,下一步就是在Task Specific Distillation中进行Step 1,General Distillation。对于一些像MRPC这样的小数据集,20倍增长后的数据量仍然不到80k,所以训练速度还是很快的,单卡大概半天可以跑20轮。但是对于像MNLI(390k)这样GLUE中最大的数据集,20倍增强的数据集(增强用了2天左右),如果用单卡训练10轮,可能需要半个月。到时候,黄花菜怕是不冷了。所以我计划使用多卡进行培训。乍一看,官方实现通过nn.DataParallel支持Doka。好吧,直接CUDA_VISIBLE_DEVICES="0,1,2,3"加载4张卡。不知道跑不跑 加载数据(标记化,填充)花了 1 个小时,我终于开始了。当我打开 nvidia-smi 时,我震惊了。GPU 利用率在 50% 左右。看看预计的时间,大概是21小时。在一轮中,10 个纪元四舍五入为一周半。男孩,我还能试验这个吗?这时候去查看PyTorch文档,发现PyTorch不再推荐使用nn。数据并行,为什么?主要原因是DataParallel的实现是一个单一的进程。每次主卡读取数据,然后将其发送到其他卡。这部分故障会带来额外的计算开销,并会导致主卡的GPU内存被占用。明显高于其他卡,导致潜在的批量大小限制;此外,在这种模式下,其他 GPU 必须在计算完成后发送回主卡进行同步。这一步会受到 GIL(全局解释器锁)的限制,进一步降低效率。另外,还有一些DataParallel不支持的功能,比如多机、模型切片,但是另一个DistributedDataParallel模块支持。所以,废话少说,你得把原来的TinyBERT DataParallel(DP)改成DistributedDataParallel(DDP)。那么,DP转DDP需要几步?答:大概,就这么多步骤。核心代码是做一些初始化,用DDP替换DP:还有一些DataParallel 不支持的功能,比如多机和模型切片,但另一个DistributedDataParallel 模块支持。所以,废话少说,你得把原来的TinyBERT DataParallel(DP)改成DistributedDataParallel(DDP)。那么,DP转DDP需要几步?答:大概,就这么多步骤。核心代码是做一些初始化,用DDP替换DP:还有一些DataParallel 不支持的功能,比如多机和模型切片,但另一个DistributedDataParallel 模块支持。所以,废话少说,你得把原来的TinyBERT DataParallel(DP)改成DistributedDataParallel(DDP)。那么,DP转DDP需要几步?答:大概,就这么多步骤。核心代码是做一些初始化,用DDP替换DP:
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
# 给 parser 增加一个 local rank 参数来在启动的时候传入 rank
parser.add_argument('--local_rank',
type=int,
default=-1)
# ...
# 初始化
logger.info("Initializing Distributed Environment")
torch.cuda.set_device(args.local_rank)
dist.init_process_group(backend="nccl")
# 设置 devicec
local_rank = args.local_rank
torch.cuda.set_device(local_rank)
# ...
# 初始化模型 并且 放到 device 上
student_model = TinyBertForSequenceClassification.from_pretrained(args.student_model, num_labels=num_labels).to(device)
teacher_model = TinyBertForSequenceClassification.from_pretrained(args.teacher_model, num_labels=num_labels).to(device)
# 用 DDP 包裹模型
student_model = DDP(student_model, device_ids=[local_rank], output_device=local_rank)
teacher_model = DDP(teacher_model, device_ids=[local_rank], output_device=local_rank)
# ..
# 用 DistributedSampler 替换原来的 Random Sampler
train_sampler = torch.utils.data.DistributedSampler(train_data)
然后,你就大功告成了,一键开始:
GPU=”0,1,2,3”
CUDA_VISIBLE_DEVICEES=$GPU python -m torch.distributed.launch –n_proc_per_node 4 task_disti.py
创业成功了吗?模型再次开始处理数据......
一小时后,机器突然卡住,程序日志停止。我打开htop看了看。好家伙,256G内存满了,程序是D状态。怎么了?
数据加载加速
我先尝试了少量数据,下采样到10k,程序运行没问题,DDP速度很快;我试过单卡加载,虽然又加载了一个小时,但是还好,程序还是可以运行的,那么,问题是怎么发生的呢?单卡,我看了一下加载全量数据后的内存使用情况。大约是60G。考虑到DDP是多进程的,每个进程必须独立加载数据。4张卡4个进程,大约是250G内存,所以内存爆了,后续数据的io卡住了(无法从磁盘加载到内存),导致程序D状态。看看下一组的机器,最大的是250G的内存。也就是说,如果我只用3张卡,那么我可以运行,但是如果其他人上来启动程序并吃掉一部分内存,那么很有可能内存会爆炸,然后大家的程序都被抹掉了,这不是很好。一个不太优雅的解决方案是将数据切成块,然后读取一小段训练,然后读取下一段,再次训练,再次读取。我咨询了小组中的一位高级研究员。另一种方式是实现一个数据读取程序,将数据存储在磁盘上,每次使用时将其加载到内存中,从而避免内存爆炸的问题。好吧,让我们做吧,但你不能从头开始制造轮子,对吧?哲哥提到了拥抱脸(yyds)的数据集可以支持这个功能。查了一下文档,发现他实现了一个基于pyarrow的内存映射数据读取。根据我在 Huggingface Transformer 方面的经验,它似乎能够实现这一点。
首先,加载增强数据。datasets提供的load_dataset函数最接近load_dataset('csv', data_file),然后我们就可以逐列获取数据并进行预处理。写了一会儿,发现读了一部分数据后,总是列数不对的错误。我猜原创的 MNLI 数据集不能保证每一列都在那里。我查看了在MnliProcessor中处理的代码,发现Line[8]和line[9]被写成sentence_a和sentence_b。无奈之下,只能用最粗暴的方式用文本方式读入,每一行都是一条数据,然后拆分:
from datasets import
processor = processors[task_name]()
output_mode = output_modes[task_name]
label_list = processor.get_labels()
num_labels = len(label_list)
tokenizer = BertTokenizer.from_pretrained(args.student_model, do_lower_case=args.do_lower_case)
# 用 text
mnli_datasets = load_dataset("text", data_files=os.path.join(args.data_dir, "train_aug.tsv"))
label_classes = processor.get_labels()
label_map = {label: i for i, label in enumerate(label_classes)}
def preprocess_func(examples, max_seq_length=args.max_seq_length):
splits = [e.split('\t') for e in examples['text']] # split
# tokenize for sent1 & sent2
tokens_s1 = [tokenizer.tokenize(e[8]) for e in splits]
tokens_s2 = [tokenizer.tokenize(e[9]) for e in splits]
for t1, t2 in zip(tokens_s1, tokens_s2):
truncate_seq_pair(t1, t2, max_length=max_seq_length - 3)
input_ids_list = []
input_mask_list = []
segment_ids_list = []
seq_length_list = []
labels_list = []
labels = [e[-1] for e in splits] # last column is label column
for token_a, token_b, l in zip(tokens_s1, tokens_s2, labels): # zip(tokens_as, tokens_bs):
tokens = ["[CLS]"] + token_a + ["[SEP]"]
segment_ids = [0] * len(tokens)
tokens += token_b + ["[SEP]"]
segment_ids += [1] * (len(token_b) + 1)
input_ids = tokenizer.convert_tokens_to_ids(tokens) # tokenize to id
input_mask = [1] * len(input_ids)
seq_length = len(input_ids)
padding = [0] * (max_seq_length - len(input_ids))
input_ids += padding
input_mask += padding
segment_ids += padding
assert len(input_ids) == max_seq_length
assert len(input_mask) == max_seq_length
assert len(segment_ids) == max_seq_length
input_ids_list.append(input_ids)
input_mask_list.append(input_mask)
segment_ids_list.append(segment_ids)
seq_length_list.append(seq_length)
labels_list.append(label_map[l])
results = {"input_ids": input_ids_list,
"input_mask": input_mask_list,
"segment_ids": segment_ids_list,
"seq_length": seq_length_list,
"label_ids": labels_list}
return results
# map datasets
mnli_datasets = mnli_datasets.map(preprocess_func, batched=True)
# remove column
train_data = mnli_datasets['train'].remove_columns('text')
写完这个preprocess_func,我觉得胜利就在眼前,但还有几个坑需要解决:
inputs = {}
for k, v in batch.items():
if isinstance(v, torch.Tensor):
inputs[k] = v.to(device)
elif isinstance(v, List):
inputs[k] = torch.stack(v, dim=1).to(device)
到目前为止,只需将之前代码的 train_data 替换为当前版本即可。
另外,为了进一步加速,我还集成了混合精度。现在Pytorch和自己支持混合精度,代码量很少,但是有一个陷阱就是loss的计算必须用auto()包裹,同时,所有模型的输出都必须参与loss的计算,对于只做预测或者隐藏状态对齐的loss不友好,所以只能手动计算一个系数为0的额外loss项(所以他参与了Training但不影响梯度)。