Skip to content

Commit

Permalink
multiple transcribers support added
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanshin committed Aug 20, 2023
1 parent fc3e793 commit 299c32c
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 40 deletions.
25 changes: 25 additions & 0 deletions components/config_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Text, Union, Dict, Iterable
from pathlib import Path
from pydantic import BaseModel, model_validator

# Base model for configuration file
class ConfigStructure(BaseModel):
working_dir: Union[Text,Path]
output_dir: Union[Text,Path]
clean_audio_dir: Union[Text,Path] = None
model: Union[Text,None] = 'large-v2'
devices: Union[Text, Iterable, None] = 'cpu'
logs_to_db: bool
logs_db_path: Union[Text,Path,None]
class Config:
extra = 'forbid'
validate_assigment = True

@model_validator(mode= 'before')
@classmethod
def set_null_feilds(cls, field_values):
if field_values['devices'] is None:
field_values['devices'] = ['cpu']
if field_values['model'] is None:
field_values['model'] = 'tiny'
return field_values
34 changes: 13 additions & 21 deletions components/configs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,37 @@
"""
from typing import Text, Union, Dict
from pathlib import Path
from pydantic import BaseModel
from components.config_structure import ConfigStructure
import os
import yaml

# Base model for configuration file
class ConfigStructure(BaseModel):
working_dir: Union[Text,Path]
output_dir: Union[Text,Path]
logs_to_db: bool
logs_db_path: Union[Text,Path,None]
class Config:
extra = 'forbid'


def create_dirs(configs_dict: Dict) -> None:
def create_dirs(configs_dict: ConfigStructure) -> None:
""" Create necessary directories """
clean_audio_path = os.path.join(configs_dict['working_dir'], 'CLEAN_AUDIO')
clean_audio_path = os.path.join(configs_dict.working_dir, 'CLEAN_AUDIO')
if not os.path.exists(clean_audio_path):
os.makedirs(clean_audio_path)
configs_dict['clean_audio_dir'] = clean_audio_path
if not os.path.exists(configs_dict['output_dir']):
os.makedirs(configs_dict['output_dir'])
if configs_dict['logs_to_db'] == True:
if not os.path.exists(configs_dict['logs_db_path']):
os.makedirs(configs_dict['logs_db_path'])
configs_dict.clean_audio_dir = clean_audio_path
if not os.path.exists(configs_dict.output_dir):
os.makedirs(configs_dict.output_dir)
if configs_dict.logs_to_db == True:
if not os.path.exists(configs_dict.logs_db_path):
os.makedirs(configs_dict.logs_db_path)

return

def validate(configs_dict: Dict) -> None:
valid_conf_model = ConfigStructure(**configs_dict)
return
return valid_conf_model

def read_configs(config_file: Union[Text,Path] = "config.yaml") -> Dict:
def read_configs(config_file: Union[Text,Path] = "config.yaml") -> ConfigStructure:
""" Read configs"""
configs_dict = dict()
with open(config_file, "r") as stream:
try:
configs_dict = yaml.safe_load(stream)
validate(configs_dict)
configs_dict = validate(configs_dict)
configs_dict.devices = list(configs_dict.devices)
create_dirs(configs_dict)
return configs_dict
except yaml.YAMLError as exc:
Expand Down
4 changes: 2 additions & 2 deletions components/logs_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ def configure_loger(APP_CONFIGS) -> logging.Logger:
attributes_list = ['asctime', 'levelname', 'service_name', 'message']
formatter = logging.Formatter('%(' + ((')s' + db_logs_handler.DEFAULT_SEPARATOR + '%(').join(attributes_list)) + ')s')

if APP_CONFIGS['logs_to_db'] == True:
if APP_CONFIGS.logs_to_db == True:
logger.propagate = False
database = os.path.join(APP_CONFIGS['logs_db_path'], 'LOGS.db')
database = os.path.join(APP_CONFIGS.logs_db_path, 'LOGS.db')
table = 'asr_logs'
sql_handler = db_logs_handler.SQLiteHandler(database = database, table = table, attributes_list = attributes_list)
sql_handler.setLevel(logging.INFO)
Expand Down
2 changes: 1 addition & 1 deletion components/noisereducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def cleaner_worker(configs_dict, queue, logs_queue) -> None:
if not queue.empty():
f_path = queue.get()
logs_queue.put(f'{f_path} Clean start' + '|' + SERVICE_NAME)
reduce_noise(f_path, configs_dict['clean_audio_dir'])
reduce_noise(f_path, configs_dict.clean_audio_dir)
logs_queue.put(f'{f_path} Clean end'+'|' + SERVICE_NAME)
os.remove(f_path)
pass
19 changes: 11 additions & 8 deletions components/transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Text, Union
from pathlib import Path
from huggingsound import SpeechRecognitionModel
import signal
import whisper
import torch
import json
Expand All @@ -25,20 +26,22 @@ def transcribe_audio(
json.dump(transcription, out_file, ensure_ascii= False)
return None

def transcriber_worker(configs_dict, queue, logs_queue) -> None:

def transcriber_worker(configs_dict, queue, logs_queue, device) -> None:
""" Daemon cleaner worker """
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#check or load model
model = whisper.load_model("large-v2", device)
#model = whisper.load_model("medium", device)
#model = SpeechRecognitionModel("jonatasgrosman/wav2vec2-large-xlsr-53-russian", device)
try:
model = whisper.load_model(configs_dict.model, torch.device(device))
except RuntimeError as e:
print(e)
os.kill(os.getppid(), signal.SIGTERM) # kill parent proc
f_path = []
while True:
if not queue.empty():
f_path = queue.get()
logs_queue.put(f'{f_path} Transcribe start' + '|' + SERVICE_NAME)
transcribe_audio(f_path, configs_dict['output_dir'], model)
logs_queue.put(f'{f_path} Transcribe end' + '|' + SERVICE_NAME)
logs_queue.put(f'{f_path} Transcribe start' + '|' + SERVICE_NAME + f'_on_{device}_{os.getpid()}')
transcribe_audio(f_path, configs_dict.output_dir, model)
logs_queue.put(f'{f_path} Transcribe end' + '|' + SERVICE_NAME + f'_on_{device}')
torch.cuda.empty_cache()
gc.collect()
os.remove(f_path)
Expand Down
15 changes: 15 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
#YAML
# [directories settings]
working_dir: /media/ivan/Диск/WAV2VEC_WD
output_dir: /media/ivan/Диск/WAV2VEC_OUTPUT

# [transcriber settings]
model: #large-v2 # 'tiny' by default if blank (Whisper model settings)

# [multiple GPU support]
devices:
- cpu
# set your cuda devices: #- cuda:0
#- cuda:1
# OR
#set blank if CPU (set same device multiple times to spawn more workers: #- cpu
#- cpu)

# [logging settings]
logs_to_db: True # False
logs_db_path: /media/ivan/Диск/WAV2VEC_DB # set path where to store SQLite DB if logs_to_db = True| else None
35 changes: 27 additions & 8 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,40 @@

# subprocesses
# 1) watchdog with queue to cleaning
watchdog_cleaner_proc = mp.Process(target= create_observer, args= (APP_CONFIGS['working_dir'], queue_to_cleaning))
watchdog_cleaner_proc = mp.Process(target= create_observer, args= (APP_CONFIGS.working_dir,\
queue_to_cleaning))
watchdog_cleaner_proc.daemon= True
watchdog_cleaner_proc.start()
##watchdog_proc.join()


## 2) cleaner
cleaner = mp.Process(target= cleaner_worker, args= (APP_CONFIGS, queue_to_cleaning, logs_queue))
cleaner = mp.Process(target= cleaner_worker, args= (APP_CONFIGS,\
queue_to_cleaning,\
logs_queue))
cleaner.daemon= True
cleaner.start()


# 3) watchdog with queue to transcribation
watchdog_transcribe_proc = mp.Process(target= create_observer, args= (APP_CONFIGS['clean_audio_dir'], queue_to_transcribe))
watchdog_transcribe_proc = mp.Process(target= create_observer, args= (APP_CONFIGS.clean_audio_dir,\
queue_to_transcribe))
watchdog_transcribe_proc.daemon= True
watchdog_transcribe_proc.start()
# 4) Russian wav2vec implementation
transcriber_proc = mp.Process(target= transcriber_worker, args= (APP_CONFIGS, queue_to_transcribe, logs_queue))
transcriber_proc.daemon= True
transcriber_proc.start()


# 4) Whisper transcriber implementation (multiple GPU support)
transcriber_proc = []
for device in APP_CONFIGS.devices:
t_proc = mp.Process(target= transcriber_worker, args= (APP_CONFIGS,\
queue_to_transcribe,\
logs_queue,\
device))
t_proc.daemon= True
transcriber_proc.append(t_proc)
t_proc.start()


logging.LoggerAdapter(logger, {'service_name': 'MAIN'}).info('Startup success')
try:
while True:
Expand All @@ -43,7 +61,8 @@
watchdog_cleaner_proc.terminate()
cleaner.terminate()
watchdog_transcribe_proc.terminate()
transcriber_proc.terminate()
for proc in transcriber_proc:
proc.terminate()
logging.LoggerAdapter(logger, {'service_name': 'MAIN'}).info('All processes terminated')


0 comments on commit 299c32c

Please sign in to comment.