NLP藏头诗写作

这是一个类似MNIST和CIFAR分类的简单项目,可以看作NLP领域的入门。

数据预处理

我选的是史上最全的诗歌数据集https://github.com/chinese-poetry/chinese-poetry/

数据集是真的很全,横跨中华上下五千年。下载到本地后,我们只选择在json文件夹中的唐诗(即poet.tang.xxxxx.json),然而已经很多了,有四万多首。

我们需要的只是单纯的诗歌文本供网络训练,因此把json文件处理一下写入txt。

创建writedata.py,将数据全部写入poets.txt,为了保证诗歌格式一致,我们只要五言律诗,所以要用正则匹配一下符合规则的诗。

import json,re
poets=open('poets.txt','w',encoding='utf-8')
for x in range(0,58000,1000):#数据集的唐诗编号从0到57000
  filename='poet.tang.'+str(x)+'.json'
  localfile=json.load(open(filename,'r',encoding='utf-8'))#用json库把数据转发为字典
  for poet in localfile:
      self=re.findall(r'\w{5},\w{5}。|\w{7},\w{7}。',''.join(poet['paragraphs']))#我们只要五言和七言的诗句
      if self:poets.writelines(''.join(self)+'\n')#每行记录一首诗

接下来就是自定义一下数据集,和cv的数据集一样。只不过由于pytorch不支持直接中文字符类型的输入,因此我们建立每个字符到整数的dict索引,这样也方便后续的onehot编码,并定义两个关键字符作为诗句的开始和结束。遍历数据集时,我们遍历的是一个个的字符索引。

class Mydate(Dataset):
  def __init__(self,file="D:\\Documents\\Dataset\\chinese-poetry-master\\json\\poets.txt",seq_len=48):#定义我们数据存放的文件和一段数据的长度
    sos,eos=0,1#表示每段数据的起始和结束
    self.seq_len=seq_len
    with open(file,encoding='utf-8') as f:
      lines=f.read().splitlines()
      self.wordindex={'':sos,'':eos}#中文字符到索引的映射
      point,wordnum=list(),0
      for line in lines:
        point.append(sos)
        for word in line:
          if word not in self.wordindex:
            self.wordindex[word]=wordnum
            wordnum+=1
          point.append(self.wordindex[word])
        point.append(eos)
    self.indexword={y:x for x,y in self.wordindex.items()}#索引到中文字符的映射,方便还原
    self.data=np.array(point,dtype=np.int64)
  def __len__(self):
    return (len(self.data)-1)//self.seq_len
  def __getitem__(self,i):
    start=i*self.seq_len
    end=start+self.seq_len
    return(torch.as_tensor(self.data[start:end]),torch.as_tensor(self.data[start+1:end+1]))#返回值分为网络的输入和输出,两者相差一个字符

建议再把整数还原回汉字,看看对不对。

NLP藏头诗写作

网络搭建

网络的结构可以很简单,因为我们实现的是最基本的序列生成模型。这里我只用了一个Embedding层+LSTM单元,最后接两个线性层用于vec2word。

​class PoetNet(nn.Module):
  def __init__(self, vocab_size, embed_size, hidden_size, lstm_layers):
    super().__init__()

    self.embedding = nn.Embedding(vocab_size, embed_size)
    self.lstm = nn.LSTM(embed_size, hidden_size, lstm_layers, batch_first=True)
    self.h2h = nn.Linear(hidden_size, hidden_size)
    self.h2o = nn.Linear(hidden_size, vocab_size)

  def forward(self, word_ids, lstm_hidden=None):
    embedded = self.embedding(word_ids)
    lstm_out, lstm_hidden = self.lstm(embedded, lstm_hidden)
    out = self.h2h(lstm_out)
    out = self.h2o(out)
    return out, lstm_hidden

网络训练

训练过程也跟mnist差不多,只不过前向传播一次是给网络输入一个字,然后网络预测下一个字,和原诗句对应文字做交叉熵运算,然后计算更新梯度,很快网络就会输出xxxxx,xxxxx。这种格式的诗句了。

由于我设置的

网络一次ep的训练过程

