diff --git a/components/configs_reader.py b/components/configs_reader.py index 4dc8631..2493172 100644 --- a/components/configs_reader.py +++ b/components/configs_reader.py @@ -4,22 +4,36 @@ """ from typing import Text, Union, Dict from pathlib import Path +from pydantic import BaseModel import os import yaml +# Base model for configuration file +class ConfigStructure(BaseModel): + working_dir: Union[Text,Path] + output_dir: Union[Text,Path] + logs_to_db: bool + logs_db_path: Union[Text,Path,None] + class Config: + extra = 'forbid' + + def create_dirs(configs_dict: Dict) -> None: """ Create necessary directories """ clean_audio_path = os.path.join(configs_dict['working_dir'], 'CLEAN_AUDIO') - if not os.path.exists(clean_audio_path): + if not os.path.exists(clean_audio_path): os.makedirs(clean_audio_path) configs_dict['clean_audio_dir'] = clean_audio_path - if not os.path.exists(configs_dict['output_dir']): + if not os.path.exists(configs_dict['output_dir']): os.makedirs(configs_dict['output_dir']) + if configs_dict['logs_to_db'] == True: + if not os.path.exists(configs_dict['logs_db_path']): + os.makedirs(configs_dict['logs_db_path']) + return def validate(configs_dict: Dict) -> None: - #TODO: using Pydantic validate config dictionary - # https://stackoverflow.com/questions/45812387/how-to-validate-structure-or-schema-of-dictionary-in-python + valid_conf_model = ConfigStructure(**configs_dict) return def read_configs(config_file: Union[Text,Path] = "config.yaml") -> Dict: diff --git a/components/db_logs_handler.py b/components/db_logs_handler.py new file mode 100644 index 0000000..e333fe9 --- /dev/null +++ b/components/db_logs_handler.py @@ -0,0 +1,59 @@ +import logging +import sqlite3 + + +DEFAULT_SEPARATOR = '|' +DEFAULT_DATA_TYPE = 'TEXT' + + +#WARNING: attributes must be choosen from https://docs.python.org/3/library/logging.html#formatter-objects +DEFAULT_ATTRIBUTES_LIST = ['asctime', 'levelname', 'name', 'message'] + +class SQLiteHandler(logging.Handler): + def __init__(self, database, table, attributes_list): + ''' + SQLiteHandler class constructor + Parameters: + self: instance of the class + database: database + table: log table name + attributes_list: log table columns + Returns: + None + ''' + #super(SQLiteHandler, self).__init__() # for python 2.X + super().__init__() # for python 3.X + self.database = database + self.table = table + self.attributes = attributes_list + + # Create table if needed + create_table_sql = 'CREATE TABLE IF NOT EXISTS ' + self.table + ' (' + ((' ' + DEFAULT_DATA_TYPE + ', ').join(self.attributes)) + ' ' + DEFAULT_DATA_TYPE + ');' + conn = sqlite3.connect(self.database) + conn.execute(create_table_sql) + conn.commit() + conn.close() + + def emit(self, record): + ''' + Save the log record + Parameters: + self: instance of the class + record: log record to be saved + Returns: + None + ''' + # Use default formatting if no formatter is set + self.format(record) + + #print(record.__dict__) + record_values = [record.__dict__[k] for k in self.attributes] + str_record_values = ', '.join("'{0}'".format(v.replace("'", '').replace('"', '').replace('\n', ' ')) for v in record_values) + #print(str_record_values) + + insert_sql = 'INSERT INTO ' + self.table + ' (' + (', '.join(self.attributes)) + ') VALUES (' + str_record_values + ');' + #print(insert_sql) + conn = sqlite3.connect(self.database) + conn.execute(insert_sql) + conn.commit() + conn.close() \ No newline at end of file diff --git a/components/logs_writer.py b/components/logs_writer.py new file mode 100644 index 0000000..31f4e02 --- /dev/null +++ b/components/logs_writer.py @@ -0,0 +1,42 @@ +""" Logger configurer depends on `logs_to_db` option + in configuretion file: + True -> write to set path db + False -> default console logging +""" +from components import db_logs_handler +import os +import logging + +def configure_loger(APP_CONFIGS) -> logging.Logger: + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + attributes_list = ['asctime', 'levelname', 'service_name', 'message'] + formatter = logging.Formatter('%(' + ((')s' + db_logs_handler.DEFAULT_SEPARATOR + '%(').join(attributes_list)) + ')s') + + if APP_CONFIGS['logs_to_db'] == True: + logger.propagate = False + database = os.path.join(APP_CONFIGS['logs_db_path'], 'LOGS.db') + table = 'asr_logs' + sql_handler = db_logs_handler.SQLiteHandler(database = database, table = table, attributes_list = attributes_list) + sql_handler.setLevel(logging.INFO) + sql_handler.setFormatter(formatter) + logger.addHandler(sql_handler) + else: + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + console_handler.setFormatter(formatter) + + + + error_file_handler = logging.FileHandler('error.log') + error_file_handler.setLevel(logging.ERROR) + error_file_handler.setFormatter(formatter) + + critical_file_handler = logging.FileHandler('critical.log') + critical_file_handler.setLevel(logging.CRITICAL) + critical_file_handler.setFormatter(formatter) + + logger.addHandler(console_handler) + logger.addHandler(error_file_handler) + logger.addHandler(critical_file_handler) + return logger \ No newline at end of file diff --git a/components/noisereducer.py b/components/noisereducer.py index 1d81477..437eec6 100644 --- a/components/noisereducer.py +++ b/components/noisereducer.py @@ -10,6 +10,8 @@ import os import noisereduce as nr +SERVICE_NAME = 'NOISE_CLEANER' + def reduce_noise(path_to_audio_file: Union[Text,Path], output_dir: Union[Text,Path]) -> None: """ Reduce noize from single audio file """ @@ -22,11 +24,13 @@ def reduce_noise(path_to_audio_file: Union[Text,Path], output_dir: Union[Text,Pa wavfile.write(os.path.join(output_dir, ts + "_" + file_name), rate, reduced_noise) return None -def cleaner_worker(configs_dict, queue) -> None: +def cleaner_worker(configs_dict, queue, logs_queue) -> None: """ Daemon cleaner worker """ while True: if not queue.empty(): f_path = queue.get() + logs_queue.put(f'{f_path} Clean start' + '|' + SERVICE_NAME) reduce_noise(f_path, configs_dict['clean_audio_dir']) + logs_queue.put(f'{f_path} Clean end'+'|' + SERVICE_NAME) os.remove(f_path) pass diff --git a/components/transcriber.py b/components/transcriber.py index e338d89..8b7079d 100644 --- a/components/transcriber.py +++ b/components/transcriber.py @@ -7,6 +7,8 @@ import json import os +SERVICE_NAME = 'TRANSCRIBER' + def transcribe_audio( path_to_audio_file: Union[Text,Path], output_dir: Union[Text,Path], @@ -19,12 +21,14 @@ def transcribe_audio( json.dump(transcription, out_file, ensure_ascii= False) return None -def transcriber_worker(configs_dict, queue) -> None: +def transcriber_worker(configs_dict, queue, logs_queue) -> None: """ Daemon cleaner worker """ model = SpeechRecognitionModel("jonatasgrosman/wav2vec2-large-xlsr-53-russian") while True: if not queue.empty(): f_path = queue.get() + logs_queue.put(f'{f_path} Transcribe start' + '|' + SERVICE_NAME) transcribe_audio(f_path, configs_dict['output_dir'], model) + logs_queue.put(f'{f_path} Transcribe end' + '|' + SERVICE_NAME) os.remove(f_path) pass \ No newline at end of file diff --git a/components/watchdog_daemon.py b/components/watchdog_daemon.py index 57559fb..a458a10 100644 --- a/components/watchdog_daemon.py +++ b/components/watchdog_daemon.py @@ -1,5 +1,6 @@ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler +import mimetypes as mime """ Create Handler for new audio files in working directory""" class AudioFilesHandler(FileSystemEventHandler): @@ -9,7 +10,7 @@ def __init__(self, queue) -> None: def on_created(self, event): #if event.is_directory == False: - if event: + if event and ('audio' in mime.guess_type(event.src_path)[0]): self.queue.put(event.src_path) print("added_to_queue", event.src_path) diff --git a/config.yaml b/config.yaml index 4a00f8c..27932c5 100644 --- a/config.yaml +++ b/config.yaml @@ -1,3 +1,5 @@ #YAML working_dir: D:\\WAV2VEC_ASR_WD\ -output_dir: D:\\WAV2VEC_ASR_OUPUT\ \ No newline at end of file +output_dir: D:\\WAV2VEC_ASR_OUPUT\ +logs_to_db: True # False +logs_db_path: D:\\WAV2VEC_ASR_DB\ # set path where to store SQLite DB if logs_to_db = True| else None \ No newline at end of file diff --git a/critical.log b/critical.log new file mode 100644 index 0000000..e69de29 diff --git a/error.log b/error.log new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py index 0db3fa9..675da63 100644 --- a/main.py +++ b/main.py @@ -2,16 +2,18 @@ from components.watchdog_daemon import create_observer from components.noisereducer import cleaner_worker from components.transcriber import transcriber_worker -from time import sleep +from components.logs_writer import configure_loger import multiprocessing as mp +import logging - -APP_CONFIGS = read_configs() # read configuration file +APP_CONFIGS = read_configs() # read and validate configuration file if __name__ == '__main__': + logger = configure_loger(APP_CONFIGS) # variables - queue_to_cleaning = mp.Queue() - queue_to_transcribe = mp.Queue() + logs_queue = mp.Queue(-1) + queue_to_cleaning = mp.Queue(-1) + queue_to_transcribe = mp.Queue(-1) # subprocesses # 1) watchdog with queue to cleaning @@ -20,7 +22,7 @@ watchdog_cleaner_proc.start() #watchdog_proc.join() # 2) cleaner - cleaner = mp.Process(target= cleaner_worker, args= (APP_CONFIGS, queue_to_cleaning)) + cleaner = mp.Process(target= cleaner_worker, args= (APP_CONFIGS, queue_to_cleaning, logs_queue)) cleaner.daemon= True cleaner.start() # 3) watchdog with queue to transcribation @@ -28,16 +30,20 @@ watchdog_transcribe_proc.daemon= True watchdog_transcribe_proc.start() # 4) Russian wav2vec implementation - transcriber_proc = mp.Process(target= transcriber_worker, args= (APP_CONFIGS, queue_to_transcribe)) + transcriber_proc = mp.Process(target= transcriber_worker, args= (APP_CONFIGS, queue_to_transcribe, logs_queue)) transcriber_proc.daemon= True - transcriber_proc.start() + transcriber_proc.start() + logging.LoggerAdapter(logger, {'service_name': 'MAIN'}).info('Startup success') try: while True: + message= logs_queue.get().split('|') + logging.LoggerAdapter(logger, {'service_name': f'{message[1]}'}).info(message[0]) pass except KeyboardInterrupt: watchdog_cleaner_proc.terminate() cleaner.terminate() watchdog_transcribe_proc.terminate() transcriber_proc.terminate() + logging.LoggerAdapter(logger, {'service_name': 'MAIN'}).info('All processes terminated') diff --git a/requirements.txt b/requirements.txt index 9c5a795..e69de29 100644 Binary files a/requirements.txt and b/requirements.txt differ