Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """HabibiTranslator.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1lYP3XxUCWdiihU0mIejW_KCqTvy7-tz6 | |
| """ | |
| import torch | |
| torch.cuda.is_available() | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import math | |
| from datasets import load_dataset | |
| import numpy as np | |
| from collections import Counter | |
| import gradio as gr | |
| # Seting random seed for reproducibility | |
| torch.manual_seed(42) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| dataset = load_dataset('Helsinki-NLP/tatoeba_mt', 'ara-eng', trust_remote_code=True) | |
| # tokenization (word-level) | |
| def tokenize(text): | |
| return text.split() | |
| # Building vocabulary from dataset | |
| def build_vocab(data, tokenizer, min_freq=2): | |
| counter = Counter() | |
| for example in data: | |
| counter.update(tokenizer(example['sourceString'])) | |
| counter.update(tokenizer(example['targetString'])) | |
| # Adding special tokens | |
| specials = ['<pad>', '<sos>', '<eos>', '<unk>'] | |
| vocab = specials + [word for word, freq in counter.items() if freq >= min_freq] | |
| word2idx = {word: idx for idx, word in enumerate(vocab)} | |
| idx2word = {idx: word for word, idx in word2idx.items()} | |
| return word2idx, idx2word | |
| # Converting text to tensor (adjusted to fit special tokens within max_len) | |
| def text_to_tensor(text, vocab, tokenizer, max_len=52): | |
| tokens = tokenizer(text)[:max_len - 2] # Reserving space for <sos> and <eos> | |
| tokens = ['<sos>'] + tokens + ['<eos>'] | |
| tensor = [vocab.get(token, vocab['<unk>']) for token in tokens] | |
| return torch.tensor(tensor, dtype=torch.long) | |
| train_data = dataset['validation'] # Using validation as training data for demo | |
| test_data = dataset['test'] | |
| # Building shared vocabulary (for simplicity, using both languages in one vocab) | |
| word2idx, idx2word = build_vocab(train_data, tokenize) | |
| # Hyperparameters for data | |
| max_len = 52 # Increased to account for <sos> and <eos> | |
| batch_size = 32 | |
| train_data_list = list(train_data) # Convert Dataset to list once | |
| print(f"Length of train_data_list: {len(train_data_list)}") | |
| def get_batches(data_list, batch_size, max_len=52): | |
| total_batches = len(data_list) // batch_size + (1 if len(data_list) % batch_size else 0) | |
| print(f"Total batches to process: {total_batches}") | |
| for i in range(0, len(data_list), batch_size): | |
| batch = data_list[i:i + batch_size] | |
| src_batch = [text_to_tensor(example['sourceString'], word2idx, tokenize, max_len) for example in batch] | |
| tgt_batch = [text_to_tensor(example['targetString'], word2idx, tokenize, max_len) for example in batch] | |
| src_batch = nn.utils.rnn.pad_sequence(src_batch, padding_value=word2idx['<pad>'], batch_first=False).to(device) | |
| tgt_batch = nn.utils.rnn.pad_sequence(tgt_batch, padding_value=word2idx['<pad>'], batch_first=False).to(device) | |
| if src_batch.size(0) > max_len: | |
| src_batch = src_batch[:max_len, :] | |
| elif src_batch.size(0) < max_len: | |
| padding = torch.full((max_len - src_batch.size(0), src_batch.size(1)), word2idx['<pad>'], dtype=torch.long).to(device) | |
| src_batch = torch.cat([src_batch, padding], dim=0) | |
| if tgt_batch.size(0) > max_len: | |
| tgt_batch = tgt_batch[:max_len, :] | |
| elif tgt_batch.size(0) < max_len: | |
| padding = torch.full((max_len - tgt_batch.size(0), tgt_batch.size(1)), word2idx['<pad>'], dtype=torch.long).to(device) | |
| tgt_batch = torch.cat([tgt_batch, padding], dim=0) | |
| src_batch = src_batch.transpose(0, 1) # [batch_size, seq_len] | |
| tgt_batch = tgt_batch.transpose(0, 1) # [batch_size, seq_len] | |
| yield src_batch, tgt_batch | |
| print("Revised Chunk 1 (Seventh Iteration) completed: Dataset loaded and preprocessing debugged.") | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model, max_len=52): | |
| super().__init__() | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) | |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term) | |
| pe = pe.unsqueeze(0) # Shape: (1, max_len, d_model) | |
| self.register_buffer('pe', pe) | |
| def forward(self, x): | |
| return x + self.pe[:, :x.size(1), :] | |
| class MultiHeadAttention(nn.Module): | |
| def __init__(self, d_model, num_heads): | |
| super().__init__() | |
| assert d_model % num_heads == 0 | |
| self.d_model = d_model | |
| self.num_heads = num_heads | |
| self.d_k = d_model // num_heads | |
| self.W_q = nn.Linear(d_model, d_model) | |
| self.W_k = nn.Linear(d_model, d_model) | |
| self.W_v = nn.Linear(d_model, d_model) | |
| self.W_o = nn.Linear(d_model, d_model) | |
| def scaled_dot_product_attention(self, Q, K, V, mask=None): | |
| scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) | |
| if mask is not None: | |
| scores = scores.masked_fill(mask == 0, -1e9) | |
| attn = torch.softmax(scores, dim=-1) | |
| return torch.matmul(attn, V) | |
| def forward(self, Q, K, V, mask=None): | |
| batch_size = Q.size(0) | |
| seq_len_q = Q.size(1) | |
| seq_len_k = K.size(1) | |
| Q = self.W_q(Q) | |
| K = self.W_k(K) | |
| V = self.W_v(V) | |
| Q = Q.view(batch_size, seq_len_q, self.num_heads, self.d_k).transpose(1, 2) | |
| K = K.view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2) | |
| V = V.view(batch_size, seq_len_k, self.num_heads, self.d_k).transpose(1, 2) | |
| output = self.scaled_dot_product_attention(Q, K, V, mask) | |
| output = output.transpose(1, 2).contiguous().view(batch_size, seq_len_q, self.d_model) | |
| return self.W_o(output) | |
| class FeedForward(nn.Module): | |
| def __init__(self, d_model, d_ff): | |
| super().__init__() | |
| self.linear1 = nn.Linear(d_model, d_ff) | |
| self.linear2 = nn.Linear(d_ff, d_model) | |
| self.relu = nn.ReLU() | |
| def forward(self, x): | |
| return self.linear2(self.relu(self.linear1(x))) | |
| class EncoderLayer(nn.Module): | |
| def __init__(self, d_model, num_heads, d_ff, dropout=0.1): | |
| super().__init__() | |
| self.mha = MultiHeadAttention(d_model, num_heads) | |
| self.ff = FeedForward(d_model, d_ff) | |
| self.norm1 = nn.LayerNorm(d_model) | |
| self.norm2 = nn.LayerNorm(d_model) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, x, mask=None): | |
| attn_output = self.mha(x, x, x, mask) | |
| x = self.norm1(x + self.dropout(attn_output)) | |
| ff_output = self.ff(x) | |
| return self.norm2(x + self.dropout(ff_output)) | |
| class DecoderLayer(nn.Module): | |
| def __init__(self, d_model, num_heads, d_ff, dropout=0.1): | |
| super().__init__() | |
| self.mha1 = MultiHeadAttention(d_model, num_heads) | |
| self.mha2 = MultiHeadAttention(d_model, num_heads) | |
| self.ff = FeedForward(d_model, d_ff) | |
| self.norm1 = nn.LayerNorm(d_model) | |
| self.norm2 = nn.LayerNorm(d_model) | |
| self.norm3 = nn.LayerNorm(d_model) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, x, enc_output, src_mask=None, tgt_mask=None): | |
| attn1_output = self.mha1(x, x, x, tgt_mask) | |
| x = self.norm1(x + self.dropout(attn1_output)) | |
| attn2_output = self.mha2(x, enc_output, enc_output, src_mask) | |
| x = self.norm2(x + self.dropout(attn2_output)) | |
| ff_output = self.ff(x) | |
| return self.norm3(x + self.dropout(ff_output)) | |
| class Transformer(nn.Module): | |
| def __init__(self, src_vocab_size, tgt_vocab_size, d_model=256, num_heads=8, num_layers=3, d_ff=1024, max_len=52, dropout=0.1): | |
| super().__init__() | |
| self.d_model = d_model | |
| self.src_embedding = nn.Embedding(src_vocab_size, d_model) | |
| self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model) | |
| self.pos_encoding = PositionalEncoding(d_model, max_len) | |
| self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) | |
| self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]) | |
| self.fc_out = nn.Linear(d_model, tgt_vocab_size) | |
| self.dropout = nn.Dropout(dropout) | |
| def generate_mask(self, src, tgt): | |
| src_mask = (src != word2idx['<pad>']).unsqueeze(1).unsqueeze(2) | |
| tgt_mask = (tgt != word2idx['<pad>']).unsqueeze(1).unsqueeze(3) | |
| seq_len = tgt.size(1) | |
| nopeak_mask = (1 - torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1)).bool().to(device) | |
| tgt_mask = tgt_mask & nopeak_mask | |
| return src_mask, tgt_mask | |
| def forward(self, src, tgt): | |
| src_mask, tgt_mask = self.generate_mask(src, tgt) | |
| src_embedded = self.dropout(self.pos_encoding(self.src_embedding(src) * math.sqrt(self.d_model))) | |
| tgt_embedded = self.dropout(self.pos_encoding(self.tgt_embedding(tgt) * math.sqrt(self.d_model))) | |
| enc_output = src_embedded | |
| for enc_layer in self.encoder_layers: | |
| enc_output = enc_layer(enc_output, src_mask) | |
| dec_output = tgt_embedded | |
| for dec_layer in self.decoder_layers: | |
| dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask) | |
| return self.fc_out(dec_output) | |
| print("Revised Chunk 2 (Fourth Iteration) completed: Transformer model fixed with max_len=52.") | |
| vocab_size = len(word2idx) | |
| model = Transformer( | |
| src_vocab_size=vocab_size, | |
| tgt_vocab_size=vocab_size, | |
| d_model=256, | |
| num_heads=8, | |
| num_layers=3, | |
| d_ff=1024, | |
| max_len=52, | |
| dropout=0.1 | |
| ).to(device) | |
| # Loss and optimizer | |
| criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>']) | |
| optimizer = optim.Adam(model.parameters(), lr=0.0001) | |
| # Training loop with progress feedback | |
| def train(model, data, epochs=20): | |
| model.train() | |
| total_batches = len(data) // batch_size + (1 if len(data) % batch_size else 0) | |
| print(f"Total batches per epoch: {total_batches}") | |
| for epoch in range(epochs): | |
| total_loss = 0 | |
| for batch_idx, (src_batch, tgt_batch) in enumerate(get_batches(data, batch_size, max_len=52), 1): | |
| if batch_idx % 100 == 0: # Printing every 100 batches for feedback | |
| print(f"Epoch {epoch + 1}, Batch {batch_idx}/{total_batches} ") | |
| optimizer.zero_grad() | |
| output = model(src_batch, tgt_batch[:, :-1]) | |
| loss = criterion(output.view(-1, vocab_size), tgt_batch[:, 1:].reshape(-1)) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| avg_loss = total_loss / total_batches | |
| print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}") | |
| # Main function | |
| def translate(model, sentence, max_len=52): | |
| model.eval() | |
| with torch.no_grad(): | |
| src = text_to_tensor(sentence, word2idx, tokenize, max_len).unsqueeze(0).to(device) | |
| tgt = torch.tensor([word2idx['<sos>']], dtype=torch.long).unsqueeze(0).to(device) | |
| for _ in range(max_len): | |
| output = model(src, tgt) | |
| next_token = output[:, -1, :].argmax(dim=-1).item() | |
| if next_token == word2idx['<eos>']: | |
| break | |
| tgt = torch.cat([tgt, torch.tensor([[next_token]], dtype=torch.long).to(device)], dim=1) | |
| translated = [idx2word[idx.item()] for idx in tgt[0] if idx.item() in idx2word] | |
| return ' '.join(translated[1:]) | |
| # Testing | |
| test_sentence = "ุนู ุฑู ุฑุงูุญ ุงูู ูุณููุ" | |
| translated = translate(model, test_sentence) | |
| print(f"Input: {test_sentence}") | |
| print(f"Translated: {translated}") | |
| print("Chunk 3 completed: Training and inference implemented.") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Instantiate the model (assuming train_dataset is already defined) | |
| model = Transformer( | |
| src_vocab_size=vocab_size, | |
| tgt_vocab_size=vocab_size | |
| ).to(device) | |
| # Load model checkpoint and set to evaluation mode | |
| model.load_state_dict(torch.load("habibi.pth", map_location=device)) | |
| model.eval() | |
| def gradio_translate(text): | |
| return translate(model, text) | |
| interface = gr.Interface( | |
| fn=gradio_translate, | |
| inputs=gr.Textbox(lines=2, placeholder="Enter Arabic sentence here..."), | |
| outputs="text", | |
| title="Habibi-Translator", | |
| description="Translate Arabic sentences to English using a Transformer model." | |
| ) | |
| interface.launch() | |
| print("Chunk 4 completed: Gradio interface deployed.") | |