def training_step():
  for i, (input_, target) in enumerate(train_loader):
    model.train()
    input_, target = input_.to(device), target.to(device)
    output, _ = model(input_)
    loss =F.cross_entropy(output.reshape(-1, vocab_size), target.flatten())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    acc = accuracy(output, target)
    print(
      "Training: Epoch=%d, Batch=%d/%d, Loss=%.4f, Accuracy=%.4f"
      % (epoch, i, len(train_loader), loss.item(), acc)
    )

最后,效果如图

NLP藏头诗写作

全部代码

import glob
import datetime

import numpy as np

import torch
import torch.optim as op
from torch import nn
import torch.nn.functional as F

from torch.utils.data import DataLoader,Dataset,random_split
from torch.utils.tensorboard import SummaryWriter

#初始参数设定
debug=False
embed_size=100
hidden_size=1024
lr=0.001
lstm_layers=2
batch_size=32
epochs=128
seq_len = 64
checkpoint_dir="./"
#数据集对象
class Mydata(Dataset):
  def __init__(self,file="./CPM/json/poets.txt",seq_len=64):
    sos,eos=0,1
    self.seq_len=seq_len
    with open(file,encoding='utf-8') as f:
      lines=f.read().splitlines()
      self.wordindex={'<sos>':sos,'<eos>':eos}
      point,wordnum=list(),0
      for line in lines:
        point.append(sos)
        for word in line:
          if word not in self.wordindex:
            self.wordindex[word]=wordnum
            wordnum+=1
          point.append(self.wordindex[word])
        point.append(eos)
    self.indexword={x:y for y,x in self.wordindex.items()}
    self.data=np.array(point,dtype=np.int64)
  def __len__(self):
    return (len(self.data)-1)//self.seq_len
  def __getitem__(self,i):
    start=i*self.seq_len
    end=start+self.seq_len
    return(torch.as_tensor(self.data[start:end]),torch.as_tensor(self.data[start+1:end+1]))
class PoetNet(nn.Module):
  def __init__(self, vocab_size, embed_size, hidden_size, lstm_layers):
    super().__init__()

    self.embedding = nn.Embedding(vocab_size, embed_size)
    self.lstm = nn.LSTM(embed_size, hidden_size, lstm_layers, batch_first=True)
    self.h2h = nn.Linear(hidden_size, hidden_size)
    self.h2o = nn.Linear(hidden_size, vocab_size)

  def forward(self, word_ids, lstm_hidden=None):
    embedded = self.embedding(word_ids)

    lstm_out, lstm_hidden = self.lstm(embedded, lstm_hidden)

    out = self.h2h(lstm_out)

    out = self.h2o(out)

    return out, lstm_hidden

def accuracy(output, target):
  """Compute the accuracy between model output and ground truth.

  Args:
      output: (batch_size, seq_len, vocab_size)
      target: (batch_size, seq_len)
  Returns:
      float: accuracy value between 0 and 1
"""
  output = output.reshape(-1, vocab_size)
  target = target.flatten()

  a = output.topk(1).indices.flatten()
  b = target
  return a.eq(b).sum().item() / len(a)

def generate(start_phrases):
  start_phrases = start_phrases.split("&#x3002;")

  hidden = None

  def next_word(input_word):
    nonlocal hidden
    input_word_index = dataset.wordindex[input_word]
    input_ = torch.Tensor([[input_word_index]]).long().to(device)
    output, hidden = model(input_, hidden)
    top_word_index = output[0].topk(1).indices.item()

    return dataset.indexword[top_word_index]

  result = []  # a list of output words
  cur_word = "&#x3002;"

  for i in range(seq_len):
    if cur_word == "&#x3002;":  # end of a sentence
      result.append(cur_word)
      next_word(cur_word)

      if len(start_phrases) == 0:
        break

      for w in start_phrases.pop(0):
        result.append(w)
        cur_word = next_word(w)

    else:
      result.append(cur_word)
      cur_word = next_word(cur_word)

  result = "".join(result)
  result = result.strip("&#x3002;")
  return result

