Learning Transformer from Scratch: Simple QA Task for Beginners¶
In part 1, we will learn how to use the basic transformer architecture for a simple QA task aimed at beginners. The goal of this task is to understand how transformers work by keeping the example as simple as possible, using a dummy dataset of Question-Answer pairs. Later parts of this series will explore more comprehensive uses of transformers for robust tasks.
Importing necessary libraries and tools: PyTorch has been used throughout the task.
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import pandas as pd
import torchtext
torchtext.disable_torchtext_deprecation_warning()
import torch.nn.functional as F
import math
import time
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torch.nn.utils.rnn import pad_sequence
Selecting a device: if you have a GPU, CUDA will be selected; otherwise, the CPU will be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f">>We are using {device} device.<<")
>>We are using cuda device.<<
Dataset Importing: We have used a tiny dataset with 1782 question-answer pairs.
df = pd.read_csv("file/QA_V2.csv")
df['Question'] = df['Question'].astype(str)
df['Answer'] = df['Answer'].astype(str)
print(df['Question'].head(4))
0 What is United International University? 1 What is UIU 2 When did the government approved United Intern... 3 Who supported United International University Name: Question, dtype: object
Tokenization: We have used PyTorch's basic english tokenizer from TorchText library. It performs basic normalization and tokenization for English text. The normalization includes: Lowercasing, Removing certain punctuation (e.g., quotation marks), Adding spaces around certain punctuation (e.g., periods, commas), Replacing multiple spaces with single spaces.
tokenizer = get_tokenizer("basic_english")
df['question_tokenized'] = df['Question'].apply(tokenizer)
df['answer_tokenized'] = df['Answer'].apply(tokenizer)
df['question_tokenized'].head(4)
0 [what, is, united, international, university, ?] 1 [what, is, uiu] 2 [when, did, the, government, approved, united,... 3 [who, supported, united, international, univer... Name: question_tokenized, dtype: object
Vocabulary Creation: We have used build_vocab_from_iterator to build a vocabulary that maps the unique tokens in both Questions and Answers to numerical IDs. We could create separate vocabularies for questions and answers, but since our dataset is tiny, we decided to create a merged vocabulary for both questions and answers. Here, <pad> represents padding, <unk> represents unknown tokens, <sos> represents the start of sequence, and <eos> represents the end of sequence.
vocab = build_vocab_from_iterator(df['question_tokenized'] + df['answer_tokenized'],
specials=["<pad>","<unk>", "<sos>", "<eos>"])
vocab.set_default_index(vocab["<unk>"])
Let's see how our vocab looks for our tiny dataset:
sorted_vocab_items = sorted(vocab.get_stoi().items(), key=lambda item: item[1])
for token, index in sorted_vocab_items:
print(f"{token}: {index}")
<pad>: 0 <unk>: 1 <sos>: 2 <eos>: 3 the: 4 ,: 5 .: 6 and: 7 of: 8 ?: 9 in: 10 to: 11 students: 12 for: 13 a: 14 what: 15 uiu: 16 is: 17 ): 18 (: 19 ...(hidden)
Encoding: Now, replace the tokens with their unique numerical IDs from the vocab.
df['question_encoded'] = df['question_tokenized'].apply(lambda x: vocab(x))
df['answer_encoded'] = df['answer_tokenized'].apply(lambda x: vocab(x))
Let's see how our questions look after encoding.
df['question_encoded'].head(4)
0 [15, 17, 286, 58, 47, 9] 1 [15, 17, 16] 2 [200, 4044, 4, 1616, 1112, 286, 58, 47, 9] 3 [117, 3342, 286, 58, 47] Name: question_encoded, dtype: object
Now, we need to make all the input sequences (questions and answers) the same length (max_seq_length). Sequences longer than this should be truncated, and those shorter than this should be padded. Now first, let's see the largest input length in our dataset for both questions and answers.
question_big = 0
answer_big = 0
for i in range(len(df['question_encoded'])):
if len(df['question_encoded'][i]) > question_big:
question_big = len(df['question_encoded'][i])
if len(df['answer_encoded'][i]) > answer_big:
answer_big = len(df['answer_encoded'][i])
print(f">>Max length of questions: {question_big}, Max length of answers: {answer_big}<<")
>>Max length of questions: 29, Max length of answers: 299<<
To ensure that all sequences (questions and answers) within our dataset have the same length, which is essential for feeding our model, we have a method called nahidorg_trunc_pad_soseos_tensor
that performs the following actions:
- Truncate: This step truncates questions and answers that are longer than the specified max_seq_length.
- Padding: Padding (<pad> token with numerical ID 0 in our vocabulary) is added to questions and answers that are shorter than the specified
max_seq_length
. - Start/End of Sequence tokens: The <sos> (numerical ID 2 in our vocabulary) and <eos> (numerical ID 3 in our vocabulary) tokens are added before and after each sentence, respectively, to indicate the start and end points of a sequence. This provides clear boundaries for the question and answer sequences, making it easier for the model to learn patterns and generate accurate responses.
- Tensorize: The encoded questions and answers are converted into PyTorch tensors, which are the standard data format for PyTorch models.
def nahidorg_trunc_pad_soseos_tensor(data_series, max_seq_length):
max_seq_length = max_seq_length - 2
padded_sequences = []
for seq in data_series:
if len(seq) > max_seq_length:
padded_sequences.append(seq[:max_seq_length] + [vocab["<eos>"]])
else:
padding_length = max_seq_length - len(seq)
padded_sequences.append(seq + [vocab["<eos>"]] + [0] * padding_length)
padded_sequences = [[vocab["<sos>"]] + x for x in padded_sequences]
return torch.tensor(padded_sequences)
max_seq_length = 64
src_data = nahidorg_trunc_pad_soseos_tensor(df['question_encoded'], max_seq_length)
tgt_data = nahidorg_trunc_pad_soseos_tensor(df['answer_encoded'], max_seq_length)
Let's see how our first question in dataset look after truncating, padding, addition of \<sos\> (Vocab ID 2)
and \<eos\> (Vocab ID 3)
tokens and tensorization.
src_data[0]
tensor([ 2, 15, 17, 286, 58, 47, 9, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
If we print the shape of src_data
, we can see torch.Size([1782, 64])
which means 1782 total questions and length of each question is 64.
src_data.shape
torch.Size([1782, 64])
Now, our data is ready. It's time to divide the dataset into training, validation, and test sets. Additionally, we implemented batch loading to train the model. We used a batch size of 64, with 85% of the total instances used for training and the rest used for validation.
batch_size = 256
dataset = data.TensorDataset(src_data, tgt_data)
train_size = int(0.85 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
Now, it's time to to design our model architecture. We will use Viswani et al. transformer architecture from their paper "Attention is All You Need".
Multi Head Attention: MultiHeadAttention method is a crucial module that enables the model to weigh the importance of different words in a sequence when making predictions. It does this by computing attention scores for each word in relation to every other word.
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model) # Query transformation
self.W_k = nn.Linear(d_model, d_model) # Key transformation
self.W_v = nn.Linear(d_model, d_model) # Value transformation
self.W_o = nn.Linear(d_model, d_model) # Output transformation
def scaled_dot_product_attention(self, Q, K, V, mask=None):
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
attn_probs = torch.softmax(attn_scores, dim=-1)
output = torch.matmul(attn_probs, V)
return output
def split_heads(self, x):
batch_size, seq_length, d_model = x.size()
return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
def combine_heads(self, x):
batch_size, _, seq_length, d_k = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
def forward(self, Q, K, V, mask=None):
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
output = self.W_o(self.combine_heads(attn_output))
return output
Feed Forward: The PositionWiseFeedForward method enhances the representation of each word in the sequence by introducing non-linear transformations. This allows the model to capture more complex patterns and relationships that might be missed by the attention mechanism alone.
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
Positional Encoding: Transformers don't have any inherent sense of the order of words in a sequence (due to their self-attention mechanism), this module is responsible for injecting positional information into the input embeddings.
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1)]
Encoder Layer: The EncoderLayer method is like the core processing unit of the Transformer's encoder. It takes in a sequence of words, uses attention mechanisms to weigh the importance of each word in relation to others, and then applies a feed-forward network to further refine the representations of each word. The Transformer encoder typically consists of multiple stacked EncoderLayers. The output of one layer serves as the input to the next layer. This stacking allows the model to learn increasingly complex representations of the input sequence.
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
Decoder Layer: DecoderLayer method is responsible for generating the output sequence one word at a time, while considering both the input sequence (encoded by the encoder) and the previously generated words in the output sequence.
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask, tgt_mask):
attn_output = self.self_attn(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn_output))
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout(ff_output))
return x
Now, we will combine all the methods to build our final transformer architecture.
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length, device=tgt_mask.device), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask.to(tgt_mask.device)
return src_mask.to(device), tgt_mask.to(device)
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
output = self.fc(dec_output)
return output
Now, setting the hyperparameters:
- src_vocab_size: defines the size of the source vocabulary, encompassing all unique words or tokens in questions.
- tgt_vocab_size: similarly, this sets the size of the target vocabulary, which usually includes the answer words.
- d_model: sets the dimensionality of the word embeddings and internal representations within the model.
- num_heads: determines the number of attention heads in the multi-head attention mechanism.
- num_layers: defines the number of encoder and decoder layers stacked in the Transformer.
- d_ff: specifies the dimensionality of the inner layer in the position-wise feed-forward network.
- dropout: sets the dropout probability for regularization, where neurons are randomly dropped out during training.
src_vocab_size = len(vocab)
tgt_vocab_size = len(vocab)
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
dropout = 0.1
Instantiate the transformer model, then defined our loss function which is CrossEntropyLoss
and defined our optimizer which is Adam
.
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
transformer = transformer.to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
Now, it's finally time to train our model.
epochs = 100
best_val_loss = float('inf')
epochs_without_improvement = 0
patience = 3
best_model_state_dict = None
best_epoch = None
start_time = time.time()
for epoch in range(epochs):
total_loss = 0
transformer.train()
num_batches = len(train_loader)
for batch_idx, (batch_src, batch_tgt) in enumerate(train_loader):
batch_src = batch_src.to(device)
batch_tgt = batch_tgt.to(device)
optimizer.zero_grad()
output = transformer(batch_src, batch_tgt[:, :-1])
loss = criterion(output.contiguous().view(-1, tgt_vocab_size), batch_tgt[:, 1:].contiguous().view(-1))
loss.backward()
optimizer.step()
total_loss += loss.item()
average_loss = total_loss / (batch_idx + 1)
if (batch_idx + 1) % 10 == 0 or batch_idx == num_batches - 1:
print(f"Epoch: {epoch+1}, Batch: {batch_idx+1}/{num_batches}, Training Loss: {average_loss:.4f}")
transformer.eval()
val_loss = 0
with torch.no_grad():
for batch_src, batch_tgt in val_loader:
batch_src = batch_src.to(device)
batch_tgt = batch_tgt.to(device)
output = transformer(batch_src, batch_tgt[:, :-1])
loss = criterion(output.contiguous().view(-1, tgt_vocab_size), batch_tgt[:, 1:].contiguous().view(-1))
val_loss += loss.item()
average_val_loss = val_loss / len(val_loader)
print(f"Epoch: {epoch+1} | Training Average Loss: {average_loss:.4f} | Validation Average Loss: {average_val_loss:.4f}")
if average_val_loss < best_val_loss:
best_val_loss = average_val_loss
epochs_without_improvement = 0
best_model_state_dict = transformer.state_dict()
best_epoch = epoch + 1
else:
epochs_without_improvement += 1
if epochs_without_improvement >= patience:
print("Stopped training by early stopping!")
break
end_time = time.time()
total_time = end_time - start_time
print(f"\nTotal training time: {total_time:.2f} seconds")
if best_model_state_dict is not None:
print(f"Best model found at epoch {best_epoch}, saving...")
torch.save(best_model_state_dict, "RAN4/best_model.pth")
torch.save(vocab, "RAN4/transformer_vocab.pth")
torch.save(tokenizer, "RAN4/tokenizer.pth")
print("Saved successfully!")
else:
print("No improvement in validation loss. Model not saved.")
Epoch: 1, Batch: 6/6, Training Loss: 7.6788 Epoch: 1 | Training Average Loss: 7.6788 | Validation Average Loss: 7.1758 Epoch: 2, Batch: 6/6, Training Loss: 7.0957 Epoch: 2 | Training Average Loss: 7.0957 | Validation Average Loss: 6.9321 Epoch: 3, Batch: 6/6, Training Loss: 6.8269 Epoch: 3 | Training Average Loss: 6.8269 | Validation Average Loss: 6.6339 ... (hidden) Epoch: 52, Batch: 6/6, Training Loss: 2.4255 Epoch: 52 | Training Average Loss: 2.4255 | Validation Average Loss: 3.8989 Epoch: 53, Batch: 6/6, Training Loss: 2.3816 Epoch: 53 | Training Average Loss: 2.3816 | Validation Average Loss: 3.8735 Epoch: 54, Batch: 6/6, Training Loss: 2.3405 Epoch: 54 | Training Average Loss: 2.3405 | Validation Average Loss: 3.8800 Epoch: 55, Batch: 6/6, Training Loss: 2.2947 Epoch: 55 | Training Average Loss: 2.2947 | Validation Average Loss: 3.9165 Epoch: 56, Batch: 6/6, Training Loss: 2.2534 Epoch: 56 | Training Average Loss: 2.2534 | Validation Average Loss: 3.8778 Stopped training by early stopping! Total training time: 119.53 seconds Best model found at epoch 53, saving... Saved successfully!
We have successfully trained the model. Primarily, we set the epochs to a large number (100). Since we have an early stopping mechanism in the model, it will stop training if the model starts to overfit. Finally, we saved the best-performing model into our local directory for later use. Now, we will load our saved model to test its performance.
transformer_state_dict = torch.load("RAN4/best_model.pth")
vocab = torch.load("RAN4/transformer_vocab.pth")
tokenizer = torch.load("RAN4/tokenizer.pth")
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
transformer.load_state_dict(transformer_state_dict)
transformer = transformer.to(device)
We loaded our model, tokenizer, and vocabulary to generate responses. Now, we will develop a method for our trained and loaded model to generate answers. As parameters, we will pass the model, tokenizer, vocabulary, and the question requiring an answer. This model employs a greedy search algorithm to find the best tokens.
def nahidorg_greedy_search(model, tokenizer, vocab, question, max_length=50):
model.eval()
question_tokens = tokenizer(question)
question_encoded = [vocab["<sos>"]] + vocab(question_tokens) + [vocab["<eos>"]]
question_tensor = torch.tensor(question_encoded, dtype=torch.long).unsqueeze(0).to(device)
answer_encoded = [vocab["<sos>"]]
answer_tensor = torch.tensor(answer_encoded, dtype=torch.long).unsqueeze(0).to(device)
for _ in range(max_length):
with torch.no_grad():
output = model(question_tensor, answer_tensor)
output_list = output.tolist()
next_token_id = output.argmax(dim=-1)[:, -1].item()
if next_token_id == vocab["<eos>"]:
break
answer_encoded.append(next_token_id)
answer_tensor = torch.tensor(answer_encoded, dtype=torch.long).unsqueeze(0).to(device)
answer_tokens = vocab.lookup_tokens(answer_encoded[1:])
answer = " ".join(answer_tokens)
return answer
Generating answer using the greedy search method:
question = "What is United International University?"
response = nahidorg_greedy_search(transformer, tokenizer, vocab, question)
print("Question:", question)
print("Response:", response)
Question: What is United International University? Response: united international university ( uiu ) is a university in bangladesh .
Now it's time for post-processing. Smaller models trained on tiny datasets like this tend to generate answers with syntactic and semantic errors due to their incomplete understanding of grammar. Therefore, post-processing the output is crucial for improving answer quality for end users. Here, we will capitalize proper nouns and correct whitespaces and punctuation.
import spacy
import re
nlp = spacy.load("en_core_web_lg")
def nahidorg_basic_clean_capitalize(text):
doc = nlp(text)
cleaned_text = []
capitalize_next = True
for token in doc:
token_text = token.text
if token_text.startswith("(") and token_text.endswith(")"):
token_text = re.sub(r"\s+", "", token_text)
if capitalize_next:
token_text = token_text.capitalize()
capitalize_next = False
elif token.pos_ == "PROPN" or token.ent_type_ != "" or token.is_title:
token_text = token_text.capitalize()
if token.text in {'.', '!', '?'}:
capitalize_next = True
cleaned_text.append(token_text)
cleaned_text_str = " ".join(cleaned_text)
cleaned_text_str = re.sub(r"\s'\s", "'", cleaned_text_str)
cleaned_text_str = re.sub(r"'\s", "'", cleaned_text_str)
cleaned_text_str = re.sub(r'\(\s*', '(', cleaned_text_str)
cleaned_text_str = re.sub(r'\s*\)', ')', cleaned_text_str)
cleaned_text_str = re.sub(r'\s([\',?.!"](?:\s|$))', r'\1', cleaned_text_str)
if not cleaned_text_str.endswith('.'):
cleaned_text_str += '.'
return cleaned_text_str
cleaned_text = nahidorg_basic_clean_capitalize(response)
print(cleaned_text)
United International University (Uiu) is a university in Bangladesh.
Now, we will explore further. Let's see if we can improve the answers by trying Beam Search. Beam Search explores multiple potential answer paths simultaneously, considering the top-k probable tokens at each step. Unlike greedy search, which only considers the single most likely token, Beam Search's broader exploration allows it to recover from early mistakes and find more optimal solutions, leading to higher quality and more fluent answers.
def nahidorg_beam_search(model, token, vocab, question, max_length=50, beam_size=4):
model.eval()
question_tokens = tokenizer(question)
question_encoded = [vocab["<sos>"]] + vocab(question_tokens) + [vocab["<eos>"]]
question_tensor = torch.tensor(question_encoded, dtype=torch.long).unsqueeze(0).to(device)
beams = [([vocab["<sos>"]], 0.0)]
for _ in range(max_length):
new_beams = []
for sequence, log_prob in beams:
answer_tensor = torch.tensor(sequence, dtype=torch.long).unsqueeze(0).to(device)
with torch.no_grad():
output = model(question_tensor, answer_tensor)
topk_probs, topk_indices = torch.topk(output[:, -1], beam_size)
for prob, index in zip(topk_probs[0], topk_indices[0]):
new_sequence = sequence + [index.item()]
new_log_prob = log_prob + prob.log().item()
new_beams.append((new_sequence, new_log_prob))
beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_size]
if beams[0][0][-1] == vocab["<eos>"]:
break
best_sequence = beams[0][0]
if best_sequence[-1] == vocab["<eos>"]:
best_sequence = best_sequence[:-1]
answer_tokens = vocab.lookup_tokens(best_sequence[1:]) # Remove <sos>
answer = " ".join(answer_tokens)
return answer
Generating answer using the beam search method:
question = "What is United International University?"
response = nahidorg_beam_search(transformer, token, vocab, question)
print("Question:", question)
print("Response:", response)
Question: What is United International University? Response: united international university ( uiu ) is a total of united international university in bangladesh .
cleaned_text = nahidorg_basic_clean_capitalize(response)
print(cleaned_text)
United International University (Uiu) is a total of United International University in Bangladesh.
Beam search usually helps generate better results, but in this case, greedy search generated better results. This is the end of this tutorial.