BERT二分类学习 kaggle nlp-getting-started Natural Language Processing with Disaster Tweets

avatar 2024年04月29日23:15:50 0 144 views
博主分享免费Java教学视频,B站账号:Java刘哥

kaggle项目地址:https://www.kaggle.com/competitions/nlp-getting-started

结果:0.83

直接贴代码

代码结构

 

完整代码

1、自定义配置类

my_config.py

# 全局配置
import torch


class Config():
    def __init__(self):
        self.batch_size = 16
        # 文本的最大长度
        self.text_max_length = 128
        # 总训练的epochs数,我只是随便定义了个数
        self.epochs = 10
        # 取多少训练集的数据作为验证集
        self.validation_ratio = 0.1
        # 设备
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        # 每多少步,打印一次loss
        self.log_per_step = 50

 

2、自定义数据集类

my_dataset.py

import pandas as pd
import torch
from torch.utils.data import Dataset


class MyDataset(Dataset):

    def __init__(self, mode='train', pd_data=None):
        super(MyDataset, self).__init__()
        self.mode = mode
        # 拿到对应的数据
        if mode == 'train':
            self.dataset = pd_data
        elif mode == 'validation':
            self.dataset = pd_data
        elif mode == 'test':
            # 如果是测试模式,则返回推文和id。拿id做target主要是方便后面写入结果。
            self.dataset = pd_data
        else:
            raise Exception("Unknown mode {}".format(mode))

    def __getitem__(self, index):
        # 取第index条
        data = self.dataset.iloc[index]
        # 取其推文,做个简单的数据清理
        source = data['text'].replace("#", "").replace("@", "")
        # 取对应的推文
        if self.mode == 'test':
            # 如果是test,将id做为target
            target = data['id']
        else:
            target = data['target']
        # 返回推文和target
        return source, target

    def __len__(self):
        return len(self.dataset)


def load_train_data(train_data_path, cfg):
    pd_data = pd.read_csv(train_data_path)[['text', 'target']]
    pd_validation_data = pd_data.sample(frac=cfg.validation_ratio)  # 随机取10%的数据作为验证集
    pd_train_data = pd_data[~pd_data.index.isin(pd_validation_data.index)]  # 剩下的数据作为训练集
    return pd_train_data, pd_validation_data


def load_test_data(test_data_path):
    # pd_data = pd.read_csv('data/test.csv')[['text', 'id']]
    pd_data = pd.read_csv(test_data_path)[['text', 'id']]
    return pd_data


def my_collate_fn(tokenizer, cfg, batch):
    """
    将一个batch的文本句子转成tensor,并组成batch。
    :param batch: [('aaa', 1), ('bbb', 1), ('ccc', 0)]
    :return: 处理后的结果,例如:
             src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
             target:[1, 1, 0, ...]
    """
    text, target = zip(*batch)  # zip(*) 将元组里的每一列抽成单独一个元组。 text = ('aaa', 'bbb', 'ccc'),  target = (1, 1, 0)
    text, target = list(text), list(target)  # tuple 转 list。text = ['aaa', 'bbb', 'ccc'],  target = [1, 1, 0]

    # src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
    # padding='max_length' 不够长度的进行填充
    # return_tensors='pt' 返回的是pytorch tensor
    # truncation=True 长度过长的进行裁剪
    src = tokenizer(text, padding='max_length', max_length=cfg.text_max_length, return_tensors='pt', truncation=True)

    # src={'input_ids': tensor([[], [], [], ...]), 'attention_mask': tensor([[1,1,1,...], [1,1,1,...], [1,1,1,...], ...]), 'token_type_ids': tensor([[0,0,0,...], [0,0,0,...], [0,0,0,...], ...]}
    # torch.LongTensor(target)=tensor([1, 1, 0, ...])

    # input_ids,它是每个 token 的 id 表示,101代表[CLS],102代表[SEP],0代表[PAD]
    # token_type_ids,它是一个 binary mask,用于标识 token 属于哪个 sequence。如果我们只有一个 sequence,那么所有的 token 类型 id 都将为 0。对于文本分类任务,token_type_ids是 BERT 模型的可选输入参数。
    # attention_mask,它是一个 binary mask,用于标识 token 是真实 word 还是只是由填充得到。如果 token 包含 [CLS]、[SEP] 或任何真实单词,则 mask 将为 1。如果 token 只是 [PAD] 填充,则 mask 将为 0

    return src, torch.LongTensor(target)

 

3、自定义模型类

my_model.py

from torch import nn
from transformers import AutoModel


