基于Tensorflow实现一个Transformer翻译器

Transformer是谷歌在2017年的一篇论文”Attention is all you need”提出的一个seq2seq的模型架构,其创造性的提出了自注意力的思想,可以很好的表达序列中各个单词之间的相互注意力关系。这个模型在NLP领域取得了巨大的成功。此外这个模型架构在最近几年也在CV领域取得了令人瞩目的进展,在图像识别,目标检测等方面都达到或超过CNN模型的性能。因此Transformer可以说是人工智能领域最近最值得关注和学习的一个架构。目前有网上已经有很多文章详细解读了Transformer的架构和其细节,这里我将不再重复这方面的内容,而是关注在实战方面,基于Tensorflow来搭建一个Transformer模型,实现法语和英语的翻译。

在Tensorflow的官网上有一个详细的教程,介绍了如何搭建Tranformer来实现葡萄牙语翻译为英语。我也是学习了这个教程之后,进行一些改造,以实现对法语-英语的翻译。

数据集的准备

在这个网站Tab-delimited Bilingual Sentence Pairs from the Tatoeba Project (Good for Anki and Similar Flashcard Applications)可以找到很多不同的语言与英语的翻译。这里我们下载法语-英语的数据作为训练集和验证集。下载http://www.manythings.org/anki/fra-eng.zip这个文件并解压之后,我们可以看到里面每一行对应一个英语句子和一个法语句子,以及句子的贡献者,中间以TAB分隔。

以下代码从文件中读取数据并查看法语和英语句子:

[En]

The following code reads the data from the file and looks at the sentences in French and English:

fra = []
eng = []
with open('fra.txt', 'r') as f:
    content = f.readlines()
    for line in content:
        temp = line.split(sep='\t')
        eng.append(temp[0])
        fra.append(temp[1])

查看这些句子,可以看到有些句子包含特殊字符,例如’Cours\u202f!’ 我们需要把这些特殊的不可见字符(\u202f, \xa0 …)去除掉

new_fra = []
new_eng = []
for item in fra:
    new_fra.append(re.sub('\s', ' ', item).strip().lower())
for item in eng:
    new_eng.append(re.sub('\s', ' ', item).strip().lower())

单词处理为token

因为模型只能处理数字,需要把这些法语和英语的单词转为token。这里采用BERT tokenizer的方式来处理,具体可以参见tensorflow的教程Subword tokenizers | Text | TensorFlow

首先创建两个dataset,分别包含了法语和英语的句子。

ds_fra = tf.data.Dataset.from_tensor_slices(new_fra)
ds_eng = tf.data.Dataset.from_tensor_slices(new_eng)

调用tensorflow的bert_vocab库来创建词汇表,这里定义了一些保留token用于特殊目的,例如[START]标识句子的开始,[UNK]标识一个不在词汇表出现的新单词。

bert_tokenizer_params=dict(lower_case=True)
reserved_tokens=["[PAD]", "[UNK]", "[START]", "[END]"]

bert_vocab_args = dict(
    # The target vocabulary size
    vocab_size = 8000,
    # Reserved tokens that must be included in the vocabulary
    reserved_tokens=reserved_tokens,
    # Arguments for text.BertTokenizer
    bert_tokenizer_params=bert_tokenizer_params,
    # Arguments for wordpiece_vocab.wordpiece_tokenizer_learner_lib.learn
    learn_params={},
)

fr_vocab = bert_vocab.bert_vocab_from_dataset(
    ds_fra.batch(1000).prefetch(2),
    **bert_vocab_args
)

en_vocab = bert_vocab.bert_vocab_from_dataset(
    ds_eng.batch(1000).prefetch(2),
    **bert_vocab_args
)

处理完词汇表后,我们可以查看它包含的内容:

[En]

After the glossary is processed, we can take a look at what it contains:

print(en_vocab[:10])
print(en_vocab[100:110])
print(en_vocab[1000:1010])
print(en_vocab[-10:])

