kaggle项目地址:https://www.kaggle.com/competitions/nlp-getting-started
结果:0.83
直接贴代码
代码结构
完整代码
1、自定义配置类
my_config.py
# 全局配置
import torch
class Config():
def __init__(self):
self.batch_size = 16
# 文本的最大长度
self.text_max_length = 128
# 总训练的epochs数,我只是随便定义了个数
self.epochs = 10
# 取多少训练集的数据作为验证集
self.validation_ratio = 0.1
# 设备
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 每多少步,打印一次loss
self.log_per_step = 50
2、自定义数据集类
my_dataset.py
import pandas as pd
import torch
from torch.utils.data import Dataset
class MyDataset(Dataset):
def __init__(self, mode='train', pd_data=None):
super(MyDataset, self).__init__()
self.mode = mode
# 拿到对应的数据
if mode == 'train':
self.dataset = pd_data
elif mode == 'validation':
self.dataset = pd_data
elif mode == 'test':
# 如果是测试模式,则返回推文和id。拿id做target主要是方便后面写入结果。
self.dataset = pd_data
else:
raise Exception("Unknown mode {}".format(mode))
def __getitem__(self, index):
# 取第index条
data = self.dataset.iloc[index]
# 取其推文,做个简单的数据清理
source = data['text'].replace("#", "").replace("@", "")
# 取对应的推文
if self.mode == 'test':
# 如果是test,将id做为target
target = data['id']
else:
target = data['target']
# 返回推文和target
return source, target
def __len__(self):
return len(self.dataset)
def load_train_data(train_data_path, cfg):
pd_data = pd.read_csv(train_data_path)[['text', 'target']]
pd_validation_data = pd_data.sample(frac=cfg.validation_ratio) # 随机取10%的数据作为验证集
pd_train_data = pd_data[~pd_data.index.isin(pd_validation_data.index)] # 剩下的数据作为训练集
return pd_train_data, pd_validation_data
def load_test_data(test_data_path):
# pd_data = pd.read_csv('data/test.csv')[['text', 'id']]
pd_data = pd.read_csv(test_data_path)[['text', 'id']]
return pd_data
def my_collate_fn(tokenizer, cfg, batch):
"""
将一个batch的文本句子转成tensor,并组成batch。
:param batch: [('aaa', 1), ('bbb', 1), ('ccc', 0)]
:return: 处理后的结果,例如:
src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
target:[1, 1, 0, ...]
"""
text, target = zip(*batch) # zip(*) 将元组里的每一列抽成单独一个元组。 text = ('aaa', 'bbb', 'ccc'), target = (1, 1, 0)
text, target = list(text), list(target) # tuple 转 list。text = ['aaa', 'bbb', 'ccc'], target = [1, 1, 0]
# src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
# padding='max_length' 不够长度的进行填充
# return_tensors='pt' 返回的是pytorch tensor
# truncation=True 长度过长的进行裁剪
src = tokenizer(text, padding='max_length', max_length=cfg.text_max_length, return_tensors='pt', truncation=True)
# src={'input_ids': tensor([[], [], [], ...]), 'attention_mask': tensor([[1,1,1,...], [1,1,1,...], [1,1,1,...], ...]), 'token_type_ids': tensor([[0,0,0,...], [0,0,0,...], [0,0,0,...], ...]}
# torch.LongTensor(target)=tensor([1, 1, 0, ...])
# input_ids,它是每个 token 的 id 表示,101代表[CLS],102代表[SEP],0代表[PAD]
# token_type_ids,它是一个 binary mask,用于标识 token 属于哪个 sequence。如果我们只有一个 sequence,那么所有的 token 类型 id 都将为 0。对于文本分类任务,token_type_ids是 BERT 模型的可选输入参数。
# attention_mask,它是一个 binary mask,用于标识 token 是真实 word 还是只是由填充得到。如果 token 包含 [CLS]、[SEP] 或任何真实单词,则 mask 将为 1。如果 token 只是 [PAD] 填充,则 mask 将为 0
return src, torch.LongTensor(target)
3、自定义模型类
my_model.py
from torch import nn
from transformers import AutoModel
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
# 加载bert模型
self.bert = AutoModel.from_pretrained("K:/workspace-sync/models/bert-base-uncased")
# 最后的预测层
# 以下代码等价于
self.predictor = nn.Sequential(
nn.Linear(768, 256),
nn.ReLU(),
nn.Linear(256, 1),
nn.Sigmoid()
) # 等价于下面的代码
# self.predictor = nn.Sequential()
# self.predictor.add_module('linear1', nn.Linear(768, 256))
# self.predictor.add_module('relu', nn.ReLU())
# self.predictor.add_module('linear2', nn.Linear(256, 1))
# self.predictor.add_module('sigmoid', nn.Sigmoid())
def forward(self, src):
"""
:param src: 分词后的推文数据
"""
# 将src直接序列解包传入bert,因为bert和tokenizer是一套的,所以可以这么做。
# 得到encoder的输出,用最前面[CLS]的输出作为最终线性层的输入
# last_hidden_state[:, 0, :] 取出每个batch的第一个token的输出,即[CLS]的输出
outputs = self.bert(**src).last_hidden_state[:, 0, :] # last_hidden_state的维度是(batch_size, seq_len, hidden_size)
# 使用线性层来做最终的预测
return self.predictor(outputs) # 输出的维度是(batch_size, 1)
4、训练类
run_train.py
import torch
from torch import nn
from torch.utils.data import DataLoader
from transformers import AutoTokenizer # 用于加载bert模型的分词器
from my_config import Config
from my_dataset import MyDataset, load_train_data
from my_model import MyModel
# 1. 定义一些常用的函数
# 将一个batch的文本句子转成tensor,并组成batch。
def my_collate_fn(batch):
"""
将一个batch的文本句子转成tensor,并组成batch。
:param batch: [('aaa', 1), ('bbb', 1), ('ccc', 0)]
:return: 处理后的结果,例如:
src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
target:[1, 1, 0, ...]
"""
text, target = zip(*batch) # zip(*) 将元组里的每一列抽成单独一个元组。 text = ('aaa', 'bbb', 'ccc'), target = (1, 1, 0)
text, target = list(text), list(target) # tuple 转 list。text = ['aaa', 'bbb', 'ccc'], target = [1, 1, 0]
# src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
# padding='max_length' 不够长度的进行填充
# return_tensors='pt' 返回的是pytorch tensor
# truncation=True 长度过长的进行裁剪
src = tokenizer(text, padding='max_length', max_length=cfg.text_max_length, return_tensors='pt', truncation=True)
# src={'input_ids': tensor([[], [], [], ...]), 'attention_mask': tensor([[1,1,1,...], [1,1,1,...], [1,1,1,...], ...]), 'token_type_ids': tensor([[0,0,0,...], [0,0,0,...], [0,0,0,...], ...]}
# torch.LongTensor(target)=tensor([1, 1, 0, ...])
# input_ids,它是每个 token 的 id 表示,101代表[CLS],102代表[SEP],0代表[PAD]
# token_type_ids,它是一个 binary mask,用于标识 token 属于哪个 sequence。如果我们只有一个 sequence,那么所有的 token 类型 id 都将为 0。对于文本分类任务,token_type_ids是 BERT 模型的可选输入参数。
# attention_mask,它是一个 binary mask,用于标识 token 是真实 word 还是只是由填充得到。如果 token 包含 [CLS]、[SEP] 或任何真实单词,则 mask 将为 1。如果 token 只是 [PAD] 填充,则 mask 将为 0
return src, torch.LongTensor(target)
# 由于inputs是字典类型的,定义一个辅助函数帮助to(device)
def to_device(dict_tensors):
result_tensors = {}
for key, value in dict_tensors.items():
result_tensors[key] = value.to(cfg.device)
return result_tensors
# 求准确率和loss
def validate():
model.eval()
total_loss = 0.
total_correct = 0
for inputs, targets in validation_loader:
inputs, targets = to_device(inputs), targets.to(cfg.device)
outputs = model(inputs) # 前向传播
loss = criteria(outputs.view(-1), targets.float()) # 计算loss
total_loss += float(loss)
correct_num = (((
outputs >= 0.5).float() * 1).flatten() == targets).sum() # flatten()将tensor展平,==判断是否相等,sum()统计True的个数
# 以上代码等价于四行代码
# correct_num = torch.tensor(0)
# for i in range(len(targets)):
# if (outputs[i] >= 0.5).float() == targets[i]:
# correct_num += 1
total_correct += correct_num
return total_correct / len(validation_dataset), total_loss / len(validation_dataset)
# 2. 准备数据和模型,以及一些准备工作
# 准备数据
cfg = Config()
pd_train_data, pd_validation_data = load_train_data("data/train.csv", cfg)
train_dataset = MyDataset('train', pd_train_data)
validation_dataset = MyDataset('validation', pd_validation_data)
tokenizer = AutoTokenizer.from_pretrained("K:/workspace-sync/models/bert-base-uncased")
train_loader = DataLoader(train_dataset, batch_size=cfg.batch_size, shuffle=True, collate_fn=my_collate_fn)
validation_loader = DataLoader(validation_dataset, batch_size=cfg.batch_size, shuffle=False, collate_fn=my_collate_fn)
# 准备模型
model = MyModel()
model = model.to(cfg.device)
criteria = nn.BCELoss() # 二分类交叉熵
optimizer = torch.optim.Adam(model.parameters(), lr=3e-5) # lr=3*10^-5
# 首先将模型调成训练模式
model.train()
# 清空一下cuda缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 3. 开始训练
total_loss = 0. # 定义几个变量,帮助打印loss
step = 0 # 记录步数
best_accuracy = 0 # 记录在验证集上最好的准确率
for epoch in range(cfg.epochs):
model.train()
# input=[{'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...]}, 'token_type_ids': ...]
# targets=[1, 1, 0, ...]
for i, (inputs, targets) in enumerate(train_loader):
# 从batch中拿到训练数据
inputs, targets = to_device(inputs), targets.to(cfg.device)
# 传入模型进行前向传递, output=tensor([[0.4752],[0.4805],[0.4780],...])
outputs = model(inputs)
# 计算损失, criteria = nn.BCELoss()。outputs.view(-1)=tensor([0.4752, 0.4805, 0.4780, ...])
loss = criteria(outputs.view(-1), targets.float()) # loss=tensor(0.6893)
loss.backward() # 反向传播
optimizer.step() # 更新参数
optimizer.zero_grad() # 梯度清零
total_loss += float(loss) # 计算总的loss
step += 1 # 记录步数
if step % cfg.log_per_step == 0:
print("Epoch {}/{}, Step: {}/{}, total loss:{:.4f}".format(epoch + 1, cfg.epochs, i, len(train_loader),
total_loss))
total_loss = 0
del inputs, targets # 释放内存
# 一个epoch后,使用过验证集进行验证
accuracy, validation_loss = validate() # 准确率,验证集上的loss
print("Epoch {}, accuracy: {:.4f}, validation loss: {:.4f}".format(epoch + 1, accuracy, validation_loss))
torch.save(model, f"model/model_{epoch}.pt") # 保存模型,每个文件400MB
# 保存最好的模型
if accuracy > best_accuracy:
torch.save(model, "model/model_best.pt")
best_accuracy = accuracy
5、测试类
run_test.py
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoTokenizer # 用于加载bert模型的分词器
from my_dataset import MyDataset, load_test_data
from my_config import Config
# 1. 定义一些常用的函数
def my_collate_fn(batch):
"""
将一个batch的文本句子转成tensor,并组成batch。
:param batch: 一个batch的句子,例如: [('推文', target), ('推文', target), ...]
:return: 处理后的结果,例如:
src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
target:[1, 1, 0, ...]
"""
text, target = zip(*batch)
text, target = list(text), list(target)
# src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
# padding='max_length' 不够长度的进行填充
# truncation=True 长度过长的进行裁剪
src = tokenizer(text, padding='max_length', max_length=cfg.text_max_length, return_tensors='pt', truncation=True)
return src, torch.LongTensor(target)
# 2. 加载模型和数据
cfg = Config()
model = torch.load("model/model_best.pt")
model = model.eval()
tokenizer = AutoTokenizer.from_pretrained("K:/workspace-sync/models/bert-base-uncased")
pd_test_data = load_test_data("data/train.csv")
test_dataset = MyDataset('test', pd_test_data)
test_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, shuffle=False, collate_fn=my_collate_fn)
# 3. 开始预测
results = []
for inputs, ids in tqdm(test_loader):
outputs = model(inputs.to(cfg.device))
# print(outputs)
outputs = (outputs >= 0.5).int().flatten().tolist()
ids = ids.tolist()
results = results + [(id, result) for result, id in
zip(outputs, ids)] # zip(outputs, ids) 会返回一个迭代器,里面的元素是(outputs[i], ids[i])
# 4. 保存结果
with open('data/results.csv', 'w', encoding='utf-8') as f:
f.write('id,target\n')
for id, result in results:
f.write(f"{id},{result}\n")
print("Finished!")
参考文章:https://blog.csdn.net/zhaohongfei_358/article/details/126426855
您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