论文:Attention is All You Need
2017年Google在论文《Attention is All You Need》中提出了Transformer模型,并成功应用到NLP领域。 该模型完全基于自注意力机制Attention mechanism实现,弥补了传统的RNN模型的不足。
1 2 3 4 5 6 7 8 9 10 11 import torchimport torch.nn as nnimport numpy as npimport mathd_model = 512 d_ff = 2048 src_vocab_size = 10 tgt_vocab_size = 10 n_layers = 6
宏观上,Transformer可以看作一个黑箱操作的Seq2Seq模型。 拆开黑箱,可以看到模型的本质是一个Encoder-Decoder结构。
Embedding:词嵌入层,将输入的词转换为词向量
Positional Encoding:位置编码,为了让模型学习到序列的位置信息
Multi-Head Attention:多头注意力机制,用于捕捉输入序列的全局依赖关系
Add & Norm:残差连接和层归一化,用于加速训练、缓解梯度消失问题
Add残差连接:把网络的输入和输出相加,有效解决梯度消失问题
Norm层归一化:对网络的输出进行归一化处理,加速训练
Feed Forward:前馈神经网络,用于对特征进行非线性变换
Linear:线性变换层,用于将特征映射到输出空间
Softmax:Softmax层,用于输出概率分布
1.Encoder
1.1 Embedding 词嵌入 模型无法直接处理文本数据,需要把文本数据转换成计算机能够识别的向量形式。 将文本转化为向量通常有两种方式:
One-hot编码:词典大小为N,每个词用一个N维向量表示,当前词对应的维度为1,其余维度为0
优点:简单直观
缺点:向量维度高、稀疏、缺乏语义之间的联系
Embedding词嵌入:通过训练学习到的词向量,将词映射到一个低维空间
输入Imput的维度是[batch_size, seq_len],Embedding层的输出维度是[batch_size, seq_len, d_model]。
batch_size:批次大小(句子个数)
seq_len:最长句子长度
d_model:词向量维度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class Embeddings (nn.Module): def __init__ (self, vocab_size, d_model ): super (Embeddings, self).__init__() self.lut = nn.Embedding(vocab_size, d_model) self.d_model = d_model def forward (self, x ): """Embedding层的前向传播 Args: x: 输入的词索引张量,形状为(batch_size, seq_len) Returns: 词嵌入张量,形状为(batch_size, seq_len, d_model) """ return self.lut(x) * math.sqrt(self.d_model)
1.2 Positional Encoding 位置编码 Embedding层只能表示词的语义信息,无法表示词的位置信息。 Transformer使用的是自注意力机制来提取信息,一个句子中的每个字/词是并行计算,虽然处理每个字的时候考虑到了所有字对其的影响,但是并没有考虑到各个字相互之间的位置信息,也就是上下文,所以需要引入位置信息
为了让模型学习到序列的位置信息,需要在Embedding层的输出上加上位置编码,得到含有位置信息的词向量$\alpha$。
Transformer中使用Positional Encoding表示每个字、词的位置信息,公式如下:
其中:
$PE_{(pos, i)}$:表示第$pos$个字/词的Encoding向量第$i$维的编码值
$pos$:位置信息,表示句子中的第几个字/词,从0开始
$i$:位置编码的维度,从0开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class PositionalEncoding (nn.Module): def __init__ (self, d_model, dropout=0.1 , max_len=5000 ): super (PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len, d_model) position = torch.arange(0 , max_len).unsqueeze(1 ).float () div_term = torch.exp(torch.arange(0 , d_model, 2 ).float () * -(math.log(10000.0 ) / d_model)) pe[:, 0 ::2 ] = torch.sin(position * div_term) pe[:, 1 ::2 ] = torch.cos(position * div_term) pe = pe.unsqueeze(0 ) self.register_buffer('pe' , pe) def forward (self, x ): """位置编码层的前向传播 Args: x: 输入的词嵌入张量,形状为(batch_size, seq_len, d_model) Returns: 添加了位置编码的词嵌入张量,形状为(batch_size, seq_len, d_model) """ x = x + self.pe[:, :x.size(1 )].clone().detach() return self.dropout(x)
1.3 Self Attention 自注意力机制 一句话中,与语义紧密相关的关键词需要予以更多的关注,而无关的连接词和辅助词则可以忽略。 在机器翻译时,更多的注意表现为更大的权重,越重要的词权重越大。
包含位置信息的词向量$\alpha^i$(句子的第i+1个词的词向量)作为Self Attention的输入,分别乘以三个权重矩阵$W^Q$、$W^K$、$W^V$,得到:
$q^i$:Query,查询向量
$k^i$:Key,键向量,“被查”时的向量
$v^i$:Value,值向量,“内容”的向量
以4个字的句子“我是学生”为例,计算第0个字“我”的Self Attention:
计算 $q^0$ 和 $k^0,k^1,k^2,k^3$ 的点积,得到4个注意力值 $\alpha{00},\alpha {01},\alpha{02},\alpha {03}$
经过Softmax归一化,得到4个注意力分数 $\hat{\alpha{00}},\hat{\alpha {01}},\hat{\alpha{02}},\hat{\alpha {03}}$ ,它们的和为1
将这些分数作为权重,对 $v^0,v^1,v^2,v^3$ 进行加权求和,得到Self Attention的输出
$b^0=\hat{\alpha{00}}v^0+\hat{\alpha {01}}v^1+\hat{\alpha{02}}v^2+\hat{\alpha {03}}v^3$
为了加速计算,可以使用矩阵运算:
其中$\alpha_{ij}=\dfrac{q_i * k_j^T}{\sqrt{d_k}}$,$d_k$是词向量的维度
最后计算$b^i=\sum{j=0}^{n-1}\hat{\alpha {ij}}v_j$
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class ScaledDotProductAttention (nn.Module): def __init__ (self, scale_factor, dropout=0.0 ): super ().__init__() self.scale_factor = scale_factor self.dropout = nn.Dropout(dropout) def forward (self, Q, K, V, mask=None ): """前向传播 batch_size: 批大小 num_heads: 多头注意力的头数,论文默认为8 seq_len: 序列长度 d_k, d_v: 键和值的维度,默认都是64 Args: Q: 查询张量,形状为(batch_size, num_heads, seq_len, d_k) K: 键张量,形状为(batch_size, num_heads, seq_len, d_k) V: 值张量,形状为(batch_size, num_heads, seq_len, d_v) mask: 掩码张量,形状为(batch_size, seq_len, seq_len),默认为None Returns: 上下文张量和注意力张量 """ attn = torch.matmul(Q / self.scale_factor, K.transpose(2 , 3 )) attn = self.dropout(torch.softmax(attn, dim=-1 )) output = torch.matmul(attn, V) return output, attn
1.4 Multi-Head Attention 多头注意力机制 多头注意力机制就是把$q^i, k^i, v^i$三个矩阵从特征维度(词向量长度)方向上拆分成为形状相同的小矩阵。 再将每个Head Attention的输出拼接起来,得到最终的Multi-Head Attention输出。
理解:多个头分别关注不同的特征子空间,最后再将这些子空间的信息融合起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 class MultiHeadAttention (nn.Module): def __init__ (self, n_head=8 , d_model=512 , d_k=64 , d_v=64 , droupout=0.1 ): super ().__init__() self.n_head = n_head self.d_k = d_k self.d_v = d_v self.w_qs = nn.Linear(d_model, n_head * d_k, bias=False ) self.w_ks = nn.Linear(d_model, n_head * d_k, bias=False ) self.w_vs = nn.Linear(d_model, n_head * d_v, bias=False ) self.fc = nn.Linear(n_head * d_v, d_model, bias=False ) self.attention = ScaledDotProductAttention(scale_factor=d_k ** 0.5 ) self.dropout = nn.Dropout(droupout) self.layer_norm = nn.LayerNorm(d_model, eps=1e-6 ) def forward (self, Q, K, V, mask=None ): d_k, d_v, n_head = self.d_k, self.d_v, self.n_head batch_size, len_q, len_k, len_v = Q.size(0 ), Q.size(1 ), K.size(1 ), V.size(1 ) residual = Q Q = self.layer_norm(Q) K = self.layer_norm(K) V = self.layer_norm(V) Q = self.w_qs(Q).view(batch_size, len_q, n_head, d_k) K = self.w_ks(K).view(batch_size, len_k, n_head, d_k) V = self.w_vs(V).view(batch_size, len_v, n_head, d_v) Q, K, V = Q.transpose(1 , 2 ), K.transpose(1 , 2 ), V.transpose(1 , 2 ) if mask is not None : mask = mask.unsqueeze(1 ) Q, attn = self.attention(Q, K, V, mask=mask) Q = Q.transpose(1 , 2 ).contiguous().view(batch_size, len_q, -1 ) Q = self.dropout(self.fc(Q)) Q += residual Q = self.layer_norm(Q) return Q, attn
1.5 Add & Norm 残差连接和层归一化 Add残差链接就是将网络的输入和输出直接相加,主要是为了解决梯度消失问题。
Layer Normalization是对网络的输出进行归一化处理,加速训练。 使$b$的每一行,也就是每个句子,归一化为标准正态分布,输出为$\hat b$,归一化公式如下:
均值:$\mui=\dfrac{1}{d}\sum {j=1}^{d}b_{ij}$
方差:$\sigmai^2=\dfrac{1}{d}\sum {j=1}^{d}(b_{ij}-\mu_i)^2$
归一化:$\hat b{ij}=\dfrac{b {ij}-\mu_i}{\sqrt{\sigma_i^2+\epsilon}}*\gamma+\beta$
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class LayerNorm (nn.Module): def __init__ (self, d_model, eps=1e-12 ): super (LayerNorm, self).__init__() self.gamma = nn.Parameter(torch.ones(d_model)) self.beta = nn.Parameter(torch.zeros(d_model)) self.eps = eps def forward (self, x ): mean = x.mean(-1 , keepdim=True ) var = x.var(-1 , unbiased=False , keepdim=True ) out = (x - mean) / torch.sqrt(var + self.eps) out = self.gamma * out + self.beta return out
1.6 Feed Forward 前馈神经网络 Add & Norm层后接一个全连接的前馈神经网络,用于对特征进行非线性变换
前馈神经网络的结构是两个全连接层,第一个全连接层的激活函数是ReLU,第二个全连接层没有激活函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class PositionwiseFeedForward (nn.Module): def __init__ (self ): super (PositionwiseFeedForward, self).__init__() self.fc = nn.Sequential( nn.Linear(d_model, d_ff, bias=False ), nn.ReLU(), nn.Linear(d_ff, d_model, bias=False )) self.layer_norm = nn.LayerNorm(d_model, eps=1e-6 ) def forward (self, inputs ): resdual = inputs output = self.fc(inputs) output = self.layer_norm(output + resdual) return output
1.7 Mask掉 停用词 句子中没有意义的占位符,例如“我是学生P”中的P是停止符,没有实际意义,需要将其mask掉。
1 2 3 4 5 6 def get_attn_pad_mask (seq_q, seq_k ): batch_size, len_q = seq_q.size() batch_size, len_k = seq_k.size() pad_attn_mask = seq_k.data.eq(0 ).unsqueeze(1 ) return pad_attn_mask.expand(batch_size, len_q, len_k)
1.8 EncoderLayer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class EncoderLayer (nn.Module): def __init__ (self ): super (EncoderLayer, self).__init__() self.enc_self_attn = MultiHeadAttention() self.pos_ffn = PositionwiseFeedForward() def forward (self, enc_inputs, enc_self_attn_mask ): """前向传播 Args: enc_inputs: 编码器输入张量,形状为(batch_size, seq_len, d_model) enc_self_attn_mask: 编码器自注意力掩码,形状为(batch_size, seq_len, seq_len) Returns: 编码器输出张量,形状为(batch_size, seq_len, d_model) """ enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) enc_outputs = self.pos_ffn(enc_outputs) return enc_outputs, attn
1.9 Encoder 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class Encoder (nn.Module): def __init__ (self ): super (Encoder, self).__init__() self.src_emb = Embeddings(src_vocab_size, d_model) self.pos_emb = PositionalEncoding(d_model) self.layers = nn.ModuleList([EncoderLayer() for _ in range (n_layers)]) def forward (self, enc_inputs ): enc_outputs = self.src_emb(enc_inputs) enc_outputs = self.pos_emb(enc_outputs) enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) enc_self_attns = [] for layer in self.layers: enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) enc_self_attns.append(enc_self_attn) return enc_outputs, enc_self_attns
2.Decoder Masked Multi-Head Attention:与MultiHead Attention类似,只是在计算Self Attention时,需要mask掉未来的信息
Multi-Head Attention:与Encoder中的MultiHead Attention相同
Decoder的输出预测:Decoder输出矩阵形状是[seq_len, word_dim],经过nn.Linear全连接层,再通过softmax函数得到每个词的概率,然后选择概率最大的词作为预测结果。
Decoder的输入是最后一个Encoder的输出,在训练时,同时输入目标句子的词向量,以便计算Loss。
“我是学生E”->“S I am a student”
T0时刻:输入开始符“S”,输出预测的第一个词“I”
T1时刻:输入“S I”,输出预测的第二个词“am”
…
输入使用上三角矩阵进行mask,避免Decoder看到未来的信息。
1 2 3 4 5 def get_attn_subsequent_mask (seq ): attn_shape = [seq.size(0 ), seq.size(1 ), seq.size(1 )] subsequent_mask = np.triu(np.ones(attn_shape), k=1 ) subsequent_mask = torch.from_numpy(subsequent_mask).byte() return subsequent_mask
2.2 DecoderLayer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class DecoderLayer (nn.Module): def __init__ (self ): super (DecoderLayer, self).__init__() self.dec_self_attn = MultiHeadAttention() self.dec_enc_attn = MultiHeadAttention() self.pos_ffn = PositionwiseFeedForward() def forward (self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask ): """前向传播 Args: dec_inputs: 解码器输入张量,形状为(batch_size, tgt_len, d_model) enc_outputs: 编码器输出张量,形状为(batch_size, seq_len, d_model) dec_self_attn_mask: 解码器自注意力掩码,形状为(batch_size, tgt_len, tgt_len) dec_enc_attn_mask: 解码器-编码器注意力掩码,形状为(batch_size, tgt_len, seq_len) Returns: 解码器输出张量,形状为(batch_size, tgt_len, d_model) """ dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask) dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask) dec_outputs = self.pos_ffn(dec_outputs) return dec_outputs, dec_self_attn, dec_enc_attn
2.3 Decoder 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class Decoder (nn.Module): def __init__ (self ): super (Decoder, self).__init__() self.tgt_emb = Embeddings(tgt_vocab_size, d_model) self.pos_emb = PositionalEncoding(d_model) self.layers = nn.ModuleList([DecoderLayer() for _ in range (n_layers)]) def forward (self, dec_inputs, enc_inputs, enc_outputs ): """前向传播 Args: dec_inputs: 解码器输入张量,形状为(batch_size, tgt_len) enc_inputs: 编码器输入张量,形状为(batch_size, seq_len) enc_outputs: 编码器输出张量,形状为(batch_size, seq_len, d_model) Returns: 解码器输出张量,形状为(batch_size, tgt_len, d_model) """ dec_outputs = self.tgt_emb(dec_inputs) dec_outputs = self.pos_emb(dec_outputs) dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs) dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0 ) dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) dec_self_attns, dec_enc_attns = [], [] for layer in self.layers: dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask) dec_self_attns.append(dec_self_attn) dec_enc_attns.append(dec_enc_attn) return dec_outputs, dec_self_attns, dec_enc_attns
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class Transformer (nn.Module): def __init__ (self ): super (Transformer, self).__init__() self.encoder = Encoder() self.decoder = Decoder() self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False ) def forward (self, enc_inputs, dec_inputs ): """前向传播 Args: enc_inputs: 编码器输入张量,形状为(batch_size, seq_len) dec_inputs: 解码器输入张量,形状为(batch_size, tgt_len) Returns: 解码器输出张量,形状为(batch_size * tgt_len, tgt_vocab_size) enc_self_attns: 编码器自注意力张量列表 dec_self_attns: 解码器自注意力张量列表 dec_enc_attns: 解码器-编码器注意力张量列表 """ enc_outputs, enc_self_attns = self.encoder(enc_inputs) dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs) dec_logits = self.projection(dec_outputs) return dec_logits.view(-1 , dec_logits.size(-1 )), enc_self_attns, dec_self_attns, dec_enc_attns
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 model = Transformer() criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001 ) word2idx = { "S" : 0 , "我" : 1 , "是" : 2 , "学" : 3 , "生" : 4 , "I" : 5 , "am" : 6 , "a" : 7 , "student" : 8 } idx2word = {i: w for w, i in word2idx.items()} enc_inputs = torch.LongTensor([[word2idx[w] for w in ["我" , "是" , "学" , "生" , "S" ]]]) dec_inputs = torch.LongTensor([[word2idx[w] for w in ["S" , "I" , "am" , "a" , "student" ]]]) target_batch = torch.LongTensor([[word2idx[w] for w in ["S" , "I" , "am" , "a" , "student" ]]]) print (enc_inputs, dec_inputs, target_batch)for epoch in range (20 ): optimizer.zero_grad() outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs) loss = criterion(outputs, target_batch.view(-1 )) print ('Epoch:' , '%04d' % (epoch + 1 ), 'cost =' , '{:.6f}' .format (loss)) loss.backward() optimizer.step() predict, _, _, _ = model(enc_inputs, dec_inputs) predict = predict.data.max (1 , keepdim=True )[1 ] print (enc_inputs, '->' , [idx2word[n.item()] for n in predict.squeeze()])
tensor([[1, 2, 3, 4, 0]]) tensor([[0, 5, 6, 7, 8]]) tensor([[0, 5, 6, 7, 8]])
Epoch: 0001 cost = 2.244576
Epoch: 0002 cost = 0.970345
Epoch: 0003 cost = 2.741649
Epoch: 0004 cost = 3.185768
Epoch: 0005 cost = 4.521887
Epoch: 0006 cost = 3.544580
Epoch: 0007 cost = 3.160498
Epoch: 0008 cost = 2.823286
Epoch: 0009 cost = 0.625107
Epoch: 0010 cost = 0.672708
Epoch: 0011 cost = 0.561514
Epoch: 0012 cost = 0.959138
Epoch: 0013 cost = 0.673696
Epoch: 0014 cost = 0.326180
Epoch: 0015 cost = 0.268545
Epoch: 0016 cost = 0.215698
Epoch: 0017 cost = 0.168979
Epoch: 0018 cost = 0.057510
Epoch: 0019 cost = 0.087601
Epoch: 0020 cost = 0.056923
tensor([[1, 2, 3, 4, 0]]) -> ['S', 'I', 'am', 'a', 'student']