class MyModel(nn.Module):

    def __init__(self):
        super(MyModel, self).__init__()

        # 加载bert模型
        self.bert = AutoModel.from_pretrained("K:/workspace-sync/models/bert-base-uncased")

        # 最后的预测层
        # 以下代码等价于

        self.predictor = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )  # 等价于下面的代码
        # self.predictor = nn.Sequential()
        # self.predictor.add_module('linear1', nn.Linear(768, 256))
        # self.predictor.add_module('relu', nn.ReLU())
        # self.predictor.add_module('linear2', nn.Linear(256, 1))
        # self.predictor.add_module('sigmoid', nn.Sigmoid())

    def forward(self, src):
        """
        :param src: 分词后的推文数据
        """

        # 将src直接序列解包传入bert,因为bert和tokenizer是一套的,所以可以这么做。
        # 得到encoder的输出,用最前面[CLS]的输出作为最终线性层的输入

        # last_hidden_state[:, 0, :] 取出每个batch的第一个token的输出,即[CLS]的输出
        outputs = self.bert(**src).last_hidden_state[:, 0, :]  # last_hidden_state的维度是(batch_size, seq_len, hidden_size)

        # 使用线性层来做最终的预测
        return self.predictor(outputs)  # 输出的维度是(batch_size, 1)

 

4、训练类

run_train.py

import torch
from torch import nn
from torch.utils.data import DataLoader
from transformers import AutoTokenizer  # 用于加载bert模型的分词器

from my_config import Config
from my_dataset import MyDataset, load_train_data
from my_model import MyModel


# 1. 定义一些常用的函数
# 将一个batch的文本句子转成tensor,并组成batch。
def my_collate_fn(batch):
    """
    将一个batch的文本句子转成tensor,并组成batch。
    :param batch: [('aaa', 1), ('bbb', 1), ('ccc', 0)]
    :return: 处理后的结果,例如:
             src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
             target:[1, 1, 0, ...]
    """
    text, target = zip(*batch)  # zip(*) 将元组里的每一列抽成单独一个元组。 text = ('aaa', 'bbb', 'ccc'),  target = (1, 1, 0)
    text, target = list(text), list(target)  # tuple 转 list。text = ['aaa', 'bbb', 'ccc'],  target = [1, 1, 0]

    # src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
    # padding='max_length' 不够长度的进行填充
    # return_tensors='pt' 返回的是pytorch tensor
    # truncation=True 长度过长的进行裁剪
    src = tokenizer(text, padding='max_length', max_length=cfg.text_max_length, return_tensors='pt', truncation=True)

    # src={'input_ids': tensor([[], [], [], ...]), 'attention_mask': tensor([[1,1,1,...], [1,1,1,...], [1,1,1,...], ...]), 'token_type_ids': tensor([[0,0,0,...], [0,0,0,...], [0,0,0,...], ...]}
    # torch.LongTensor(target)=tensor([1, 1, 0, ...])

    # input_ids,它是每个 token 的 id 表示,101代表[CLS],102代表[SEP],0代表[PAD]
    # token_type_ids,它是一个 binary mask,用于标识 token 属于哪个 sequence。如果我们只有一个 sequence,那么所有的 token 类型 id 都将为 0。对于文本分类任务,token_type_ids是 BERT 模型的可选输入参数。
    # attention_mask,它是一个 binary mask,用于标识 token 是真实 word 还是只是由填充得到。如果 token 包含 [CLS]、[SEP] 或任何真实单词,则 mask 将为 1。如果 token 只是 [PAD] 填充,则 mask 将为 0

    return src, torch.LongTensor(target)


# 由于inputs是字典类型的,定义一个辅助函数帮助to(device)
def to_device(dict_tensors):
    result_tensors = {}
    for key, value in dict_tensors.items():
        result_tensors[key] = value.to(cfg.device)
    return result_tensors


# 求准确率和loss
def validate():
    model.eval()
    total_loss = 0.
    total_correct = 0
    for inputs, targets in validation_loader:
        inputs, targets = to_device(inputs), targets.to(cfg.device)
        outputs = model(inputs)  # 前向传播
        loss = criteria(outputs.view(-1), targets.float())  # 计算loss
        total_loss += float(loss)

        correct_num = (((
                                outputs >= 0.5).float() * 1).flatten() == targets).sum()  # flatten()将tensor展平,==判断是否相等,sum()统计True的个数
        # 以上代码等价于四行代码
        # correct_num = torch.tensor(0)
        # for i in range(len(targets)):
        #     if (outputs[i] >= 0.5).float() == targets[i]:
        #         correct_num += 1

        total_correct += correct_num

    return total_correct / len(validation_dataset), total_loss / len(validation_dataset)


# 2. 准备数据和模型,以及一些准备工作
# 准备数据
cfg = Config()
pd_train_data, pd_validation_data = load_train_data("data/train.csv", cfg)
train_dataset = MyDataset('train', pd_train_data)
validation_dataset = MyDataset('validation', pd_validation_data)
tokenizer = AutoTokenizer.from_pretrained("K:/workspace-sync/models/bert-base-uncased")
train_loader = DataLoader(train_dataset, batch_size=cfg.batch_size, shuffle=True, collate_fn=my_collate_fn)
validation_loader = DataLoader(validation_dataset, batch_size=cfg.batch_size, shuffle=False, collate_fn=my_collate_fn)

