From 64930084301f55e8a93e615152eb1b8f56224dd2 Mon Sep 17 00:00:00 2001 From: marwan2232004 <118024824+marwan2232004@users.noreply.github.com> Date: Fri, 11 Oct 2024 21:10:57 +0300 Subject: [PATCH] Initial commit --- .gitignore | 165 ++++++++++++++++++++++++++ Inference.py | 58 +++++++++ main.py | 97 +++++++++++++++ requirements.txt | 10 ++ utils/Audio_Processing.py | 38 ++++++ utils/Constants.py | 22 ++++ utils/MMS.py | 177 ++++++++++++++++++++++++++++ utils/NLP.py | 68 +++++++++++ utils/Transformer.py | 241 ++++++++++++++++++++++++++++++++++++++ utils/Translation.py | 21 ++++ 10 files changed, 897 insertions(+) create mode 100644 .gitignore create mode 100644 Inference.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 utils/Audio_Processing.py create mode 100644 utils/Constants.py create mode 100644 utils/MMS.py create mode 100644 utils/NLP.py create mode 100644 utils/Transformer.py create mode 100644 utils/Translation.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4d99868 --- /dev/null +++ b/.gitignore @@ -0,0 +1,165 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +.idea +ASR.pth +Test Arabic.mp3 +test_main.http +# C extensions +*.so +*.pth +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file diff --git a/Inference.py b/Inference.py new file mode 100644 index 0000000..38ee7ac --- /dev/null +++ b/Inference.py @@ -0,0 +1,58 @@ +import numpy as np +from utils.Audio_Processing import preprocess_audio +from utils.Constants import * +from utils.MMS import get_device, MMS, greedyDecoder +from utils.NLP import preprocess_vocab +import torch + +############################################################################################ + + +model_path = "./ASR_2_1_220.pth" + + +############################################################################################ + + +def predict(audio_file): + device = get_device() + + processed_audios = [] + mel_spec, duration = preprocess_audio(audio_file) + processed_audios.append(mel_spec) + padded_audios = [ + ( + mel_spec.shape[-1], + np.pad( + mel_spec, + ((0, 0), (0, N_FRAMES - mel_spec.shape[-1])), + mode="constant", + ), + ) + for mel_spec in processed_audios + ] + + char2idx, idx2char, vocab_size = preprocess_vocab() + + # load model + + model = MMS( + vocab_size=vocab_size, + max_encoder_seq_len=math.ceil(N_FRAMES / 2), + max_decoder_seq_len=MAX_SEQ_LEN, + num_encoder_layers=2, + num_decoder_layers=1, + d_model=512, + nhead=8, + dim_feedforward=2048, + ) + + model.load_state_dict(torch.load(model_path, weights_only=True)) + model.to(device) + model.eval() + + result = greedyDecoder( + model, padded_audios[0][1], padded_audios[0][0], char2idx, idx2char, device + ) + + return result diff --git a/main.py b/main.py new file mode 100644 index 0000000..50229a7 --- /dev/null +++ b/main.py @@ -0,0 +1,97 @@ +import sys +import io + +from fastapi import ( + FastAPI, + File, + UploadFile, +) +from openai import OpenAI +import dotenv +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from Inference import predict + +import tempfile +import os +from utils.Translation import get_translate + +dotenv.load_dotenv() +app = FastAPI() +client = OpenAI() + +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Allow all origins + allow_credentials=True, + allow_methods=["*"], # Allow all methods (GET, POST, etc.) + allow_headers=["*"], # Allow all headers +) + + +@app.get("/") +async def root(): + return {"message": "Hello World"} + + +@app.post("/translate-any") +async def translate(txt: str, language: str): + completion = client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + { + "role": "system", + "content": f"Translate this {txt} into {language} directly without any other words", + } + ], + ) + print(completion.choices[0].message.content) + return {"response": completion.choices[0].message.content} + + +class TranslationRequest(BaseModel): + text: str + + +@app.post("/translate/auto") +async def translate(request: TranslationRequest): + response = get_translate(request.text) + return {"translation": response} + + +@app.post("/translate/en") +async def translate(request: TranslationRequest): + # response = get_translate(request.text) + return {"translation": "ليس بعد"} + + +@app.post("/audio2text") +async def upload_audio(file: UploadFile = File(...)): + # Read the uploaded audio file into memory + contents = await file.read() + + # Get the current working directory + current_dir = os.getcwd() + print(current_dir, flush=True) + + # Create a temporary file in the current working directory + with tempfile.NamedTemporaryFile( + dir=current_dir, delete=False, suffix=".wav" + ) as tmp_file: + tmp_file.write(contents) + tmp_file_path = tmp_file.name # Get the path of the temp file + + try: + # Pass the path of the saved file to the predict function + print(f"Temporary file created at: {tmp_file_path}", flush=True) + result = predict(tmp_file_path) + finally: + # Clean up the temporary file after prediction + os.remove(tmp_file_path) + print(f"Temporary file deleted: {tmp_file_path}", flush=True) + + return {"text": result} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f4fc38e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +fastapi~=0.115.0 +openai~=1.50.2 +python-dotenv~=1.0.1 +numpy~=2.0.2 +torch~=2.4.1 +uvicorn~=0.31.0 +python-multipart~=0.0.12 +pydantic~=2.8.2 +librosa~=0.10.2.post1 +requests~=2.32.3 \ No newline at end of file diff --git a/utils/Audio_Processing.py b/utils/Audio_Processing.py new file mode 100644 index 0000000..0fe19f0 --- /dev/null +++ b/utils/Audio_Processing.py @@ -0,0 +1,38 @@ +import librosa +import numpy as np +from utils.Constants import * + + +def pad_or_trim(array, length=N_SAMPLES, axis=-1, padding=True): + if array.shape[axis] > length: + array = array.take(indices=range(length), axis=axis) + + if padding & (array.shape[axis] < length): + pad_widths = [(0, 0)] * array.ndim + pad_widths[axis] = (0, length - array.shape[axis]) + array = np.pad(array, pad_widths) + + return array + + +# Function to load and preprocess audio +def preprocess_audio(file_path): + audio_data, _ = librosa.load(file_path, sr=SAMPLE_RATE) + + duration = librosa.get_duration(y=audio_data, sr=SAMPLE_RATE) + + modified_audio = pad_or_trim(audio_data, padding=False) + + sgram = librosa.stft(y=modified_audio, n_fft=N_FFT, hop_length=HOP_LENGTH) + + sgram_mag, _ = librosa.magphase(sgram) + + mel_scale_sgram = librosa.feature.melspectrogram( + S=sgram_mag, sr=SAMPLE_RATE, n_fft=N_FFT, hop_length=HOP_LENGTH, n_mels=N_MELS + ) + + mel_sgram = librosa.amplitude_to_db(mel_scale_sgram, ref=np.min) + + del audio_data, modified_audio, sgram, mel_scale_sgram + + return mel_sgram, duration \ No newline at end of file diff --git a/utils/Constants.py b/utils/Constants.py new file mode 100644 index 0000000..5df6606 --- /dev/null +++ b/utils/Constants.py @@ -0,0 +1,22 @@ +import math + +N_ROWS = 50000 # NUMBER OF ROWS TAKEN FROM THE DATA +MAX_TEXT_LEN = 70 +MAX_SEQ_LEN = 70 +N_MELS = 128 +SAMPLE_RATE = 16000 # NUMBER OF SAMPLES PER SECOND +HOP_LENGTH = ( + 512 # THE STEP LENGTH OF THE SLIDING WINDOW, Commonly set to one-fourth of N_FFT +) +N_FFT = 2048 # NUMBER OF FFT POINTS (WINDOW SIZE) THIS CONTROLS THE RESOLUTION OF THE FREQUENCY DOMAIN ANALYSIS +CHUNK_LENGTH = 15 # 15 SECOND CHUNK +N_SAMPLES = SAMPLE_RATE * CHUNK_LENGTH +N_FRAMES = math.ceil(N_SAMPLES / HOP_LENGTH) +N_SAMPLES_PER_TOKEN = 2 * HOP_LENGTH +FRAMES_PER_SECOND = SAMPLE_RATE // HOP_LENGTH # OR N_FRAMES // CHUNK_LENGTH +TOKENS_PER_SECOND = SAMPLE_RATE // N_SAMPLES_PER_TOKEN +NEG_INFTY = -1e9 +special_tokens = ["", "", "", ""] + + +############################################################################################ \ No newline at end of file diff --git a/utils/MMS.py b/utils/MMS.py new file mode 100644 index 0000000..b387cda --- /dev/null +++ b/utils/MMS.py @@ -0,0 +1,177 @@ +from utils.Transformer import * +from utils.Constants import * +import gc +import numpy as np +from utils.NLP import TextDecoder, tokenize_text + + +def get_device(): + device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") + print(f"Using device: {device}") + return torch.device("cpu") + + +def get_conv_Lout(L_in, conv): + return math.floor( + (L_in + 2 * conv.padding[0] - conv.dilation[0] * (conv.kernel_size[0] - 1) - 1) + / conv.stride[0] + + 1 + ) + + +class MMS(nn.Module): + def __init__(self, vocab_size, d_model=512, nhead=8, num_encoder_layers=6, num_decoder_layers=6, + dim_feedforward=2048, max_encoder_seq_len=100, max_decoder_seq_len=100, n_mels=N_MELS, + dropout=0.1): + super(MMS, self).__init__() + + self.transformer = Transformer(d_model=d_model, + num_heads=nhead, + num_encoder_layers=num_encoder_layers, + num_decoder_layers=num_decoder_layers, + ffn_hidden=dim_feedforward, + drop_prob=dropout) + + self.en_positional_encoding = PositionalEncoding(d_model, max_encoder_seq_len) + self.de_positional_encoding = PositionalEncoding(d_model, max_decoder_seq_len) + self.conv1 = nn.Conv1d(n_mels, d_model, kernel_size=3, padding=1) + self.conv2 = nn.Conv1d(d_model, d_model, kernel_size=3, stride=2, padding=1) + self.gelu = nn.GELU() + + self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=0) + self.d_model = d_model + self.ff = nn.Linear(d_model, vocab_size) + + def get_encoder_seq_len(self, L_in): + return get_conv_Lout(get_conv_Lout(L_in, self.conv1), self.conv2) + + def forward(self, audio, text, encoder_self_attn_mask, decoder_self_attn_mask, decoder_padding_mask, + device): + + audio = self.gelu(self.conv1(audio)) + audio = self.gelu(self.conv2(audio)) + + audio = audio.permute(0, 2, 1) + + en_positional_encoding = self.en_positional_encoding().to(device) + + assert audio.shape[1:] == en_positional_encoding.shape[1:], "incorrect audio shape" + + audio += en_positional_encoding + + text = self.embedding(text) + + de_positional_encoding = self.de_positional_encoding().to(device) + + assert text.shape[1:] == de_positional_encoding.shape[1:], "incorrect text shape" + + text += de_positional_encoding + + out = self.transformer(src=audio, tgt=text, + encoder_self_attention_mask=encoder_self_attn_mask, + decoder_self_attention_mask=decoder_self_attn_mask, + decoder_cross_attention_mask=decoder_padding_mask) + + out = self.ff(out) + + del de_positional_encoding, en_positional_encoding + gc.collect() + + return out + + +def generate_padding_masks( + transcription, audio_original_len, conv_func, frames=N_FRAMES +): + batch_size, seq_len = transcription.size() + audio_len = conv_func(frames) + + look_ahead_mask = torch.full((seq_len, seq_len), True) + look_ahead_mask = torch.triu(look_ahead_mask, diagonal=1) + + encoder_self_attn_mask = torch.full([batch_size, audio_len, audio_len], False) + decoder_padding_self_attn_mask = torch.full([batch_size, seq_len, seq_len], False) + decoder_padding_cross_attn_mask = torch.full( + [batch_size, seq_len, audio_len], False + ) + + for i in range(batch_size): + audio_new_len = conv_func(audio_original_len[i]) + encoder_self_attn_mask[i, audio_new_len:, :] = True + encoder_self_attn_mask[i, :, audio_new_len:] = True + decoder_padding_cross_attn_mask[i, :, audio_new_len:] = True + + zero_indices = np.where(transcription[0].cpu().numpy() == 0)[0] + if len(zero_indices) > 0: + idx = zero_indices[0] + decoder_padding_self_attn_mask[i, idx:, :] = True + decoder_padding_self_attn_mask[i, :, idx:] = True + decoder_padding_cross_attn_mask[i, idx:, :] = True + + decoder_self_attention_mask = torch.where( + look_ahead_mask + decoder_padding_self_attn_mask, NEG_INFTY, 0 + ) + decoder_cross_attention_mask = torch.where( + decoder_padding_cross_attn_mask, NEG_INFTY, 0 + ) + encoder_self_attn_mask = torch.where(encoder_self_attn_mask, NEG_INFTY, 0) + + return ( + encoder_self_attn_mask, + decoder_self_attention_mask, + decoder_cross_attention_mask, + ) + + +def greedyDecoder(model, audio, org_len, char2idx, idx2char, device): + with torch.no_grad(): + transcription = "" + + audio = [torch.tensor(aud) for aud in audio] + audio = torch.stack(audio) + audio = audio.unsqueeze(0) + audio = audio.to(device) + + for i in range(MAX_TEXT_LEN): + + transcription_padded = torch.tensor( + tokenize_text( + transcription, char2idx, max_len=MAX_TEXT_LEN, end_token=False + ), + dtype=torch.long, + ) + + transcription_padded = transcription_padded.unsqueeze(0).to(device) + + encoder_self_attn_mask, decoder_self_attn_mask, decoder_padding_mask = ( + generate_padding_masks( + transcription_padded, [org_len], model.get_encoder_seq_len + ) + ) + + decoder_self_attn_mask = decoder_self_attn_mask.to(device) + decoder_padding_mask = decoder_padding_mask.to(device) + encoder_self_attn_mask = encoder_self_attn_mask.to(device) + # Forward pass + output = model( + audio, + transcription_padded, + encoder_self_attn_mask, + decoder_self_attn_mask, + decoder_padding_mask, + device + ) + + output = F.softmax(output, dim=-1) + + next_token = torch.argmax(output[0, i, :], dim=-1).unsqueeze( + 0 + ) # Get last token prediction + + transcription += TextDecoder(next_token, idx2char) + + # If an end-of-sequence token is predicted, stop + if next_token.item() == char2idx[""]: + break + + return transcription diff --git a/utils/NLP.py b/utils/NLP.py new file mode 100644 index 0000000..7091560 --- /dev/null +++ b/utils/NLP.py @@ -0,0 +1,68 @@ +from utils.Constants import * +import string +import torch + + +def preprocess_vocab(): + # Define the vocabulary: add lowercase letters, digits, and special tokens + english_characters = list( + string.ascii_lowercase + " " + ) # List of English characters + arabic_characters = list( + "ابتثجحخدذرزسشصضطظعغفقكلمنهويئءىةؤ" + ) # List of Arabic characters + + # Combine all characters + characters = english_characters + arabic_characters + vocab = special_tokens + characters # Combine special tokens and characters + + # Build dictionaries for character-to-index and index-to-character + char2idx = {char: idx for idx, char in enumerate(vocab)} + idx2char = {idx: char for idx, char in enumerate(vocab)} + + vocab_size = len(vocab) + print(f"Vocabulary size: {vocab_size}") + return char2idx, idx2char, vocab_size + + +def tokenize_text( + text, char2idx, max_len=MAX_TEXT_LEN, start_token=True, end_token=True +): + for char in text: + idx = char2idx.get(char, char2idx[""]) + if idx == 1: + print(char) + + tokens = [char2idx.get(char, char2idx[""]) for char in text] + + max_len -= start_token + end_token # fot the start and end token + + # pad to a maximum length (for batching) + if max_len is not None: + text_len = len(tokens) + if text_len < max_len: + if end_token: + tokens += [char2idx[""]] + tokens += [char2idx[""]] * (max_len - text_len) # Pad + else: + tokens = tokens[:max_len] # Truncate if longer than max_len + if end_token: + tokens += [char2idx[""]] + + if start_token: + tokens.insert(0, char2idx[""]) + + return tokens + + +def TextDecoder(sentence, idx2char): + out = "" + for token in sentence: + if isinstance(token, torch.Tensor): + token = token.item() + char = idx2char[token] + if char == "": + return out + if not (char in special_tokens): + out += char + return out diff --git a/utils/Transformer.py b/utils/Transformer.py new file mode 100644 index 0000000..373e093 --- /dev/null +++ b/utils/Transformer.py @@ -0,0 +1,241 @@ +import torch +import torch.nn.functional as F +import torch.nn as nn +import math + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len): + super(PositionalEncoding, self).__init__() + self.max_len = max_len + self.d_model = d_model + + def forward(self): + # print('Positional Encoding') + pos = torch.arange(self.max_len, dtype=torch.float).unsqueeze(1) # (max_len,1) + _i = torch.arange(self.d_model, dtype=torch.float).unsqueeze(0) # (1,d_model) + + _i = 1 / torch.pow(torch.tensor(10000.0), (2 * (_i // 2)) / self.d_model) + angles = pos * _i # (max_len * d_model) + + angles[:, 0::2] = torch.sin(angles[:, 0::2]) # even indices + angles[:, 1::2] = torch.cos(angles[:, 1::2]) # odd indices + + return angles[:self.max_len].unsqueeze(0) # convert it to (1,max_len, d_model) so it can be added to a batch + + +def scaled_dot_product_attention(q, k, v, s_mask=None): + d_k = q.shape[-1] + scaled = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k) + if s_mask is not None: + scaled = scaled.permute(1, 0, 2, 3) + s_mask + scaled = scaled.permute(1, 0, 2, 3) + attention = F.softmax(scaled, dim=-1) + output = torch.matmul(attention, v) + return output, attention + + +class MultiHeadAttention(nn.Module): + def __init__(self, _input_dim, d_model, _num_heads=8): + super(MultiHeadAttention, self).__init__() + assert d_model % _num_heads == 0 + self.num_heads = _num_heads + self.input_dim = _input_dim + self.d_model = d_model + self.head_dim = d_model // _num_heads + # This Represents multiplying input embeddings with W_q, W_k, W_v with will result then in Q, K, V matrices + # This is a normal Dense Layer like tf. + # Input matrix is seq_len * input_dim, W_q is output_dim * input_dim. + # The same for W_k and W_v + # Linear Layer = x * W_T + b → W: weights, b: bias, x: input + # so W_q, W_k, W_v will be input_dim * output_dim which will allow us to preform matmul. + # For efficiency, we stack all the weight matrices together + # in one big matrix of size (3 * output_dim) * embedding_size + # when apply matmul this will result of q, k, v all stacked together + self.qkv_layer = nn.Linear(_input_dim, 3 * d_model) + self.output_layer = nn.Linear(d_model, d_model) + + def forward(self, mha_x, self_mask=None): + seq_len = mha_x.shape[1] + batch_size = mha_x.shape[0] + qkv = self.qkv_layer(mha_x) + qkv = qkv.reshape(batch_size, seq_len, self.num_heads, 3 * self.head_dim) + qkv = qkv.permute(0, 2, 1, 3) + q, k, v = qkv.chunk(3, dim=-1) + values, attention = scaled_dot_product_attention(q, k, v, self_mask) + values = values.permute(0, 2, 1, 3) + output = values.reshape(batch_size, seq_len, self.head_dim * self.num_heads) + output = self.output_layer(output) + return output + + +class MultiHeadCrossAttention(nn.Module): + def __init__(self, d_model, _num_heads=8): + super(MultiHeadCrossAttention, self).__init__() + assert d_model % _num_heads == 0 + self.num_heads = _num_heads + self.d_model = d_model + self.head_dim = d_model // _num_heads + self.kv_layer = nn.Linear(d_model, 2 * d_model) + self.q_layer = nn.Linear(d_model, d_model) + self.output_layer = nn.Linear(d_model, d_model) + + def forward(self, mhca_input, encoder_output, cross_mask=None): + decoder_seq_len = mhca_input.shape[1] + encoder_seq_len = encoder_output.shape[1] + batch_size = mhca_input.shape[0] + kv = self.kv_layer(encoder_output) + q = self.q_layer(mhca_input) + + kv = kv.reshape(batch_size, encoder_seq_len, self.num_heads, 2 * self.head_dim) + kv = kv.permute(0, 2, 1, 3) + + q = q.reshape(batch_size, decoder_seq_len, self.num_heads, self.head_dim) + q = q.permute(0, 2, 1, 3) + + k, v = kv.chunk(2, dim=-1) + values, attention = scaled_dot_product_attention(q, k, v, cross_mask) + values = values.permute(0, 2, 1, 3) + output = values.reshape(batch_size, decoder_seq_len, self.head_dim * self.num_heads) + output = self.output_layer(output) + return output + + +class LayerNorm(nn.Module): + def __init__(self, normalization_shape, eps=1e-6): + super(LayerNorm, self).__init__() + self.normalization_shape = normalization_shape + self.eps = eps + self.gamma = nn.Parameter(torch.ones(normalization_shape)) + self.beta = nn.Parameter(torch.zeros(normalization_shape)) + + def forward(self, layer): + dim = [-(i + 1) for i in range(len(self.normalization_shape))] + mean = layer.mean(dim=dim, keepdim=True) + std = (layer.var(dim=dim, keepdim=True) + self.eps).sqrt() + y = (layer - mean) / std + return self.gamma * y + self.beta + + +class PositionWiseFeedForward(nn.Module): + def __init__(self, d_model, hidden, dropout=0.1): + super(PositionWiseFeedForward, self).__init__() + self.linear1 = nn.Linear(d_model, hidden) + self.linear2 = nn.Linear(hidden, d_model) + self.gelu = nn.GELU() + self.dropout = nn.Dropout(p=dropout) + + def forward(self, pos_feed_input): + pos_feed_input = self.linear1(pos_feed_input) + pos_feed_input = self.gelu(pos_feed_input) + pos_feed_input = self.dropout(pos_feed_input) + pos_feed_input = self.linear2(pos_feed_input) + return pos_feed_input + + +class EncoderLayer(nn.Module): + def __init__(self, d_model, num_heads, ffn_hidden, dropout): + super(EncoderLayer, self).__init__() + self.self_attn = MultiHeadAttention(d_model, d_model, num_heads) + self.layer_norm = LayerNorm([d_model], eps=1e-6) + self.dropout = nn.Dropout(dropout) + self.ffn = PositionWiseFeedForward(d_model, ffn_hidden, dropout) + + def forward(self, encoder_layer_input, mask=None): + encoder_layer_residual = encoder_layer_input + encoder_layer_input = self.self_attn(encoder_layer_input, mask) + encoder_layer_input = self.dropout(encoder_layer_input) + encoder_layer_input = self.layer_norm(encoder_layer_input + encoder_layer_residual) + encoder_layer_residual = encoder_layer_input + encoder_layer_input = self.ffn(encoder_layer_input) + encoder_layer_input = self.dropout(encoder_layer_input) + encoder_layer_input = self.layer_norm(encoder_layer_input + encoder_layer_residual) + return encoder_layer_input + + +class SequentialEncoder(nn.Sequential): + def forward(self, *inputs): + sequential_encoder_x, mask = inputs + for module in self._modules.values(): + sequential_encoder_x = module(sequential_encoder_x, mask) + return sequential_encoder_x + + +class Encoder(nn.Module): + def __init__(self, d_model, ffn_hidden, num_heads, dropout, num_layers): + super(Encoder, self).__init__() + self.layers = SequentialEncoder( + *[EncoderLayer(d_model, num_heads, ffn_hidden, dropout) for _ in range(num_layers)]) + + def forward(self, encoder_input, encoder_self_attention_mask): + return self.layers(encoder_input, encoder_self_attention_mask) + + +class DecoderLayer(nn.Module): + def __init__(self, d_model, num_heads, ffn_hidden=2048, dropout=0.1): + super(DecoderLayer, self).__init__() + self.self_attn = MultiHeadAttention(d_model, d_model, num_heads) + self.layer_norm = LayerNorm([d_model], eps=1e-6) + self.dropout = nn.Dropout(dropout) + self.ffn = PositionWiseFeedForward(d_model, ffn_hidden, dropout) + self.cross_attn = MultiHeadCrossAttention(d_model, num_heads) + + def forward(self, decoder_layer_input, encoder_output, self_attn_mask, cross_attn_mask): + decoder_layer_residual = decoder_layer_input + decoder_layer_input = self.self_attn(decoder_layer_input, self_attn_mask) + decoder_layer_input = self.dropout(decoder_layer_input) + decoder_layer_input = self.layer_norm(decoder_layer_input + decoder_layer_residual) + + decoder_layer_residual = decoder_layer_input + decoder_layer_input = self.cross_attn(decoder_layer_input, encoder_output, cross_attn_mask) + decoder_layer_input = self.dropout(decoder_layer_input) + decoder_layer_input = self.layer_norm(decoder_layer_input + decoder_layer_residual) + + decoder_layer_residual = decoder_layer_input + decoder_layer_input = self.ffn(decoder_layer_input) + decoder_layer_input = self.dropout(decoder_layer_input) + decoder_layer_input = self.layer_norm(decoder_layer_input + decoder_layer_residual) + return decoder_layer_input + + +class SequentialDecoder(nn.Sequential): + def forward(self, *inputs): + sequential_decoder_x, encoder_output, self_mask, cross_mask = inputs + for module in self._modules.values(): + sequential_decoder_x = module(sequential_decoder_x, encoder_output, self_mask, cross_mask) + return sequential_decoder_x + + +class Decoder(nn.Module): + def __init__(self, d_model, ffn_hidden, num_heads, dropout, num_layers): + super(Decoder, self).__init__() + self.layers = SequentialDecoder(*[DecoderLayer(d_model, num_heads, ffn_hidden, dropout) + for _ in range(num_layers)]) + + def forward(self, decoder_input, encoder_output, self_attn_mask=None, cross_attn_mask=None): + return self.layers(decoder_input, encoder_output, self_attn_mask, cross_attn_mask) + + +class Transformer(nn.Module): + def __init__(self, + d_model, + ffn_hidden, + num_heads, + drop_prob, + num_encoder_layers, + num_decoder_layers + ): + super().__init__() + self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_encoder_layers) + self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_decoder_layers) + self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') + + def forward(self, + src, + tgt, + encoder_self_attention_mask=None, + decoder_self_attention_mask=None, + decoder_cross_attention_mask=None): + src = self.encoder(src, encoder_self_attention_mask) + out = self.decoder(tgt,src, decoder_self_attention_mask, decoder_cross_attention_mask) + return out diff --git a/utils/Translation.py b/utils/Translation.py new file mode 100644 index 0000000..0230d15 --- /dev/null +++ b/utils/Translation.py @@ -0,0 +1,21 @@ +import requests +import sys +import io + + +def get_translate(article_ar): + url = "https://openl-translate.p.rapidapi.com/translate" + + payload = {"target_lang": "arz", "text": article_ar} + headers = { + "x-rapidapi-key": "394717c44cmsh82600cc7cdcfb18p1008a8jsne46053c531d3", + "x-rapidapi-host": "openl-translate.p.rapidapi.com", + "Content-Type": "application/json", + } + + response = requests.post(url, json=payload, headers=headers) + return response.json()["translatedText"] + + +# sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") +# print(get_translate("Hello mohamed I want to go kitchen and eating chicken with your mom"))