输出如下,可以看到词汇表不是严格按照每个英语单词来划分的,例如’##ers’表示某个单词如果以ers结尾,则会划分出一个’##ers’的token

['[PAD]', '[UNK]', '[START]', '[END]', '!', '"', '$', '%', '&', "'"]
['ll', 'there', 've', 'and', 'him', 'time', 'here', 'about', 'get', 'didn']
['##ers', 'chair', 'earth', 'honest', 'succeed', '##ted', 'animals', 'bill', 'drank', 'lend']
['##?', '##j', '##q', '##z', '##°', '##–', '##—', '##‘', '##’', '##€']

把词汇表保存为文件,然后我们就可以实例化两个tokenizer,以实现对法语和英语句子的token化处理。

def write_vocab_file(filepath, vocab):
    with open(filepath, 'w') as f:
        for token in vocab:
            print(token, file=f)
write_vocab_file('fr_vocab.txt', fr_vocab)
write_vocab_file('en_vocab.txt', en_vocab)

fr_tokenizer = text.BertTokenizer('fr_vocab.txt', **bert_tokenizer_params)
en_tokenizer = text.BertTokenizer('en_vocab.txt', **bert_tokenizer_params)

下面我们可以测试一下对一些英语句子进行token处理后的结果,这里我们需要给每个句子的开头和结尾分别加上[START]和[END]这两个特殊的token,这样可以方便以后模型的训练。

START = tf.argmax(tf.constant(reserved_tokens) == "[START]")
END = tf.argmax(tf.constant(reserved_tokens) == "[END]")

def add_start_end(ragged):
    count = ragged.bounding_shape()[0]
    starts = tf.fill([count,1], START)
    ends = tf.fill([count,1], END)
    return tf.concat([starts, ragged, ends], axis=1)

sentences = ["Hello Roy!", "The sky is blue.", "Nice to meet you!"]

add_start_end(en_tokenizer.tokenize(sentences).merge_dims(1,2)).to_tensor()

输出结果如下:

<tf.tensor: shape="(3," 7), dtype="int64," numpy="array([[" 2, 1830, 45, 3450, 4, 3, 0], [ 62, 1132, 64, 996, 13, 3], 353, 61, 416, 60, 3]])></tf.tensor:>

构建数据集

现在我们可以构建训练集和验证集了。这里需要把法语和英语的句子都包括在数据集中,其中法语句子作为Transformer编码器的输入,英语句子作为解码器的输入以及模型输出的Target。这里我们用Pandas构造一个Dataframe,随机划分其中80%的数据为训练集,其余为验证集。然后转换为Tensorflow的dataset

df = pd.DataFrame(data={'fra':new_fra, 'eng':new_eng})

Shuffle the Dataframe
recordnum = df.count()['fra']
indexlist = list(range(recordnum-1))
random.shuffle(indexlist)
df_train = df.loc[indexlist[:int(recordnum*0.8)]]
df_val = df.loc[indexlist[int(recordnum*0.8):]]

ds_train = tf.data.Dataset.from_tensor_slices((df_train.fra.values, df_train.eng.values))
ds_val = tf.data.Dataset.from_tensor_slices((df_val.fra.values, df_val.eng.values))

查看训练集的句子最多包含多少个token

lengths = []

for fr_examples, en_examples in ds_train.batch(1024):
    fr_tokens = fr_tokenizer.tokenize(fr_examples)
    lengths.append(fr_tokens.row_lengths())

    en_tokens = en_tokenizer.tokenize(en_examples)
    lengths.append(en_tokens.row_lengths())
    print('.', end='', flush=True)

all_lengths = np.concatenate(lengths)

plt.hist(all_lengths, np.linspace(0, 100, 11))
plt.ylim(plt.ylim())
max_length = max(all_lengths)
plt.plot([max_length, max_length], plt.ylim())
plt.title(f'Max tokens per example: {max_length}');

从结果中可以看到训练集的句子转换为token后最多包含67个token:

基于Tensorflow实现一个Transformer翻译器

之后就可以为数据集生成batch,如以下代码:

