An importart milestone to understanding how more modern neural network architectures work is grasping and implementing the architectures of RNNs, encoder-decoder models and the attention mechanism.
These ideas allowed Deep Learning techniques to perform sublimely well in the areas of Machine translation around 2014-2018, before Transformer models began to dominate the state-of-the-art. But before we get to Transformers, we need to understand sequence to sequence models.
Sources:
Recurrent Neural Networks (RNNs) are models that can process sequential inputs, like sentences for translation or generative tasks. Inherently sequential, RNNs work by re-purposing their hidden state towards future timesteps.
There are multiple types of RNNs, each suited for a different type of problem:
The attention mechanism is a solution to the first three of the above concerns. By directing its attention to specific parts of the sequence, we are able to:
The mechanism of attention is simple, and consists of two main key points:
Network architectures could change, but in essence the idea of attention is as simple as that: choose what to focus on based on some parameters which the network learns. It's much like real-life really: You can't learn the contents of a book by sequentially going through it. You need to learn to focus on various specific pieces of content - mulitple ones at times.
The idea is so revolutionary, that it turns out to be the only thing we need to make models really powerful. That is essentially what Transformers did, but we'll leave that for another time.
For now, we'll focus on implementation. We'll implement a sequence-to-sequence model, which is a fancy term for an encoder-decoder model that will translate English sentences (sequences of words) to French sentences (sequences of words) by using RNNs and Attention in Pytorch. The implementation will be based on the ideas described above, so don't worry about the exact architecture details.
First, we'll need to set some ground work for processing words, strings and dictionaries. We'll use the French-to-English translation dataset found here: https://download.pytorch.org/tutorial/data.zip
# Import essential libraries
from __future__ import unicode_literals, print_function, division
from io import open
import unicodedata
import string
import re
import random
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SOS_token = 0 # Start of sentence.
EOS_token = 1 # End of sentence
# This class will help us handle our language dataset.
# We'll represent words as one-hot vectors. We could also use word-embeddings here!
class Lang:
def __init__(self, name):
self.name = name # name of language
self.word2index = {} # word -> index map
self.word2count = {} # word -> word count map
self.index2word = {0: "SOS", 1: "EOS"} # index -> word map
self.n_words = 2 # Count SOS and EOS
def addSentence(self, sentence):
for word in sentence.split(' '):
self.addWord(word)
def addWord(self, word):
if word not in self.word2index:
# Append new word to end of dictionary
self.word2index[word] = self.n_words
self.word2count[word] = 1
self.index2word[self.n_words] = word
self.n_words += 1
else:
self.word2count[word] += 1
# Other utility functions
# Turn a Unicode string to plain ASCII, thanks to
# https://stackoverflow.com/a/518232/2809427
def unicodeToAscii(s):
return ''.join(
c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn'
)
# Lowercase, trim, and remove non-letter characters
def normalizeString(s):
s = unicodeToAscii(s.lower().strip())
s = re.sub(r"([.!?])", r" \1", s)
s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
return s
#
# This function will help us parse through the 'dictionary' dataset.
#
def readLangs(lang1, lang2, reverse=False):
print("Reading lines...")
# Read the file and split into lines. Each line has the form
# [English] [\t] [Other language]
lines = open(r'C:\Users\themi\OneDrive\Projects\Machine Learning\data\%s-%s.txt' % (lang1, lang2), encoding='utf-8').\
read().strip().split('\n')
# Split every line into pairs and normalize
pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]
# Reverse pairs, make Lang instances
if reverse:
pairs = [list(reversed(p)) for p in pairs]
input_lang = Lang(lang2)
output_lang = Lang(lang1)
else:
input_lang = Lang(lang1)
output_lang = Lang(lang2)
return input_lang, output_lang, pairs
#
# We want to trim down our dataset because it is massive!
# We will only consider phrases with at most MAX_LENGTH letters.
# We also would like to limit our sentences to start with some pre-determined prefixes.
# All that for simplicity. With enough time and resources, we could train our model on the entirety of the dataset!
#
MAX_LENGTH = 10
eng_prefixes = (
"i am ", "i m ",
"he is", "he s ",
"she is", "she s ",
"you are", "you re ",
"we are", "we re ",
"they are", "they re "
)
def filterPair(p):
return len(p[0].split(' ')) < MAX_LENGTH and \
len(p[1].split(' ')) < MAX_LENGTH and \
p[1].startswith(eng_prefixes)
def filterPairs(pairs):
return [pair for pair in pairs if filterPair(pair)]
#
# Function to prepare our data:
#
def prepareData(lang1, lang2, reverse=False):
# Parse the dictionary file into pairs
input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
print("Read %s sentence pairs" % len(pairs))
# Filter the pairs to simplify
pairs = filterPairs(pairs)
print("Trimmed to %s sentence pairs" % len(pairs))
# Add the sentences and words to our language objects so that we can create our one-hot encodings.
print("Counting words...")
for pair in pairs:
input_lang.addSentence(pair[0])
output_lang.addSentence(pair[1])
print("Counted words:")
print(input_lang.name, input_lang.n_words)
print(output_lang.name, output_lang.n_words)
# Return the objects that we will work with.
return input_lang, output_lang, pairs
# Let's try it for French->English!
input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
print(random.choice(pairs))
Now that the data processing stuff is out of the way and we have a nice, organized way of dealing with our bags of words in both languages, we are ready to build our model!
Recall that our model consists of:
Let's talk about the encoder first. Here is the architecture we will use. Each encoder "layer" corresponds to a single timestep and looks like a feedforward neural network, only that across timesteps these layers "borrow" information from each other.
A detail we need to address is the use of Gated Recurrent Units (GRUs) in this RNN.
I can probably make a more extensive set of notes on this topic, but GRUs are an 'evolved' version of a type of architecture in RNNs called Long-Short-Term-Memory (LSTM). Both GRUs and LSTMs use the concept of gates, which are quantities that are calculated in each timestep to serve different purposes. There are gates that help the network retain information across timesteps, gates that help it forget and gates that combine all the information collected in each timestep. The values of gates are combined in certain ways to produce the hidden layers of the RNN in each timestep.
Let's stick to a rough description of GRUs: As shown above, a GRU takes the input at time $t$, $x_t$, the hidden layer from the previous timestamp $h_{t-1}$ and produces a new hidden layer $h_t$.
Let's see how it does that.
This is what a fully gated unit looks like architecturally.
We have two gates ($\sigma$ is the sigmoid function):
The reset gate is used to create the candidate activation vector $\hat{h}_t = \phi(W_h x_t + U_h (r_t \odot h_{t-1}) + b_h)$, in which the network has the chance to reset its memory ($\phi$ is the hyperbolic tangent)
And that is finally combined with the update gate, in which the network has the chance to retain memory long term: $$h_t = (1-z_t)\odot h_{t-1} + z_t \odot \hat{h}_t$$
Again, the architecture of our encoder is portrayed in the following figure:
#
# This class represents the encoder RNN
#
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size):
super(EncoderRNN, self).__init__()
self.hidden_size = hidden_size
# We generate embeddings of hidden_size dimensions for each word.
# Think of `self.embedding` as a |V| x hidden_size matrix, where if an input index is provided we output
# the row that represents that word's embedding.
self.embedding = nn.Embedding(input_size, hidden_size)
# Our RNN is composed of an
self.gru = nn.GRU(hidden_size, hidden_size)
def forward(self, input, hidden):
# `input` is an index (1x1 tensor) for a word in the dictionary.
# Generate its embedding: 1x1x`hidden_size` tensor.
embedded = self.embedding(input).view(1, 1, -1)
# Pass the embedded vector into our GRU layer, which spits out the same information twice
# * `output`: (1x`hidden_size) -> the final hidden state of the input sequence.
# * `hidden`: 1x`hidden_size` -> the final hidden state of the input sequence.
# We'll be saving `output` and feeding it to our decoder, while `hidden` will be passed to the next GRU with the
# next word.
# Granted, we could probably make this work by a single GRU with multiple layers, instead of invoking forward
# multiple times.
output = embedded
output, hidden = self.gru(output, hidden)
return output, hidden
# The initial hidden state is a vector of zeros (it could also be random)
def initHidden(self):
return torch.zeros(1, 1, self.hidden_size, device=device)
# Implementing the non-attention based RNN.
class DecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size):
super(DecoderRNN, self).__init__()
self.hidden_size = hidden_size
# An embedding layer over the entire vocabulary of the target language (English)
# `output_size` is simply the size of that vocabulary.
self.embedding = nn.Embedding(output_size, hidden_size)
# Again, we use a GRU layer as before.
self.gru = nn.GRU(hidden_size, hidden_size)
# To produce the output word, the mechanism is reminiscent of a FFNN: a linear layer and a log softmax to
# generate probabilities for each word.
self.out = nn.Linear(hidden_size, output_size)
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden):
# First generate the embedding of the word.
# Our input word initially is `SOS` (start of sentence)
output = self.embedding(input).view(1, 1, -1)
# Introduce a layer of non-linearity with an element-wise ReLU.
output = F.relu(output)
# The 'meat' of the decoder - the GRU layer.
output, hidden = self.gru(output, hidden)
# We have our hidden state - let's now generate the prediction!
output = self.softmax(self.out(output[0]))
return output, hidden
# As before, the initial hidden state is defined by default to be zeros, even though we'll end up using the encoder's
# final hidden state.
def initHidden(self):
return torch.zeros(1, 1, self.hidden_size, device=device)
We'll be focusing on attention-based decoders in the remainder of this exposition. Recall the main idea behind attention based decoders: We preserve all of the hidden states and use the decoder's own hidden state as a weight mechanism that tells us which word to focus on when translating each next word.
class AttnDecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
super(AttnDecoderRNN, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.dropout_p = dropout_p
self.max_length = max_length
# The hidden state of the decoder will be used as attention weights.
# The input word from the target language is embedded at each timestep...
self.embedding = nn.Embedding(self.output_size, self.hidden_size)
# ... then fed into a dropout layer that has empirically been found to reduce overfitting.
self.dropout = nn.Dropout(self.dropout_p)
# To actually calculate the attention weights, we'll use a FFNN where our feature vectors
# are the concatenations of the embeddings from before and the previous hidden state.
# The attention weights have `max_length` entries.
self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
# When we finally have the weights applied on the encoder's output, we have our context vector.
# We combine it with the embedding of the previous target word and after a last FFNW-GRU-FFNN
# architecture we have both our hidden state for the next iteration and our output target word!
self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
self.gru = nn.GRU(self.hidden_size, self.hidden_size)
self.out = nn.Linear(self.hidden_size, self.output_size)
def forward(self, input, hidden, encoder_outputs):
# `input` is an index in the target language vocabulary.
# Generate the embedding and mask it through the drop out.
embedded = self.embedding(input).view(1, 1, -1)
embedded = self.dropout(embedded)
# Now we generate the attention weights and apply them to the hidden states of the encoder.
# `attn_weights` is a `max_length` vector.
# `encoder_outputs` is a **typically** a |source sequence length| x `hidden_size` tensor.
# However, we pad it with zeroes to make the BMM (Batch Matrix-Matrix Product) work out.
attn_weights = F.softmax(
self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
attn_applied = torch.bmm(attn_weights.unsqueeze(0),
encoder_outputs.unsqueeze(0))
# attn_applied ends up having a length of `hidden_size`!
# Concatenate the applied weights and the embedding and combine them through another linear layer.
output = torch.cat((embedded[0], attn_applied[0]), 1)
output = self.attn_combine(output).unsqueeze(0)
# Finally, our RNN GRU layer will give us the output (which will become the translated word) and the next
# timestep's hidden layer.
output = F.relu(output)
output, hidden = self.gru(output, hidden)
output = F.log_softmax(self.out(output[0]), dim=1)
return output, hidden, attn_weights
def initHidden(self):
return torch.zeros(1, 1, self.hidden_size, device=device)
Let's train our model!
A couple of things to keep in mind as we do this:
# Some helper functions with evaluation and data processing...
import time
import math
import matplotlib.pyplot as plt
import numpy as np
# Seconds -> Minutes
def asMinutes(s):
m = math.floor(s / 60)
s -= m * 60
return '%dm %ds' % (m, s)
# Counts time since some other time.
def timeSince(since, percent):
now = time.time()
s = now - since
es = s / (percent)
rs = es - s
return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
# Extracts indices from a sentence
def indexesFromSentence(lang, sentence):
return [lang.word2index[word] for word in sentence.split(' ')]
# Creates a tensor of indices from a sentence.
def tensorFromSentence(lang, sentence):
indexes = indexesFromSentence(lang, sentence)
indexes.append(EOS_token)
return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)
# Creates a pair of tensors of indices from a pair of sentences.
def tensorsFromPair(pair):
input_tensor = tensorFromSentence(input_lang, pair[0])
target_tensor = tensorFromSentence(output_lang, pair[1])
return (input_tensor, target_tensor)
teacher_forcing_ratio = 0.5
# This function will train our model for *one* training example.
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
# Initialize the hidden layer of the encoder to all zeroes.
encoder_hidden = encoder.initHidden()
# Initialize the optimizers.
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
# `input_tensor` and `target_tensor` represent the sequences of words to translate between.
input_length = input_tensor.size(0)
target_length = target_tensor.size(0)
# Initialize the encoder outputs array. As mentioned above, this array should have input_length elements, but
# we shall pad with zeroes for normalizing purposes.
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
# Keep track of the loss.
loss = 0
# Go through all timesteps (words in the input sequence) and run the encoder.
for ei in range(input_length):
# The hidden state is recycled to be inputted to the next iteration
encoder_output, encoder_hidden = encoder(
input_tensor[ei], encoder_hidden)
# The hidden state is retained.
encoder_outputs[ei] = encoder_output[0, 0]
# The initial token to give to the decoder is the SOS token.
decoder_input = torch.tensor([[SOS_token]], device=device)
# The initial hidden state of the decoder is the final state of the encoder.
decoder_hidden = encoder_hidden
# Decide how much teacher forcing to use.
use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
if use_teacher_forcing:
# Teacher forcing: Feed the target as the next input
for di in range(target_length):
# For each target word, run the decoder with the input, its previous hidden layer and the
# encoder outputs.
decoder_output, decoder_hidden, decoder_attention = decoder(
decoder_input, decoder_hidden, encoder_outputs)
# See how we did.
loss += criterion(decoder_output, target_tensor[di])
decoder_input = target_tensor[di] # Teacher forcing
else:
# Without teacher forcing: use its own predictions as the next input
for di in range(target_length):
decoder_output, decoder_hidden, decoder_attention = decoder(
decoder_input, decoder_hidden, encoder_outputs)
topv, topi = decoder_output.topk(1)
decoder_input = topi.squeeze().detach() # detach from history as input
# See how we did.
loss += criterion(decoder_output, target_tensor[di])
if decoder_input.item() == EOS_token:
break
# Collect the gradients.
loss.backward()
# Step through the optimizers
encoder_optimizer.step()
decoder_optimizer.step()
return loss.item() / target_length
# End-to-end training!
def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01):
start = time.time()
plot_losses = []
print_loss_total = 0 # Reset every print_every
plot_loss_total = 0 # Reset every plot_every
# Stochastic gradient descent with negative log-likelihood loss.
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
training_pairs = [tensorsFromPair(random.choice(pairs))
for i in range(n_iters)]
criterion = nn.NLLLoss()
# For each iteration (aka training epoch)...
for iter in range(1, n_iters + 1):
# Collect the input and target sentence tensors (aka French -> English)
training_pair = training_pairs[iter - 1]
input_tensor = training_pair[0]
target_tensor = training_pair[1]
# Calculate the loss for this training example.
loss = train(input_tensor, target_tensor, encoder,
decoder, encoder_optimizer, decoder_optimizer, criterion)
print_loss_total += loss
plot_loss_total += loss
# Print some progress stuff.
if iter % print_every == 0:
print_loss_avg = print_loss_total / print_every
print_loss_total = 0
print('%s (%d %d%%) %.4f' % (timeSince(start, iter / n_iters),
iter, iter / n_iters * 100, print_loss_avg))
# Gather data for our plot.
if iter % plot_every == 0:
plot_loss_avg = plot_loss_total / plot_every
plot_losses.append(plot_loss_avg)
plot_loss_total = 0
# Show our training plot.
plt.plot(plot_losses)
plt.show()
Let's evaluate our model by seeing how it does on our dataset. There's plenty of metrics to use such as cross validation and holdout, by we won't be very pedantic here.
# Evaluating our model on a sentence. This basically runs a forward method through the encoder and decoder
# and spits out the decoded word!
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH):
with torch.no_grad():
input_tensor = tensorFromSentence(input_lang, sentence)
input_length = input_tensor.size()[0]
encoder_hidden = encoder.initHidden()
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
for ei in range(input_length):
encoder_output, encoder_hidden = encoder(input_tensor[ei],
encoder_hidden)
encoder_outputs[ei] += encoder_output[0, 0]
decoder_input = torch.tensor([[SOS_token]], device=device) # SOS
decoder_hidden = encoder_hidden
decoded_words = []
decoder_attentions = torch.zeros(max_length, max_length)
for di in range(max_length):
decoder_output, decoder_hidden, decoder_attention = decoder(
decoder_input, decoder_hidden, encoder_outputs)
decoder_attentions[di] = decoder_attention.data
topv, topi = decoder_output.data.topk(1)
if topi.item() == EOS_token:
decoded_words.append('<EOS>')
break
else:
decoded_words.append(output_lang.index2word[topi.item()])
decoder_input = topi.squeeze().detach()
return decoded_words, decoder_attentions[:di + 1]
# We'll evaluate sentences from the training set randomly.
def evaluateRandomly(encoder, decoder, n=10):
for i in range(n):
pair = random.choice(pairs)
print('>', pair[0])
print('=', pair[1])
output_words, attentions = evaluate(encoder, decoder, pair[0])
output_sentence = ' '.join(output_words)
print('<', output_sentence)
print('')
# Moment of truth!
hidden_size = 256
encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device)
trainIters(encoder1, attn_decoder1, 50000, print_every=5000)
evaluateRandomly(encoder1, attn_decoder1)
Just out of curiosity, let's examine the attention weights in each iteration for a specific translation task. That would intuitively show us which encoder hidden state the network was trained to focus on when translating each word.
output_words, attentions = evaluate(
encoder1, attn_decoder1, "je suis trop froid .")
plt.matshow(attentions.numpy())
# Let's add some extra finery for the viewing experience.
import matplotlib.ticker as ticker
def showAttention(input_sentence, output_words, attentions):
# Set up figure with colorbar
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(attentions.numpy(), cmap='bone')
fig.colorbar(cax)
# Set up axes
ax.set_xticklabels([''] + input_sentence.split(' ') +
['<EOS>'], rotation=90)
ax.set_yticklabels([''] + output_words)
# Show label at every tick
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
plt.show()
def evaluateAndShowAttention(input_sentence):
output_words, attentions = evaluate(
encoder1, attn_decoder1, input_sentence)
print('input =', input_sentence)
print('output =', ' '.join(output_words))
showAttention(input_sentence, output_words, attentions)
evaluateAndShowAttention("elle a cinq ans de moins que moi .")
evaluateAndShowAttention("elle est trop petit .")
evaluateAndShowAttention("je ne crains pas de mourir .")
evaluateAndShowAttention("c est un jeune directeur plein de talent .")