文章目录
*
–
+ 一、文本情感分析简介
+ 二、文本情感分类任务
+
* 1.基于情感词典的方法
* 2.基于机器学习的方法
+ 三、PyTorch中LSTM介绍
+ 四、基于PyTorch与LSTM的情感分类流程
- 这节理论部分传送门:NLP学习—10.循环神经网络RNN及其变体LSTM、GRU、双向LSTM
- 数据集代码链接
一、文本情感分析简介
利用算法来分析提取文本中表达的情感。 分析一个句子表达的好、中、坏等判断,高兴、悲伤、愤怒等情绪。如果能将这种文字转为情感的操作让计算机自动完成,就节省了大量的时间。对于目前的海量文本数据来说,这是很有必要 的。我们可以通过情感分析,在电商领域挖掘出口碑好的商品,订餐订住宿领域挖掘优质场所等。
文本情感分析主要有三大任务 即文本情感特征提取,文本情感特征分类,文本情感特征检索与归纳。
二、文本情感分类任务
1.基于情感词典的方法
第一种方法:基于情感词典的方法
举个例子:这个/电影/不是/太好看,一共分为四个词,这个,电 影,不是,太好看。 “太好看”在情感词典中的pos词典中出现,所以pos_score得分为1,然后往前遍历是否出现程度词,无程度词,再搜索否定词,出现了”不是”为-1,相乘最终得分为-1。
词典的构建有如下方法:
- 人工构建情感字典(人工总结标注)
- 自动构建情感词典(基于知识库)
基于关键词(高兴、悲伤、愤怒等)挖掘出包含同样情感的词
- 利用gensim找出最相近的词向量
- 利用爬虫或者查词典的方式做同义词的替换
; 2.基于机器学习的方法
一般流程如下:
- 朴素贝叶斯
- SVM分类器
- 集成学习
- 深度学习方法
这里介绍LSTM与LSTM+Attention,起到融合信息的作用。
诸如在词性标注下游任务中,我们不仅考虑上文信息,而且还要考虑下文信息,此时,就需要双向LSTM。双向LSTM可以理解为同时训练两个LSTM,两个LSTM的方向、参数都不同。当前时刻的h t h_t h t 就是将两个方向不同的LSTM得到的两个h t h_t h t 向量拼接到一起。我们使用双向LSTM捕捉到当前时刻t t t的过去和未来的特征。通过反向传播来训练双向LSTM网络。
如果是双向LSTM+Attention,这里是静态的Attention,则网络结构如下:
score是标量。每句话进行拼接,然后做softmax得到概率,然后对hidden state进行加权平均,得到总向量,然后经过一个分类层,经softmax得到每一个类别的得分。
三、PyTorch中LSTM介绍
torch.nn.LSTM(*args, **kwargs)
参数:
- input_size –输入特征数
- hidden_size – 隐藏层的大小
- num_layers – LSTM的层数,例如,设置num_layers=2意味着将两个LSTM堆叠在一起,形成一个堆叠的LSTM,第二个LSTM接收第一个LSTM的输出并计算最终的结果。默认值:1
- bias – 如果为False,则该层不适用偏置权重。Default: True
- batch_first – 如果为True,则输入和输出张量被提供为(batch, seq, feature)而不是(seq, batch, feature)。注意,这并不适用于隐藏或单元格状态。 Default: False
- dropout – 如果非0,则在除最后一层外的每个LSTM层的输出上引入Dropout层,Dropout概率等于Dropout。默认值:0。 Default: 0
- bidirectional – 如果为True,则为双向LSTM。 Default: False
- proj_size – if> 0,则使用LSTM,并进行相应大小的投影。 Default: 0
输入:Inputs: input, (h_0, c_0)
输出:Outputs: output, (h_n, c_n)
四、基于PyTorch与LSTM的情感分类流程
- 拿到文本,分词,清洗数据(去掉停用词)
- 建立word2index index2word 表
- 准备好预训练好的 word embedding ( or start from one hot)
- 做好 Dataset / Dataloader
- 建立模型(soft attention/ hard attention/ self-attention/ scaled dot /product self attention)
- 配置好参数
- 开始训练
- 测评
- 保存模型
数据预处理部分代码: Sentiment_Analysis_DataProcess.py
from __future__ import unicode_literals, print_function, division
from io import open
import torch
import re
import numpy as np
import gensim
from torch.utils.data import Dataset
from Sentiment_Analysis_Config import Config
class Data_set(Dataset):
"""
自定义数据类,只需要定义__len__和__getitem__这两个方法就可以。
我们可以通过迭代的方式来取得每一个数据,但是这样很难实现取batch,shuffle或者多线程读取数据,此时,需要torch.utils.data.DataLoader来进行加载
"""
def __init__(self, Data, Label):
self.Data = Data
if Label is not None:
self.Label = Label
def __len__(self):
return len(self.Data)
def __getitem__(self, index):
if self.Label is not None:
data = torch.from_numpy(self.Data[index])
label = torch.from_numpy(self.Label[index])
return data, label
else:
data = torch.from_numpy(self.Data[index])
return data
def stopwordslist():
"""
创建停用词表
:return:
"""
stopwords = [line.strip() for line in open('word2vec_data/stopword.txt', encoding='UTF-8').readlines()]
return stopwords
def build_word2id(file):
"""
将word2id词典写入文件中,key为word,value为索引
:param file: word2id保存地址
:return: None
"""
stopwords = stopwordslist()
word2id = {'_PAD_': 0}
path = [Config.train_path, Config.val_path]
for _path in path:
with open(_path, encoding='utf-8') as f:
for line in f.readlines():
out_list = []
sp = line.strip().split()
for word in sp[1:]:
if word not in stopwords:
rt = re.findall('[a-zA-Z]+', word)
if word != '\t':
if len(rt) == 1:
continue
else:
out_list.append(word)
for word in out_list:
if word not in word2id.keys():
word2id[word] = len(word2id)
with open(file, 'w', encoding='utf-8') as f:
for w in word2id:
f.write(w + '\t')
f.write(str(word2id[w]))
f.write('\n')
def build_word2vec(fname, word2id, save_to_path=None):
"""
使用word2vec对单词进行编码
:param fname: 预训练的word2vec.
:param word2id: 语料文本中包含的词汇集.
:param save_to_path: 保存训练语料库中的词组对应的word2vec到本地
:return: 语料文本中词汇集对应的word2vec向量{id: word2vec}.
"""
n_words = max(word2id.values()) + 1
model = gensim.models.KeyedVectors.load_word2vec_format(fname, binary=True)
word_vecs = np.array(np.random.uniform(-1., 1., [n_words, model.vector_size]))
for word in word2id.keys():
try:
word_vecs[word2id[word]] = model[word]
except KeyError:
pass
if save_to_path:
with open(save_to_path, 'w', encoding='utf-8') as f:
for vec in word_vecs:
vec = [str(w) for w in vec]
f.write(' '.join(vec))
f.write('\n')
return word_vecs
def text_to_array(word2id, seq_lenth, path):
"""
有标签文本转为索引数字模式
:param word2id: word2id
:param seq_lenth: 句子最大长度
:param path: 文件路径
:return:
"""
lable_array = []
i = 0
sa = []
with open(path, encoding='utf-8') as f1:
for l1 in f1.readlines():
s = l1.strip().split()
s1 = s[1:]
new_s = [word2id.get(word, 0) for word in s1]
sa.append(new_s)
with open(path, encoding='utf-8') as f:
sentences_array = np.zeros(shape=(len(sa), seq_lenth))
for line in f.readlines():
sl1 = line.strip().split()
sen = sl1[1:]
new_sen = [word2id.get(word, 0) for word in sen]
new_sen_np = np.array(new_sen).reshape(1, -1)
if np.size(new_sen_np, 1) < seq_lenth:
sentences_array[i, seq_lenth - np.size(new_sen_np, 1):] = new_sen_np[0, :]
else:
sentences_array[i, 0:seq_lenth] = new_sen_np[0, 0:seq_lenth]
i = i + 1
lable = int(sl1[0])
lable_array.append(lable)
return np.array(sentences_array), lable_array
def text_to_array_nolable(word2id, seq_lenth, path):
"""
无标签文本转为索引数字模式,与上面相比,只是少了标签的处理
:param word2id:
:param seq_lenth: 序列长度
:param path:文件路径
:return:
"""
i = 0
sa = []
with open(path, encoding='utf-8') as f1:
for l1 in f1.readlines():
s = l1.strip().split()
s1 = s[1:]
new_s = [word2id.get(word, 0) for word in s1]
sa.append(new_s)
with open(path, encoding='utf-8') as f:
sentences_array = np.zeros(shape=(len(sa), seq_lenth))
for line in f.readlines():
sl1 = line.strip().split()
sen = sl1[1:]
new_sen = [word2id.get(word, 0) for word in sen]
new_sen_np = np.array(new_sen).reshape(1, -1)
if np.size(new_sen_np, 1) < seq_lenth:
sentences_array[i, seq_lenth - np.size(new_sen_np, 1):] = new_sen_np[0, :]
else:
sentences_array[i, 0:seq_lenth] = new_sen_np[0, 0:seq_lenth]
i = i + 1
return np.array(sentences_array)
def to_categorical(y, num_classes=None):
"""
将类别转化为one-hot编码
:param y: 标签
:param num_classes: 类别数
:return:
"""
y = np.array(y, dtype='int')
input_shape = y.shape
if input_shape and input_shape[-1] == 1 and len(input_shape) > 1:
input_shape = tuple(input_shape[:-1])
y = y.ravel()
if not num_classes:
num_classes = np.max(y) + 1
n = y.shape[0]
categorical = np.zeros((n, num_classes))
categorical[np.arange(n), y] = 1
output_shape = input_shape + (num_classes,)
categorical = np.reshape(categorical, output_shape)
return categorical
def prepare_data(w2id, train_path, val_path, test_path, seq_lenth):
"""
得到数字索引表示的句子和标签
:param w2id: word2id
:param train_path: 训练文件路径
:param val_path: 验证文件路径
:param test_path: 测试文件路径
:param seq_lenth: 句子最大长度
:return:
"""
train_array, train_lable = text_to_array(w2id, seq_lenth=seq_lenth, path=train_path)
val_array, val_lable = text_to_array(w2id, seq_lenth=seq_lenth, path=val_path)
test_array, test_lable = text_to_array(w2id, seq_lenth=seq_lenth, path=test_path)
"""for i in train_lable:
np.array([i])"""
train_lable = np.array([train_lable]).T
val_lable = np.array([val_lable]).T
test_lable = np.array([test_lable]).T
"""转换后标签
[[0. 1.]
[0. 1.]
[0. 1.]
...
[1. 0.]
[1. 0.]
[1. 0.]]"""
return train_array, train_lable, val_array, val_lable, test_array, test_lable
if __name__ == '__main__':
build_word2id('./word2vec_data/word2id.txt')
splist = []
word2id = {}
with open('./word2vec_data/word2id.txt', encoding='utf-8') as f:
for line in f.readlines():
sp = line.strip().split()
splist.append(sp)
word2id = dict(splist)
for key in word2id:
word2id[key] = int(word2id[key])
id2word = {}
for key, val in word2id.items():
id2word[val] = key
w2vec = build_word2vec(Config.pre_word2vec_path, word2id, Config.corpus_word2vec_path)
train_array, train_lable, val_array, val_lable, test_array, test_label = prepare_data(word2id,
train_path=Config.train_path,
val_path=Config.val_path,
test_path=Config.test_path,
seq_lenth=Config.max_sen_len)
np.savetxt('./word2vec_data/train_data.txt', train_array, fmt='%d')
np.savetxt('./word2vec_data/val_data.txt', val_array, fmt='%d')
np.savetxt('./word2vec_data/test_data.txt', test_array, fmt='%d')
模型部分代码为: Sentiment_model.py
"""
模型部分
"""
from __future__ import unicode_literals, print_function, division
import torch
import torch.nn as nn
import torch.nn.functional as F
class LSTMModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, pretrained_weight, update_w2v, hidden_dim,
num_layers, drop_keep_prob, n_class, bidirectional, **kwargs):
super(LSTMModel, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.n_class = n_class
self.bidirectional = bidirectional
self.embedding = nn.Embedding.from_pretrained(pretrained_weight)
self.embedding.weight.requires_grad = update_w2v
self.encoder = nn.LSTM(input_size=embedding_dim, hidden_size=self.hidden_dim,
num_layers=num_layers, bidirectional=self.bidirectional,
dropout=drop_keep_prob)
if self.bidirectional:
self.decoder1 = nn.Linear(hidden_dim * 4, hidden_dim)
self.decoder2 = nn.Linear(hidden_dim, n_class)
else:
self.decoder1 = nn.Linear(hidden_dim * 2, hidden_dim)
self.decoder2 = nn.Linear(hidden_dim, n_class)
def forward(self, inputs):
"""
前向传播
:param inputs: [batch, seq_len]
:return:
"""
embeddings = self.embedding(inputs)
states, hidden = self.encoder(embeddings.permute([1, 0, 2]))
encoding = torch.cat([states[0], states[-1]], dim=1)
outputs = self.decoder1(encoding)
outputs = self.decoder2(outputs)
return outputs
class LSTM_attention(nn.Module):
def __init__(self, vocab_size, embedding_dim, pretrained_weight, update_w2v, hidden_dim,
num_layers, drop_keep_prob, n_class, bidirectional, **kwargs):
super(LSTM_attention, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.n_class = n_class
self.bidirectional = bidirectional
self.embedding = nn.Embedding.from_pretrained(pretrained_weight)
self.embedding.weight.requires_grad = update_w2v
self.encoder = nn.LSTM(input_size=embedding_dim, hidden_size=self.hidden_dim,
num_layers=num_layers, bidirectional=self.bidirectional,
dropout=drop_keep_prob, batch_first=True)
self.weight_W = nn.Parameter(torch.Tensor(2 * hidden_dim, 2 * hidden_dim))
self.weight_proj = nn.Parameter(torch.Tensor(2 * hidden_dim, 1))
nn.init.uniform_(self.weight_W, -0.1, 0.1)
nn.init.uniform_(self.weight_proj, -0.1, 0.1)
if self.bidirectional:
self.decoder1 = nn.Linear(hidden_dim * 2, hidden_dim)
self.decoder2 = nn.Linear(hidden_dim, n_class)
else:
self.decoder1 = nn.Linear(hidden_dim * 2, hidden_dim)
self.decoder2 = nn.Linear(hidden_dim, n_class)
def forward(self, inputs):
"""
前向传播
:param inputs: [batch, seq_len]
:return:
"""
embeddings = self.embedding(inputs)
states, hidden = self.encoder(embeddings.permute([0, 1, 2]))
u = torch.tanh(torch.matmul(states, self.weight_W))
att = torch.matmul(u, self.weight_proj)
att_score = F.softmax(att, dim=1)
scored_x = states * att_score
encoding = torch.sum(scored_x, dim=1)
outputs = self.decoder1(encoding)
outputs = self.decoder2(outputs)
return outputs
验证部分代码为: Sentiment_Analysis_eval.py
from __future__ import unicode_literals, print_function, division
from io import open
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix, f1_score, recall_score, precision_score
import os
from Sentiment_model import LSTMModel, LSTM_attention
from Sentiment_Analysis_Config import Config
from Sentiment_Analysis_DataProcess import prepare_data, build_word2vec, text_to_array_nolable, Data_set
def val_accuary(model, val_dataloader, device, criterion):
model.eval()
model = model.to(device)
with torch.no_grad():
correct1 = 0
total1 = 0
val_loss = 0.0
for j, data_1 in (enumerate(val_dataloader, 0)):
input1, target1 = data_1[0], data_1[1]
input1 = input1.type(torch.LongTensor)
target1 = target1.type(torch.LongTensor)
target1 = target1.squeeze(1)
input1 = input1.to(device)
target1 = target1.to(device)
output1 = model(input1)
loss1 = criterion(output1, target1)
val_loss += loss1.item()
_, predicted1 = torch.max(output1, 1)
total1 += target1.size(0)
correct1 += (predicted1 == target1).sum().item()
F1 = f1_score(target1.cpu(), predicted1.cpu(), average='weighted')
Recall = recall_score(target1.cpu(), predicted1.cpu(), average='micro')
print(
'\nVal accuracy : {:.3f}%,val_loss:{:.3f}, F1_score:{:.3f}%, Recall:{:.3f}%'.format(100 * correct1 / total1,
val_loss, 100 * F1,
100 * Recall))
return 100 * correct1 / total1
def test_accuary(model, test_dataloader, device):
model = model.to(device)
with torch.no_grad():
correct = 0
total = 0
for k, data_test in (enumerate(test_dataloader, 0)):
input_test, target_ = data_test[0], data_test[1]
input_test = input_test.type(torch.LongTensor)
target_ = target_.type(torch.LongTensor)
target_ = target_.squeeze(1)
input_test = input_test.to(device)
target_ = target_.to(device)
output2 = model(input_test)
_, predicted_test = torch.max(output2, 1)
total += target_.size(0)
correct += (predicted_test == target_).sum().item()
F1 = f1_score(target_.cpu(), predicted_test.cpu(), average='weighted')
Recall = recall_score(target_.cpu(), predicted_test.cpu(), average='micro')
CM = confusion_matrix(target_.cpu(), predicted_test.cpu())
print('test accuracy : {:.3f}%, F1_score:{:.3f}%, Recall:{:.3f}%,Confusion_matrix:{}'.format(
100 * correct / total, 100 * F1, 100 * Recall, CM))
def pre(word2id, model, seq_lenth, path):
model.to("cpu")
with torch.no_grad():
input_array = text_to_array_nolable(word2id, seq_lenth, path)
sen_p = torch.from_numpy(input_array)
sen_p = sen_p.type(torch.LongTensor)
output_p = model(sen_p)
_, pred = torch.max(output_p, 1)
for i in pred:
print('预测类别为', i.item())
if __name__ == '__main__':
splist = []
word2id = {}
with open(Config.word2id_path, encoding='utf-8') as f:
for line in f.readlines():
sp = line.strip().split()
splist.append(sp)
word2id = dict(splist)
for key in word2id:
word2id[key] = int(word2id[key])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_array, train_lable, val_array, val_lable, test_array, test_lable = prepare_data(word2id,
train_path=Config.train_path,
val_path=Config.val_path,
test_path=Config.test_path,
seq_lenth=Config.max_sen_len)
test_loader = Data_set(test_array, test_lable)
test_dataloader = DataLoader(test_loader,
batch_size=Config.batch_size,
shuffle=True,
num_workers=0)
w2vec = build_word2vec(Config.pre_word2vec_path,
word2id,
None)
w2vec = torch.from_numpy(w2vec)
w2vec = w2vec.float()
model = LSTM_attention(Config.vocab_size, Config.embedding_dim, w2vec, Config.update_w2v,
Config.hidden_dim, Config.num_layers, Config.drop_keep_prob, Config.n_class,
Config.bidirectional)
model = torch.load('./word2vec_data/sen_model_best.pkl')
test_accuary(model, test_dataloader, device)
pre(word2id, model, Config.max_sen_len, Config.pre_path)
如果对您有帮助,麻烦点赞关注,这真的对我很重要!!!如果需要互关,请评论或者私信!
Original: https://blog.csdn.net/weixin_46649052/article/details/119814292
Author: 哎呦-_-不错
Title: NLP学习—11.实现基于PyTorch与LSTM的情感分类
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/531271/
转载文章受原作者版权保护。转载请注明原作者出处!