BUFFER_SIZE = 20000
BATCH_SIZE = 64
MAX_TOKENS = 67

def filter_max_tokens(fr, en):
    num_tokens = tf.maximum(tf.shape(fr)[1],tf.shape(en)[1])
    return num_tokens < MAX_TOKENS

def tokenize_pairs(fr, en):
    fr = add_start_end(fr_tokenizer.tokenize(fr).merge_dims(1,2))
    # Convert from ragged to dense, padding with zeros.

    fr = fr.to_tensor()

    en = add_start_end(en_tokenizer.tokenize(en).merge_dims(1,2))
    # Convert from ragged to dense, padding with zeros.

    en = en.to_tensor()
    return fr, en

def make_batches(ds):
    return (
        ds
        .cache()
        .shuffle(BUFFER_SIZE)
        .batch(BATCH_SIZE)
        .map(tokenize_pairs, num_parallel_calls=tf.data.AUTOTUNE)
        .filter(filter_max_tokens)
        .prefetch(tf.data.AUTOTUNE))

train_batches = make_batches(ds_train)
val_batches = make_batches(ds_val)

可以生成一个batch来查看一下:

for a in train_batches.take(1):
    print(a)

结果如下,可见每个batch包含两个tensor,分别对应法语和英语句子转化为token之后的向量,每个句子以token 2开头,以token 3结尾:

(<tf.tensor: shape="(64," 24), dtype="int64," numpy="array([[" 2, 39, 9, ..., 0, 0], [ 62, 43, 147, 70, 4310, 14, 68, 64, 0]])>, <tf.tensor: shape="(64," 20), dtype="int64," numpy="array([[" 2, 36, 76, ..., 0, 0], [ 75, 92, 80, 68, 60, 67, 9, 0]])>)</tf.tensor:></tf.tensor:>

给输入数据添加位置信息

把上面得到的batch数据输入到embedding层,就可以把每个token转化为一个高位向量,例如转换为一个128维的向量。之后我们需要给这个向量增加一个位置信息以表示这个token在句子中的位置。论文给出了一种对位置信息进行编码的方法,如以下的公式:

基于Tensorflow实现一个Transformer翻译器

基于Tensorflow实现一个Transformer翻译器

公式中pos表示词语的位置,例如一个句子有50个单词,pos取值范围为0-49. d_model表示embedding的维度,例如把每个单词映射为一个128维的向量,d_model=128. i表示这128维里面的维度,取值范围为0-127
因此公式的含义为,对第N个单词,在其128维的嵌入向量中,每个维度都加上对应的位置信息.

以第3个单词为例,pos=2, 在其对应的128维向量,其偶数维(0,2,4…)需要加上sin(2/10000^(2i/128)),2i的对应取值是(0,2,4…). 第2i+1维(1,3,5…)需要加上cos(2/10000^(2i/128)),2i的对应取值是(0,2,4…)

以下代码将生成位置编码向量,这个向量可以加入到token的嵌入向量中。

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)

    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

创建Padding掩码和look ahead掩码

Mask用于标识输入序列中为0的位置,如果为0,则Mask为1. 这样可以使得padding的字符不会参与到模型的训练中
Look ahead mask是用于在预测是掩盖未来的字符,例如翻译一句法语,对应的英语是目标数据,在训练时,当预测第一个英语单词时,需要把整句英语都掩盖,当预测第二个英语单词时,需要把整句英语的第一个单词之后的都掩盖。这个目的是避免让模型看到之后要预测的单词,影响模型的训练。

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)

    # add extra dimensions to add the padding
    # to the attention logits.

    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)

自注意力计算

现在来到了Transformer的核心概念了,我们需要把输入的向量,通过三个线性转换的矩阵,把它变为Q,K,V三个向量。
通过计算Q和K的相似性来得到注意力系数,再和V相乘,得到对应的数值,如以下的图片:

基于Tensorflow实现一个Transformer翻译器

注意力权重的计算公式如下:

[En]

The formula for calculating the attention weight is as follows:

基于Tensorflow实现一个Transformer翻译器

