-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
116 lines (95 loc) · 3.75 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import whisper
import os
import json
import nltk
import requests
import threading
import time
import logging
from datetime import datetime
nltk.download("punkt")
STREAM_ID = 1189
CHUNK_PER_SECOND = 1024 * 2
RAW_AUDIO_PATH = "./files/raw_audio"
TRANSCRIBED_PATH = "./files/transcribed"
THREATS_PATH = "./files/threats"
THREAT_LIBRARY = "threat_library.json"
logging.basicConfig(level=logging.INFO)
stop_event = threading.Event()
def data_gathering(url, stream_id, chunk_per_second):
while not stop_event.is_set():
response = requests.get(url, stream=True)
for chunk in response.iter_content(chunk_size=chunk_per_second * 60):
formatted_date = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
file_path = os.path.join(RAW_AUDIO_PATH, f"{stream_id}_{formatted_date}.mp3")
with open(file_path, "wb") as f:
f.write(chunk)
logging.info(f"Saved audio chunk at {file_path}")
def transcription():
model = whisper.load_model("small.en")
options = whisper.DecodingOptions().__dict__.copy()
options['no_speech_threshold'] = 0.3
while not stop_event.is_set():
for file in os.listdir(RAW_AUDIO_PATH):
audio_path = os.path.join(RAW_AUDIO_PATH, file)
output_path = os.path.join(TRANSCRIBED_PATH, f"{file}.txt")
result = whisper.transcribe(model, audio_path)
if result and result["text"]:
with open(output_path, "w", encoding="utf-8") as f:
f.write(result["text"])
logging.info(f"Saved transcription at {output_path}")
else:
os.remove(audio_path)
time.sleep(10)
def threat_detection():
with open(THREAT_LIBRARY) as f:
threats_keywords = json.load(f)
while not stop_event.is_set():
for file in os.listdir(TRANSCRIBED_PATH):
transcribed_path = os.path.join(TRANSCRIBED_PATH, file)
threats_file_path = os.path.join(THREATS_PATH, f"{file}.txt")
found_threats = set()
with open(transcribed_path) as f:
text = f.read()
tokens = nltk.word_tokenize(text)
for token in tokens:
for category in threats_keywords:
if token in category["keywords"]:
found_threats.add(token)
if found_threats:
with open(threats_file_path, "w") as f:
f.write("\n".join(found_threats))
logging.info(f"Saved found threats at {threats_file_path}")
# else:
# os.remove(transcribed_path)
time.sleep(10)
def setup_folders():
for folder in [RAW_AUDIO_PATH, TRANSCRIBED_PATH, THREATS_PATH]:
if not os.path.exists(folder):
os.makedirs(folder)
if __name__ == "__main__":
try:
setup_folders()
data_gathering_thread = threading.Thread(
target=data_gathering,
args=(
"https://broadcastify.cdnstream1.com/1189",
STREAM_ID,
CHUNK_PER_SECOND,
),
)
transcription_thread = threading.Thread(target=transcription, args=())
threat_detection_thread = threading.Thread(target=threat_detection, args=())
data_gathering_thread.start()
transcription_thread.start()
threat_detection_thread.start()
while not stop_event.is_set():
data_gathering_thread.join(1)
transcription_thread.join(1)
threat_detection_thread.join(1)
except KeyboardInterrupt:
logging.info("Exiting...")
stop_event.set()
data_gathering_thread.join()
transcription_thread.join()
threat_detection_thread.join()