def training_step():
  for i, (input_, target) in enumerate(train_loader):
    model.train()

    input_, target = input_.to(device), target.to(device)

    output, _ = model(input_)
    loss =F.cross_entropy(output.reshape(-1, vocab_size), target.flatten())

    optimizer.zero_grad()  # Make sure gradient does not accumulate
    loss.backward()  # Compute gradient
    optimizer.step()  # Update NN weights

    acc = accuracy(output, target)

    print(
      "Training: Epoch=%d, Batch=%d/%d, Loss=%.4f, Accuracy=%.4f"
      % (epoch, i, len(train_loader), loss.item(), acc)
    )

    if not debug:
      step = epoch * len(train_loader) + i
      writer.add_scalar("loss/training", loss.item(), step)
      writer.add_scalar("accuracy/training", acc, step)

      if i % 50 == 0:
        generated_lyrics = generate("&#x7687;&#x3002;&#x51FD;&#x3002;&#x8C37;&#x3002;&#x5C45;")
        writer.add_text("generated_lyrics", generated_lyrics, i)
        writer.flush()

def evaluation_step():
  model.eval()

  epoch_loss = 0
  epoch_acc = 0

  with torch.no_grad():
    for data in test_loader:
      input_, target = data[0].to(device), data[1].to(device)

      output, _ = model(input_)
      loss = F.cross_entropy(output.reshape(-1, vocab_size), target.flatten())

      epoch_acc += accuracy(output, target)

      epoch_loss += loss.item()

  epoch_loss /= len(test_loader)
  epoch_acc /= len(test_loader)
  print(
    "Validation: Epoch=%d, Loss=%.4f, Accuracy=%.4f"
    % (epoch, epoch_loss, epoch_acc)
  )

  if not debug:
    writer.add_scalar("loss/validation", epoch_loss, epoch)
    writer.add_scalar("accuracy/validation", epoch_acc, epoch)
    writer.flush()

def save_checkpoint():
  torch.save(
    {
      "epoch": epoch,
      "model_state_dict": model.state_dict(),
      "optimizer_state_dict": optimizer.state_dict(),
    },
    "checkpoint-%s.pth" % datetime.datetime.now().strftime("%y%m%d-%H%M%S"),
  )

def load_checkpoint(file):
  global epoch

  ckpt = torch.load(file)

  print("Loading checkpoint from %s." % file)

  model.load_state_dict(ckpt["model_state_dict"])

  optimizer.load_state_dict(ckpt["optimizer_state_dict"])

  epoch = ckpt["epoch"]

if __name__ == "__main__":
  # Create cuda device to train model on GPU
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

  # Define dataset
  dataset = Mydata(seq_len=seq_len)

  # Split dataset into training and validation
  data_length = len(dataset)
  lengths = [int(data_length - 1000), 1000]
  train_data, test_data = random_split(dataset, lengths)

  # Create data loader
  train_loader = DataLoader(
    train_data, batch_size=batch_size, shuffle=True, num_workers=0
  )
  test_loader = DataLoader(
    test_data, batch_size=batch_size, shuffle=True, num_workers=0
  )
  if debug:
    train_loader = [next(iter(train_loader))]
    test_loader = [next(iter(test_loader))]

  # Create NN model
  vocab_size = len(dataset.wordindex)
  model = PoetNet(
    vocab_size=vocab_size,
    embed_size=embed_size,
    hidden_size=hidden_size,
    lstm_layers=lstm_layers,
  )
  model = model.to(device)

  # Create optimizer
  optimizer = op.Adam(model.parameters(), lr=lr)
  # optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

  # Load checkpoint
  checkpoint_files = glob.glob("checkpoint-*.pth")
  if (
      not debug
      and len(checkpoint_files) > 0
      and input("Enter y to load %s: " % checkpoint_files[-1]) == "y"
  ):
    load_checkpoint(checkpoint_files[-1])
  else:
    epoch = 0

  if (
      input("Enter y to enter inference mode, anything else to enter training mode: ")
      == "y"
  ):
    # Inference loop
    while True:
      start_words = input("Enter start-words divided by '&#x3002;' (e.g. '&#x6DF1;&#x3002;&#x5EA6;&#x3002;&#x5B66;&#x3002;&#x4E60;'): ")
      if not start_words:
        break

      print(generate(start_words))

  else:
    if not debug:
      writer = SummaryWriter()

    # Optimization loop
    while epoch < epochs:
      training_step()
      evaluation_step()

      if not debug:
        save_checkpoint()

      epoch += 1</eos></sos>

Original: https://blog.csdn.net/weixin_51018649/article/details/121270846
Author: weixin_51018649
Title: NLP藏头诗写作

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/531930/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球