解释一下这个公式,这里的K和V代表了Key和Value,Q是查询的内容。假设有一句话”Tom is a boy”,这句话有4个单词,也就是4个token。通过线性变换之后,每个token都有对应的Q,K, V。当用Tom这个token的Q来做查询时,将比较这个token的Q值与所有4个token的K值,看哪个最相似,然后计算出一个注意力权重,例如我们假定Tom除了和Tom最相似外,和boy是第二相似的,那么通过softmax之后得到的注意力权重是[0.9, 0.005, 0.005, 0.09], 然后再和每个Token的V值相乘,得到最后的注意力值,这个值里面就是每个token的V值根据注意力权重分配后累加之后的数值,包含了token之间的关系。

另外也可以用电商网站的例子来做类比,每个产品都有一个Key来描述,例如PS3游戏机,Value表示这个产品的价格。那么我们输入一个Query词语”PS游戏”时,网站就会进行比对,找到最相似的产品并展示。

具体到上面的计算公式,例如每个token都编码为一个128维的向量。通过三个Q,K,V线性变换矩阵来做变换,其中Q,K矩阵的输出维度为64,V矩阵的输出维度为100。以输入一个批量32个句子为例,这些句子最长的一个有20个token,那么输入的维度是32×20×128。变换之后,Q是32×20×64,K是32×20×64,V是32×20×100。对Q和K的转置矩阵K’进行矩阵乘法,即matmul(Q, K’),得到的结果的维度是32×20×20,表示每个句子中的每个token的Q都和这个句子中的所有token的K做了点乘,计算相似度。在公式中对这个计算结果还要进行缩放,除以维度的开方,即64的开方8,这样做可以使得无论Q,K的维度多大,最后得到的结果的方差保持不变。对这个结果进行Softmax归一处理,得到每个token和其他token的注意力权重。再把这个值与V相乘,得到的结果的维度为32×20×100,即每个句子中的每个token都获得了一个100维的向量表达,这里面编码了token和其他token之间的一些关系。

在代码实现的时候,还要给句子的padding_mask乘以一个很大的负数,加到注意力权重的结果中,再进行softmax计算。这个目的是,对于padding_mask为1的位置,表示这个token是一个padding,没有实际的含义。因此这个位置的注意力权重加上一个很大的负数之后,softmax的结果就是接近于0,这样就可以排除掉padding token的影响。

以下是代码实现:

def scaled_dot_product_attention(q, k, v, mask):
    """Calculate the attention weights.

    q, k, v must have matching leading dimensions.

    k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.

    The mask has different shapes depending on its type(padding or look ahead)
    but it must be broadcastable for addition.

    Args:
    q: query shape == (..., seq_len_q, depth)
    k: key shape == (..., seq_len_k, depth)
    v: value shape == (..., seq_len_v, depth_v)
    mask: Float tensor with shape broadcastable
          to (..., seq_len_q, seq_len_k). Defaults to None.

    Returns:
    output, attention_weights
"""

    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    # scale matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # add the mask to the scaled tensor.

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    # softmax is normalized on the last axis (seq_len_k) so that the scores
    # add up to 1.

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights

多头注意力

了解了注意力机制的原理后,可以构造一个多头注意力。这里多头的意思是使得模型可以从不同的层面来关注token之间的关系。例如可以想象其中一头是关注token之间的表达含义的关系,另一头是关注token之间的语法关系。

Multi-head的结构如下图:

基于Tensorflow实现一个Transformer翻译器

这个Multi-head的结构包括了3部分:

  • 线性变换层
  • Scaled dot product attention
  • 最后的线性变换层

在具体编码实现的时候,我们可以把以上的层按照heads数量进行合并,最后计算完之后再拆分。
例如有8个head, 每个head的线性变换层是转换为一个32维的输出,那么我们可以用一个大的线性变换层来统一处理,输出为32*8维,再把结果的维度修改为[…, 8, 32],把结果统一用一个scaled dot product attention处理,处理之后把结果再按照head数整合,然后经过最后的线性变换层输出。以下是代码实现,封装为一个keras的层:

