文本序列化
文本序列化需要考虑的问题:
- 使用字典将数字和词语进行一一对应
- 使用字典将句子转化成数字序列,也可以将数字序列转化为句子
- 不同的句子长度不相同,如何将每个batch的句子构造成相同的长度
- 不同词语出现的频率不一样,对高频词和低频词进行过滤,以及限制总的词语数量
- 对于未登录词(OOV)的处理-用UNK特殊字符代替
思路分析
- 对所有句子进行分词
- 词语存入字典,并且统计次数,过滤低频词
- 实现文本转数字序列的方法
- 实现数字序列转文本的方法
具体实现
一. 准备数据集
- 当输入是文本的时候,需要重写dataloader中的
collate_fn
方法 - word2sequence方法实现:
- 定义词典保留所有词语
- 根据词频筛选保留下的词语
- 统一每个batch中句子的长度
- 实现句子<==>序列的相互转化
二. 构建模型
- 实例化
embedding=nn.Embedding(词典长度, embedding维度)
2 RNN模型的构建(未完成)
1.1 准备数据集
# dataset.py
import torch
from torch.utils.data import DataLoader, Dataset
import re
import os
from lib import ws, max_len
# contraction_mapping表示要替换的缩略词
# fileters列表表示需要替换的词
from data_clear import contraction_mapping, filters
# 英文分词
# step1:正则替换掉语料中无效的标点符号,html标签等
# step2:将所有的字母变成小写,以空格来分词
def tokenize(text):
# re_words结果是"\t|\n|\x97",| 表示或,
# 后续正则替换时,这些或的词都被替换成空格
re_words = "|".join(filters)
# re.sub 传入 正则表达式step_words,替换后的词" ",需要处理的字符串text
clear_content = re.sub(re_words, " ", text)
# split()将多个空格当成一个空格,split(" ")会将多个空格分隔
# 每个词先统一成小写,然后做替换
tokens_list = [contraction_mapping[word.lower()] if word.lower() in contraction_mapping else word.lower() for word in clear_content.strip().split()]
return tokens_list
class ImdbDataset(Dataset):
def __init__(self, train=True):
self.train_data_path = r"/home/wangmin/workspace/myStudy/learnPytorchNLP/sentimentAnalysis/data/aclImdb_v1/aclImdb/train"
self.test_data_path = r"/home/wangmin/workspace/myStudy/learnPytorchNLP/sentimentAnalysis/data/aclImdb_v1/aclImdb/test"
data_path = self.train_data_path if train else self.test_data_path
# 把正例和负例的文件名都放到一个列表
temp_data_path = [os.path.join(data_path, "pos"), os.path.join(data_path, "neg")]
self.total_file_path = []
for path in temp_data_path:
# listdir方法返回文件夹下所有文件名
file_name_list = os.listdir(path)
# 将以txt结尾的文件加入到list中
file_path_list = [os.path.join(path, filename) for filename in file_name_list if filename.endswith(".txt")]
self.total_file_path.extend(file_path_list)
def __getitem__(self, index):
file_path = self.total_file_path[index]
# 获取tokens
tokens = tokenize(open(file_path).read())
# 获取label
label_str = file_path.split("/")[-2]
label = 0 if "neg" == label_str else 1
return tokens, label
def __len__(self):
return len(self.total_file_path)
# collate_fn函数会将batch_size个样本,整理成一个batch样本,便于批量训练
def collate_fn(batch):
'''
:param batch: 一个getitem的结果[(tokens, label),(tokens, label)],batch_size==2
:return:contents=([tokens], [tokens]), labels=(label, label)
'''
# 星号的作用是展开输入
contents, labels = zip(*batch)
# 做word2seq
contents = [ws.transform(i, max_len=max_len) for i in contents]
# 转成Tensor以便后续做embedding
contents = torch.LongTensor(contents)
labels = torch.LongTensor(labels)
return contents, labels
def get_dataloader(train=True):
imdb_dataset = ImdbDataset(train)
#由于输入是字符串,需要重写collate_fn,否则会报错
data_loader = DataLoader(imdb_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
return data_loader#([tokens], [tokens]), (label, label)
if __name__ == '__main__':
for idx, (input, target) in enumerate(get_dataloader()):
print(input)
print(target)#
break
# imdb_dataset = ImdbDataset()
# print(imdb_dataset[0])
1.2. 实现word2seq
# word2seq.py
import numpy as np
class Word2Sequence:
UNK_TAG = "UNK"
PAD_TAG = "PAD"
UNK = 0
PAD = 1
def __init__(self):
# 词典 {word: id}
self.dict = {
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
# 统计词频
self.count = {}
def fit(self, sentence):
"""
把单个句子中的单词保存到dict中
:param sentence: 把单个句子中的单词保存到dict中
:return:
"""
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1
def build_vocab(self, min_vocab=5, max_vocab=999, max_features=None):
"""
生成词典
:param min_vocab: 词语最小出现的次数
:param max_vocab: 词语最大出现的次数
:param max_features: 最多保留多少个词语
:return:
"""
# 删除count中词频小于min_vocab和大于max_vocab的词语
if min_vocab and max_vocab is not None:
self.count = {word: value for word, value in self.count.items() if min_vocab <= value <= max_vocab}
# 限制保留的词语数量
if max_features is not None:
# 将字典按照value值排序,截取前max_features频次的词语
tmp_count_list = sorted(self.count.items(), key=lambda x: x[-1], reverse=True)[:max_features]
self.count = dict(tmp_count_list)
for word in self.count:
# word -> id 每多一个词就dict的len就加一,一个很棒的trick
self.dict[word] = len(self.dict)
# 得到翻转的字典id->word
self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
"""
把句子转换为序列
:param sentence: [word1, word2, ...]
:param max_len: int,对句子进行填充或者裁减
:return: [id1, id2, ...]
"""
if max_len is not None:
# 填充
if max_len > len(sentence):
sentence += ([self.PAD_TAG] * (max_len - len(sentence)))
# 裁剪
if max_len < len(sentence):
sentence = sentence[:max_len]
return [self.dict.get(word, self.UNK) for word in sentence]
def inverse_transform(self, ids):
"""
把序列转换为句子
:param ids:[id1, id2, ...]
:return:
"""
return [self.inverse_dict.get(idx, "UNK") for idx in ids]
def __len__(self):
return len(self.dict)
if __name__ == '__main__':
# ws = Word2Sequence()
# ws.fit(["我", "爱","你"])
# ws.fit(["我", "是", "我"])
# ws.build_vocab(min_vocab=1, max_vocab=10)
# print(ws.dict)
#
# ret = ws.transform(["我", "爱", "北京"], max_len=10)
# ret = ws.inverse_transform(ret)
# print(ret)
pass
1.3. 保存word2seq的结果到pickle文件中
# main.py
from word2seq import Word2Sequence
import pickle
import os
from dataset import tokenize
from tqdm import tqdm
if __name__ == '__main__':
ws = Word2Sequence()
data_path = r"/home/wangmin/workspace/myStudy/learnPytorchNLP/sentimentAnalysis/data/aclImdb_v1/aclImdb/train"
temp_data_path = [os.path.join(data_path, "pos"), os.path.join(data_path, "neg")]
for data_path in temp_data_path:
file_paths = [os.path.join(data_path, file_name) for file_name in os.listdir(data_path) if file_name.endswith("txt")]
for file_path in tqdm(file_paths):
sentence = tokenize(open(file_path).read())
ws.fit(sentence)
ws.build_vocab(min_vocab=10, max_features=20000)
pickle.dump(ws, open("../model/ws.pkl", "wb"))
print(len(ws))
1.4.调用已保存的word2seq的pickle文件
# lib.py
import pickle
max_len = 20
ws = pickle.load(open("../model/ws.pkl", "rb"))
2.1 构建模型
# myModel.py
"""
定义模型
"""
import torch
import torch.nn as nn
from torch.optim import Adam
from dataset import get_dataloader
from lib import ws, max_len
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
self.embedding = nn.Embedding(len(ws), 100)
self.fc = nn.Linear(max_len * 100, 2)
def forward(self, input):
# 进行embedding操作,形状:[batch_size, max_len, 100]
x = self.embedding(input)
# x改变形状
x = x.view([-1, max_len * 100])
output = self.fc(x)
return output
model = MyModel()
dataloader = get_dataloader(train=True)
optimizer = Adam(model.parameters(), 0.001)
criterion = nn.CrossEntropyLoss()
def train(epoch):
for idx, (input, target) in enumerate(dataloader):
# 1. 梯度置零
optimizer.zero_grad()
# 2. 前向计算
output = model(input)
# 3. 计算损失
loss = criterion(output, target)
# 4. 反向传播
loss.backward()
# 5. 参数更新
optimizer.step()
print(loss.item())
if __name__ == '__main__':
for i in range(1):
train(i)
版权声明:本文为bazinga014原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。