# 准备模型
model = MyModel()
model = model.to(cfg.device)
criteria = nn.BCELoss()  # 二分类交叉熵
optimizer = torch.optim.Adam(model.parameters(), lr=3e-5)  # lr=3*10^-5

# 首先将模型调成训练模式
model.train()

# 清空一下cuda缓存
if torch.cuda.is_available():
    torch.cuda.empty_cache()

# 3. 开始训练
total_loss = 0.  # 定义几个变量,帮助打印loss
step = 0  # 记录步数
best_accuracy = 0  # 记录在验证集上最好的准确率
for epoch in range(cfg.epochs):
    model.train()

    # input=[{'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...]}, 'token_type_ids': ...]
    # targets=[1, 1, 0, ...]
    for i, (inputs, targets) in enumerate(train_loader):
        # 从batch中拿到训练数据
        inputs, targets = to_device(inputs), targets.to(cfg.device)
        # 传入模型进行前向传递, output=tensor([[0.4752],[0.4805],[0.4780],...])
        outputs = model(inputs)
        # 计算损失, criteria = nn.BCELoss()。outputs.view(-1)=tensor([0.4752, 0.4805, 0.4780, ...])
        loss = criteria(outputs.view(-1), targets.float())  # loss=tensor(0.6893)
        loss.backward()  # 反向传播
        optimizer.step()  # 更新参数
        optimizer.zero_grad()  # 梯度清零

        total_loss += float(loss)  # 计算总的loss
        step += 1  # 记录步数

        if step % cfg.log_per_step == 0:
            print("Epoch {}/{}, Step: {}/{}, total loss:{:.4f}".format(epoch + 1, cfg.epochs, i, len(train_loader),
                                                                       total_loss))
            total_loss = 0

        del inputs, targets  # 释放内存

    # 一个epoch后,使用过验证集进行验证
    accuracy, validation_loss = validate()  # 准确率,验证集上的loss
    print("Epoch {}, accuracy: {:.4f}, validation loss: {:.4f}".format(epoch + 1, accuracy, validation_loss))
    torch.save(model, f"model/model_{epoch}.pt")  # 保存模型,每个文件400MB

    # 保存最好的模型
    if accuracy > best_accuracy:
        torch.save(model, "model/model_best.pt")
        best_accuracy = accuracy

 

5、测试类

run_test.py

import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoTokenizer  # 用于加载bert模型的分词器

from my_dataset import MyDataset, load_test_data
from my_config import Config


# 1. 定义一些常用的函数
def my_collate_fn(batch):
    """
    将一个batch的文本句子转成tensor,并组成batch。
    :param batch: 一个batch的句子,例如: [('推文', target), ('推文', target), ...]
    :return: 处理后的结果,例如:
             src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
             target:[1, 1, 0, ...]
    """
    text, target = zip(*batch)
    text, target = list(text), list(target)

    # src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
    # padding='max_length' 不够长度的进行填充
    # truncation=True 长度过长的进行裁剪
    src = tokenizer(text, padding='max_length', max_length=cfg.text_max_length, return_tensors='pt', truncation=True)

    return src, torch.LongTensor(target)


# 2. 加载模型和数据
cfg = Config()
model = torch.load("model/model_best.pt")
model = model.eval()
tokenizer = AutoTokenizer.from_pretrained("K:/workspace-sync/models/bert-base-uncased")
pd_test_data = load_test_data("data/train.csv")
test_dataset = MyDataset('test', pd_test_data)
test_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, shuffle=False, collate_fn=my_collate_fn)

# 3. 开始预测
results = []
for inputs, ids in tqdm(test_loader):
    outputs = model(inputs.to(cfg.device))
    # print(outputs)
    outputs = (outputs >= 0.5).int().flatten().tolist()
    ids = ids.tolist()
    results = results + [(id, result) for result, id in
                         zip(outputs, ids)]  # zip(outputs, ids) 会返回一个迭代器,里面的元素是(outputs[i], ids[i])

# 4. 保存结果
with open('data/results.csv', 'w', encoding='utf-8') as f:
    f.write('id,target\n')
    for id, result in results:
        f.write(f"{id},{result}\n")
print("Finished!")

 

参考文章:https://blog.csdn.net/zhaohongfei_358/article/details/126426855

  • 微信
  • 交流学习,有偿服务
  • weinxin
  • 博客/Java交流群
  • 资源分享,问题解决,技术交流。群号:590480292
  • weinxin
avatar

发表评论

avatar 登录者:匿名
匿名评论,评论回复后会有邮件通知

  

已通过评论:0   待审核评论数:0