class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self,*, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).

        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
"""
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

Point wise feed forward network

Multi-head attention输出的结果再通过一个point wise feed forward network进行转换,这个网络由两个全连接层组成,连接层之间采用ReLU进行激活,代码如下:

def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
      tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])

Transformer模型

有了以上的基础模块之后,我们就可以搭建整个transformer模型了。模型由编码器和解码器两大部分组成,如下图:

基于Tensorflow实现一个Transformer翻译器

我们首先看左边的编码器部分,这个编码器由N个编码层顺序连接组成。第一个编码层接收最下方的输入,对于我们的例子来说,输入就是法语的句子,经过编码之后的向量。例如是一个[64, 32, 128]的向量,表示每个批次有64个句子,这个批次里面最长的句子包括了32个token,每个token被编码为128维的向量表达。这个输入向量加入位置编码信息之后,就是编码器的第一个编码层的输入了。

除了第一个编码层之外,其他编码层以上一个编码层的输出为输入。最后一个编码层的输出V,K作为解码器的输入。

再看一下右边的解码器部分,同样解码器也是由N个解码层顺序连接组成。每个解码层包括了两个multi-head attention(MHA)模块。第一个解码层接收最下方的输入,对于我们的例子来说,就是法语的句子对应的英语句子翻译,经过编码之后的向量。例如是一个[64, 48, 128]的向量,表示每个批次有64个句子,这个批次里面最长的句子包括了48个token,每个token被编码为128维的向量表达。这个输入向量加入位置编码信息之后,就是解码器的第一个编码层的输入了。这个输入经过第一个编码层的MHA处理之后,输出的值作为第二个MHA的Q值输入,第二个MHA的V,K输入是编码器的输出。最终这个解码层的输出结果作为第二个解码层的第一个MHA的输入,MHA的输出作为第二个MHA的Q值,V,K是编码器的输出,从而得到第二个解码层的输出。如此类推,直到第N个解码层处理完毕,把结果通过一个线性变化之后,通过Softmax计算预测的概率。

这里解码器的输入需要把对应的look head mask传入,以使得模型不会看到实际预测的单词。
例如我们输入一个法语句子,最终翻译的英语句子是”Tom is a boy”,这个句子编码后是6个token,包含了[start]和[end]两个token. 对应的look ahead mask是一个6*6的矩阵。

编码器

编码器可以包括多个编码层,首先定义编码层,例如以下代码

[En]

The encoder can include multiple coding layers, first defining a coding layer, such as the following code

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self,*, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()
        self.mha = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)
        return out2

定义编码器,它包括以下三个部分:

[En]

Define the encoder, which includes the following three parts:

  • 输入的编码
  • 位置编码
  • 多个编码层

输入的句子的每个单词token化之后,根据token id查找对应的嵌入向量,然后根据token的位置添加位置编码信息,然后作为编码器的输入。编码器最后的输出,将作为解码器的输入。

class Encoder(tf.keras.layers.Layer):
    def __init__(self,*, num_layers, d_model, num_heads, dff, input_vocab_size, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(MAX_TOKENS, self.d_model)

        self.enc_layers = [
            EncoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, rate=rate)
            for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):

        seq_len = tf.shape(x)[1]

        # adding embedding and position encoding.

        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)

解码器

以下是解码层的代码

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self,*, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()
        self.mha1 = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.mha2 = MultiHeadAttention(d_model=d_model, num_heads=num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        # enc_output.shape == (batch_size, input_seq_len, d_model)

        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # (batch_size, target_seq_len, d_model)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)  # (batch_size, target_seq_len, d_model)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)

        ffn_output = self.ffn(out2)  # (batch_size, target_seq_len, d_model)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)

        return out3, attn_weights_block1, attn_weights_block2

定义解码器

class Decoder(tf.keras.layers.Layer):
    def __init__(self,*, num_layers, d_model, num_heads, dff, target_vocab_size,
               rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(MAX_TOKENS, d_model)

        self.dec_layers = [
            DecoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, rate=rate)
            for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training,
           look_ahead_mask, padding_mask):

        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)

            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2

        # x.shape == (batch_size, target_seq_len, d_model)
        return x, attention_weights

组装模型

一旦定义了编码器和解码器,就可以组装整个模型。

[En]

Once the encoder and decoder are defined, the entire model can be assembled.

class Transformer(tf.keras.Model):
    def __init__(self,*, num_layers, d_model, num_heads, dff, input_vocab_size,
               target_vocab_size, rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                               num_heads=num_heads, dff=dff,
                               input_vocab_size=input_vocab_size, rate=rate)

        self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                               num_heads=num_heads, dff=dff,
                               target_vocab_size=target_vocab_size, rate=rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inputs, training):
        # Keras models prefer if you pass all your inputs in the first argument
        inp, tar = inputs

        padding_mask, look_ahead_mask = self.create_masks(inp, tar)

        enc_output = self.encoder(inp, training, padding_mask)  # (batch_size, inp_seq_len, d_model)

        # dec_output.shape == (batch_size, tar_seq_len, d_model)
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, padding_mask)

        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

        return final_output, attention_weights

    def create_masks(self, inp, tar):
        # Encoder padding mask (Used in the 2nd attention block in the decoder too.)
        padding_mask = create_padding_mask(inp)

        # Used in the 1st attention block in the decoder.

        # It is used to pad and mask future tokens in the input received by
        # the decoder.

        look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
        dec_target_padding_mask = create_padding_mask(tar)
        look_ahead_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

        return padding_mask, look_ahead_mask

优化器定义

按照论文,Optimizer采用Adam算法,学习率按照以下公式来计算:

基于Tensorflow实现一个Transformer翻译器
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

损失函数

模型的预测值是token的序号,可以理解为类别。因此采用类别的交叉熵来计算Loss值。以下代码定义了一个损失函数,以及一个计算模型准确率指标的函数。

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_sum(loss_)/tf.reduce_sum(mask)

def accuracy_function(real, pred):
    accuracies = tf.equal(real, tf.argmax(pred, axis=2))

    mask = tf.math.logical_not(tf.math.equal(real, 0))
    accuracies = tf.math.logical_and(mask, accuracies)

    accuracies = tf.cast(accuracies, dtype=tf.float32)
    mask = tf.cast(mask, dtype=tf.float32)
    return tf.reduce_sum(accuracies)/tf.reduce_sum(mask)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.Mean(name='train_accuracy')

模型训练

现在我们可以对模型进行训练了。我们的输入是法语和英语的句子对,经过token处理和向量化表达的数据。其中法语的数据作为编码器的输入,英语的数据分为tar_inp和tar_real两部分。tar_inp作为解码器的输入。tar_real作为模型训练的目标值,和模型输出的预测值作loss的计算。

例如英语的句子为’SOS A lion in the jungle is sleeping EOS’,SOS和EOS分别表示开头和结束的特殊Token。那么tar_inp为’SOS A lion in the jungle is sleeping’,tar_real为’A lion in the jungle is sleeping EOS’。可以理解为首先输入这个英语句子对应的法语句子到编码器,并且输入tar_inp的第一个token ‘SOS’到解码器,我们预期模型应该能够翻译出第一个英语单词,把这个英语单词和tar_real的目标’A’相比较,计算loss。然后我们再输入tar_inp的头两个token’SOS A’到解码器,预期模型能翻译出第二个英语单词,计算这第二个英语单词和tar_real的目标’lion’的loss。如此类推直到tar_inp的最后一个token。在实际训练中,tar_inp和tar_real是一次全部传给模型的,结合look_ahead_mask就可以完成以上的训练过程。

首先我们实例化一个Transformer,如以下代码:

input_vocab_size = 0
target_vocab_size = 0
with open('fr_vocab.txt', 'r') as f:
    input_vocab_size = len(f.readlines())
with open('en_vocab.txt', 'r') as f:
    target_vocab_size = len(f.readlines())

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=input_vocab_size,
    target_vocab_size=target_vocab_size,
    rate=dropout_rate)

定义checkpoint在训练过程中保存模型:

checkpoint_path = './checkpoints/train'

#定义两个trackable object需要保存
ckpt = tf.train.Checkpoint(transformer=transformer, optimizer=optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

if a checkpoint exists, restore the latest checkpoint.

if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print('Latest checkpoint restored!!')

定义一个训练函数:

EPOCHS = 20

The @tf.function trace-compiles train_step into a TF graph for faster
execution. The function specializes to the precise shape of the argument
tensors. To avoid re-tracing due to the variable sequence lengths or variable
batch sizes (the last batch is smaller), use input_signature to specify
more generic shapes.

train_step_signature = [
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
    tf.TensorSpec(shape=(None, None), dtype=tf.int64),
]

@tf.function(input_signature=train_step_signature)
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
    print(tar_real)
    with tf.GradientTape() as tape:
        predictions, _ = transformer([inp, tar_inp], training = True)
        loss = loss_function(tar_real, predictions)

    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

    train_loss(loss)
    train_accuracy(accuracy_function(tar_real, predictions))

然后就可以开始训练了,经过20轮训练,准确率达到86.3%:

[En]

Then you can start training, and after 20 rounds of training, the accuracy reaches 86.3%:

for epoch in range(EPOCHS):
    start = time.time()

    train_loss.reset_states()
    train_accuracy.reset_states()

    # inp -> portuguese, tar -> english
    for (batch, (inp, tar)) in enumerate(train_batches):
        try:
            train_step(inp, tar)
        except ValueError:
            print(inp)
            print('-------')
            print(tar)

        if batch % 50 == 0:
            print(f'Epoch {epoch + 1} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')

    if (epoch + 1) % 5 == 0:
        ckpt_save_path = ckpt_manager.save()
        print(f'Saving checkpoint for epoch {epoch+1} at {ckpt_save_path}')

    print(f'Epoch {epoch + 1} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')

    print(f'Time taken for 1 epoch: {time.time() - start:.2f} secs\n')

模型预测

模型训练完成后,就到了激动人心的时刻了。我们可以检验一下这个法语英语翻译器到底能否完成翻译任务呢。为此我们构建一个Translator的类,这个类在翻译的时候接收一个法语句子,在翻译前需要先添加上START, END这两个token,然后模型就会给出预测的英语Token,直到预测的TOKEN为END

class Translator(tf.Module):
    START = tf.argmax(tf.constant(reserved_tokens) == "[START]")
    END = tf.argmax(tf.constant(reserved_tokens) == "[END]")

    def __init__(self, fr_tokenizer, en_tokenizer, transformer):
        self.fr_tokenizer = fr_tokenizer
        self.en_tokenizer = en_tokenizer
        self.transformer = transformer

    def _add_start_end(self, ragged):
        count = ragged.bounding_shape()[0]
        starts = tf.fill([count,1], START)
        ends = tf.fill([count,1], END)
        return tf.concat([starts, ragged, ends], axis=1)

    def __call__(self, sentence, max_length=MAX_TOKENS):
        # input sentence is french, hence adding the start and end token
        assert isinstance(sentence, tf.Tensor)
        if len(sentence.shape) == 0:
            sentence = sentence[tf.newaxis]
        #print(sentence)
        #print(self.fr_tokenizer.tokenize(sentence))
        #print(self.fr_tokenizer.tokenize(sentence).merge_dims(1,2))
        sentence = self._add_start_end(self.fr_tokenizer.tokenize(sentence).merge_dims(1,2)).to_tensor()

        encoder_input = sentence

        # As the output language is english, initialize the output with the
        # english start token.

        #start_end = self.en_tokenizer.tokenize([''])[0]
        start_end = self._add_start_end(en_tokenizer.tokenize(['']).merge_dims(1,2))[0]
        start = start_end[0][tf.newaxis]
        end = start_end[1][tf.newaxis]

        # tf.TensorArray is required here (instead of a python list) so that the
        # dynamic-loop can be traced by tf.function.

        output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
        output_array = output_array.write(0, start)

        for i in tf.range(max_length):
            output = tf.transpose(output_array.stack())
            predictions, _ = self.transformer([encoder_input, output], training=False)

            # select the last token from the seq_len dimension
            predictions = predictions[:, -1:, :]  # (batch_size, 1, vocab_size)

            predicted_id = tf.argmax(predictions, axis=-1)

            # concatentate the predicted_id to the output which is given to the decoder
            # as its input.

            output_array = output_array.write(i+1, predicted_id[0])

            if predicted_id == end:
                break

        output = tf.transpose(output_array.stack())
        # output.shape (1, tokens)
        text = en_tokenizer.detokenize(output)[0]  # shape: ()

        #tokens = en_tokenizer.lookup(output)[0]

        # tf.function prevents us from using the attention_weights that were
        # calculated on the last iteration of the loop. So recalculate them outside
        # the loop.

        _, attention_weights = self.transformer([encoder_input, output[:,:-1]], training=False)

        #return text, tokens, attention_weights
        return text, attention_weights

translator = Translator(fr_tokenizer, en_tokenizer, transformer)

定义一个辅助函数,打印模型输入的法语句子、对应的英语句子和模型预测的英语句子:

[En]

Define an auxiliary function to print the French sentences entered by the model, the corresponding English sentences and the English sentences predicted by the model:

def print_translation(sentence, tokens, ground_truth):
    prediction_text = []
    tokens_numpy = tokens.numpy()
    for i in range(1, tokens_numpy.shape[0]-1):
        prediction_text.append(tokens_numpy[i].decode("utf-8"))
    prediction_text = ' '.join(prediction_text)
    print(f'{"Input:":15s}: {sentence}')
    print(f'{"Prediction":15s}: {prediction_text}')
    print(f'{"Ground truth":15s}: {ground_truth}')

下面,我们可以测试验证集中的几个法语句子:

[En]

Below, we can test a few French sentences from the verification set:

sentence = "c'est une histoire tellement triste."
ground_truth = "this is such a sad story."

translated_text, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

输出如下:

Input:         : c&#x2019;est une histoire tellement triste.

Prediction     : that ' s such a sad story .
Ground truth   : this is such a sad story.

然后我试着输入任何法语句子,因为我不懂法语,我只能先做一个英语句子,然后在Google翻译中把它翻译成法语句子。

[En]

Then I try to type in any French sentence, because I don’t know French, I can only make an English sentence first, and then translate it into French sentence in Google Translation.

sentence = "Ces pratiques sont essentiellement inefficaces et peuvent entraîner des risques pour la santé et la pollution de l'environnement."
ground_truth = "These practices are essentially ineffective, and can cause health hazards and environmental pollution."

translated_text, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

结果如下所示,可以看出翻译不是很准确,但大体意思仍然接近,可以看出当前训练集不够大,如果有更多的数据,应该能够提高模型的性能。

[En]

The results are as follows, it can be seen that the translation is not very accurate, but the general meaning is still close, it can be seen that the current training set is not large enough, if there is more data, it should be able to improve the performance of the model.

Input:         : Ces pratiques sont essentiellement inefficaces et peuvent entra&#xEE;ner des risques pour la sant&#xE9; et la pollution de l'environnement.

Prediction     : these practices are essentially invinivities and practicing health and pollution .
Ground truth   : These practices are essentially ineffective, and can cause health hazards and environmental pollution.

结论

通过对TensorFlow官网的transformer教程的学习,实现了一个法语翻译为英语的模型,下一步可以尝试一下中文翻译为英语,按照官网的介绍,中文,日语等语言的Token化的过程和英语法语不同,需要尝试另外一种token的方法,这个留待以后进一步研究。

Original: https://blog.csdn.net/gzroy/article/details/124460547
Author: gzroy
Title: 基于Tensorflow实现一个Transformer翻译器

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/497045/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球